001/*******************************************************************************
002The MIT License (MIT)
003
004Copyright (c) 2024 KILLCODING.COM
005
006Permission is hereby granted, free of charge, to any person obtaining a copy
007of this software and associated documentation files (the "Software"), to deal
008in the Software without restriction, including without limitation the rights
009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
010copies of the Software, and to permit persons to whom the Software is
011furnished to do so, subject to the following conditions:
012
013The above copyright notice and this permission notice shall be included in
014all copies or substantial portions of the Software.
015
016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
022THE SOFTWARE.
023*****************************************************************************/
024package com.killcoding.file;
025
026import java.io.File;
027import com.killcoding.datasource.DriverDataSource;
028import com.killcoding.tool.ConfigProperties;
029import com.killcoding.tool.CommonTools;
030import com.killcoding.log.Logger;
031import com.killcoding.log.LoggerFactory;
032import java.io.FileInputStream;
033import java.io.IOException;
034import java.sql.Connection;
035import java.sql.SQLException;
036import com.killcoding.datasource.CacheDriverExecutor;
037import java.util.Map;
038import java.util.HashMap;
039import com.killcoding.datasource.Clock;
040import java.sql.Timestamp;
041import java.nio.file.Files;
042import java.util.concurrent.ConcurrentHashMap;
043import java.util.List;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.io.ByteArrayOutputStream;
047import java.sql.Blob;
048import java.nio.file.Paths;
049import java.nio.file.Path;
050import java.net.URI;
051import java.io.ByteArrayInputStream;
052import java.nio.ByteBuffer;
053import java.util.concurrent.ExecutorService;
054import java.util.concurrent.Executors;
055import java.util.concurrent.Future;
056import java.nio.file.LinkOption;
057import javax.crypto.Cipher;
058import javax.crypto.CipherInputStream;
059import javax.crypto.CipherOutputStream;
060import javax.crypto.KeyGenerator;
061import javax.crypto.SecretKey;
062import javax.crypto.SecretKeyFactory;
063import javax.crypto.spec.DESKeySpec;
064import javax.crypto.spec.IvParameterSpec;
065import javax.crypto.spec.SecretKeySpec;
066import com.killcoding.file.DiskFile;
067import java.util.Collection;
068import com.killcoding.cache.CacheArray;
069import com.killcoding.cache.CacheArrayFilter;
070import java.nio.file.attribute.BasicFileAttributes;
071import java.util.concurrent.Callable;
072import com.killcoding.datasource.IdentityCard;
073import java.util.Collections;
074import java.util.concurrent.ThreadLocalRandom;
075import com.killcoding.tool.CipherTools;
076import com.killcoding.cache.Cache;
077import java.util.Date;
078
079public class RemoteFile extends BaseFile implements RemoteFileSQL {
080
081        private static final Map<String, DriverDataSource> DATASOURCE_REMOTE_FILE = new ConcurrentHashMap<String, DriverDataSource>();
082
083        private static final Map<String, ConfigProperties> CONFIG_PROPS_REMOTE_FILE = new ConcurrentHashMap<String, ConfigProperties>();
084
085        private static final Map<String, Long> QUEUE_PATH_MAPPING = new ConcurrentHashMap<String, Long>();
086
087        private static final Map<String, ConfigProperties> DATA_MAPPING = new ConcurrentHashMap<String, ConfigProperties>();
088
089        private static final Map<String, List<String>> DATA_MAPPING_LIST = new ConcurrentHashMap<String, List<String>>();
090
091        private static final Map<String, Integer> DATA_MAPPING_INDEX = new ConcurrentHashMap<String, Integer>();
092
093    protected static boolean FIRST_LOADED = false;
094    
095    public static Integer MAX_POOL_SIZE = 100;
096        public static Double MAX_CONNECT_USAGE = 0.5D;
097        public static Double MAX_MEMORY_USAGE = 0.8D;
098        public static Integer MAX_QUEUE_SIZE = 10;
099        public static Long MAX_FILE_SZIE = -1L;
100        public static boolean SHOW_LINK = false;
101        public static boolean SHOW_HIDDEN = false;
102        public static boolean COPY_STRUCTURE_ONLY = false;
103        public static Integer MAX_FILE_PARENT_PATH_LENGTH = 200;
104        public static Integer MAX_FILE_NAME_LENGTH = 200;
105        public static Integer CACHE_SECONDS = 5;
106        public static Date SYNC_CUTOFF_TIME = null;
107        public static Date DELETE_CUTOFF_TIME = null;
108
109        private static IdentityCard identityCard = null;
110        private static File appRemoteFile = null;
111        private DriverDataSource dataSourceReader = null;
112        private DriverDataSource dataSourceWriter = null;
113        private String dataSourceReaderPath = null;
114        private String dataSourceWriterPath = null;
115        private String modifyUserId = null;
116        private Integer filePartSize = null;
117        private Boolean identityCardEnable = false;
118        private String identityCardName = null;
119        private String identityCardPrefix = null;
120        private Integer identityCardSuffix = 0;
121        private Timestamp modifyTime = null;
122
123    protected ConfigProperties configProperties = null;
124    protected String fileId = null;
125    protected String tableName = null;
126        protected Integer dataMappingTable = null;
127        protected String dataMappingDs = null;
128        protected DriverDataSource dataMappingDataSourceReader = null;
129        protected DriverDataSource dataMappingDataSourceWriter = null;
130        protected String dsKey = null;
131
132        public static Long BEFORE_THE_QUEUE_TIME = 60000L;
133        public static Integer BATCH_SIZE = 10;
134        public static String DEFAULE_MODIFY_USER_ID = null;
135
136        private Logger log = null;
137
138        private static ExecutorService mergePool = null;
139
140        public RemoteFile(String path) {
141                super(path);
142                log = LoggerFactory.getLogger(getClass());
143                fileId = CommonTools.generateId(16);
144                initGlobal();
145        }
146
147        public static synchronized void initPool(int poolSize) {
148                if (mergePool == null) {
149                        MAX_POOL_SIZE = poolSize;
150                        LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE);
151                        mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
152                }
153        }
154
155        private synchronized void init() {
156                dsKey = CommonTools.md5(this.appRemoteFile.getAbsolutePath());
157                dataSourceReader = DATASOURCE_REMOTE_FILE.get(String.format("%s.reader", dsKey));
158                dataSourceWriter = DATASOURCE_REMOTE_FILE.get(String.format("%s.writer", dsKey));
159                configProperties = CONFIG_PROPS_REMOTE_FILE.get(dsKey);
160                if (dataSourceReader == null && dataSourceWriter == null) {
161                        try {
162                                if (!appRemoteFile.exists())
163                                        throw new IOException(String.format("Not exist '%s'.", appRemoteFile.getAbsolutePath()));
164
165                                configProperties = new ConfigProperties();
166                                Runnable updatedRun = new Runnable() {
167                                        @Override
168                                        public void run() {
169                                                try {
170                                                        reload();
171                                                        LoggerFactory.getLogger(RemoteFile.class).mark("RemoteFile Reloaded");
172                                                } catch (Exception e) {
173                                                        log.error(e.getMessage(), e);
174                                                }
175                                        }
176                                };
177                                configProperties.load(appRemoteFile, updatedRun);
178
179                                CONFIG_PROPS_REMOTE_FILE.put(dsKey, configProperties);
180
181                                String dataSourcePath = configProperties.getString("DataSource");
182                                dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath);
183                                dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath);
184                                reload();
185                        } catch (Exception e) {
186                                log.error(e.getMessage(), e);
187                        }
188                }
189                if (configProperties != null) {
190                        String dataSourcePath = configProperties.getString("DataSource");
191                        dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath);
192                        dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath);
193                        reload();
194                }
195                calculateDataMappingDataSourceWriter(this);
196        }
197
198        private synchronized void initGlobal() {
199            if(this.appRemoteFile == null){
200                this.appRemoteFile = CommonTools.getSystemProperties(RemoteFile.class, "APP_REMOTE_FILE",
201                                        "RemoteFile.properties");
202            }
203                init();
204        }
205
206        private synchronized void reload() {
207                try {
208                        LOGIC_TIMEOUT_MS = configProperties.getMilliSeconds("LogicTimeout", 300000L);
209                        LOGIC_CHECK_TIMEOUT_MS = configProperties.getMilliSeconds("LogicCheckTimeout", 60000L);
210                        MAX_FILE_SZIE = configProperties.getFileSize("MaxFileSize", 1024 * 1024 * 1L);
211                        SHOW_HIDDEN = configProperties.getBoolean("ShowHidden", false);
212                        SHOW_LINK = configProperties.getBoolean("ShowLink", false);
213                        COPY_STRUCTURE_ONLY = configProperties.getBoolean("CopyStructureOnly", false);
214                        MAX_QUEUE_SIZE = configProperties.getInteger("MaxQueueSize", 10);
215                        BATCH_SIZE = configProperties.getInteger("BatchSize", 10);
216                        MAX_CONNECT_USAGE = configProperties.getDouble("MaxConnectUsage", 0.5D);
217                        MAX_MEMORY_USAGE = configProperties.getDouble("MaxMemoryUsage", 0.8D);
218                        BEFORE_THE_QUEUE_TIME = configProperties.getMilliSeconds("BeforeTheQueueTime", 15000L);
219                        MAX_FILE_PARENT_PATH_LENGTH = configProperties.getInteger("MaxFileParentPathLength", 200);
220                        MAX_FILE_NAME_LENGTH = configProperties.getInteger("MaxFileNameLength", 200);
221                        CACHE_SECONDS = configProperties.getInteger("CacheSeconds", 5);
222                        
223                        SYNC_CUTOFF_TIME = configProperties.getDateTime("SyncCutoffTime","0000-01-01 00:00:00.0000");
224                        DELETE_CUTOFF_TIME = configProperties.getDateTime("DeleteCutoffTime","0000-01-01 00:00:00.0000");
225                        
226                        if(BEFORE_THE_QUEUE_TIME < 0) BEFORE_THE_QUEUE_TIME = 15000L;
227                        
228                        if(MAX_CONNECT_USAGE < 0) MAX_CONNECT_USAGE = 0D;
229                        
230                        if(MAX_MEMORY_USAGE < 0) MAX_MEMORY_USAGE = 0D;
231                        
232                        if(BATCH_SIZE <= 0) BATCH_SIZE = 10;
233                        
234                        if(MAX_QUEUE_SIZE <= 0) MAX_QUEUE_SIZE = 10;
235                        
236                        if(MAX_FILE_SZIE <= 0) MAX_FILE_SZIE = 1024 * 1024 * 1L;
237                        
238                        if(LOGIC_TIMEOUT_MS <= 0) LOGIC_TIMEOUT_MS = 900000L;
239                        
240                        if(LOGIC_CHECK_TIMEOUT_MS < 0) LOGIC_CHECK_TIMEOUT_MS = 60000L;
241                        
242                        if(MAX_FILE_PARENT_PATH_LENGTH <= 0) MAX_FILE_PARENT_PATH_LENGTH = 200;
243                        
244                        if(MAX_FILE_NAME_LENGTH <= 0) MAX_FILE_NAME_LENGTH = 200;
245                        
246                        if(CACHE_SECONDS < 0) CACHE_SECONDS = 0;
247
248                        log.debug("LOGIC_TIMEOUT_MS={}", LOGIC_TIMEOUT_MS);
249                        log.debug("LOGIC_CHECK_TIMEOUT_MS={}", LOGIC_CHECK_TIMEOUT_MS);
250                        log.debug("MAX_FILE_SZIE={}", MAX_FILE_SZIE);
251                        log.debug("SHOW_HIDDEN={}", SHOW_HIDDEN);
252                        log.debug("SHOW_LINK={}", SHOW_LINK);
253                        log.debug("COPY_STRUCTURE_ONLY={}", COPY_STRUCTURE_ONLY);
254                        log.debug("MAX_QUEUE_SIZE={}", MAX_QUEUE_SIZE);
255                        log.debug("BATCH_SIZE={}", BATCH_SIZE);
256                        log.debug("MAX_CONNECT_USAGE={}", MAX_CONNECT_USAGE);
257                        log.debug("MAX_MEMORY_USAGE={}", MAX_MEMORY_USAGE);
258                        log.debug("MAX_FILE_PARENT_PATH_LENGTH={}", MAX_FILE_PARENT_PATH_LENGTH);
259                        log.debug("MAX_FILE_NAME_LENGTH={}", MAX_FILE_NAME_LENGTH);
260                        log.debug("CACHE_SECONDS={}", CACHE_SECONDS);
261
262                        String dataSourcePath = configProperties.getString("DataSource");
263                        String reloadDataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath);
264                        String reloadDataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath);
265                        filePartSize = configProperties.getFileSize("FilePartSize", 1024 * 1024 * 1L).intValue();
266                        
267                        if(filePartSize <= 0) filePartSize = 1024 * 1024 * 1;
268                        
269                        tableName = configProperties.getString("TableName", "remote_file");
270                        identityCardEnable = configProperties.getBoolean("IdentityCardEnable", false);
271                        identityCardName = configProperties.getString("IdentityCardName", "RemoteFile");
272                        identityCardPrefix = configProperties.getString("IdentityCardPrefix", "RMF");
273                        identityCardSuffix = configProperties.getInteger("IdentityCardSuffix", 0);
274
275                        String readerDsKey = String.format("%s.reader", dsKey);
276                        String writerDsKey = String.format("%s.writer", dsKey);
277                        /**
278                        May be referenced when expansion is needed
279                        if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) {
280                                DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(readerDsKey);
281                                if (currentDs != null) {
282                                        currentDs.closeAll();
283                        }
284                        **/
285                        if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) {
286                                dataSourceReaderPath = reloadDataSourceReaderPath;
287                                dataSourceReader = new DriverDataSource(new File(dataSourceReaderPath));
288                                DATASOURCE_REMOTE_FILE.put(readerDsKey, dataSourceReader);
289                        }
290                        /**
291                        May be referenced when expansion is needed
292                        if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) {
293                                DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(writerDsKey);
294                                if (currentDs != null) {
295                                        currentDs.closeAll();
296                        }
297                        **/
298                        if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) {
299                                dataSourceWriterPath = reloadDataSourceWriterPath;
300                                dataSourceWriter = new DriverDataSource(new File(dataSourceWriterPath));
301                                DATASOURCE_REMOTE_FILE.put(writerDsKey, dataSourceWriter);
302                        }
303
304                        loadDataMapping(this);
305                        
306                        FIRST_LOADED = true;
307                } catch (Exception e) {
308                        log.error(e.getMessage(), e);
309                }
310        }
311
312        public String formatSql(String sql) {
313                return String.format(sql, tableName);
314        }
315
316        public String formatSqlForFilePartData(String sql) {
317                return String.format(sql, getDataMappingTableName());
318        }
319
320        public DiskFile writeToDisk(DiskFile diskFile) throws IOException {
321                if (!exists()) {
322                        log.mark(String.format("The remote file '%s' does not exist.", this.getPath()));
323                }
324                RemoteFile the = this;
325
326                Timestamp rfModifyTime = the.getModifyTime();
327                if (rfModifyTime != null) {
328                        diskFile.setModifyTimeMsFromClock(rfModifyTime.getTime());
329                }
330                if (isLink()) {
331                        if (isDebug(the)) {
332                                LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", diskFile.getPath());
333                        }
334                        diskFile.createLink(new String(readAllBytes(),BaseFile.CHARSET));
335                } else if (isFile()) {
336                        merge(new FilePart() {
337                                final List<byte[]> partDataList = new ArrayList<byte[]>();
338                                boolean isWritingDf = false;
339                                DiskFile realDf = null;
340
341                                @Override
342                                protected void process(int partIndex, byte[] data) throws IOException {
343                                        Thread.currentThread()
344                                                        .setName(String.format("RemoteFile-writeToDisk-merge-%s", diskFile.getOrigin().getName()));
345                                        if (syncRoot != null) {
346                                                if (!diskFile.isLogicModify()) {
347                                                        throw new IOException(
348                                                                        String.format("The file '%s' missing modify logic lock.", the.getPath()));
349                                                }
350                                        }
351                                        if (diskFile != null)
352                                                diskFile.logicModify();
353                                        try {
354                                                String fileName = diskFile.getOrigin().getName();
355                                                isWritingDf = fileName.matches(TMP_WRITING_SWP);
356                                                if (isWritingDf) {
357                                                        if (partIndex == 0) {
358                                                                if (diskFile.exists()) {
359                                                                        diskFile.delete();
360                                                                }
361                                                                if (isDebug(the)) {
362                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}",
363                                                                                        diskFile.getPath());
364                                                                }
365                                                                String realFileName = fileName.replaceFirst(BaseFile.TMP_WRITING_FOR_REPLACE, "");
366                                                                String realDfPath = String.format("%s/%s", diskFile.getParent(), realFileName);
367                                                                realDf = new DiskFile(realDfPath);
368                                                                realDf.copyAttrs(realDf, diskFile.isCopyStructureOnly(), diskFile.syncRoot);
369                                                                if(!diskFile.manualOpen(true)){
370                                                                        throw new IOException(
371                                                                                        String.format("The disk file '%s' cannot open.", diskFile.getPath()));  
372                                                                }
373                                                        }
374                                                        partDataList.add(decrypt(data));
375                                                        if (partDataList.size() >= BATCH_SIZE) {
376
377                                                                if (realDf != null)
378                                                                        realDf.logicModify();
379
380                                                                ByteArrayOutputStream ops = new ByteArrayOutputStream();
381                                                                int size = partDataList.size();
382                                                                for (int i = 0; i < size; i++) {
383                                                                        byte[] partData = partDataList.get(i);
384                                                                        ops.write(partData);
385                                                                }
386                                                                diskFile.manualWrite(ops.toByteArray());
387                                                                partDataList.clear();
388                                                        }
389                                                } else {
390                                                        if (partIndex == 0) {
391                                                                if (diskFile.exists()) {
392                                                                        throw new IOException(
393                                                                                        String.format("The remote file '%s' already exist.", diskFile.getPath()));
394                                                                }
395                                                                if (isDebug(the)) {
396                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}",
397                                                                                        diskFile.getPath());
398                                                                }
399                                                                if(!diskFile.manualOpen(true)){
400                                                                        throw new IOException(
401                                                                                        String.format("The disk file '%s' cannot open.", diskFile.getPath()));
402                                                                }
403                                                        }
404                                                        partDataList.add(decrypt(data));
405                                                        if (partDataList.size() >= BATCH_SIZE) {
406                                                                ByteArrayOutputStream ops = new ByteArrayOutputStream();
407                                                                int size = partDataList.size();
408                                                                for (int i = 0; i < size; i++) {
409                                                                        byte[] partData = partDataList.get(i);
410                                                                        ops.write(partData);
411                                                                }
412                                                                diskFile.manualWrite(ops.toByteArray());
413                                                                partDataList.clear();
414                                                        }
415                                                }
416
417                                        } catch (Exception e) {
418                                                log.warn(e.getMessage(), e);
419                                                if (diskFile != null) {
420                                                        diskFile.manualClose();
421                                                        diskFile.delete();
422                                                }
423                                                throw new IOException(e.getMessage(), e);
424                                        }
425                                }
426
427                                @Override
428                                protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) throws IOException {
429                                        if (partDataList.size() > 0) {
430                                                ByteArrayOutputStream ops = new ByteArrayOutputStream();
431                                                int size = partDataList.size();
432                                                for (int i = 0; i < size; i++) {
433                                                        byte[] partData = partDataList.get(i);
434                                                        ops.write(partData);
435                                                }
436                                                diskFile.manualWrite(ops.toByteArray());
437                                                partDataList.clear();
438                                        }
439                                        diskFile.manualClose();
440                                        if (syncRoot != null && !diskFile.isLogicModify()) {
441                                                diskFile.delete();
442                                                throw new IOException(
443                                                                String.format("The file '%s' missing modify logic lock.", diskFile.getPath()));
444                                        }
445                                        try {
446                                                if (isWritingDf) {
447                                                        if (diskFile.exists()) {
448                                                                diskFile.moveTo(realDf.getPath());
449                                                                if (isDebug(the)) {
450                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CompletedDiskFile - {}",
451                                                                                        realDf.getPath());
452                                                                }
453                                                        }
454                                                        if (realDf.exists())
455                                                                the.syncedToDisk();
456                                                }
457                                                if (realDf == null) {
458                                                        if (diskFile != null) {
459                                                                if (diskFile.getOrigin().getName().startsWith(".")) {
460                                                                        Files.setAttribute(Paths.get(diskFile.getPath()), "dos:hidden", true);
461                                                                }
462                                                        }
463                                                } else {
464                                                        if (realDf.getOrigin().getName().startsWith(".")) {
465                                                                Files.setAttribute(Paths.get(realDf.getPath()), "dos:hidden", true);
466                                                        }
467                                                }
468
469                                                RemoteFile.removeQueuePathMapping(the.getPath());
470
471                                                if (diskFile != null)
472                                                        diskFile.removeLogicModify();
473
474                                                if (realDf != null)
475                                                        realDf.removeLogicModify();
476
477                                        } catch (Exception e) {
478                                                log.error(e.getMessage(), e);
479                                        }
480                                }
481
482                                @Override
483                                protected void ended(int lastPartIndex, long fileSize) {
484                                        try {
485                                                if (diskFile != null) {
486                                                        diskFile.manualClose();
487                                                }
488                                        } catch (Exception e) {
489                                                log.error(e.getMessage(), e);
490                                        }
491                                }
492                        });
493                } else {
494                        List<Map<String, Object>> writeList = list(getPath(), "%");
495                        writeList.addAll(list(parsePath(getPath() + "/%"), "%"));
496
497                        int size = writeList.size();
498                        if (size > 0) {
499                                if (!diskFile.exists()) {
500                                        if (isDebug(the)) {
501                                                log.mark("CreateDiskDir - {} ", diskFile.getPath());
502                                        }
503                                        diskFile.mkdirs();
504                                }
505                        }
506
507                        for (Map<String, Object> item : writeList) {
508                                RemoteFile rf = new RemoteFile(
509                                                String.format("%s/%s", item.get("file_parent_path"), item.get("file_name")));
510
511                                if (rf.getPath().equals(getPath()))
512                                        continue;
513
514                                String dfPath = rf.getPath().replaceFirst(getPath(), diskFile.getPath());
515                                DiskFile df = new DiskFile(dfPath);
516                                df.copyAttrs(df, false, this.syncRoot);
517                                Timestamp mt = rf.getModifyTime();
518                                if (mt != null) {
519                                        df.setModifyTimeMsFromClock(mt.getTime());
520                                }
521                                if (rf.isDir()) {
522                                        if (!df.exists()) {
523                                                if (isDebug(the)) {
524                                                        log.mark("CreateDiskDir - {}", diskFile.getPath());
525                                                }
526                                                df.mkdirs();
527                                        }
528                                } else {
529                                        rf.writeToDisk(df);
530                                }
531                        }
532                }
533                return diskFile;
534        }
535
536        public DiskFile writeToDisk() throws IOException {
537                DiskFile df = new DiskFile(this.origin.getAbsolutePath());
538                df.copyAttrs(df, false, this.syncRoot);
539                return writeToDisk(df);
540        }
541
542        public DiskFile writeToDisk(String path) throws IOException {
543                DiskFile df = new DiskFile(path);
544                df.copyAttrs(df, false, this.syncRoot);
545                return writeToDisk(df);
546        }
547
548        public boolean isCompleted() throws IOException {
549                return getCompletedRecord() != null;
550        }
551
552        public Map<String, Object> getCompletedRecord() throws IOException {
553                String sql = formatSql(SQL_SELECT_FOR_COMPLETED);
554                Map dataRecord = new HashMap<String, Object>();
555                dataRecord.put("file_name", origin.getName());
556                dataRecord.put("file_parent_path", getParent());
557                String cacheKey = String.format("first-%s-%s",sql,dataRecord);
558                Map<String,Object> record = (Map<String,Object>)Cache.get(cacheKey);
559                if(record == null){
560                        CacheDriverExecutor executor = null;
561                        Connection conn = null;
562                        try {
563                                conn = dataSourceReader.getConnection();
564                                conn.setAutoCommit(true);
565                                executor = new CacheDriverExecutor(conn);
566                                record = executor.first(sql, dataRecord);
567                                if(record != null){
568                                    Cache.set(cacheKey,record,CACHE_SECONDS);
569                                }
570                        } catch (SQLException e) {
571                                throw new IOException(e.getMessage(), e);
572                        } finally {
573                                try {
574                                        if (executor != null)
575                                                executor.close();
576                                } catch (Exception e) {
577                                        log.error(e.getMessage(), e);
578                                }
579                        }
580                }
581                return record;
582        }
583
584        public Map<String, Object> getRecordById(Object id) throws IOException {
585                String sql = formatSql(SQL_SELECT_GET_RECORD_BY_ID);
586                Map dataRecord = new HashMap<String, Object>();
587                dataRecord.put("id", id);
588                Map<String, Object> record = null;
589                if (record == null) {
590                        CacheDriverExecutor executor = null;
591                        Connection conn = null;
592                        try {
593                                conn = dataSourceReader.getConnection();
594                                conn.setAutoCommit(true);
595                                executor = new CacheDriverExecutor(conn);
596                                record = executor.first(sql, dataRecord);
597                        } catch (SQLException e) {
598                                throw new IOException(e.getMessage(), e);
599                        } finally {
600                                try {
601                                        if (executor != null)
602                                                executor.close();
603                                } catch (Exception e) {
604                                        log.error(e.getMessage(), e);
605                                }
606                        }
607                }
608                return record;
609        }
610
611        public Map<String, Object> getRecordByFileId(Object fileId) throws IOException {
612                String sql = formatSql(SQL_SELECT_GET_RECORD_BY_FILE_ID);
613                Map dataRecord = new HashMap<String, Object>();
614                dataRecord.put("file_id", fileId);
615                Map<String, Object> record = null;
616                if (record == null) {
617                        CacheDriverExecutor executor = null;
618                        Connection conn = null;
619                        try {
620                                conn = dataSourceReader.getConnection();
621                                conn.setAutoCommit(true);
622                                executor = new CacheDriverExecutor(conn);
623                                record = executor.first(sql, dataRecord);
624                        } catch (SQLException e) {
625                                throw new IOException(e.getMessage(), e);
626                        } finally {
627                                try {
628                                        if (executor != null)
629                                                executor.close();
630                                } catch (Exception e) {
631                                        log.error(e.getMessage(), e);
632                                }
633                        }
634                }
635                return record;
636        }
637
638        public Map<String, Object> getLastUpdateRecord() throws IOException {
639                String sql = formatSql(SQL_SELECT_FOR_LAST_UPDATE);
640                Map dataRecord = new HashMap<String, Object>();
641                dataRecord.put("file_name", origin.getName());
642                dataRecord.put("file_parent_path", getParent());
643                Map<String, Object> record = null;
644                CacheDriverExecutor executor = null;
645                Connection conn = null;
646                try {
647                        conn = dataSourceReader.getConnection();
648                        conn.setAutoCommit(true);
649                        executor = new CacheDriverExecutor(conn);
650                        record = executor.first(sql, dataRecord);
651                        return record;
652                } catch (SQLException e) {
653                        throw new IOException(e.getMessage(), e);
654                } finally {
655                        try {
656                                if (executor != null)
657                                        executor.close();
658                        } catch (Exception e) {
659                                log.error(e.getMessage(), e);
660                        }
661                }
662        }
663        
664        protected Timestamp getFirstCreatedAt() throws IOException {
665                String sql = formatSql(SQL_SELECT_FOR_FIRST_CREATED_AT);
666                Map dataRecord = new HashMap<String, Object>();
667                dataRecord.put("file_parent_path", getPath());
668                dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
669                dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%");
670                CacheDriverExecutor executor = null;
671                Connection conn = null;
672                try {
673                        conn = dataSourceReader.getConnection();
674                        conn.setAutoCommit(true);
675                        executor = new CacheDriverExecutor(conn);
676                        Map<String, Object> record = executor.first(sql, dataRecord);
677                        return record == null ? null : (Timestamp)record.get("created_at");
678                } catch (SQLException e) {
679                        throw new IOException(e.getMessage(), e);
680                } finally {
681                        try {
682                                if (executor != null)
683                                        executor.close();
684                        } catch (Exception e) {
685                                log.error(e.getMessage(), e);
686                        }
687                }
688        }       
689        
690        protected Integer getMaxFilePartDataTable() throws IOException {
691                String sql = formatSql(SQL_SELECT_FOR_MAX_DATA_TABLE);
692                Map dataRecord = new HashMap<String, Object>();
693                dataRecord.put("file_parent_path", getPath());
694                dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
695                CacheDriverExecutor executor = null;
696                Connection conn = null;
697                try {
698                        conn = dataSourceReader.getConnection();
699                        conn.setAutoCommit(true);
700                        executor = new CacheDriverExecutor(conn);
701                        Map<String, Object> record = executor.first(sql, dataRecord);
702                        return record == null ? 0 : Integer.parseInt(record.get("file_part_data_table")+"");
703                } catch (SQLException e) {
704                        throw new IOException(e.getMessage(), e);
705                } finally {
706                        try {
707                                if (executor != null)
708                                        executor.close();
709                        } catch (Exception e) {
710                                log.error(e.getMessage(), e);
711                        }
712                }
713        }       
714
715        private Map<String, Object> getSelectByPathForCheckExists() throws IOException {
716                String sql = formatSql(SQL_SELECT_BY_PATH_FOR_CHECK_EXISTS);
717                Map dataRecord = new HashMap<String, Object>();
718                dataRecord.put("file_name", origin.getName());
719                dataRecord.put("file_parent_path", getParent());
720                Map<String, Object> record = null;
721                if (record == null) {
722                        CacheDriverExecutor executor = null;
723                        Connection conn = null;
724                        try {
725                                conn = dataSourceReader.getConnection();
726                                conn.setAutoCommit(true);
727                                executor = new CacheDriverExecutor(conn);
728                                record = executor.first(sql, dataRecord);
729                                return record;
730                        } catch (SQLException e) {
731                                throw new IOException(e.getMessage(), e);
732                        } finally {
733                                try {
734                                        if (executor != null)
735                                                executor.close();
736                                } catch (Exception e) {
737                                        log.error(e.getMessage(), e);
738                                }
739                        }
740                }
741                return record;
742        }
743
744        private int countAllSubFiles(String fileParentPath, String fileName) throws IOException {
745                String sql = formatSql(SQL_SELECT_FOR_COUNT_ALL_SUB_FILES);
746                Map dataRecord = new HashMap<String, Object>();
747                dataRecord.put("file_name", fileName);
748                dataRecord.put("file_parent_path", fileParentPath);
749                Map<String, Object> record = null;
750                if (record == null) {
751                        CacheDriverExecutor executor = null;
752                        Connection conn = null;
753                        try {
754                                conn = dataSourceReader.getConnection();
755                                conn.setAutoCommit(true);
756                                executor = new CacheDriverExecutor(conn);
757                                record = executor.first(sql, dataRecord);
758                                return Integer.parseInt(record.get("as_count") + "");
759                        } catch (SQLException e) {
760                                throw new IOException(e.getMessage(), e);
761                        } finally {
762                                try {
763                                        if (executor != null)
764                                                executor.close();
765                                } catch (Exception e) {
766                                        log.error(e.getMessage(), e);
767                                }
768                        }
769                }
770                return -1;
771        }
772
773        private Map<String, Object> getFirstFilePart() throws IOException {
774                String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE);
775                Map dataRecord = new HashMap<String, Object>();
776                dataRecord.put("file_name", origin.getName());
777                dataRecord.put("file_parent_path", getParent());
778                String cacheKey = String.format("first-%s-%s",sql,dataRecord);
779                Map<String, Object> record = (Map<String,Object>)Cache.get(cacheKey);
780                if(record == null){
781                        CacheDriverExecutor executor = null;
782                        Connection conn = null;
783                        try {
784                                conn = dataSourceReader.getConnection();
785                                conn.setAutoCommit(true);
786                                executor = new CacheDriverExecutor(conn);
787                                record = executor.first(sql, dataRecord);
788                                if(record != null){
789                                    Cache.set(cacheKey,record,CACHE_SECONDS);
790                                }
791                        } catch (SQLException e) {
792                                throw new IOException(e.getMessage(), e);
793                        } finally {
794                                try {
795                                        if (executor != null)
796                                                executor.close();
797                                } catch (Exception e) {
798                                        log.error(e.getMessage(), e);
799                                }
800                        }
801                }
802                return record;
803        }
804        
805        private Map<String, Object> getFirstFilePartRefresh() throws IOException {
806                String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE);
807                Map dataRecord = new HashMap<String, Object>();
808                dataRecord.put("file_name", origin.getName());
809                dataRecord.put("file_parent_path", getParent());
810                String cacheKey = String.format("first-%s-%s",sql,dataRecord);
811                Cache.remove(cacheKey);
812                CacheDriverExecutor executor = null;
813                Connection conn = null;
814                try {
815                        conn = dataSourceReader.getConnection();
816                        conn.setAutoCommit(true);
817                        executor = new CacheDriverExecutor(conn);
818                        Map<String,Object> record =  executor.first(sql, dataRecord);
819                        if(record != null){
820                            Cache.set(cacheKey,record,CACHE_SECONDS);
821                        }
822                        return record;
823                } catch (SQLException e) {
824                        throw new IOException(e.getMessage(), e);
825                } finally {
826                        try {
827                                if (executor != null)
828                                        executor.close();
829                        } catch (Exception e) {
830                                log.error(e.getMessage(), e);
831                        }
832                }
833        }       
834
835        private Map<String, Object> getLastIndexRecord() throws IOException {
836                String sql = formatSql(SQL_SELECT_FOR_LAST_FILE_PART);
837                Map dataRecord = new HashMap<String, Object>();
838                dataRecord.put("file_name", origin.getName());
839                dataRecord.put("file_parent_path", getParent());
840                dataRecord.put("file_id", fileId);
841                Map<String, Object> record = null;
842                if (record == null) {
843                        CacheDriverExecutor executor = null;
844                        Connection conn = null;
845                        try {
846                                conn = dataSourceReader.getConnection();
847                                conn.setAutoCommit(true);
848                                executor = new CacheDriverExecutor(conn);
849                                record = executor.first(sql, dataRecord);
850                        } catch (SQLException e) {
851                                throw new IOException(e.getMessage(), e);
852                        } finally {
853                                try {
854                                        if (executor != null)
855                                                executor.close();
856                                } catch (Exception e) {
857                                        log.error(e.getMessage(), e);
858                                }
859                        }
860                }
861                return record;
862        }
863
864        public boolean isTimeout(long timeoutMs) throws IOException {
865                CacheDriverExecutor executor = null;
866                Connection conn = null;
867                try {
868                        Clock clock = new Clock();
869                        conn = dataSourceReader.getConnection();
870                        conn.setAutoCommit(true);
871                        executor = new CacheDriverExecutor(conn);
872                        Map dataRecord = new HashMap<String, Object>();
873                        dataRecord.put("file_name", origin.getName());
874                        dataRecord.put("file_parent_path", getParent());
875                        Map<String, Object> completedRecord = getCompletedRecord();
876
877                        if (completedRecord != null)
878                                return false;
879
880                        Map<String, Object> record = executor.first(formatSql(SQL_SELECT_FOR_TIMEOUT), dataRecord);
881
882                        if (record == null)
883                                return false;
884
885                        boolean completed = executor.first(formatSql(SQL_SELECT_CHECK_COMPLETED), record) != null;
886                        if (!completed) {
887                                boolean timeout = (clock.getTime() - ((Timestamp) record.get("updated_at")).getTime()) > timeoutMs;
888
889                                return timeout;
890                        }
891                        return false;
892                } catch (SQLException e) {
893                        throw new IOException(e.getMessage(), e);
894                } finally {
895                        try {
896                                if (executor != null)
897                                        executor.close();
898                        } catch (Exception e) {
899                                log.error(e.getMessage(), e);
900                        }
901                }
902        }
903
904        private Map<String, Object> readPart(Map<String, Object> rfRecord) throws IOException {
905                CacheDriverExecutor executor = null;
906                Connection conn = null;
907                try {
908                        calculateDataMappingDataSourceReader(this, rfRecord);
909                        if(this.dataMappingTable <= -1){
910                            return null;
911                        }
912                        Clock clock = new Clock();
913                        conn = dataMappingDataSourceReader.getConnection();
914                        conn.setAutoCommit(true);
915                        executor = new CacheDriverExecutor(conn);
916                        Map<String, Object> dataRecord = new HashMap<String, Object>();
917                        dataRecord.put("remote_file_id", rfRecord.get("id"));
918                        return executor.first(formatSqlForFilePartData(SQL_SELECT_BY_RF_ID_FROM_FILE_DATA), dataRecord);
919                } catch (SQLException e) {
920                        throw new IOException(e.getMessage(), e);
921                } finally {
922                        try {
923                                if (executor != null)
924                                        executor.close();
925                        } catch (Exception e) {
926                                log.error(e.getMessage(), e);
927                        }
928                }
929        }
930
931        public void merge(FilePart filePart) throws IOException {
932                if (isLink()) {
933                        throw new IOException(String.format("The file '%s' is a link.", origin.getAbsolutePath()));
934                }
935                if (isDir()) {
936                        throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath()));
937                }
938                CacheDriverExecutor executor = null;
939                Connection conn = null;
940                try {
941                        Clock clock = new Clock();
942                        conn = dataSourceReader.getConnection();
943                        conn.setAutoCommit(true);
944                        executor = new CacheDriverExecutor(conn);
945                        Map dataRecord = getCompletedRecord();
946                        if (dataRecord != null) {
947                                CacheArray rows = new CacheArray();
948                                CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
949                                        long fileSize = 0L;
950                                        Map<String, Object> fp = null;
951                                        Map<String, Object> item = null;
952
953                                        @Override
954                                        public void execute(Integer index, Object o) {
955
956                                                try {
957                                                        item = (Map<String, Object>) o;
958                                                        fp = readPart(item);
959                                                        Object fpData = null;
960                                                        
961                                                        if(fp == null){
962                                                            fpData = new byte[]{};
963                                                        }else{
964                                                            fpData = fp.get("file_part_data");
965                                                        }
966                                                        Thread.currentThread()
967                                                                        .setName(String.format("RemoteFile-merge-%s", item.get("file_name")));
968                                                        byte[] partData = null;
969                                                        if (fpData instanceof byte[]) {
970                                                                partData = (byte[]) fpData;
971                                                        }
972                                                        fileSize += partData.length;
973                                                        filePart.process(index, partData);
974                                                } catch (Exception e) {
975                                                        log.error(e.getMessage(), e);
976                                                        this.terminated = true;
977                                                }
978                                        }
979
980                                        @Override
981                                        public void completed(Integer size) {
982                                                try {
983                                                        if (item != null) {
984                                                                Thread.currentThread()
985                                                                                .setName(String.format("RemoteFile-merge-completed-%s", item.get("file_name")));
986                                                        }
987                                                        filePart.completed(size, new Clock().getSqlTimestamp(), fileSize);
988                                                } catch (IOException e) {
989                                                        log.error(e.getMessage(), e);
990                                                }
991                                        }
992
993                                        @Override
994                                        public void terminated() {
995                                                if (item != null) {
996                                                        Thread.currentThread()
997                                                                        .setName(String.format("RemoteFile-merge-trminated-%s", item.get("file_name")));
998                                                        log.error("Merge terminated: {}/{}", item.get("file_parent_path"), item.get("file_name"));
999                                                }
1000                                        }
1001                                };
1002                                rows.filter(filter);
1003                                executor.find(formatSql(SQL_SELECT_FILE_HAVE_DATA), dataRecord, rows);
1004                        }
1005                } catch (SQLException e) {
1006                        throw new IOException(e.getMessage(), e);
1007                } finally {
1008                        if (executor != null) {
1009                                try {
1010                                        executor.close();
1011                                } catch (Exception e) {
1012                                        throw new IOException(e.getMessage(), e);
1013                                }
1014                        }
1015                }
1016        }
1017
1018        protected void copyFrom(boolean copyStructureOnly, DiskFile diskFile) throws IOException {
1019                var the = this;
1020                if (diskFile.isLink()) {
1021                        the.setModifyTime(diskFile.getModifyTimeForClock());
1022                        the.createLink(diskFile.readLink());
1023                }
1024                if (diskFile.isDir()) {
1025                        the.setModifyTime(diskFile.getModifyTimeForClock());
1026                        the.mkdirs();
1027                }
1028                if (diskFile.isFile()) {
1029                        the.setModifyTime(diskFile.getModifyTimeForClock());
1030                        if (copyStructureOnly) {
1031                                the.createStructureFile(diskFile.size());
1032                                diskFile.removeLogicAccess();
1033                                DiskFile.removeQueuePathMapping(diskFile.getPath());                            
1034                        } else {
1035                                diskFile.split(filePartSize, new FilePart() {
1036                                        final List<byte[]> partDataList = new ArrayList<byte[]>();
1037                                        int currentFilePartStartIndex = 0;
1038
1039                                        @Override
1040                                        protected void process(int partIndex, byte[] data) throws IOException {
1041                                                if (partIndex == 0) {
1042                                                        if (diskFile.syncRoot != null) {
1043                                                                if (diskFile.isLogicModify()) {
1044                                                                        throw new IOException(
1045                                                                                        String.format("The file '%s' is already logic locked by another thread.",
1046                                                                                                        diskFile.getPath()));
1047                                                                }
1048                                                                if (!diskFile.isLogicAccess()) {
1049                                                                        throw new IOException(String.format("The file '%s' missing access logic lock.",
1050                                                                                        diskFile.getPath()));
1051                                                                }
1052                                                        }
1053                                                        if (the.exists(true)) {
1054                                                                throw new IOException(
1055                                                                                String.format("The remote file '%s' already exist.", the.getPath()));
1056                                                        }
1057                                                        if (isDebug(the)) {
1058                                                                LoggerFactory.getLogger(RemoteFile.class).mark("CreateRemoteFile - {}", the.getPath());
1059                                                        }
1060                                                }
1061
1062                                                partDataList.add(data);
1063                                                if (partDataList.size() >= BATCH_SIZE) {
1064                                                        diskFile.logicAccess();
1065                                                        if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){
1066                                                                throw new IOException(String.format("The remote file '%s' write failed.",the.getPath()));
1067                                                        }
1068                                                        partDataList.clear();
1069                                                        currentFilePartStartIndex += BATCH_SIZE;
1070                                                }
1071                                        }
1072
1073                                        @Override
1074                                        protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize)
1075                                                        throws IOException {
1076                                                try {
1077                                                        if (diskFile.syncRoot != null) {
1078                                                                if (diskFile.isLogicModify()) {
1079                                                                        throw new IOException(
1080                                                                                        String.format("The file '%s' is already logic locked by another thread.",
1081                                                                                                        diskFile.getPath()));
1082                                                                }
1083                                                                if (!diskFile.isLogicAccess()) {
1084                                                                        throw new IOException(String.format("The file '%s' missing access logic lock.",
1085                                                                                        diskFile.getPath()));
1086                                                                }
1087                                                        }
1088                                                        /**This is 0 bytes file**/
1089                                                        if (lastPartIndex == -1) {
1090
1091                                                                if (the.exists(true)) {
1092                                                                        the.forceDeleteFile(false);
1093                                                                }
1094
1095                                                                the.setModifyTime(modifyTime);
1096                                                                the.createEmptyFile();
1097                                                        } else {
1098                                                                the.setModifyTime(modifyTime);
1099                                                                if (partDataList.size() > 0) {
1100                                                                        if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){
1101                                                                            throw new IOException(String.format("The remote file '%s' write failed.",the.getPath()));
1102                                                                        }
1103                                                                        partDataList.clear();
1104                                                                }
1105                                                                the.complete(lastPartIndex, fileSize);
1106                                                        }
1107                                                        if (isDebug(the)) {
1108                                                                LoggerFactory.getLogger(RemoteFile.class).mark("CompletedRemoteFile - {}",
1109                                                                                the.getPath());
1110                                                        }
1111                                                        DiskFile.removeQueuePathMapping(diskFile.getPath());
1112                                                } catch (IOException e) {
1113                                                        throw e;
1114                                                }
1115                                        }
1116
1117                                        @Override
1118                                        protected void ended(int lastPartIndex, long fileSize) {
1119
1120                                        }
1121
1122                                });
1123                        }
1124                }
1125        }
1126
1127        private boolean createEmptyFile() throws IOException {
1128            boolean allowed = beforeWrite();
1129            
1130            if(!allowed) return false;
1131                
1132                Clock clock = new Clock();
1133                Timestamp currentTime = clock.getSqlTimestamp();
1134                CacheDriverExecutor executor = null;
1135                Connection conn = null;
1136                try {
1137                    calculateDataMappingDataSourceWriter(this);
1138                        conn = dataSourceWriter.getConnection();
1139                        conn.setAutoCommit(true);
1140                        executor = new CacheDriverExecutor(conn);
1141                        Map dataRecord = new HashMap<String, Object>();
1142                        dataRecord.put("id",
1143                                        generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix));
1144                        dataRecord.put("file_name", origin.getName());
1145                        dataRecord.put("file_parent_path", getParent());
1146                        dataRecord.put("file_type", "F");
1147                        dataRecord.put("file_part_index", 0);
1148                        dataRecord.put("file_part_size", 0);
1149                        dataRecord.put("file_part_data_ds", "NONE");
1150                        dataRecord.put("file_part_data_table", this.dataMappingTable);
1151                        dataRecord.put("file_part_last_index", 0);
1152                        dataRecord.put("file_size", 0);
1153                        dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1154                        dataRecord.put("file_deleted", 0);
1155                        dataRecord.put("created_user_id", getDefaultModifyUserId());
1156                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1157                        dataRecord.put("created_at", currentTime);
1158                        dataRecord.put("updated_at", currentTime);
1159                        dataRecord.put("file_id", fileId);
1160                        dataRecord.put("source_hostname", CommonTools.getHostname());
1161                        dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1162                        dataRecord.put("deleted_on_hostnames", ";");
1163                        executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord);
1164                        
1165                        afterWrite();
1166                        
1167                        return true;
1168                } catch (Exception e) {
1169                        throw new IOException(e.getMessage(), e);
1170                } finally {
1171                        try {
1172                                if (executor != null)
1173                                        executor.close();
1174                        } catch (Exception e) {
1175                                log.error(e.getMessage(), e);
1176                        }
1177                }
1178        }
1179
1180        private boolean createStructureFile(long fileSize) throws IOException {
1181            boolean allowed = beforeWrite();
1182            
1183            if(!allowed) return false;
1184            
1185                log.debug("CreateStructureFile: {}", getPath());
1186                Clock clock = new Clock();
1187                Timestamp currentTime = clock.getSqlTimestamp();
1188                CacheDriverExecutor executor = null;
1189                Connection conn = null;
1190                try {
1191                    calculateDataMappingDataSourceWriter(this);
1192                        conn = dataSourceWriter.getConnection();
1193                        conn.setAutoCommit(true);
1194                        executor = new CacheDriverExecutor(conn);
1195                        Map dataRecord = new HashMap<String, Object>();
1196                        dataRecord.put("id",
1197                                        generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix));
1198                        dataRecord.put("file_name", origin.getName());
1199                        dataRecord.put("file_parent_path", getParent());
1200                        dataRecord.put("file_type", "F");
1201                        dataRecord.put("file_part_index", 0);
1202                        dataRecord.put("file_part_size", fileSize);
1203                        dataRecord.put("file_part_data_ds", "NONE");
1204                        dataRecord.put("file_part_data_table", this.dataMappingTable);
1205                        dataRecord.put("file_part_last_index", 0);
1206                        dataRecord.put("file_size", fileSize);
1207                        dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1208                        dataRecord.put("file_deleted", 0);
1209                        dataRecord.put("created_user_id", getDefaultModifyUserId());
1210                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1211                        dataRecord.put("created_at", currentTime);
1212                        dataRecord.put("updated_at", currentTime);
1213                        dataRecord.put("file_id", fileId);
1214                        dataRecord.put("source_hostname", CommonTools.getHostname());
1215                        dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1216                        dataRecord.put("deleted_on_hostnames", ";");
1217                        executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord);
1218                        
1219                        afterWrite();
1220                        
1221                        return true;
1222                } catch (Exception e) {
1223                        throw new IOException(e.getMessage(), e);
1224                } finally {
1225                        try {
1226                                if (executor != null)
1227                                        executor.close();
1228                        } catch (Exception e) {
1229                                log.error(e.getMessage(), e);
1230                        }
1231                }
1232        }
1233
1234        protected boolean syncedToDisk() throws IOException {
1235                Clock clock = new Clock();
1236                CacheDriverExecutor executor = null;
1237                Connection conn = null;
1238                try {
1239                        conn = dataSourceWriter.getConnection();
1240                        conn.setAutoCommit(true);
1241                        executor = new CacheDriverExecutor(conn);
1242                        Map<String, Object> record = getCompletedRecord();
1243                        if (record == null) {
1244                                log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath()));
1245                        }
1246                        if (record != null) {
1247                            String hostname = CommonTools.getHostname();
1248                                String syncedOnHostnames = (String) record.get("synced_on_hostnames");
1249                                syncedOnHostnames = syncedOnHostnames.replaceAll(String.format(";%s;",hostname),";");
1250                                String shn = String.format("%s%s;", syncedOnHostnames, hostname);
1251                                Map<String, Object> dataRecord = new HashMap<String, Object>();
1252                            dataRecord.put("file_parent_path",record.get("file_parent_path"));
1253                            dataRecord.put("file_name", record.get("file_name"));
1254                                dataRecord.put("synced_on_hostnames",shn);
1255                                dataRecord.put("updated_at", clock.getSqlTimestamp());
1256                                dataRecord.put("updated_user_id", getDefaultModifyUserId());
1257                                executor.execute(formatSql(SQL_UPDATE_FOR_SYNCED_TO_DISK), dataRecord);
1258                                return true;
1259                        }
1260                        return false;
1261                } catch (SQLException e) {
1262                        throw new IOException(e.getMessage(), e);
1263                } finally {
1264                        try {
1265                                if (executor != null)
1266                                        executor.close();
1267                        } catch (Exception e) {
1268                                log.error(e.getMessage(), e);
1269                        }
1270                }
1271        }
1272
1273        protected boolean deletedToDiskAndSub(Object id) throws IOException {
1274                Clock clock = new Clock();
1275                CacheDriverExecutor executor = null;
1276                Connection conn = null;
1277                try {
1278                        conn = dataSourceWriter.getConnection();
1279                        conn.setAutoCommit(true);
1280                        executor = new CacheDriverExecutor(conn);
1281                        Map<String, Object> record = getRecordById(id);
1282                        if (record == null) {
1283                                log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath()));
1284                        }
1285                        if (record != null) {
1286                            String hostname = CommonTools.getHostname();
1287                                String deletedOnHostnames = (String) record.get("deleted_on_hostnames");
1288                                deletedOnHostnames = deletedOnHostnames.replaceAll(String.format(";%s;",hostname),";");
1289                                String dhn = String.format("%s%s;", deletedOnHostnames, hostname);
1290                                Timestamp now = clock.getSqlTimestamp();
1291                                final Map<String, Object> dataRecord = new HashMap<String, Object>();
1292                            dataRecord.put("file_parent_path", record.get("file_parent_path"));
1293                            dataRecord.put("file_name", record.get("file_name"));
1294                                dataRecord.put("deleted_on_hostnames", dhn);
1295                                dataRecord.put("updated_at", now);
1296                                dataRecord.put("updated_user_id", getDefaultModifyUserId());
1297                                executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK), dataRecord);
1298                                if (record.get("file_type").equals("D")) {
1299                                        String dirPath = String.format("%s/%s", record.get("file_parent_path"),
1300                                                        record.get("file_name"));
1301                                        dataRecord.put("file_parent_path", dirPath);
1302                                        dataRecord.put("like_file_parent_path", dirPath + "/%");
1303                                        executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK_SUB_FILES), dataRecord);
1304                                }
1305                                return true;
1306                        }
1307                        return false;
1308                } catch (SQLException e) {
1309                        throw new IOException(e.getMessage(), e);
1310                } finally {
1311                        try {
1312                                if (executor != null)
1313                                        executor.close();
1314                        } catch (Exception e) {
1315                                log.error(e.getMessage(), e);
1316                        }
1317                }
1318        }
1319
1320        private boolean complete(int filePartLastIndex, long fileSize) throws IOException {
1321                Clock clock = new Clock();
1322                CacheDriverExecutor executor = null;
1323                Timestamp currentTime = clock.getSqlTimestamp();
1324                Connection conn = null;
1325                try {
1326                        conn = dataSourceWriter.getConnection();
1327                        conn.setAutoCommit(true);
1328                        executor = new CacheDriverExecutor(conn);
1329                        Map dataRecord = new HashMap<String, Object>();
1330                        dataRecord.put("file_id", fileId);
1331                        dataRecord.put("file_part_last_index", filePartLastIndex);
1332                        dataRecord.put("file_size", fileSize);
1333                        dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1334                        dataRecord.put("updated_at", currentTime);
1335                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1336                        int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord);
1337                        
1338                        afterWrite();
1339                        
1340                        return rows > 0;
1341                } catch (SQLException e) {
1342                        throw new IOException(e.getMessage(), e);
1343                } finally {
1344                        try {
1345                                if (executor != null)
1346                                        executor.close();
1347                        } catch (Exception e) {
1348                                log.error(e.getMessage(), e);
1349                        }
1350                }
1351        }
1352
1353        @Override
1354        public boolean complete() throws IOException {
1355                Clock clock = new Clock();
1356                Timestamp currentTime = clock.getSqlTimestamp();
1357                CacheDriverExecutor executor = null;
1358                Connection conn = null;
1359                try {
1360                        conn = dataSourceWriter.getConnection();
1361                        conn.setAutoCommit(true);
1362                        executor = new CacheDriverExecutor(conn);
1363                        Map dataRecord = new HashMap<String, Object>();
1364                        dataRecord.put("file_name", origin.getName());
1365                        dataRecord.put("file_parent_path", getParent());
1366                        dataRecord.put("file_id", fileId);
1367
1368                        Map<String, Object> sumRecord = executor.first(formatSql(SQL_SELECT_FOR_SUM_FILE_PART_SIZE_INDEX),
1369                                        dataRecord);
1370
1371                        if (sumRecord == null) {
1372                                log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath()));
1373                        }
1374
1375                        if (sumRecord != null) {
1376                                long fileSize = Long.parseLong(sumRecord.get("file_part_size") + "");
1377                                long filePartLastIndex = Integer.parseInt(sumRecord.get("file_part_index") + "");
1378
1379                                dataRecord.put("file_part_last_index", filePartLastIndex);
1380                                dataRecord.put("file_size", fileSize);
1381                                dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1382                                dataRecord.put("updated_at", currentTime);
1383                                dataRecord.put("updated_user_id", getDefaultModifyUserId());
1384                                int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord);
1385                                
1386                                afterWrite();
1387                                
1388                                return rows > 0;
1389                        }
1390                        return false;
1391                } catch (SQLException e) {
1392                        throw new IOException(e.getMessage(), e);
1393                } finally {
1394                        try {
1395                                if (executor != null)
1396                                        executor.close();
1397                        } catch (Exception e) {
1398                                log.error(e.getMessage(), e);
1399                        }
1400                }
1401        }
1402
1403        public boolean forceDelete() throws IOException {
1404                return forceDeleteAll(true);
1405        }
1406
1407        public boolean forceDeleteDir(boolean realDeleted) throws IOException {
1408                return forceDelete(realDeleted, "D");
1409        }
1410
1411        public boolean forceDeleteLink(boolean realDeleted) throws IOException {
1412                return forceDelete(realDeleted, "L");
1413        }
1414
1415        public boolean forceDeleteFile(boolean realDeleted) throws IOException {
1416                return forceDelete(realDeleted, "F");
1417        }
1418
1419        public boolean forceDelete(boolean realDeleted, String fileType) throws IOException {
1420                if (fileType.equalsIgnoreCase("D")) {
1421                        return forceDeleteAll(realDeleted);
1422                } else {
1423                        boolean allowed = beforeDelete(realDeleted);
1424                        if (allowed) {
1425                                CacheDriverExecutor executor = null;
1426                                Connection conn = null;
1427                                try {
1428                                        Clock clock = new Clock();
1429                                        Timestamp currentTime = clock.getSqlTimestamp();
1430                                        conn = dataSourceWriter.getConnection();
1431                                        conn.setAutoCommit(true);
1432                                        executor = new CacheDriverExecutor(conn);
1433                                        Map dataRecord = new HashMap<String, Object>();
1434                                        dataRecord.put("file_deleted", realDeleted ? 1 : 0);
1435                                        dataRecord.put("deleted", realDeleted ? 0 : 1);
1436                                        dataRecord.put("file_name", origin.getName());
1437                                        dataRecord.put("file_type", fileType.toUpperCase());
1438                                        dataRecord.put("file_parent_path", getParent());
1439                                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1440                                        dataRecord.put("deleted_user_id", getDefaultModifyUserId());
1441                                        dataRecord.put("updated_at", currentTime);
1442                                        dataRecord.put("deleted_at", currentTime);
1443                                        dataRecord.put("root_folder", getPath());
1444                                        dataRecord.put("like_subfolder", parsePath(getPath() + "/%"));
1445                                        int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_FILE_LINK), dataRecord);
1446                                        log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows);
1447                    
1448                    afterDelete(realDeleted);
1449                    
1450                                        return resultRows > 0;
1451                                } catch (SQLException e) {
1452                                        throw new IOException(e.getMessage(), e);
1453                                } finally {
1454                                        try {
1455                                                if (executor != null)
1456                                                        executor.close();
1457                                        } catch (Exception e) {
1458                                                log.error(e.getMessage(), e);
1459                                        }
1460                                }
1461                        }
1462                }
1463                return false;
1464        }
1465
1466        public boolean forceDeleteAll(boolean realDeleted) throws IOException {
1467                boolean allowed = beforeDelete(realDeleted);
1468                if (allowed) {
1469                        CacheDriverExecutor executor = null;
1470                        Connection conn = null;
1471                        try {
1472                                Clock clock = new Clock();
1473                                Timestamp currentTime = clock.getSqlTimestamp();
1474                                conn = dataSourceWriter.getConnection();
1475                                conn.setAutoCommit(true);
1476                                executor = new CacheDriverExecutor(conn);
1477                                Map dataRecord = new HashMap<String, Object>();
1478                                dataRecord.put("file_deleted", realDeleted ? 1 : 0);
1479                                dataRecord.put("deleted", realDeleted ? 0 : 1);
1480                                dataRecord.put("file_name", origin.getName());
1481                                dataRecord.put("file_parent_path", getParent());
1482                                dataRecord.put("updated_user_id", getDefaultModifyUserId());
1483                                dataRecord.put("deleted_user_id", getDefaultModifyUserId());
1484                                dataRecord.put("updated_at", currentTime);
1485                                dataRecord.put("deleted_at", currentTime);
1486                                dataRecord.put("root_folder", getPath());
1487                                dataRecord.put("like_subfolder", parsePath(getPath() + "/%"));
1488                                int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_ALL), dataRecord);
1489                                log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows);
1490
1491                afterDelete(realDeleted);
1492                
1493                                return resultRows > 0;
1494                        } catch (SQLException e) {
1495                                throw new IOException(e.getMessage(), e);
1496                        } finally {
1497                                try {
1498                                        if (executor != null)
1499                                                executor.close();
1500                                } catch (Exception e) {
1501                                        log.error(e.getMessage(), e);
1502                                }
1503                        }
1504                }
1505                return false;
1506        }
1507
1508        @Override
1509        public boolean delete() throws IOException {
1510                return forceDeleteAll(true);
1511        }
1512
1513        public boolean delete(boolean realDeleted) throws IOException {
1514                return forceDeleteAll(realDeleted);
1515        }
1516
1517        @Override
1518        public boolean beforeDelete(boolean realDeleted) {
1519                return true;
1520        }
1521        
1522        @Override
1523        public void afterDelete(boolean realDeleted) {
1524            
1525        }       
1526        
1527        @Override
1528        public boolean beforeMkdirs() {
1529            return true;
1530        }
1531        
1532        @Override
1533        public void afterMkdirs() {
1534            
1535        }
1536        
1537        @Override
1538        public boolean beforeCreateLink(String target) {
1539            return true;
1540        }
1541        
1542        @Override
1543        public void afterCreateLink(String target) {
1544            
1545        }
1546        
1547        @Override
1548        public boolean beforeWrite(){
1549            return true;
1550        }
1551        
1552        @Override
1553        public void afterWrite(){
1554            
1555        }       
1556
1557        @Override
1558        public boolean exists() throws IOException {
1559                return getFirstFilePart() != null;
1560        }
1561        
1562        public boolean exists(boolean refresh) throws IOException {
1563            if(refresh){
1564                return getFirstFilePartRefresh() != null;
1565            }else{
1566                    return exists();
1567            }
1568        }       
1569
1570        @Override
1571        public boolean mkdirs() throws IOException {
1572                return mkdirs(false);
1573        }
1574
1575        public boolean mkdirs(boolean checkSourceHostname) throws IOException {
1576            
1577            boolean allowed = beforeMkdirs();
1578            
1579            if(!allowed) return false;
1580            
1581                Clock clock = new Clock();
1582                CacheDriverExecutor executor = null;
1583                Connection conn = null;
1584                try {
1585                        if (getFirstFilePart() != null) {
1586                                return false;
1587                        }
1588                        if (checkSourceHostname) {
1589                                String lastSourceHostname = getLastSourceHostname();
1590                                if (lastSourceHostname != null) {
1591                                        String clientHostname = CommonTools.getHostname();
1592                                        boolean sameHostname = clientHostname.equals(lastSourceHostname);
1593                                        if (!sameHostname) {
1594                                                return false;
1595                                        }
1596                                }
1597                        }
1598                        log.debug("Mkidrs: {}", getPath());
1599                        calculateDataMappingDataSourceWriter(this);
1600                        Timestamp currentTime = clock.getSqlTimestamp();
1601                        conn = dataSourceWriter.getConnection();
1602                        conn.setAutoCommit(true);
1603                        executor = new CacheDriverExecutor(conn);
1604                        Map dataRecord = new HashMap<String, Object>();
1605                        dataRecord.put("id",
1606                                        generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix));
1607                        dataRecord.put("file_name", origin.getName());
1608                        dataRecord.put("file_parent_path", getParent());
1609                        dataRecord.put("file_type", "D");
1610                        dataRecord.put("file_part_index", 0);
1611                        dataRecord.put("file_part_size", 0);
1612                        dataRecord.put("file_part_data_ds", "NONE");
1613                        dataRecord.put("file_part_data_table", this.dataMappingTable);
1614                        dataRecord.put("file_part_last_index", 0);
1615                        dataRecord.put("file_size", 0);
1616                        dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1617                        dataRecord.put("file_deleted", 0);
1618                        dataRecord.put("created_user_id", getDefaultModifyUserId());
1619                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1620                        dataRecord.put("created_at", currentTime);
1621                        dataRecord.put("updated_at", currentTime);
1622                        dataRecord.put("file_id", fileId);
1623                        dataRecord.put("source_hostname", CommonTools.getHostname());
1624                        dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1625                        dataRecord.put("deleted_on_hostnames", ";");
1626                        int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord);
1627                        
1628                        afterMkdirs();
1629                        
1630                        return rows > 0;
1631                } catch (Exception e) {
1632                        throw new IOException(e.getMessage(), e);
1633                } finally {
1634                        try {
1635                                if (executor != null)
1636                                        executor.close();
1637                        } catch (Exception e) {
1638                                log.error(e.getMessage(), e);
1639                        }
1640                }
1641        }
1642
1643        @Override
1644        public boolean write(byte[] data, boolean append) throws IOException {
1645                return writeTo(data, append);
1646        }
1647
1648        public synchronized boolean writeBlock(byte[] data, boolean append) throws IOException {
1649                return writeTo(data, append);
1650        }
1651
1652        private boolean writeToBatch(int currentFilePartStartIndex, List<byte[]> partDataList) throws IOException {
1653            
1654            boolean allowed = beforeWrite();
1655            
1656            if(!allowed) return false;
1657            
1658                CacheDriverExecutor executor = null;
1659                CacheDriverExecutor dmExecutor = null;
1660                List<Map<String, Object>> rfList = new ArrayList<Map<String, Object>>();
1661                List<Map<String, Object>> dmList = new ArrayList<Map<String, Object>>();
1662                try {
1663                        Clock clock = new Clock();
1664                        log.debug("Write: {}", getPath());
1665                        int size = partDataList.size();
1666                        Timestamp currentTime = clock.getSqlTimestamp();
1667                        String hostname = CommonTools.getHostname();
1668                        calculateDataMappingDataSourceWriter(this);
1669                        for (int filePartIndex = 0; filePartIndex < size; filePartIndex++) {
1670                                byte[] data = partDataList.get(filePartIndex);
1671                                String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix,
1672                                                identityCardSuffix);
1673                                Map rfRecord = new HashMap<String, Object>();
1674                                rfRecord.put("id", remoteFileId);
1675                                rfRecord.put("file_name", origin.getName());
1676                                rfRecord.put("file_parent_path", getParent());
1677                                rfRecord.put("file_type", "F");
1678                                rfRecord.put("file_part_index", filePartIndex + currentFilePartStartIndex);
1679                                rfRecord.put("file_part_size", data.length);
1680                                rfRecord.put("file_part_data_ds", this.dataMappingDs);
1681                                rfRecord.put("file_part_data_table", this.dataMappingTable);
1682                                rfRecord.put("file_part_last_index", -1);
1683                                rfRecord.put("file_size", 0);
1684                                rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1685                                rfRecord.put("file_deleted", 0);
1686                                rfRecord.put("created_user_id", getDefaultModifyUserId());
1687                                rfRecord.put("updated_user_id", getDefaultModifyUserId());
1688                                rfRecord.put("created_at", currentTime);
1689                                rfRecord.put("updated_at", currentTime);
1690                                rfRecord.put("file_id", fileId);
1691                                rfRecord.put("source_hostname", hostname);
1692                                rfRecord.put("synced_on_hostnames", String.format(";%s;", hostname));
1693                                rfRecord.put("deleted_on_hostnames", ";");
1694                                rfList.add(rfRecord);
1695
1696                                Map<String, Object> dataRecord = new HashMap<String, Object>();
1697                                dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this));
1698                                dataRecord.put("file_part_data", encrypt(data));
1699                                dataRecord.put("remote_file_id", remoteFileId);
1700                                dataRecord.put("file_id", fileId);
1701                                dataRecord.put("created_at", currentTime);
1702                                dataRecord.put("updated_at", currentTime);
1703                                dmList.add(dataRecord);
1704                        }
1705                        Connection conn = dataSourceWriter.getConnection();
1706                        conn.setAutoCommit(true);
1707                        executor = new CacheDriverExecutor(conn);
1708                        executor.executeBatch(formatSql(SQL_INSERT_INTO_FILE), rfList);
1709
1710                        conn = dataMappingDataSourceWriter.getConnection();
1711                        conn.setAutoCommit(true);
1712                        dmExecutor = new CacheDriverExecutor(conn);
1713                        dmExecutor.executeBatch(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dmList);
1714                        
1715                        return true;
1716                } catch (Exception e) {
1717                        throw new IOException(e.getMessage(), e);
1718                } finally {
1719                        try {
1720                                if (executor != null)
1721                                        executor.close();
1722                        } catch (Exception e) {
1723                                log.error(e.getMessage(), e);
1724                        }
1725                        try {
1726                                if (dmExecutor != null)
1727                                        dmExecutor.close();
1728                        } catch (Exception e) {
1729                                log.error(e.getMessage(), e);
1730                        }
1731                }
1732        }
1733
1734        private boolean writeTo(byte[] data, boolean append) throws IOException {
1735            
1736            boolean allowed = beforeWrite();
1737            
1738            if(!allowed) return false;
1739            
1740                CacheDriverExecutor executor = null;
1741                CacheDriverExecutor dmExecutor = null;
1742                try {
1743                        if (!append) {
1744                                if (exists())
1745                                        throw new IOException(
1746                                                        String.format("The remote file '%s' already exists.", origin.getAbsolutePath()));
1747                        }
1748                        Clock clock = new Clock();
1749                        Timestamp currentTime = clock.getSqlTimestamp();
1750                        Map<String, Object> lastRecord = getLastIndexRecord();
1751                        int filePartIndex = 0;
1752                        if (lastRecord != null) {
1753                                if (lastRecord.get("file_type").equals("D")) {
1754                                        throw new IOException(String.format("This is a folder '%s'.", origin.getAbsolutePath()));
1755                                }
1756                                if (lastRecord.get("file_type").equals("L")) {
1757                                        throw new IOException(String.format("This is a link '%s'.", origin.getAbsolutePath()));
1758                                }
1759                                if (append) {
1760                                        filePartIndex = Integer.parseInt(lastRecord.get("file_part_index") + "") + 1;
1761                                }
1762                                boolean isDeleted = false;
1763                                Object deletedValue = lastRecord.get("deleted");
1764                                if (deletedValue instanceof Boolean) {
1765                                        isDeleted = (Boolean) deletedValue;
1766                                } else {
1767                                        isDeleted = Integer.parseInt(deletedValue + "") == 1;
1768                                }
1769                                if (isDeleted) {
1770                                        throw new IOException(String.format("The file '%s' is deleted.", origin.getAbsolutePath()));
1771                                }
1772                        }
1773                        calculateDataMappingDataSourceWriter(this);
1774                        log.debug("Write: {}", getPath());
1775                        String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix,
1776                                        identityCardSuffix);
1777                        Map rfRecord = new HashMap<String, Object>();
1778                        rfRecord.put("id", remoteFileId);
1779                        rfRecord.put("file_name", origin.getName());
1780                        rfRecord.put("file_parent_path", getParent());
1781                        rfRecord.put("file_type", "F");
1782                        rfRecord.put("file_part_index", filePartIndex);
1783                        rfRecord.put("file_part_size", data.length);
1784                        rfRecord.put("file_part_data_ds", this.dataMappingDs);
1785                        rfRecord.put("file_part_data_table", this.dataMappingTable);
1786                        rfRecord.put("file_part_last_index", -1);
1787                        rfRecord.put("file_size", 0);
1788                        rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1789                        rfRecord.put("file_deleted", 0);
1790                        rfRecord.put("created_user_id", getDefaultModifyUserId());
1791                        rfRecord.put("updated_user_id", getDefaultModifyUserId());
1792                        rfRecord.put("created_at", currentTime);
1793                        rfRecord.put("updated_at", currentTime);
1794                        rfRecord.put("file_id", fileId);
1795                        rfRecord.put("source_hostname", CommonTools.getHostname());
1796                        rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1797                        rfRecord.put("deleted_on_hostnames", ";");
1798
1799                        Connection conn = dataSourceWriter.getConnection();
1800                        conn.setAutoCommit(true);
1801                        executor = new CacheDriverExecutor(conn);
1802                        executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord);
1803
1804                        Map dataRecord = new HashMap<String, Object>();
1805                        dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this));
1806                        dataRecord.put("file_part_data", encrypt(data));
1807                        dataRecord.put("remote_file_id", remoteFileId);
1808                        dataRecord.put("file_id", fileId);
1809                        dataRecord.put("created_at", currentTime);
1810                        dataRecord.put("updated_at", currentTime);
1811
1812                        conn = dataMappingDataSourceWriter.getConnection();
1813                        conn.setAutoCommit(true);
1814                        dmExecutor = new CacheDriverExecutor(conn);
1815                        dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord);
1816
1817                        return true;
1818                } catch (Exception e) {
1819                        throw new IOException(e.getMessage(), e);
1820                } finally {
1821                        try {
1822                                if (executor != null)
1823                                        executor.close();
1824                        } catch (Exception e) {
1825                                log.error(e.getMessage(), e);
1826                        }
1827                        try {
1828                                if (dmExecutor != null)
1829                                        dmExecutor.close();
1830                        } catch (Exception e) {
1831                                log.error(e.getMessage(), e);
1832                        }
1833                }
1834        }
1835
1836        @Override
1837        public boolean write(byte[] data) throws IOException {
1838                return write(data, false);
1839        }
1840
1841        @Override
1842        public boolean write(String data, boolean append) throws IOException {
1843                return write(data.getBytes(CHARSET), append);
1844        }
1845
1846        @Override
1847        public boolean write(String data) throws IOException {
1848                return write(data.getBytes(CHARSET));
1849        }
1850
1851        @Override
1852        public boolean createLink(String target) throws IOException {
1853            boolean allowed = beforeCreateLink(target);
1854            
1855            if(!allowed) return false;
1856            
1857                Clock clock = new Clock();
1858                CacheDriverExecutor executor = null;
1859                CacheDriverExecutor dmExecutor = null;
1860                try {
1861                        if (getFirstFilePart() != null) {
1862                                return false;
1863                        }
1864                        log.debug("CreateLink: {}", getPath());
1865                        Timestamp currentTime = clock.getSqlTimestamp();
1866                        calculateDataMappingDataSourceWriter(this);
1867                        String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix,
1868                                        identityCardSuffix);
1869                        byte[] data = target.getBytes(CHARSET);
1870                        Map rfRecord = new HashMap<String, Object>();
1871                        rfRecord.put("id", remoteFileId);
1872                        rfRecord.put("file_name", origin.getName());
1873                        rfRecord.put("file_parent_path", getParent());
1874                        rfRecord.put("file_type", "L");
1875                        rfRecord.put("file_part_index", 0);
1876                        rfRecord.put("file_part_size", data.length);
1877                        rfRecord.put("file_part_data_ds", this.dataMappingDs);
1878                        rfRecord.put("file_part_data_table", this.dataMappingTable);
1879                        rfRecord.put("file_part_last_index", -1);
1880                        rfRecord.put("file_size", 0);
1881                        rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1882                        rfRecord.put("file_deleted", 0);
1883                        rfRecord.put("created_user_id", getDefaultModifyUserId());
1884                        rfRecord.put("updated_user_id", getDefaultModifyUserId());
1885                        rfRecord.put("created_at", currentTime);
1886                        rfRecord.put("updated_at", currentTime);
1887                        rfRecord.put("file_id", fileId);
1888                        rfRecord.put("source_hostname", CommonTools.getHostname());
1889                        rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1890                        rfRecord.put("deleted_on_hostnames", ";");
1891
1892                        Connection conn = dataSourceWriter.getConnection();
1893                        conn.setAutoCommit(true);
1894                        executor = new CacheDriverExecutor(conn);
1895                        int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord);
1896
1897                        Map dataRecord = new HashMap<String, Object>();
1898                        dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this));
1899                        dataRecord.put("file_part_data", encrypt(data));
1900                        dataRecord.put("remote_file_id", remoteFileId);
1901                        dataRecord.put("file_id", fileId);
1902                        dataRecord.put("created_at", currentTime);
1903                        dataRecord.put("updated_at", currentTime);
1904
1905                        conn = dataMappingDataSourceWriter.getConnection();
1906                        conn.setAutoCommit(true);
1907                        dmExecutor = new CacheDriverExecutor(conn);
1908                        dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord);
1909
1910                        complete();
1911            
1912            afterCreateLink(target);
1913            
1914                        return rows > 0;
1915                } catch (Exception e) {
1916                        throw new IOException(e.getMessage(), e);
1917                } finally {
1918                        try {
1919                                if (executor != null)
1920                                        executor.close();
1921                        } catch (Exception e) {
1922                                log.error(e.getMessage(), e);
1923                        }
1924                        try {
1925                                if (dmExecutor != null)
1926                                        dmExecutor.close();
1927                        } catch (Exception e) {
1928                                log.error(e.getMessage(), e);
1929                        }
1930                }
1931        }
1932
1933        @Override
1934        public String readLink() throws IOException {
1935                return new String(readAllBytes(),BaseFile.CHARSET);
1936        }
1937
1938        @Override
1939        public byte[] readAllBytes() throws IOException {
1940                return readAllBytesFrom();
1941        }
1942
1943        private byte[] readAllBytesFrom() throws IOException {
1944                if (isDir()) {
1945                        throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath()));
1946                }
1947                if (!exists()) {
1948                        throw new IOException(String.format("The file '%s' does not exist.", origin.getAbsolutePath()));
1949                }
1950                CacheDriverExecutor executor = null;
1951                Connection conn = null;
1952                ByteArrayOutputStream baos = null;
1953                try {
1954                        conn = dataSourceReader.getConnection();
1955                        conn.setAutoCommit(true);
1956                        executor = new CacheDriverExecutor(conn);
1957                        Map dataRecord = getCompletedRecord();
1958                        List<Map<String, Object>> filePartList = executor.find(formatSql(SQL_SELECT_FILE), dataRecord);
1959                        baos = new ByteArrayOutputStream();
1960                        for (Map<String, Object> fpItem : filePartList) {
1961                                Map<String, Object> fp = readPart(fpItem);
1962                                Object fpData = null;
1963                                
1964                                if(fp == null){
1965                                    fpData = new byte[]{};
1966                                }else{
1967                                    fpData = fp.get("file_part_data");  
1968                                }
1969
1970                                if (fpData instanceof byte[]) {
1971                                        byte[] partData = (byte[]) fpData;
1972                                        baos.write(decrypt(partData));
1973                                        baos.flush();
1974                                }
1975                        }
1976                        return baos == null ? null : baos.toByteArray();
1977                } catch (Exception e) {
1978                        throw new IOException(e.getMessage(), e);
1979                } finally {
1980                        try {
1981                                if (baos != null)
1982                                        baos.close();
1983                        } catch (Exception e) {
1984                                log.error(e.getMessage(), e);
1985                        }
1986                        try {
1987                                if (executor != null)
1988                                        executor.close();
1989                        } catch (Exception e) {
1990                                log.error(e.getMessage(), e);
1991                        }
1992                }
1993        }
1994
1995        @Override
1996        public boolean isFile() throws IOException {
1997                Map<String, Object> lastRecord = getFirstFilePart();
1998
1999                if (lastRecord == null) {
2000                        return false;
2001                }
2002                return lastRecord.get("file_type").equals("F");
2003        }
2004
2005        @Override
2006        public boolean isDirectory() throws IOException {
2007                Map<String, Object> lastRecord = getFirstFilePart();
2008
2009                if (lastRecord == null) {
2010                        return false;
2011                }
2012                return lastRecord.get("file_type").equals("D");
2013        }
2014
2015        @Override
2016        public boolean isLink() throws IOException {
2017                Map<String, Object> lastRecord = getFirstFilePart();
2018
2019                if (lastRecord == null) {
2020                        return false;
2021                }
2022                return lastRecord.get("file_type").equals("L");
2023        }
2024
2025        @Override
2026        public String readAllString() throws IOException {
2027                ByteArrayOutputStream baos = null;
2028                try {
2029                        byte[] bytes = readAllBytes();
2030                        
2031                        if(bytes == null) return null;
2032                        
2033                        baos = new ByteArrayOutputStream();
2034                        baos.write(bytes);
2035                        baos.flush();
2036                        return baos.toString();
2037                } finally {
2038                        if (baos != null) {
2039                                try {
2040                                        baos.close();
2041                                } catch (IOException e) {
2042                                        log.error(e.getMessage(), e);
2043                                }
2044                        }
2045                }
2046        }
2047
2048        @Override
2049        public long size() throws IOException {
2050                Map<String, Object> record = getCompletedRecord();
2051                if (record != null) {
2052                        return Long.parseLong(record.get("file_size") + "");
2053                }
2054                return 0L;
2055        }
2056
2057        public List<Map<String, Object>> list() throws IOException {
2058                return list(getPath(), "%");
2059        }
2060
2061        public List<Map<String, Object>> list(String likeFileName) throws IOException {
2062                return list(getPath(), likeFileName);
2063        }
2064
2065        public List<Map<String, Object>> list(String likeFolderName, String likeFileName) throws IOException {
2066                CacheDriverExecutor executor = null;
2067                Connection conn = null;
2068                try {
2069                        conn = dataSourceReader.getConnection();
2070                        conn.setAutoCommit(true);
2071                        executor = new CacheDriverExecutor(conn);
2072                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2073                        if (CommonTools.isBlank(likeFileName)) {
2074                                dataRecord.put("file_name", "%");
2075                        } else {
2076                                dataRecord.put("file_name", likeFileName);
2077                        }
2078                        if (CommonTools.isBlank(likeFolderName)) {
2079                                dataRecord.put("file_parent_path", "%");
2080                        } else {
2081                                dataRecord.put("file_parent_path", likeFolderName);
2082                        }
2083                        return executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord);
2084                } catch (SQLException e) {
2085                        throw new IOException(e.getMessage(), e);
2086                } finally {
2087                        if (executor != null) {
2088                                try {
2089                                        executor.close();
2090                                } catch (Exception e) {
2091                                        throw new IOException(e.getMessage(), e);
2092                                }
2093                        }
2094                }
2095        }
2096
2097        public void listAll(String likeFileName, CacheArray rows) throws IOException {
2098                listAll(getPath(), likeFileName, rows);
2099        }
2100
2101        public void listAll(CacheArray rows) throws IOException {
2102                listAll(getPath(), "%", rows);
2103        }
2104
2105        public void listAll(String likeFolderName, String likeFileName, CacheArray rows) throws IOException {
2106                CacheDriverExecutor executor = null;
2107                Connection conn = null;
2108                try {
2109                        conn = dataSourceReader.getConnection();
2110                        conn.setAutoCommit(true);
2111                        executor = new CacheDriverExecutor(conn);
2112                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2113                        if (CommonTools.isBlank(likeFileName)) {
2114                                dataRecord.put("file_name", "%");
2115                        } else {
2116                                dataRecord.put("file_name", likeFileName);
2117                        }
2118                        if (CommonTools.isBlank(likeFolderName)) {
2119                                dataRecord.put("file_parent_path", "%");
2120                        } else {
2121                                dataRecord.put("file_parent_path", likeFolderName);
2122                        }
2123                        executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord, rows);
2124                } catch (SQLException e) {
2125                        throw new IOException(e.getMessage(), e);
2126                } finally {
2127                        if (executor != null) {
2128                                try {
2129                                        executor.close();
2130                                } catch (Exception e) {
2131                                        throw new IOException(e.getMessage(), e);
2132                                }
2133                        }
2134                }
2135        }
2136
2137        protected void listAllForDelete(String likeFolderName, String likeFileName, CacheArray rows) throws IOException {
2138                CacheDriverExecutor executor = null;
2139                Connection conn = null;
2140                try {
2141                    long deleteCutoffTimeMs = 0L;
2142                    
2143                    if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime();
2144                    
2145                        Timestamp currentTime = new Clock().getSqlTimestamp();
2146                        
2147                        conn = dataSourceReader.getConnection();
2148                        conn.setAutoCommit(true);
2149                        executor = new CacheDriverExecutor(conn);
2150                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2151                        if (CommonTools.isBlank(likeFileName)) {
2152                                dataRecord.put("file_name", "%");
2153                        } else {
2154                                dataRecord.put("file_name", likeFileName);
2155                        }
2156                        if (CommonTools.isBlank(likeFolderName)) {
2157                                dataRecord.put("file_parent_path", "%");
2158                        } else {
2159                                dataRecord.put("file_parent_path", likeFolderName);
2160                        }
2161                        dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%");
2162                        dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs));
2163                        dataRecord.put("end_time", currentTime);
2164                        dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME));
2165                        executor.find(0, MAX_QUEUE_SIZE,formatSql(SQL_SELECT_FOR_LIST_ALL_DELETE), dataRecord, rows);
2166                } catch (SQLException e) {
2167                        throw new IOException(e.getMessage(), e);
2168                } finally {
2169                        if (executor != null) {
2170                                try {
2171                                        executor.close();
2172                                } catch (Exception e) {
2173                                        throw new IOException(e.getMessage(), e);
2174                                }
2175                        }
2176                }
2177        }
2178
2179        protected void listAllForSync(String likeFolderName, String likeFileName, CacheArray rows) throws IOException {
2180                CacheDriverExecutor executor = null;
2181                Connection conn = null;
2182                try {
2183                    long deleteCutoffTimeMs = 0L;
2184                    
2185                    if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = SYNC_CUTOFF_TIME.getTime();
2186                    
2187                        Timestamp currentTime = new Clock().getSqlTimestamp();
2188                        
2189                        conn = dataSourceReader.getConnection();
2190                        conn.setAutoCommit(true);
2191                        executor = new CacheDriverExecutor(conn);
2192                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2193                        if (CommonTools.isBlank(likeFileName)) {
2194                                dataRecord.put("file_name", "%");
2195                        } else {
2196                                dataRecord.put("file_name", likeFileName);
2197                        }
2198                        if (CommonTools.isBlank(likeFolderName)) {
2199                                dataRecord.put("file_parent_path", "%");
2200                        } else {
2201                                dataRecord.put("file_parent_path", likeFolderName);
2202                        }
2203                        dataRecord.put("synced_on_hostnames", "%;" + CommonTools.getHostname() + ";%");
2204                        dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs));
2205                        dataRecord.put("end_time", currentTime);
2206                        dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME));
2207                        executor.find(0, MAX_QUEUE_SIZE, formatSql(SQL_SELECT_FOR_LIST_ALL_SYNC), dataRecord, rows);
2208                } catch (SQLException e) {
2209                        throw new IOException(e.getMessage(), e);
2210                } finally {
2211                        if (executor != null) {
2212                                try {
2213                                        executor.close();
2214                                } catch (Exception e) {
2215                                        throw new IOException(e.getMessage(), e);
2216                                }
2217                        }
2218                }
2219        }
2220
2221        public void setModifyUserId(String modifyUserId) {
2222                this.modifyUserId = modifyUserId;
2223        }
2224
2225        private String getDefaultModifyUserId() {
2226                if (CommonTools.isBlank(modifyUserId)) {
2227                        return DEFAULE_MODIFY_USER_ID;
2228                } else {
2229                        return modifyUserId;
2230                }
2231        }
2232
2233        @Override
2234        public void setModifyTime(Timestamp modifyTime) throws IOException {
2235                this.modifyTime = modifyTime;
2236        }
2237
2238        @Override
2239        public Timestamp getModifyTime() throws IOException {
2240                Map<String, Object> record = getFirstFilePart();
2241                if (record != null) {
2242                        return (Timestamp) record.get("file_modify_time");
2243                }
2244                return null;
2245        }
2246
2247        public boolean isTimeout() throws IOException {
2248                return isTimeout(LOGIC_TIMEOUT_MS);
2249        }
2250
2251        @Override
2252        public void copyTo(String toPath) throws IOException {
2253                log.debug("CopyTo: {} -> {}", getPath(), toPath);
2254                if (!exists()) {
2255                        log.mark(String.format("The remote file '%s' does not exist.", this.getPath()));
2256                }
2257                RemoteFile destRf = new RemoteFile(toPath);
2258                final Timestamp originModifyTime = this.getModifyTime();
2259                destRf.setModifyUserId(modifyUserId);
2260                destRf.setModifyTime(originModifyTime);
2261                if (isLink()) {
2262                        if (destRf.exists(true)) {
2263                                destRf.forceDeleteLink(false);
2264                        }
2265                        destRf.createLink(readLink());
2266                }
2267                if (isFile()) {
2268                        final RemoteFile _destRf = destRf;
2269                        if (_destRf.exists(true)) {
2270                                _destRf.forceDeleteFile(false);
2271                        }
2272                        merge(new FilePart() {
2273                                @Override
2274                                protected void process(int partIndex, byte[] data) {
2275                                        try {
2276                                                _destRf.write(decrypt(data), partIndex > 0);
2277                                        } catch (Exception e) {
2278                                                log.error(e.getMessage(), e);
2279                                        }
2280                                }
2281
2282                                @Override
2283                                protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) {
2284                                        try {
2285                                                /**This is 0 bytes file**/
2286                                                if (lastPartIndex == -1) {
2287
2288                                                        if (_destRf.exists(true))
2289                                                                _destRf.forceDeleteFile(false);
2290
2291                                                        _destRf.setModifyTime(originModifyTime);
2292                                                        _destRf.createEmptyFile();
2293                                                } else {
2294                                                        _destRf.setModifyTime(originModifyTime);
2295                                                        _destRf.complete();
2296                                                }
2297                                        } catch (IOException e) {
2298                                                log.error(e.getMessage(), e);
2299                                        }
2300                                }
2301
2302                                @Override
2303                                protected void ended(int lastPartIndex, long fileSize) {
2304
2305                                }
2306
2307                        });
2308                }
2309                if (isDir()) {
2310                        List<Map<String, Object>> copyList = list(getPath(), "%");
2311                        int size = copyList.size();
2312                        if (size > 0) {
2313                                if (!destRf.exists(true)) {
2314                                        destRf.mkdirs();
2315                                }
2316                        }
2317                        String copyRootPath = getPath();
2318                        copyList = list(parsePath(getPath() + "/%"), "%");
2319                        if (copyList.size() > 0) {
2320                                if (!destRf.exists(true)) {
2321                                        destRf.mkdirs();
2322                                }
2323                        }
2324                        for (Map<String, Object> item : copyList) {
2325                                String itemParentPath = (String) item.get("file_parent_path");
2326                                String destParentPath = itemParentPath.replaceFirst(copyRootPath + "/", toPath);
2327                                RemoteFile sourceRf = new RemoteFile(
2328                                                String.format("%s/%s", item.get("file_parent_path"), item.get("file_name")));
2329                                String destRfPath = new File(String.format("%s/%s", destParentPath, item.get("file_name")))
2330                                                .getAbsolutePath();
2331                                if (sourceRf.isDir()) {
2332                                        RemoteFile tmpRf = new RemoteFile(destRfPath + "/");
2333                                        tmpRf.setModifyTime(sourceRf.getModifyTime());
2334                                        if (!tmpRf.exists(true)) {
2335                                                tmpRf.mkdirs();
2336                                        }
2337                                } else {
2338                                        sourceRf.copyTo(destRfPath);
2339                                }
2340                        }
2341                }
2342        }
2343
2344        @Override
2345        public void moveTo(String toPath) throws IOException {
2346                log.debug("MoveTo: {} -> {}", getPath(), toPath);
2347                RemoteFile destRf = new RemoteFile(toPath);
2348                copyTo(toPath);
2349                boolean exists = this.exists(true);
2350                if (exists && destRf.exists(true) && destRf.isCompleted()) {
2351                        forceDeleteAll(true);
2352                }
2353        }
2354
2355        private void startFullAsyncThreadForDelete(boolean subFolders, long timerMs, String replaceTo,
2356                        Runnable completedCallback) {
2357                RemoteFile the = this;
2358                try {
2359                        if (syncRoot == null) {
2360                                File replaceToFile = new File(replaceTo);
2361                                if (replaceToFile.getParent() == null) {
2362                                        throw new IOException("Cannot execute delete task.");
2363                                }
2364                                String replaceToParent = replacePath(replaceToFile.getParent());
2365                                syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName()));
2366                        }
2367
2368                        CacheArray rows = new CacheArray();
2369                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
2370                                @Override
2371                                public void execute(Integer index, Object o) {
2372                                        Map<String, Object> item = (Map<String, Object>) o;
2373                                        String fileName = (String) item.get("file_name");
2374                                        String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName);
2375                                        Thread.currentThread()
2376                                                        .setName(String.format("RemoteFile-startFullAsyncThreadForDelete-%s", fileName));
2377                                        RemoteFile rf = new RemoteFile(fullPath);
2378                                        if(isDebug(rf)){
2379                                            debugLog("CheckUsage",rf,this);
2380                                        }                                       
2381                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
2382                                                rf.fileId = (String) item.get("file_id");
2383                                        String toDf = rf.getPath();
2384                                        if (!CommonTools.isBlank(replaceTo)) {
2385                                                if (subFolders) {
2386                                                        toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/");
2387                                                } else {
2388                                                        toDf = fullPath.replaceFirst(getPath(), replaceTo + "/");
2389                                                }
2390                                        }
2391                                        DiskFile df = new DiskFile(toDf);
2392                                        df.copyAttrs(df, false, the.syncRoot);
2393                                        executeSyncRunnable(this, item, df, rf);            
2394                                        } else{
2395                                            log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
2396                                        }
2397                                }
2398
2399                                @Override
2400                                public void terminated() {
2401                                        log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId());
2402                                }
2403
2404                                @Override
2405                                public void completed(Integer size) {
2406                                        Thread.currentThread().setName(String
2407                                                        .format("RemoteFile-startFullAsyncThreadForDelete-completed-%s", the.origin.getName()));
2408                                        if (isStopSync()) {
2409                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start delete task thread.");
2410                                        }
2411                                        if (timerMs <= 0) {
2412                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed start delete task thread.");
2413                                        }
2414                                        if (!isStopSync() && timerMs > 0) {
2415                                                try {
2416                                                        Thread.sleep(timerMs);
2417                                                        if (!isStopSync()) {
2418                                                                startFullAsyncThreadForDelete(!subFolders, timerMs, replaceTo, completedCallback);
2419                                                        }
2420                                                } catch (Exception e) {
2421                                                        log.error(e.getMessage(), e);
2422                                                }
2423                                        }
2424                                        if (completedCallback != null) {
2425                                                completedCallback.run();
2426                                        }
2427                                }
2428
2429                        };
2430                        rows.filter(filter);
2431                        listAllForDelete(getPath() + (subFolders ? "/%" : ""), "%", rows);
2432                } catch (Exception e) {
2433                        log.error(e.getMessage(), e);
2434                        if (timerMs > 0) {
2435                                try {
2436                                        Thread.sleep(timerMs);
2437                                        startFullAsyncThreadForDelete(subFolders, timerMs, replaceTo, completedCallback);
2438                                } catch (InterruptedException ee) {
2439                                        log.error(ee.getMessage(), ee);
2440                                }
2441                        }
2442                }
2443        }
2444
2445        private void startFullAsyncThread(boolean subFolders, long timerMs, String replaceTo, Runnable completedCallback) {
2446                RemoteFile the = this;
2447                try {
2448                        if (syncRoot == null) {
2449                                File replaceToFile = new File(replaceTo);
2450                                if (replaceToFile.getParent() == null) {
2451                                        throw new IOException("Cannot sync the root path.");
2452                                }
2453                                String replaceToParent = replacePath(replaceToFile.getParent());
2454                                syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName()));
2455                        }
2456
2457                        CacheArray rows = new CacheArray();
2458                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
2459                                @Override
2460                                public void execute(Integer index, Object o) {
2461                                        Map<String, Object> item = (Map<String, Object>) o;
2462                                        String fileName = (String) item.get("file_name");
2463                                        Thread.currentThread().setName(String.format("RemoteFile-startFullAsyncThread-%s", fileName));
2464                                        String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName);
2465                                        RemoteFile rf = new RemoteFile(fullPath);
2466                                        if(isDebug(the)){
2467                                            debugLog("CheckUsage",the,this);
2468                            }
2469                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
2470                                                String toDf = rf.getPath();
2471                                                if (!CommonTools.isBlank(replaceTo)) {
2472                                                        if (subFolders) {
2473                                                                toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/");
2474                                                        } else {
2475                                                                toDf = fullPath.replaceFirst(getPath(), replaceTo + "/");
2476                                                        }
2477                                                }
2478                                                DiskFile df = new DiskFile(toDf);
2479                                                df.copyAttrs(df, false, the.syncRoot);
2480                                                executeSyncRunnable(this, item, df, rf);
2481                                        } else{
2482                                             log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
2483                                        }
2484                                }
2485
2486                                @Override
2487                                public void terminated() {
2488                                        log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId());
2489                                }
2490
2491                                @Override
2492                                public void completed(Integer size) {
2493                                        Thread.currentThread().setName(
2494                                                        String.format("RemoteFile-startFullAsyncThread-completed-%s", the.origin.getName()));
2495                                        if (isStopSync()) {
2496                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start full async thread.");
2497                                        }
2498                                        if (timerMs <= 0) {
2499                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed start full async thread.");
2500                                        }
2501                                        if (!isStopSync() && timerMs > 0) {
2502                                                try {
2503                                                        Thread.sleep(timerMs);
2504                                                        if (!isStopSync()) {
2505                                                                startFullAsyncThread(!subFolders, timerMs, replaceTo, completedCallback);
2506                                                        }
2507                                                } catch (Exception e) {
2508                                                        log.error(e.getMessage(), e);
2509                                                }
2510                                        }
2511                                        if (completedCallback != null) {
2512                                                completedCallback.run();
2513                                        }
2514                                }
2515
2516                        };
2517                        rows.filter(filter);
2518                        listAllForSync(getPath() + (subFolders ? "/%" : ""), "%", rows);
2519                } catch (Exception e) {
2520                        log.error(e.getMessage(), e);
2521                        if (timerMs > 0) {
2522                                try {
2523                                        Thread.sleep(timerMs);
2524                                        startFullAsyncThread(subFolders, timerMs, replaceTo, completedCallback);
2525                                } catch (InterruptedException ee) {
2526                                        log.error(ee.getMessage(), ee);
2527                                }
2528                        }
2529                }
2530        }
2531
2532        public void startFullAsync(long timerMs) throws IOException {
2533                startFullAsync(timerMs, getPath() + "/", null);
2534        }
2535
2536        public void startFullAsync(long timerMs, Runnable completedCallback) throws IOException {
2537                startFullAsync(timerMs, getPath() + "/", completedCallback);
2538        }
2539
2540        public void startFullAsync(long timerMs, String replaceTo) throws IOException {
2541                startFullAsync(timerMs, replaceTo, null);
2542        }
2543
2544        public void startFullAsync(long timerMs, String replaceTo, Runnable completedCallback) throws IOException {
2545
2546                stopSync = false;
2547
2548                RemoteFile the = this;
2549                Clock clock = new Clock();
2550                Timestamp toTime = new Timestamp(clock.getTime() - timerMs);
2551                Executors.newFixedThreadPool(1).execute(new Runnable() {
2552
2553                        @Override
2554                        public void run() {
2555                                startFullAsyncThread(false, timerMs, replaceTo, completedCallback);
2556                        }
2557
2558                });
2559
2560                Executors.newFixedThreadPool(1).execute(new Runnable() {
2561
2562                        @Override
2563                        public void run() {
2564                                startFullAsyncThreadForDelete(false, timerMs, replaceTo, completedCallback);
2565                        }
2566
2567                });
2568        }
2569
2570        public DriverDataSource getDataSourceReader() {
2571                return dataSourceReader;
2572        }
2573
2574        public DriverDataSource getDataSourceWriter() {
2575                return dataSourceWriter;
2576        }
2577
2578        public ConfigProperties getConfigProperties() {
2579                return configProperties;
2580        }
2581
2582        private byte[] encrypt(byte[] data) throws Exception {
2583                boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false);
2584                if (useEncrypt) {
2585            String aesKey = configProperties.getString("EncryptAesKey");
2586            return CipherTools.AESEncrypt(aesKey,data);
2587                } else {
2588                        return data;
2589                }
2590        }
2591
2592        private byte[] decrypt(byte[] encData) throws Exception {
2593                boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false);
2594                if (useEncrypt) {
2595            String aesKey = configProperties.getString("EncryptAesKey");
2596            return CipherTools.AESDecrypt(aesKey,encData);
2597                } else {
2598                        return encData;
2599                }
2600        }
2601
2602        public String getSyncedOnHostnames() throws IOException {
2603                Map<String, Object> record = getCompletedRecord();
2604                if (record != null) {
2605                        return (String) record.get("synced_on_hostnames");
2606                }
2607                return null;
2608        }
2609
2610        public String getDeletedOnHostnames() throws IOException {
2611                Map<String, Object> record = getCompletedRecord();
2612                if (record != null) {
2613                        return (String) record.get("deleted_on_hostnames");
2614                }
2615                return null;
2616        }
2617
2618        public String getSourceHostname() throws IOException {
2619                Map<String, Object> record = getCompletedRecord();
2620                if (record != null) {
2621                        return (String) record.get("source_hostname");
2622                }
2623                return null;
2624        }
2625
2626        protected String getLastSourceHostname() throws IOException {
2627                Map<String, Object> record = getLastUpdateRecord();
2628                if (record != null) {
2629                        return (String) record.get("source_hostname");
2630                }
2631                return null;
2632        }
2633
2634        protected Timestamp getLastOperateTime() throws IOException {
2635                Map<String, Object> record = getLastUpdateRecord();
2636                if (record != null) {
2637                        Timestamp updatedAt = (Timestamp) record.get("updated_at");
2638                        Timestamp deletedAt = (Timestamp) record.get("deleted_at");
2639
2640                        return deletedAt == null ? updatedAt : deletedAt;
2641                }
2642                return null;
2643        }
2644
2645        protected Timestamp getLastModifyTime() throws IOException {
2646                Map<String, Object> record = getLastUpdateRecord();
2647                if (record != null) {
2648                        return (Timestamp) record.get("file_modify_time");
2649                }
2650                return null;
2651        }
2652
2653        protected boolean isLastOperateDelete() throws IOException {
2654                Map<String, Object> record = getLastUpdateRecord();
2655                if (record != null) {
2656                        Timestamp deletedAt = (Timestamp) record.get("deleted_at");
2657
2658                        return deletedAt == null ? false : true;
2659                }
2660                return false;
2661        }
2662
2663        protected boolean isLastOperateRealDelete() throws IOException {
2664                Map<String, Object> record = getLastUpdateRecord();
2665                if (record != null) {
2666                        Object deletedValue = record.get("file_deleted");
2667                        if (deletedValue != null) {
2668                                if (deletedValue instanceof Boolean) {
2669                                        return (Boolean) deletedValue;
2670                                } else {
2671                                        return Integer.parseInt(deletedValue + "") == 1;
2672                                }
2673                        }
2674                }
2675                return false;
2676        }
2677        
2678        protected void listAllForScanDelete(Integer filePartDataTable, CacheArray rows) throws IOException {
2679                CacheDriverExecutor executor = null;
2680                Connection conn = null;
2681                try {
2682                    long deleteCutoffTimeMs = 0L;
2683                    
2684                    if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime();
2685                    
2686                        Timestamp currentTime = new Clock().getSqlTimestamp();
2687                        conn = dataSourceReader.getConnection();
2688                        conn.setAutoCommit(true);
2689                        executor = new CacheDriverExecutor(conn);
2690                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2691                        dataRecord.put("file_parent_path", getPath());
2692                        dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
2693                        dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%");
2694                        dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs));
2695                        dataRecord.put("end_time", currentTime);
2696                        dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME));
2697                        dataRecord.put("file_part_data_table", filePartDataTable);
2698                        executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL_SCAN_DELETE), dataRecord, rows);
2699                } catch (SQLException e) {
2700                        throw new IOException(e.getMessage(), e);
2701                } finally {
2702                        if (executor != null) {
2703                                try {
2704                                        executor.close();
2705                                } catch (Exception e) {
2706                                        throw new IOException(e.getMessage(), e);
2707                                }
2708                        }
2709                }
2710        }
2711
2712        protected boolean isSyncedOnHostname() throws IOException {
2713                Map<String, Object> record = getCompletedRecord();
2714                if (record != null) {
2715                        String syncedOnHostnames = (String) record.get("synced_on_hostnames");
2716                        return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1;
2717                }
2718                return false;
2719        }
2720
2721        protected boolean isSyncedOnHostname(Object id) throws IOException {
2722                Map<String, Object> record = getRecordById(id);
2723                if (record != null) {
2724                        String syncedOnHostnames = (String) record.get("synced_on_hostnames");
2725                        return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1;
2726                }
2727                return false;
2728        }
2729
2730        protected boolean isDeletedOnHostname(Object id) throws IOException {
2731                Map<String, Object> record = getRecordById(id);
2732                if (record != null) {
2733                        String deletedOnHostnames = (String) record.get("deleted_on_hostnames");
2734                        return deletedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1;
2735                }
2736                return false;
2737        }
2738
2739        public RemoteFile getParentFile() {
2740                return new RemoteFile(getParent());
2741        }
2742
2743        private void executeSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final DiskFile df,
2744                        final RemoteFile rf) {
2745                if (!df.isLogicModify()) {
2746                        df.logicModify();
2747                        if (mergePool == null) {
2748                                LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE);
2749                                mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
2750                        }
2751                        mergePool.execute(getSyncRunnable(filter, item, rf, df));
2752                }
2753        }
2754
2755        private Runnable getSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final RemoteFile rf,
2756                        final DiskFile df) {
2757                Runnable runnable = new Runnable() {
2758                        @Override
2759                        public void run() {
2760                                try {
2761                                        String fileName = (String) item.get("file_name");
2762                                        Thread.currentThread()
2763                                                        .setName(String.format("RemoteFile-getSyncRunnable-%s-%s", fileName, item.get("id")));
2764                                        boolean isDebug = isDebug(rf);
2765                                        boolean isDeleted = false;
2766                                        Object deletedValue = item.get("file_deleted");
2767
2768                                        if (deletedValue instanceof Boolean) {
2769                                                isDeleted = (Boolean) deletedValue;
2770                                        } else {
2771                                                isDeleted = Integer.parseInt(deletedValue + "") == 1;
2772                                        }
2773
2774                                        String clientHostname = CommonTools.getHostname();
2775                                        String lastSourceHostname = rf.getLastSourceHostname();
2776
2777                                        boolean isClientHostname = lastSourceHostname != null && !lastSourceHostname.equals(clientHostname);
2778                                        boolean rootExists = syncRoot != null && syncRoot.exists();
2779
2780                        if(!rootExists){
2781                            Logger.systemError(RemoteFile.class,"The root path '{}' does not exist.",syncRoot.getPath());
2782                        }
2783                        
2784                                        if (isDeleted && rootExists) {
2785                                                if (df.exists()) {
2786                                                        if (isDebug) {
2787                                                                LoggerFactory.getLogger(RemoteFile.class).mark("DeleteDiskFile - {}", df.getPath());
2788                                                        }
2789                                                        df.delete();
2790                                                        rf.deletedToDiskAndSub(item.get("id"));
2791                                                } else {
2792                                                        rf.deletedToDiskAndSub(item.get("id"));
2793                                                }
2794                                                df.removeLogicModify();
2795                                        }
2796
2797                                        String sourceHostname = rf.getSourceHostname();
2798                                        isClientHostname = sourceHostname != null && !sourceHostname.equals(clientHostname);
2799                                        boolean checkHaveParent = configProperties.getBoolean("CheckHaveParent", false);
2800                                        boolean haveParent = true;
2801                                        if (checkHaveParent) {
2802                                                haveParent = df.getOrigin().getParentFile().exists();
2803                                        }
2804                                        String fileType = (String) item.get("file_type");
2805
2806                                        if (!isDeleted && isClientHostname && haveParent && rootExists) {
2807                                                Timestamp rfModifyTime = rf.getModifyTime();
2808                                                df.setModifyTimeMsFromClock(rfModifyTime.getTime());
2809
2810                                                if (fileType.equals("L")) {
2811                                                        if (df.exists() && !df.isLink()) {
2812                                                                df.delete();
2813                                                        }
2814                                                        if (df.exists()) {
2815                                                                Timestamp dfModifyTime = df.getModifyTimeForClock();
2816                                                                if (rfModifyTime.getTime() > dfModifyTime.getTime()) {
2817                                                                        if (isDebug) {
2818                                                                                LoggerFactory.getLogger(RemoteFile.class).mark("ReCreateDiskLink - {}",
2819                                                                                                df.getPath());
2820                                                                        }
2821                                                                        df.delete();
2822                                                                        if (df.createLink(rf.readLink())) {
2823                                                                                if (df.getOrigin().getName().startsWith(".")) {
2824                                                                                        Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true);
2825                                                                                }
2826                                                                                rf.syncedToDisk();
2827                                                                        }
2828                                                                } else {
2829                                                                        rf.syncedToDisk();
2830                                                                }
2831                                                        } else {
2832                                                                if (isDebug) {
2833                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", df.getPath());
2834                                                                }
2835                                                                if (df.createLink(rf.readLink())) {
2836                                                                        if (df.getOrigin().getName().startsWith(".")) {
2837                                                                                Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true);
2838                                                                        }
2839                                                                        rf.syncedToDisk();
2840                                                                }
2841                                                        }
2842                                                        df.removeLogicModify();
2843                                                }
2844                                                if (fileType.equals("F")) {
2845                                                        if (df.exists()) {
2846                                                                Timestamp dfModifyTime = df.getModifyTimeForClock();
2847                                                                boolean changed = rfModifyTime.getTime() > dfModifyTime.getTime();
2848                                                                if (changed) {
2849                                                                        DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE);
2850                                                                        writingDf.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot);
2851                                                                        writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime());
2852                                                                        if (!writingDf.isLogicModify()) {
2853                                                                                writingDf.logicModify();
2854                                                                                if (RemoteFile.addQueuePathMapping(rf.getPath())) {
2855                                                                                        if (isDebug(rf)) {
2856                                                                                                LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}",
2857                                                                                                                df.getPath());
2858                                                                                        }
2859                                                                                        rf.writeToDisk(writingDf);
2860                                                                                }
2861                                                                        }
2862                                                                } else {
2863                                                                        rf.syncedToDisk();
2864                                                                        df.removeLogicModify();
2865                                                                }
2866                                                        } else {
2867                                                                if (rf.exists()) {
2868                                                                        DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE);
2869                                                                        writingDf.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot);
2870                                                                        writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime());
2871                                                                        if (!writingDf.isLogicModify()) {
2872                                                                                writingDf.logicModify();
2873                                                                                if (RemoteFile.addQueuePathMapping(rf.getPath())) {
2874                                                                                        if (isDebug(rf)) {
2875                                                                                                LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}",
2876                                                                                                                df.getPath());
2877                                                                                        }
2878                                                                                        rf.writeToDisk(writingDf);
2879                                                                                }
2880                                                                        }
2881                                                                }
2882                                                        }
2883                                                }
2884                                                if (fileType.equals("D")) {
2885                                                        if (rf.exists()) {
2886                                                                if (isDebug) {
2887                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskDir - {}", df.getPath());
2888                                                                }
2889                                                                if (df.exists()) {
2890                                                                        if (df.isDir()) {
2891                                                                                rf.syncedToDisk();
2892                                                                        } else {
2893                                                                                df.delete();
2894                                                                                if (df.mkdirs()) {
2895                                                                                        if (df.getOrigin().getName().startsWith(".")) {
2896                                                                                                Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true);
2897                                                                                        }
2898                                                                                        rf.syncedToDisk();
2899                                                                                }
2900                                                                        }
2901                                                                } else {
2902                                                                        if (df.mkdirs()) {
2903                                                                                if (df.getOrigin().getName().startsWith(".")) {
2904                                                                                        Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true);
2905                                                                                }
2906                                                                                rf.syncedToDisk();
2907                                                                        }
2908                                                                }
2909                                                        }
2910                                                        df.removeLogicModify();
2911                                                }
2912                                        }
2913                                } catch (Exception e) {
2914                                        log.error(e.getMessage(), e);
2915                                }
2916                        }
2917                };
2918                return runnable;
2919        }
2920
2921        public void startArchiveForFileDeleted(long timerMs, String archiveTime) throws IOException {
2922                long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L);
2923                startArchiveForFileDeleted(timerMs, archiveMs, null);
2924        }
2925
2926        public void startArchiveForFileDeleted(long timerMs, long archiveMs) throws IOException {
2927                startArchiveForFileDeleted(timerMs, archiveMs, null);
2928        }
2929
2930        public void startArchiveForFileDeleted(long timerMs, String archiveTime, Runnable completedCallback)
2931                        throws IOException {
2932                long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L);
2933                startArchiveForFileDeleted(timerMs, archiveMs, completedCallback);
2934        }
2935
2936        public void startArchiveForFileDeleted(long timerMs, long archiveMs, Runnable completedCallback)
2937                        throws IOException {
2938                final RemoteFile the = this;
2939                if (archiveMs <= 0) {
2940                        throw new IOException(String.format(
2941                                        "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs));
2942                }
2943
2944                stopSync = false;
2945        
2946                CacheDriverExecutor executor = null;
2947                Connection conn = null;
2948                try {
2949                    Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", getPath()));
2950                        conn = dataSourceReader.getConnection();
2951                        conn.setAutoCommit(true);
2952                        executor = new CacheDriverExecutor(conn);
2953                        Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs);
2954                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2955                        dataRecord.put("file_parent_path", getPath());
2956                        dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
2957                        dataRecord.put("before_deleted_at", beforeTime);
2958                        CacheArray rows = new CacheArray();
2959                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
2960
2961                                @Override
2962                                public void completed(Integer size) {
2963                                        if (isStopSync()) {
2964                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for file deleted.");
2965                                        }
2966                                        if (timerMs <= 0) {
2967                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for file deleted.");
2968                                        }
2969                                        
2970                                        if (completedCallback != null) {
2971                                                completedCallback.run();
2972                                        }
2973                                        
2974                                        if (!isStopSync() && timerMs > 0) {
2975                                            while(true){
2976                                                
2977                                                if(isStopSync()) break;
2978                                                
2979                                                try {
2980                                                        Thread.sleep(timerMs);
2981                                                        if (!isStopSync()) {
2982                                                            boolean allowed = checkUsage(the);
2983                                                            if(allowed){
2984                                                                    startArchiveForFileDeleted(timerMs, archiveMs, completedCallback);
2985                                                                    break;
2986                                                            }
2987                                                        }
2988                                                } catch (Exception e) {
2989                                                        log.error(e.getMessage(), e);
2990                                                }
2991                                            }
2992                                        }
2993
2994                                }
2995
2996                                @Override
2997                                public void execute(Integer index, Object o) {
2998                                    Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForFileDeleted-%s", index));
2999                                        if(isDebug(the)){
3000                                            debugLog("CheckUsage",the,this);
3001                                        }                                   
3002                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
3003                                                CacheDriverExecutor _executor = null;
3004                                                try {
3005                                                        Map<String, Object> item = (Map<String, Object>) o;
3006                                                        boolean notDir = !item.get("file_type").equals("D");
3007                                                        if(notDir){
3008                                                        calculateDataMappingDataSourceReader(the, item);
3009                                                        Object fileId = item.get("file_id");
3010                                                        Map<String, Object> dataRecord = new HashMap<String, Object>();
3011                                                        dataRecord.put("file_id", fileId);
3012                                                        dataRecord.put("deleted_at", new Clock().getSqlTimestamp());
3013                                                        Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", fileId));
3014                                            Connection _conn = dataMappingDataSourceReader.getConnection();
3015                                                        _conn.setAutoCommit(true);
3016                                                        _executor = new CacheDriverExecutor(_conn);
3017                                        _executor.execute(formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA), dataRecord);
3018                                                        }
3019                                                } catch (Exception e) {
3020                                                        log.error(e.getMessage(), e);
3021                                                } finally {
3022                                                        try {
3023                                                                if (_executor != null)
3024                                                                        _executor.close();
3025                                                        } catch (Exception e) {
3026                                                                log.error(e.getMessage(), e);
3027                                                        }
3028                                                }
3029                                        }else{
3030                                            log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
3031                                        }
3032                                }
3033                        };
3034                        rows.filter(filter);
3035                        final CacheDriverExecutor finalExecutor = executor;
3036                        Executors.newFixedThreadPool(1).execute(new Runnable(){
3037                            @Override
3038                            public void run(){
3039                                try{
3040                                     finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_FILE_DELETED), dataRecord, rows);
3041                                }catch(Exception e){
3042                                    log.error(e.getMessage(),e);
3043                                }
3044                            }
3045                        });
3046                } catch (SQLException e) {
3047                        throw new IOException(e.getMessage(), e);
3048                } finally {
3049                        if (executor != null) {
3050                                try {
3051                                        executor.close();
3052                                } catch (Exception e) {
3053                                        throw new IOException(e.getMessage(), e);
3054                                }
3055                        }
3056                }
3057        }
3058
3059        public void startArchiveForDeleted(long timerMs, String archiveTime) throws IOException {
3060                long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L);
3061                startArchiveForDeleted(timerMs, archiveMs, null);
3062        }
3063
3064        public void startArchiveForDeleted(long timerMs, long archiveMs) throws IOException {
3065                startArchiveForDeleted(timerMs, archiveMs, null);
3066        }
3067
3068        public void startArchiveForDeleted(long timerMs, String archiveTime, Runnable completedCallback)
3069                        throws IOException {
3070                long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L);
3071                startArchiveForDeleted(timerMs, archiveMs, completedCallback);
3072        }
3073
3074        public void startArchiveForDeleted(long timerMs, long archiveMs, Runnable completedCallback) throws IOException {
3075            final RemoteFile the = this;
3076                if (archiveMs <= 0) {
3077                        throw new IOException(String.format(
3078                                        "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs));
3079                }
3080
3081                stopSync = false;
3082
3083                CacheDriverExecutor executor = null;
3084                Connection conn = null;
3085                try {
3086                    Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", getPath()));
3087                        conn = dataSourceReader.getConnection();
3088                        conn.setAutoCommit(true);
3089                        executor = new CacheDriverExecutor(conn);
3090                        Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs);
3091                        Map<String, Object> dataRecord = new HashMap<String, Object>();
3092                        dataRecord.put("file_parent_path", getPath());
3093                        dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
3094                        dataRecord.put("before_deleted_at", beforeTime);
3095
3096                        CacheArray rows = new CacheArray();
3097                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
3098
3099                                @Override
3100                                public void completed(Integer size) {
3101                                        if (isStopSync()) {
3102                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for deleted.");
3103                                        }
3104                                        if (timerMs <= 0) {
3105                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for deleted.");
3106                                        }
3107                                        
3108                                        if (completedCallback != null) {
3109                                                completedCallback.run();
3110                                        }
3111                                        
3112                                        if (!isStopSync() && timerMs > 0) {
3113                                            while(true){
3114                                                
3115                                                if(isStopSync()) break;
3116                                                
3117                                                try {
3118                                                        Thread.sleep(timerMs);
3119                                                        if (!isStopSync()) {
3120                                                            boolean allowed = checkUsage(the);
3121                                                            if(allowed){
3122                                                                    startArchiveForDeleted(timerMs, archiveMs, completedCallback);
3123                                                                    break;
3124                                                            }
3125                                                        }
3126                                                } catch (Exception e) {
3127                                                        log.error(e.getMessage(), e);
3128                                                }
3129                                            }
3130                                        }
3131                                }
3132
3133                                @Override
3134                                public void execute(Integer index, Object o) {
3135                                    Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForDeleted-%s", index));
3136                                        CacheDriverExecutor dmExecutor = null;
3137                                        CacheDriverExecutor dmDelExecutor = null;
3138                                        CacheDriverExecutor executor = null;
3139                                        if(isDebug(the)){
3140                                            debugLog("CheckUsage",the,this);
3141                                        }                                       
3142                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
3143                                                try {
3144                                                        Clock clock = new Clock();
3145                                                        Timestamp currentTime = clock.getSqlTimestamp();
3146                                                        Map<String, Object> item = (Map<String, Object>) o;
3147                                                        Object fileId = item.get("file_id");
3148                                                        Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", fileId));
3149                                                        calculateDataMappingDataSourceReader(the, item);
3150                                                        Map<String, Object> dataRecord = null;
3151                                                        boolean haveSubFiles = false;
3152                                                        /**Need use dataMappingDataSourceReader**/
3153                                                        if (item.get("file_type").equals("D")) {
3154                                                                int count = countAllSubFiles(item.get("file_parent_path") + "",
3155                                                                                item.get("file_name") + "");
3156                                                                haveSubFiles = count > 0;
3157                                                        }
3158                                                        if (!haveSubFiles) {
3159                                                                if (!item.get("file_type").equals("D")) {
3160                                                                        Connection dmConn = dataMappingDataSourceReader.getConnection();
3161                                                                        dmConn.setAutoCommit(true);
3162                                                                        dmExecutor = new CacheDriverExecutor(dmConn);
3163                                                                        dataRecord = new HashMap<String, Object>();
3164                                                                        dataRecord.put("file_id", item.get("file_id"));
3165                                                                        dataRecord.put("deleted_at", currentTime);
3166                                                                        dmExecutor.execute(
3167                                                                                        formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA),
3168                                                                                        dataRecord);
3169                                                                }
3170                                                                Connection conn = dataSourceWriter.getConnection();
3171                                                                conn.setAutoCommit(true);
3172                                                                executor = new CacheDriverExecutor(conn);
3173                                                                dataRecord = new HashMap<String, Object>();
3174                                                                dataRecord.put("file_id", item.get("file_id"));
3175                                                                executor.execute(formatSql(SQL_DELETE_BY_FILE_ID_FROM_FILE), dataRecord);
3176                                                        }
3177
3178                                                        Connection dmDelConn = dataMappingDataSourceReader.getConnection();
3179                                                        dmDelConn.setAutoCommit(true);
3180                                                        dmDelExecutor = new CacheDriverExecutor(dmDelConn);
3181                                                        dataRecord = new HashMap<String, Object>();
3182                                                        dataRecord.put("deleted_at", new Timestamp(currentTime.getTime() - archiveMs));
3183                                                        dmDelExecutor.execute(formatSqlForFilePartData(SQL_DELETE_BY_RF_ID_FROM_FILE_DATA),
3184                                                                        dataRecord);
3185                                                } catch (Exception e) {
3186                                                        log.error(e.getMessage(), e);
3187                                                } finally {
3188                                                        try {
3189                                                                if (dmExecutor != null)
3190                                                                        dmExecutor.close();
3191                                                        } catch (Exception e) {
3192                                                                log.error(e.getMessage(), e);
3193                                                        }
3194                                                        try {
3195                                                                if (dmDelExecutor != null)
3196                                                                        dmDelExecutor.close();
3197                                                        } catch (Exception e) {
3198                                                                log.error(e.getMessage(), e);
3199                                                        }
3200                                                        try {
3201                                                                if (executor != null)
3202                                                                        executor.close();
3203                                                        } catch (Exception e) {
3204                                                                log.error(e.getMessage(), e);
3205                                                        }
3206                                                }
3207
3208                                        }else{
3209                                            log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
3210                                        }
3211                                }
3212                        };
3213                        rows.filter(filter);
3214                        final CacheDriverExecutor finalExecutor = executor;
3215                        Executors.newFixedThreadPool(1).execute(new Runnable(){
3216                            @Override
3217                            public void run(){
3218                                try{
3219                                    finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_DELETED), dataRecord, rows);
3220                                }catch(Exception e){
3221                                    log.error(e.getMessage(),e);
3222                                }
3223                            }
3224                        });                     
3225                } catch (SQLException e) {
3226                        throw new IOException(e.getMessage(), e);
3227                } finally {
3228                        if (executor != null) {
3229                                try {
3230                                        executor.close();
3231                                } catch (Exception e) {
3232                                        throw new IOException(e.getMessage(), e);
3233                                }
3234                        }
3235                }
3236        }
3237        
3238        public void startArchiveForTimeout(long timerMs) throws IOException {
3239            startArchiveForTimeout(timerMs,null);
3240        }
3241        
3242        public void startArchiveForTimeout(long timerMs,Runnable completedCallback) throws IOException {
3243            final RemoteFile the = this;
3244            
3245                stopSync = false;
3246
3247                CacheDriverExecutor executor = null;
3248                Connection conn = null;
3249                try {
3250                    Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", getPath()));
3251                        conn = dataSourceReader.getConnection();
3252                        conn.setAutoCommit(true);
3253                        executor = new CacheDriverExecutor(conn);
3254                        Timestamp beforeTime = new Timestamp(new Clock().getTime() - (LOGIC_TIMEOUT_MS*BATCH_SIZE));
3255                        Map<String, Object> dataRecord = new HashMap<String, Object>();
3256                        dataRecord.put("file_parent_path", getPath());
3257                        dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
3258                        dataRecord.put("before_updated_at", beforeTime);
3259
3260                        CacheArray rows = new CacheArray();
3261                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
3262
3263                                @Override
3264                                public void completed(Integer size) {
3265                                        if (isStopSync()) {
3266                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for timeout deleted.");
3267                                        }
3268                                        if (timerMs <= 0) {
3269                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for timeout deleted.");
3270                                        }
3271                                        
3272                                        if (completedCallback != null) {
3273                                                completedCallback.run();
3274                                        }
3275                                        
3276                                        if (!isStopSync() && timerMs > 0) {
3277                                            while(true){
3278                                                
3279                                                if(isStopSync()) break;
3280                                                
3281                                                try {
3282                                                        Thread.sleep(timerMs);
3283                                                        if (!isStopSync()) {
3284                                                            boolean allowed = checkUsage(the);
3285                                                            if(allowed){
3286                                                                    startArchiveForTimeout(timerMs, completedCallback);
3287                                                                    break;
3288                                                            }
3289                                                        }
3290                                                } catch (Exception e) {
3291                                                        log.error(e.getMessage(), e);
3292                                                }
3293                                            }
3294                                        }
3295
3296                                }
3297
3298                                @Override
3299                                public void execute(Integer index, Object o) {
3300                                    Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForTimeout-%s", index));
3301                                        if(isDebug(the)){
3302                                            debugLog("CheckUsage",the,this);
3303                                        }                                   
3304                                        CacheDriverExecutor executor = null;
3305                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
3306                                                try {
3307                                                        Clock clock = new Clock();
3308                                                        Timestamp currentTime = clock.getSqlTimestamp();
3309                                                        Map<String, Object> item = (Map<String, Object>) o;
3310                                                        Object fileId = item.get("file_id");
3311                                                        Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", fileId));
3312                                                        Connection conn = dataSourceWriter.getConnection();
3313                                                        conn.setAutoCommit(true);
3314                                                        executor = new CacheDriverExecutor(conn);
3315                                                        Map<String,Object> dataRecord = new HashMap<String, Object>();
3316                                                        dataRecord.put("file_id",fileId);
3317                                                        dataRecord.put("deleted_at",currentTime);
3318                                                        dataRecord.put("deleted_user_id",getDefaultModifyUserId());
3319                                                        executor.execute(formatSql(SQL_UPDATE_FOR_TIMEOUT_DELETE), dataRecord);
3320                                                } catch (Exception e) {
3321                                                        log.error(e.getMessage(), e);
3322                                                } finally {
3323                                                        try {
3324                                                                if (executor != null)
3325                                                                        executor.close();
3326                                                        } catch (Exception e) {
3327                                                                log.error(e.getMessage(), e);
3328                                                        }
3329                                                }
3330
3331                                        }else{
3332                                             log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
3333                                        }
3334                                }
3335                        };
3336                        rows.filter(filter);
3337                        final CacheDriverExecutor finalExecutor = executor;
3338                        Executors.newFixedThreadPool(1).execute(new Runnable(){
3339                            @Override
3340                            public void run(){
3341                                try{
3342                                    finalExecutor.find(formatSql(SQL_SELECT_FOR_ALL_TIMEOUT), dataRecord, rows);
3343                                }catch(Exception e){
3344                                    log.error(e.getMessage(),e);
3345                                }
3346                            }
3347                        });                             
3348                } catch (SQLException e) {
3349                        throw new IOException(e.getMessage(), e);
3350                } finally {
3351                        if (executor != null) {
3352                                try {
3353                                        executor.close();
3354                                } catch (Exception e) {
3355                                        throw new IOException(e.getMessage(), e);
3356                                }
3357                        }
3358                }
3359        }       
3360
3361        public static Map<String, DriverDataSource> getDriverDataSourceMap() {
3362                return Collections.synchronizedMap(DATASOURCE_REMOTE_FILE);
3363        }
3364
3365        private static synchronized String generateId(boolean identityCardEnable, String identityCardName,
3366                        String identityCardPrefix, int identityCardSuffix) throws SQLException {
3367                if (identityCardEnable) {
3368                        if (identityCard == null) {
3369                                identityCard = new IdentityCard(identityCardName);
3370                        }
3371                        return identityCard.generateId(16, identityCardPrefix, "", identityCardSuffix);
3372                } else {
3373                        return CommonTools.generateId(16);
3374                }
3375        }
3376
3377        private static synchronized String generateDataId(boolean identityCardEnable, String identityCardName,
3378                        int identityCardSuffix, RemoteFile theRf) throws SQLException {
3379                if (identityCardEnable) {
3380                        if (identityCard == null) {
3381                                identityCard = new IdentityCard(identityCardName + "Data");
3382                        }
3383                        return identityCard.generateId(16, String.format("%04d", theRf.dataMappingTable), "", identityCardSuffix);
3384                } else {
3385                        return CommonTools.generateId(16);
3386                }
3387        }
3388
3389        protected static synchronized boolean addQueuePathMapping(String path) {
3390                Long addTimeMs = null;
3391
3392                boolean allowed = checkQueuePathMapping();
3393
3394                if (!allowed)
3395                        return false;
3396
3397                addTimeMs = QUEUE_PATH_MAPPING.get(path);
3398
3399                if (addTimeMs == null) {
3400                        QUEUE_PATH_MAPPING.put(path, System.currentTimeMillis());
3401                        return true;
3402                }
3403
3404                return false;
3405        }
3406
3407        public static synchronized boolean checkQueuePathMapping() {
3408                Long addTimeMs = null;
3409                Map<String, Long> copyQueueMap = getQueuePathMapping();
3410                List<String> keyList = new ArrayList<String>(copyQueueMap.keySet());
3411                for (String key : keyList) {
3412                        addTimeMs = copyQueueMap.get(key);
3413                        if (addTimeMs != null) {
3414                                boolean timeout = (System.currentTimeMillis() - addTimeMs) >= LOGIC_TIMEOUT_MS;
3415
3416                                if (timeout) {
3417                                        QUEUE_PATH_MAPPING.remove(key);
3418                                }
3419                        }
3420                }
3421
3422                if (QUEUE_PATH_MAPPING.keySet().size() >= MAX_QUEUE_SIZE)
3423                        return false;
3424
3425                return true;
3426        }
3427
3428        protected static synchronized void removeQueuePathMapping(String path) {
3429                QUEUE_PATH_MAPPING.remove(path);
3430        }
3431
3432        public static synchronized Map<String, Long> getQueuePathMapping() {
3433                return Collections.synchronizedMap(QUEUE_PATH_MAPPING);
3434        }
3435
3436        protected static synchronized void loadDataMapping(RemoteFile rf) throws Exception {
3437                List<String> dmArray = rf.configProperties.getArray("DataMapping", new String[] {});
3438                int size = dmArray.size();
3439                List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey);
3440
3441                if (dmList == null)
3442                        dmList = new ArrayList<String>();
3443
3444                if (!DATA_MAPPING_INDEX.containsKey(rf.dsKey))
3445                        DATA_MAPPING_INDEX.put(rf.dsKey, 0);
3446
3447                for (int i = 0; i < size; i++) {
3448                        String dm = dmArray.get(i);
3449                        String originKey = String.format("DataMapping[%s]", i);
3450                        Map<String, Object> dmMap = rf.configProperties.getMap(originKey);
3451                        String shortKey = String.format("%s.%s", rf.dsKey, dm);
3452                        if (dmMap != null) {
3453                                String dmKey = String.format("%s.%s", rf.dsKey, originKey);
3454
3455                                if (!DATA_MAPPING.containsKey(shortKey)) {
3456                                        ConfigProperties dmCp = new ConfigProperties();
3457                                        dmCp.putAllAndReturnSlef(dmMap);
3458                                        int priority = dmCp.getInteger(String.format("%s.Priority", originKey), 0);
3459                                        if (i == 0 && priority < 1) {
3460                                                throw new Exception("First DataMapping priority cannot be less than 1.");
3461                                        }
3462                                        String dataSourceFileOriginValue = dmCp.getString(String.format("%s.DataSource", originKey));
3463                                        String dataSourceReaderFileOriginValue = dmCp
3464                                                        .getString(String.format("%s.DataSourceReader", originKey), dataSourceFileOriginValue);
3465                                        String dataSourceWriterFileOriginValue = dmCp
3466                                                        .getString(String.format("%s.DataSourceWriter", originKey), dataSourceFileOriginValue);
3467
3468                                        String dmDsReaderKey = String.format("%s.reader", CommonTools.md5(dataSourceReaderFileOriginValue));
3469                                        String dmDsWriterKey = String.format("%s.writer", CommonTools.md5(dataSourceWriterFileOriginValue));
3470                                        DriverDataSource dmDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey);
3471                                        DriverDataSource dmDataSourceWriter = DATASOURCE_REMOTE_FILE.get(dmDsWriterKey);
3472                                        if (dmDataSourceReader == null) {
3473                                                DATASOURCE_REMOTE_FILE.put(dmDsReaderKey,
3474                                                                new DriverDataSource(new File(dataSourceReaderFileOriginValue)));
3475                                        }
3476                                        if (dmDataSourceWriter == null) {
3477                                                DATASOURCE_REMOTE_FILE.put(dmDsWriterKey,
3478                                                                new DriverDataSource(new File(dataSourceWriterFileOriginValue)));
3479                                        }
3480                                        String tableRangOriginValue = dmCp.getString(String.format("%s.TableRange", originKey), "0");
3481                                        ConfigProperties shortCp = new ConfigProperties();
3482                                        List<Integer> tableRange = new ArrayList<Integer>();
3483                                        if (tableRangOriginValue.contains("-")) {
3484                                                String[] ranges = tableRangOriginValue.split("-");
3485                                                Integer begin = Integer.parseInt(ranges[0].trim());
3486                                                Integer end = Integer.parseInt(ranges[1].trim());
3487                                                for (int j = begin; j <= end; j++) {
3488                                                        tableRange.add(begin + j);
3489                                                }
3490                                        } else if (tableRangOriginValue.contains(",")) {
3491                                                String[] ranges = tableRangOriginValue.split(",");
3492                                                int rangesLen = ranges.length;
3493                                                for (int j = 0; j < rangesLen; j++) {
3494                                                        tableRange.add(Integer.parseInt(ranges[j].trim()));
3495                                                }
3496                                        } else {
3497                                                tableRange.add(Integer.parseInt(tableRangOriginValue.trim()));
3498                                        }
3499                                        shortCp.put("TableRange", tableRange);
3500                                        shortCp.putAndReturnSlef("DataSourceReader", dmDsReaderKey);
3501                                        shortCp.putAndReturnSlef("DataSourceWriter", dmDsWriterKey);
3502                                        shortCp.putAndReturnSlef("Priority", priority);
3503                                        shortCp.putAndReturnSlef("UsingCount", 0);
3504                                        shortCp.putAndReturnSlef("Name", dm);
3505                                        int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size());
3506                                        shortCp.putAndReturnSlef("Table", tableRange.get(randomIndex));
3507                                        DATA_MAPPING.put(shortKey, shortCp);
3508                                        dmList.add(dm);
3509                                        DATA_MAPPING_LIST.put(rf.dsKey, dmList);
3510                                }
3511                        }
3512                }
3513
3514        }
3515
3516        protected static synchronized void calculateDataMappingDataSourceReader(RemoteFile rf,
3517                        Map<String, Object> rfRecord) {
3518                rf.dataMappingDs = (String) rfRecord.get("file_part_data_ds");
3519                rf.dataMappingTable = (Integer) rfRecord.get("file_part_data_table");
3520                String shortKey = String.format("%s.%s", rf.dsKey, rf.dataMappingDs);
3521                ConfigProperties cp = DATA_MAPPING.get(shortKey);
3522                if (cp != null) {
3523                        String dmDsReaderKey = cp.getString("DataSourceReader");
3524                        rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey);
3525                }
3526        }
3527
3528        protected static synchronized void calculateDataMappingDataSourceWriter(RemoteFile rf) {
3529                if (DATA_MAPPING_INDEX.get(rf.dsKey) >= DATA_MAPPING_LIST.size() - 1) {
3530                        DATA_MAPPING_INDEX.put(rf.dsKey, 0);
3531                }
3532                ConfigProperties cp = getDataMapping(rf, DATA_MAPPING_INDEX.get(rf.dsKey));
3533                List<Integer> tableRange = (List<Integer>) cp.get("TableRange");
3534                Integer priority = cp.getInteger("Priority", 0);
3535                Integer usingCount = cp.getInteger("UsingCount", 0);
3536                String key = String.format("%s.%s", rf.dsKey, cp.getString("Name"));
3537                if (usingCount < priority) {
3538                        rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceReader"));
3539                        rf.dataMappingDataSourceWriter = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceWriter"));
3540                        rf.dataMappingDs = cp.getString("Name");
3541                        rf.dataMappingTable = cp.getInteger("Table");
3542                        usingCount++;
3543                        if (usingCount >= priority) {
3544                                DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1);
3545                                cp.putAndReturnSlef("UsingCount", 0);
3546                                int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size());
3547                                cp.putAndReturnSlef("Table", tableRange.get(randomIndex));
3548                                DATA_MAPPING.put(key, cp);
3549                        } else {
3550                                cp.putAndReturnSlef("UsingCount", usingCount);
3551                                int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size());
3552                                cp.putAndReturnSlef("Table", tableRange.get(randomIndex));
3553                                DATA_MAPPING.put(key, cp);
3554                        }
3555                } else {
3556                        DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1);
3557                        calculateDataMappingDataSourceWriter(rf);
3558                }
3559        }
3560
3561        protected static synchronized ConfigProperties getDataMapping(RemoteFile rf, int index) {
3562                List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey);
3563                String dm = dmList.get(index);
3564                String key = String.format("%s.%s", rf.dsKey, dm);
3565                return DATA_MAPPING.get(key);
3566        }
3567
3568        protected String getDataMappingTableName() {
3569                return String.format("%s_data_%s", this.tableName, this.dataMappingTable);
3570        }
3571        
3572        private static void debugLog(String point,RemoteFile rf,CacheArrayFilter filter){
3573                debugLog(RemoteFile.class,point,rf,filter);
3574        }       
3575
3576}