001/*******************************************************************************
002The MIT License (MIT)
003
004Copyright (c) 2024 KILLCODING.COM
005
006Permission is hereby granted, free of charge, to any person obtaining a copy
007of this software and associated documentation files (the "Software"), to deal
008in the Software without restriction, including without limitation the rights
009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
010copies of the Software, and to permit persons to whom the Software is
011furnished to do so, subject to the following conditions:
012
013The above copyright notice and this permission notice shall be included in
014all copies or substantial portions of the Software.
015
016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
022THE SOFTWARE.
023*****************************************************************************/
024package com.killcoding.file;
025
026import java.io.File;
027import com.killcoding.datasource.DriverDataSource;
028import com.killcoding.tool.ConfigProperties;
029import com.killcoding.tool.CommonTools;
030import com.killcoding.log.Logger;
031import com.killcoding.log.LoggerFactory;
032import java.io.FileInputStream;
033import java.io.IOException;
034import java.sql.Connection;
035import java.sql.SQLException;
036import com.killcoding.datasource.CacheDriverExecutor;
037import java.util.Map;
038import java.util.HashMap;
039import com.killcoding.datasource.Clock;
040import java.sql.Timestamp;
041import java.nio.file.Files;
042import java.util.concurrent.ConcurrentHashMap;
043import java.util.List;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.io.ByteArrayOutputStream;
047import java.sql.Blob;
048import java.nio.file.Paths;
049import java.nio.file.Path;
050import java.net.URI;
051import java.io.ByteArrayInputStream;
052import java.nio.ByteBuffer;
053import java.util.concurrent.ExecutorService;
054import java.util.concurrent.Executors;
055import java.util.concurrent.Future;
056import java.nio.file.LinkOption;
057import javax.crypto.Cipher;
058import javax.crypto.CipherInputStream;
059import javax.crypto.CipherOutputStream;
060import javax.crypto.KeyGenerator;
061import javax.crypto.SecretKey;
062import javax.crypto.SecretKeyFactory;
063import javax.crypto.spec.DESKeySpec;
064import javax.crypto.spec.IvParameterSpec;
065import javax.crypto.spec.SecretKeySpec;
066import com.killcoding.file.DiskFile;
067import java.util.Collection;
068import com.killcoding.cache.CacheArray;
069import com.killcoding.cache.CacheArrayFilter;
070import java.nio.file.attribute.BasicFileAttributes;
071import java.util.concurrent.Callable;
072import com.killcoding.datasource.IdentityCard;
073import java.util.Collections;
074import java.util.concurrent.ThreadLocalRandom;
075import com.killcoding.tool.CipherTools;
076import com.killcoding.cache.Cache;
077import java.util.Date;
078
079public class RemoteFile extends BaseFile implements RemoteFileSQL {
080
081        private static final Map<String, DriverDataSource> DATASOURCE_REMOTE_FILE = new ConcurrentHashMap<String, DriverDataSource>();
082
083        private static final Map<String, ConfigProperties> CONFIG_PROPS_REMOTE_FILE = new ConcurrentHashMap<String, ConfigProperties>();
084
085        private static final Map<String, Long> QUEUE_PATH_MAPPING = new ConcurrentHashMap<String, Long>();
086
087        private static final Map<String, ConfigProperties> DATA_MAPPING = new ConcurrentHashMap<String, ConfigProperties>();
088
089        private static final Map<String, List<String>> DATA_MAPPING_LIST = new ConcurrentHashMap<String, List<String>>();
090
091        private static final Map<String, Integer> DATA_MAPPING_INDEX = new ConcurrentHashMap<String, Integer>();
092
093    protected static boolean FIRST_LOADED = false;
094    
095    public static Integer MAX_POOL_SIZE = 100;
096        public static Double MAX_CONNECT_USAGE = 0.5D;
097        public static Double MAX_MEMORY_USAGE = 0.8D;
098        public static Integer MAX_QUEUE_SIZE = 10;
099        public static Long MAX_FILE_SZIE = -1L;
100        public static boolean SHOW_LINK = false;
101        public static boolean SHOW_HIDDEN = false;
102        public static boolean COPY_STRUCTURE_ONLY = false;
103        public static Integer MAX_FILE_PARENT_PATH_LENGTH = 200;
104        public static Integer MAX_FILE_NAME_LENGTH = 200;
105        public static Integer CACHE_SECONDS = 5;
106        public static Date SYNC_CUTOFF_TIME = null;
107        public static Date DELETE_CUTOFF_TIME = null;
108
109        private static IdentityCard identityCard = null;
110        private static File appRemoteFile = null;
111        private DriverDataSource dataSourceReader = null;
112        private DriverDataSource dataSourceWriter = null;
113        private String dataSourceReaderPath = null;
114        private String dataSourceWriterPath = null;
115        private String modifyUserId = null;
116        private Integer filePartSize = null;
117        private Boolean identityCardEnable = false;
118        private String identityCardName = null;
119        private String identityCardPrefix = null;
120        private Integer identityCardSuffix = 0;
121        private Timestamp modifyTime = null;
122
123    protected ConfigProperties configProperties = null;
124    protected String fileId = null;
125    protected String tableName = null;
126        protected Integer dataMappingTable = null;
127        protected String dataMappingDs = null;
128        protected DriverDataSource dataMappingDataSourceReader = null;
129        protected DriverDataSource dataMappingDataSourceWriter = null;
130        protected String dsKey = null;
131
132        public static Long BEFORE_THE_QUEUE_TIME = 60000L;
133        public static Integer BATCH_SIZE = 10;
134        public static String DEFAULE_MODIFY_USER_ID = null;
135
136        private Logger log = null;
137
138        private static ExecutorService mergePool = null;
139
140        public RemoteFile(String path) {
141                super(path);
142                log = LoggerFactory.getLogger(getClass());
143                fileId = CommonTools.generateId(16);
144                initGlobal();
145        }
146
147        public static synchronized void initPool(int poolSize) {
148                if (mergePool == null) {
149                        MAX_POOL_SIZE = poolSize;
150                        LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE);
151                        mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
152                }
153        }
154
155        private synchronized void init() {
156                dsKey = CommonTools.md5(this.appRemoteFile.getAbsolutePath());
157                dataSourceReader = DATASOURCE_REMOTE_FILE.get(String.format("%s.reader", dsKey));
158                dataSourceWriter = DATASOURCE_REMOTE_FILE.get(String.format("%s.writer", dsKey));
159                configProperties = CONFIG_PROPS_REMOTE_FILE.get(dsKey);
160                if (dataSourceReader == null && dataSourceWriter == null) {
161                        try {
162                                if (!appRemoteFile.exists())
163                                        throw new IOException(String.format("Not exist '%s'.", appRemoteFile.getAbsolutePath()));
164
165                                configProperties = new ConfigProperties();
166                                Runnable updatedRun = new Runnable() {
167                                        @Override
168                                        public void run() {
169                                                try {
170                                                        reload();
171                                                        LoggerFactory.getLogger(RemoteFile.class).mark("RemoteFile Reloaded");
172                                                } catch (Exception e) {
173                                                        log.error(e.getMessage(), e);
174                                                }
175                                        }
176                                };
177                                configProperties.load(appRemoteFile, updatedRun);
178
179                                CONFIG_PROPS_REMOTE_FILE.put(dsKey, configProperties);
180
181                                String dataSourcePath = configProperties.getString("DataSource");
182                                dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath);
183                                dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath);
184                                reload();
185                        } catch (Exception e) {
186                                log.error(e.getMessage(), e);
187                        }
188                }
189                if (configProperties != null) {
190                        String dataSourcePath = configProperties.getString("DataSource");
191                        dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath);
192                        dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath);
193                        reload();
194                }
195                calculateDataMappingDataSourceWriter(this);
196        }
197
198        private synchronized void initGlobal() {
199            if(this.appRemoteFile == null){
200                this.appRemoteFile = CommonTools.getSystemProperties(RemoteFile.class, "APP_REMOTE_FILE",
201                                        "RemoteFile.properties");
202            }
203                init();
204        }
205
206        private synchronized void reload() {
207                try {
208                        LOGIC_TIMEOUT_MS = configProperties.getMilliSeconds("LogicTimeout", 300000L);
209                        LOGIC_CHECK_TIMEOUT_MS = configProperties.getMilliSeconds("LogicCheckTimeout", 60000L);
210                        MAX_FILE_SZIE = configProperties.getFileSize("MaxFileSize", 1024 * 1024 * 1L);
211                        SHOW_HIDDEN = configProperties.getBoolean("ShowHidden", false);
212                        SHOW_LINK = configProperties.getBoolean("ShowLink", false);
213                        COPY_STRUCTURE_ONLY = configProperties.getBoolean("CopyStructureOnly", false);
214                        MAX_QUEUE_SIZE = configProperties.getInteger("MaxQueueSize", 10);
215                        BATCH_SIZE = configProperties.getInteger("BatchSize", 10);
216                        MAX_CONNECT_USAGE = configProperties.getDouble("MaxConnectUsage", 0.5D);
217                        MAX_MEMORY_USAGE = configProperties.getDouble("MaxMemoryUsage", 0.8D);
218                        BEFORE_THE_QUEUE_TIME = configProperties.getMilliSeconds("BeforeTheQueueTime", 15000L);
219                        MAX_FILE_PARENT_PATH_LENGTH = configProperties.getInteger("MaxFileParentPathLength", 200);
220                        MAX_FILE_NAME_LENGTH = configProperties.getInteger("MaxFileNameLength", 200);
221                        CACHE_SECONDS = configProperties.getInteger("CacheSeconds", 5);
222                        
223                        SYNC_CUTOFF_TIME = configProperties.getDateTime("SyncCutoffTime","0000-01-01 00:00:00.0000");
224                        DELETE_CUTOFF_TIME = configProperties.getDateTime("DeleteCutoffTime","0000-01-01 00:00:00.0000");
225                        
226                        if(BEFORE_THE_QUEUE_TIME < 0) BEFORE_THE_QUEUE_TIME = 15000L;
227                        
228                        if(MAX_CONNECT_USAGE < 0) MAX_CONNECT_USAGE = 0D;
229                        
230                        if(MAX_MEMORY_USAGE < 0) MAX_MEMORY_USAGE = 0D;
231                        
232                        if(BATCH_SIZE <= 0) BATCH_SIZE = 10;
233                        
234                        if(MAX_QUEUE_SIZE <= 0) MAX_QUEUE_SIZE = 10;
235                        
236                        if(MAX_FILE_SZIE <= 0) MAX_FILE_SZIE = 1024 * 1024 * 1L;
237                        
238                        if(LOGIC_TIMEOUT_MS <= 0) LOGIC_TIMEOUT_MS = 900000L;
239                        
240                        if(LOGIC_CHECK_TIMEOUT_MS < 0) LOGIC_CHECK_TIMEOUT_MS = 60000L;
241                        
242                        if(MAX_FILE_PARENT_PATH_LENGTH <= 0) MAX_FILE_PARENT_PATH_LENGTH = 200;
243                        
244                        if(MAX_FILE_NAME_LENGTH <= 0) MAX_FILE_NAME_LENGTH = 200;
245                        
246                        if(CACHE_SECONDS < 0) CACHE_SECONDS = 0;
247
248                        log.debug("LOGIC_TIMEOUT_MS={}", LOGIC_TIMEOUT_MS);
249                        log.debug("LOGIC_CHECK_TIMEOUT_MS={}", LOGIC_CHECK_TIMEOUT_MS);
250                        log.debug("MAX_FILE_SZIE={}", MAX_FILE_SZIE);
251                        log.debug("SHOW_HIDDEN={}", SHOW_HIDDEN);
252                        log.debug("SHOW_LINK={}", SHOW_LINK);
253                        log.debug("COPY_STRUCTURE_ONLY={}", COPY_STRUCTURE_ONLY);
254                        log.debug("MAX_QUEUE_SIZE={}", MAX_QUEUE_SIZE);
255                        log.debug("BATCH_SIZE={}", BATCH_SIZE);
256                        log.debug("MAX_CONNECT_USAGE={}", MAX_CONNECT_USAGE);
257                        log.debug("MAX_MEMORY_USAGE={}", MAX_MEMORY_USAGE);
258                        log.debug("MAX_FILE_PARENT_PATH_LENGTH={}", MAX_FILE_PARENT_PATH_LENGTH);
259                        log.debug("MAX_FILE_NAME_LENGTH={}", MAX_FILE_NAME_LENGTH);
260                        log.debug("CACHE_SECONDS={}", CACHE_SECONDS);
261
262                        String dataSourcePath = configProperties.getString("DataSource");
263                        String reloadDataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath);
264                        String reloadDataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath);
265                        filePartSize = configProperties.getFileSize("FilePartSize", 1024 * 1024 * 1L).intValue();
266                        
267                        if(filePartSize <= 0) filePartSize = 1024 * 1024 * 1;
268                        
269                        tableName = configProperties.getString("TableName", "remote_file");
270                        identityCardEnable = configProperties.getBoolean("IdentityCardEnable", false);
271                        identityCardName = configProperties.getString("IdentityCardName", "RemoteFile");
272                        identityCardPrefix = configProperties.getString("IdentityCardPrefix", "RMF");
273                        identityCardSuffix = configProperties.getInteger("IdentityCardSuffix", 0);
274
275                        String readerDsKey = String.format("%s.reader", dsKey);
276                        String writerDsKey = String.format("%s.writer", dsKey);
277                        /**
278                        May be referenced when expansion is needed
279                        if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) {
280                                DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(readerDsKey);
281                                if (currentDs != null) {
282                                        currentDs.closeAll();
283                        }
284                        **/
285                        if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) {
286                                dataSourceReaderPath = reloadDataSourceReaderPath;
287                                dataSourceReader = new DriverDataSource(new File(dataSourceReaderPath));
288                                DATASOURCE_REMOTE_FILE.put(readerDsKey, dataSourceReader);
289                        }
290                        /**
291                        May be referenced when expansion is needed
292                        if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) {
293                                DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(writerDsKey);
294                                if (currentDs != null) {
295                                        currentDs.closeAll();
296                        }
297                        **/
298                        if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) {
299                                dataSourceWriterPath = reloadDataSourceWriterPath;
300                                dataSourceWriter = new DriverDataSource(new File(dataSourceWriterPath));
301                                DATASOURCE_REMOTE_FILE.put(writerDsKey, dataSourceWriter);
302                        }
303
304                        loadDataMapping(this);
305                        
306                        FIRST_LOADED = true;
307                } catch (Exception e) {
308                        log.error(e.getMessage(), e);
309                }
310        }
311
312        public String formatSql(String sql) {
313                return String.format(sql, tableName);
314        }
315
316        public String formatSqlForFilePartData(String sql) {
317                return String.format(sql, getDataMappingTableName());
318        }
319
320        public DiskFile writeToDisk(DiskFile diskFile) throws IOException {
321                if (!exists()) {
322                        log.mark(String.format("The remote file '%s' does not exist.", this.getPath()));
323                }
324                RemoteFile the = this;
325
326                Timestamp rfModifyTime = the.getModifyTime();
327                if (rfModifyTime != null) {
328                        diskFile.setModifyTimeMsFromClock(rfModifyTime.getTime());
329                }
330                if (isLink()) {
331                        if (isDebug(the)) {
332                                LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", diskFile.getPath());
333                        }
334                        diskFile.createLink(new String(readAllBytes(),BaseFile.CHARSET));
335                } else if (isFile()) {
336                        merge(new FilePart() {
337                                final List<byte[]> partDataList = new ArrayList<byte[]>();
338                                boolean isWritingDf = false;
339                                DiskFile realDf = null;
340
341                                @Override
342                                protected void process(int partIndex, byte[] data) throws IOException {
343                                        Thread.currentThread()
344                                                        .setName(String.format("RemoteFile-writeToDisk-merge-%s", diskFile.getOrigin().getName()));
345                                        if (syncRoot != null) {
346                                                if (!diskFile.isLogicModify()) {
347                                                        throw new IOException(
348                                                                        String.format("The file '%s' missing modify logic lock.", the.getPath()));
349                                                }
350                                        }
351                                        if (diskFile != null)
352                                                diskFile.logicModify();
353                                        try {
354                                                String fileName = diskFile.getOrigin().getName();
355                                                isWritingDf = fileName.matches(TMP_WRITING_SWP);
356                                                if (isWritingDf) {
357                                                        if (partIndex == 0) {
358                                                                if (diskFile.exists()) {
359                                                                        diskFile.delete();
360                                                                }
361                                                                if (isDebug(the)) {
362                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}",
363                                                                                        diskFile.getPath());
364                                                                }
365                                                                String realFileName = fileName.replaceFirst(BaseFile.TMP_WRITING_FOR_REPLACE, "");
366                                                                String realDfPath = String.format("%s/%s", diskFile.getParent(), realFileName);
367                                                                realDf = new DiskFile(realDfPath);
368                                                                realDf.copyAttrs(realDf, diskFile.isCopyStructureOnly(), diskFile.syncRoot);
369                                                                if(!diskFile.manualOpen(true)){
370                                                                        throw new IOException(
371                                                                                        String.format("The disk file '%s' cannot open.", diskFile.getPath()));  
372                                                                }
373                                                        }
374                                                        partDataList.add(decrypt(data));
375                                                        if (partDataList.size() >= BATCH_SIZE) {
376
377                                                                if (realDf != null)
378                                                                        realDf.logicModify();
379
380                                                                ByteArrayOutputStream ops = new ByteArrayOutputStream();
381                                                                int size = partDataList.size();
382                                                                for (int i = 0; i < size; i++) {
383                                                                        byte[] partData = partDataList.get(i);
384                                                                        ops.write(partData);
385                                                                }
386                                                                diskFile.manualWrite(ops.toByteArray());
387                                                                partDataList.clear();
388                                                        }
389                                                } else {
390                                                        if (partIndex == 0) {
391                                                                if (diskFile.exists()) {
392                                                                        throw new IOException(
393                                                                                        String.format("The remote file '%s' already exist.", diskFile.getPath()));
394                                                                }
395                                                                if (isDebug(the)) {
396                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}",
397                                                                                        diskFile.getPath());
398                                                                }
399                                                                if(!diskFile.manualOpen(true)){
400                                                                        throw new IOException(
401                                                                                        String.format("The disk file '%s' cannot open.", diskFile.getPath()));
402                                                                }
403                                                        }
404                                                        partDataList.add(decrypt(data));
405                                                        if (partDataList.size() >= BATCH_SIZE) {
406                                                                ByteArrayOutputStream ops = new ByteArrayOutputStream();
407                                                                int size = partDataList.size();
408                                                                for (int i = 0; i < size; i++) {
409                                                                        byte[] partData = partDataList.get(i);
410                                                                        ops.write(partData);
411                                                                }
412                                                                diskFile.manualWrite(ops.toByteArray());
413                                                                partDataList.clear();
414                                                        }
415                                                }
416
417                                        } catch (Exception e) {
418                                                log.warn(e.getMessage(), e);
419                                                if (diskFile != null) {
420                                                        diskFile.manualClose();
421                                                        diskFile.delete();
422                                                }
423                                                throw new IOException(e.getMessage(), e);
424                                        }
425                                }
426
427                                @Override
428                                protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) throws IOException {
429                                        if (partDataList.size() > 0) {
430                                                ByteArrayOutputStream ops = new ByteArrayOutputStream();
431                                                int size = partDataList.size();
432                                                for (int i = 0; i < size; i++) {
433                                                        byte[] partData = partDataList.get(i);
434                                                        ops.write(partData);
435                                                }
436                                                diskFile.manualWrite(ops.toByteArray());
437                                                partDataList.clear();
438                                        }
439                                        diskFile.manualClose();
440                                        if (syncRoot != null && !diskFile.isLogicModify()) {
441                                                diskFile.delete();
442                                                throw new IOException(
443                                                                String.format("The file '%s' missing modify logic lock.", diskFile.getPath()));
444                                        }
445                                        try {
446                                                if (isWritingDf) {
447                                                        if (diskFile.exists()) {
448                                                                diskFile.moveTo(realDf.getPath());
449                                                                if (isDebug(the)) {
450                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CompletedDiskFile - {}",
451                                                                                        realDf.getPath());
452                                                                }
453                                                        }
454                                                        if (realDf.exists())
455                                                                the.syncedToDisk();
456                                                }
457                                                if (realDf == null) {
458                                                        if (diskFile != null) {
459                                                                if (diskFile.getOrigin().getName().startsWith(".")) {
460                                                                        Files.setAttribute(Paths.get(diskFile.getPath()), "dos:hidden", true);
461                                                                }
462                                                        }
463                                                } else {
464                                                        if (realDf.getOrigin().getName().startsWith(".")) {
465                                                                Files.setAttribute(Paths.get(realDf.getPath()), "dos:hidden", true);
466                                                        }
467                                                }
468
469                                                RemoteFile.removeQueuePathMapping(the.getPath());
470
471                                                if (diskFile != null)
472                                                        diskFile.removeLogicModify();
473
474                                                if (realDf != null)
475                                                        realDf.removeLogicModify();
476
477                                        } catch (Exception e) {
478                                                log.error(e.getMessage(), e);
479                                        }
480                                }
481
482                                @Override
483                                protected void ended(int lastPartIndex, long fileSize) {
484                                        try {
485                                                if (diskFile != null) {
486                                                        diskFile.manualClose();
487                                                }
488                                        } catch (Exception e) {
489                                                log.error(e.getMessage(), e);
490                                        }
491                                }
492                        });
493                } else {
494                        List<Map<String, Object>> writeList = list(getPath(), "%");
495                        writeList.addAll(list(parsePath(getPath() + "/%"), "%"));
496
497                        int size = writeList.size();
498                        if (size > 0) {
499                                if (!diskFile.exists()) {
500                                        if (isDebug(the)) {
501                                                log.mark("CreateDiskDir - {} ", diskFile.getPath());
502                                        }
503                                        diskFile.mkdirs();
504                                }
505                        }
506
507                        for (Map<String, Object> item : writeList) {
508                                RemoteFile rf = new RemoteFile(
509                                                String.format("%s/%s", item.get("file_parent_path"), item.get("file_name")));
510
511                                if (rf.getPath().equals(getPath()))
512                                        continue;
513
514                                String dfPath = rf.getPath().replaceFirst(getPath(), diskFile.getPath());
515                                DiskFile df = new DiskFile(dfPath);
516                                df.copyAttrs(df, false, this.syncRoot);
517                                Timestamp mt = rf.getModifyTime();
518                                if (mt != null) {
519                                        df.setModifyTimeMsFromClock(mt.getTime());
520                                }
521                                if (rf.isDir()) {
522                                        if (!df.exists()) {
523                                                if (isDebug(the)) {
524                                                        log.mark("CreateDiskDir - {}", diskFile.getPath());
525                                                }
526                                                df.mkdirs();
527                                        }
528                                } else {
529                                        rf.writeToDisk(df);
530                                }
531                        }
532                }
533                return diskFile;
534        }
535
536        public DiskFile writeToDisk() throws IOException {
537                DiskFile df = new DiskFile(this.origin.getAbsolutePath());
538                df.copyAttrs(df, false, this.syncRoot);
539                return writeToDisk(df);
540        }
541
542        public DiskFile writeToDisk(String path) throws IOException {
543                DiskFile df = new DiskFile(path);
544                df.copyAttrs(df, false, this.syncRoot);
545                return writeToDisk(df);
546        }
547
548        public boolean isCompleted() throws IOException {
549                return getCompletedRecord() != null;
550        }
551
552        public Map<String, Object> getCompletedRecord() throws IOException {
553                String sql = formatSql(SQL_SELECT_FOR_COMPLETED);
554                Map dataRecord = new HashMap<String, Object>();
555                dataRecord.put("file_name", origin.getName());
556                dataRecord.put("file_parent_path", getParent());
557                String cacheKey = String.format("first-%s-%s",sql,dataRecord);
558                Map<String,Object> record = (Map<String,Object>)Cache.get(cacheKey);
559                if(record == null){
560                        CacheDriverExecutor executor = null;
561                        Connection conn = null;
562                        try {
563                                conn = dataSourceReader.getConnection();
564                                conn.setAutoCommit(true);
565                                executor = new CacheDriverExecutor(conn);
566                                record = executor.first(sql, dataRecord);
567                                if(record != null){
568                                    Cache.set(cacheKey,record,CACHE_SECONDS);
569                                }
570                        } catch (SQLException e) {
571                                throw new IOException(e.getMessage(), e);
572                        } finally {
573                                try {
574                                        if (executor != null)
575                                                executor.close();
576                                } catch (Exception e) {
577                                        log.error(e.getMessage(), e);
578                                }
579                        }
580                }
581                return record;
582        }
583
584        public Map<String, Object> getRecordById(Object id) throws IOException {
585                String sql = formatSql(SQL_SELECT_GET_RECORD_BY_ID);
586                Map dataRecord = new HashMap<String, Object>();
587                dataRecord.put("id", id);
588                Map<String, Object> record = null;
589                if (record == null) {
590                        CacheDriverExecutor executor = null;
591                        Connection conn = null;
592                        try {
593                                conn = dataSourceReader.getConnection();
594                                conn.setAutoCommit(true);
595                                executor = new CacheDriverExecutor(conn);
596                                record = executor.first(sql, dataRecord);
597                        } catch (SQLException e) {
598                                throw new IOException(e.getMessage(), e);
599                        } finally {
600                                try {
601                                        if (executor != null)
602                                                executor.close();
603                                } catch (Exception e) {
604                                        log.error(e.getMessage(), e);
605                                }
606                        }
607                }
608                return record;
609        }
610
611        public Map<String, Object> getRecordByFileId(Object fileId) throws IOException {
612                String sql = formatSql(SQL_SELECT_GET_RECORD_BY_FILE_ID);
613                Map dataRecord = new HashMap<String, Object>();
614                dataRecord.put("file_id", fileId);
615                Map<String, Object> record = null;
616                if (record == null) {
617                        CacheDriverExecutor executor = null;
618                        Connection conn = null;
619                        try {
620                                conn = dataSourceReader.getConnection();
621                                conn.setAutoCommit(true);
622                                executor = new CacheDriverExecutor(conn);
623                                record = executor.first(sql, dataRecord);
624                        } catch (SQLException e) {
625                                throw new IOException(e.getMessage(), e);
626                        } finally {
627                                try {
628                                        if (executor != null)
629                                                executor.close();
630                                } catch (Exception e) {
631                                        log.error(e.getMessage(), e);
632                                }
633                        }
634                }
635                return record;
636        }
637
638        public Map<String, Object> getLastUpdateRecord() throws IOException {
639                String sql = formatSql(SQL_SELECT_FOR_LAST_UPDATE);
640                Map dataRecord = new HashMap<String, Object>();
641                dataRecord.put("file_name", origin.getName());
642                dataRecord.put("file_parent_path", getParent());
643                Map<String, Object> record = null;
644                CacheDriverExecutor executor = null;
645                Connection conn = null;
646                try {
647                        conn = dataSourceReader.getConnection();
648                        conn.setAutoCommit(true);
649                        executor = new CacheDriverExecutor(conn);
650                        record = executor.first(sql, dataRecord);
651                        return record;
652                } catch (SQLException e) {
653                        throw new IOException(e.getMessage(), e);
654                } finally {
655                        try {
656                                if (executor != null)
657                                        executor.close();
658                        } catch (Exception e) {
659                                log.error(e.getMessage(), e);
660                        }
661                }
662        }
663        
664        protected Timestamp getFirstCreatedAt() throws IOException {
665                String sql = formatSql(SQL_SELECT_FOR_FIRST_CREATED_AT);
666                Map dataRecord = new HashMap<String, Object>();
667                dataRecord.put("file_parent_path", getPath());
668                dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
669                dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%");
670                CacheDriverExecutor executor = null;
671                Connection conn = null;
672                try {
673                        conn = dataSourceReader.getConnection();
674                        conn.setAutoCommit(true);
675                        executor = new CacheDriverExecutor(conn);
676                        Map<String, Object> record = executor.first(sql, dataRecord);
677                        return record == null ? null : (Timestamp)record.get("created_at");
678                } catch (SQLException e) {
679                        throw new IOException(e.getMessage(), e);
680                } finally {
681                        try {
682                                if (executor != null)
683                                        executor.close();
684                        } catch (Exception e) {
685                                log.error(e.getMessage(), e);
686                        }
687                }
688        }       
689        
690        protected Integer getMaxFilePartDataTable() throws IOException {
691                String sql = formatSql(SQL_SELECT_FOR_MAX_DATA_TABLE);
692                Map dataRecord = new HashMap<String, Object>();
693                dataRecord.put("file_parent_path", getPath());
694                dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
695                CacheDriverExecutor executor = null;
696                Connection conn = null;
697                try {
698                        conn = dataSourceReader.getConnection();
699                        conn.setAutoCommit(true);
700                        executor = new CacheDriverExecutor(conn);
701                        Map<String, Object> record = executor.first(sql, dataRecord);
702                        return record == null ? 0 : Integer.parseInt(record.get("file_part_data_table")+"");
703                } catch (SQLException e) {
704                        throw new IOException(e.getMessage(), e);
705                } finally {
706                        try {
707                                if (executor != null)
708                                        executor.close();
709                        } catch (Exception e) {
710                                log.error(e.getMessage(), e);
711                        }
712                }
713        }       
714
715        private Map<String, Object> getSelectByPathForCheckExists() throws IOException {
716                String sql = formatSql(SQL_SELECT_BY_PATH_FOR_CHECK_EXISTS);
717                Map dataRecord = new HashMap<String, Object>();
718                dataRecord.put("file_name", origin.getName());
719                dataRecord.put("file_parent_path", getParent());
720                Map<String, Object> record = null;
721                if (record == null) {
722                        CacheDriverExecutor executor = null;
723                        Connection conn = null;
724                        try {
725                                conn = dataSourceReader.getConnection();
726                                conn.setAutoCommit(true);
727                                executor = new CacheDriverExecutor(conn);
728                                record = executor.first(sql, dataRecord);
729                                return record;
730                        } catch (SQLException e) {
731                                throw new IOException(e.getMessage(), e);
732                        } finally {
733                                try {
734                                        if (executor != null)
735                                                executor.close();
736                                } catch (Exception e) {
737                                        log.error(e.getMessage(), e);
738                                }
739                        }
740                }
741                return record;
742        }
743
744        private int countAllSubFiles(String fileParentPath, String fileName) throws IOException {
745                String sql = formatSql(SQL_SELECT_FOR_COUNT_ALL_SUB_FILES);
746                Map dataRecord = new HashMap<String, Object>();
747                dataRecord.put("file_name", fileName);
748                dataRecord.put("file_parent_path", fileParentPath);
749                Map<String, Object> record = null;
750                if (record == null) {
751                        CacheDriverExecutor executor = null;
752                        Connection conn = null;
753                        try {
754                                conn = dataSourceReader.getConnection();
755                                conn.setAutoCommit(true);
756                                executor = new CacheDriverExecutor(conn);
757                                record = executor.first(sql, dataRecord);
758                                return Integer.parseInt(record.get("as_count") + "");
759                        } catch (SQLException e) {
760                                throw new IOException(e.getMessage(), e);
761                        } finally {
762                                try {
763                                        if (executor != null)
764                                                executor.close();
765                                } catch (Exception e) {
766                                        log.error(e.getMessage(), e);
767                                }
768                        }
769                }
770                return -1;
771        }
772
773        private Map<String, Object> getFirstFilePart() throws IOException {
774                String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE);
775                Map dataRecord = new HashMap<String, Object>();
776                dataRecord.put("file_name", origin.getName());
777                dataRecord.put("file_parent_path", getParent());
778                String cacheKey = String.format("first-%s-%s",sql,dataRecord);
779                Map<String, Object> record = (Map<String,Object>)Cache.get(cacheKey);
780                if(record == null){
781                        CacheDriverExecutor executor = null;
782                        Connection conn = null;
783                        try {
784                                conn = dataSourceReader.getConnection();
785                                conn.setAutoCommit(true);
786                                executor = new CacheDriverExecutor(conn);
787                                record = executor.first(sql, dataRecord);
788                                if(record != null){
789                                    Cache.set(cacheKey,record,CACHE_SECONDS);
790                                }
791                        } catch (SQLException e) {
792                                throw new IOException(e.getMessage(), e);
793                        } finally {
794                                try {
795                                        if (executor != null)
796                                                executor.close();
797                                } catch (Exception e) {
798                                        log.error(e.getMessage(), e);
799                                }
800                        }
801                }
802                return record;
803        }
804        
805        private Map<String, Object> getFirstFilePartRefresh() throws IOException {
806                String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE);
807                Map dataRecord = new HashMap<String, Object>();
808                dataRecord.put("file_name", origin.getName());
809                dataRecord.put("file_parent_path", getParent());
810                String cacheKey = String.format("first-%s-%s",sql,dataRecord);
811                Cache.remove(cacheKey);
812                CacheDriverExecutor executor = null;
813                Connection conn = null;
814                try {
815                        conn = dataSourceReader.getConnection();
816                        conn.setAutoCommit(true);
817                        executor = new CacheDriverExecutor(conn);
818                        Map<String,Object> record =  executor.first(sql, dataRecord);
819                        if(record != null){
820                            Cache.set(cacheKey,record,CACHE_SECONDS);
821                        }
822                        return record;
823                } catch (SQLException e) {
824                        throw new IOException(e.getMessage(), e);
825                } finally {
826                        try {
827                                if (executor != null)
828                                        executor.close();
829                        } catch (Exception e) {
830                                log.error(e.getMessage(), e);
831                        }
832                }
833        }       
834
835        private Map<String, Object> getLastIndexRecord() throws IOException {
836                String sql = formatSql(SQL_SELECT_FOR_LAST_FILE_PART);
837                Map dataRecord = new HashMap<String, Object>();
838                dataRecord.put("file_name", origin.getName());
839                dataRecord.put("file_parent_path", getParent());
840                dataRecord.put("file_id", fileId);
841                Map<String, Object> record = null;
842                if (record == null) {
843                        CacheDriverExecutor executor = null;
844                        Connection conn = null;
845                        try {
846                                conn = dataSourceReader.getConnection();
847                                conn.setAutoCommit(true);
848                                executor = new CacheDriverExecutor(conn);
849                                record = executor.first(sql, dataRecord);
850                        } catch (SQLException e) {
851                                throw new IOException(e.getMessage(), e);
852                        } finally {
853                                try {
854                                        if (executor != null)
855                                                executor.close();
856                                } catch (Exception e) {
857                                        log.error(e.getMessage(), e);
858                                }
859                        }
860                }
861                return record;
862        }
863
864        public boolean isTimeout(long timeoutMs) throws IOException {
865                CacheDriverExecutor executor = null;
866                Connection conn = null;
867                try {
868                        Clock clock = new Clock();
869                        conn = dataSourceReader.getConnection();
870                        conn.setAutoCommit(true);
871                        executor = new CacheDriverExecutor(conn);
872                        Map dataRecord = new HashMap<String, Object>();
873                        dataRecord.put("file_name", origin.getName());
874                        dataRecord.put("file_parent_path", getParent());
875                        Map<String, Object> completedRecord = getCompletedRecord();
876
877                        if (completedRecord != null)
878                                return false;
879
880                        Map<String, Object> record = executor.first(formatSql(SQL_SELECT_FOR_TIMEOUT), dataRecord);
881
882                        if (record == null)
883                                return false;
884
885                        boolean completed = executor.first(formatSql(SQL_SELECT_CHECK_COMPLETED), record) != null;
886                        if (!completed) {
887                                boolean timeout = (clock.getTime() - ((Timestamp) record.get("updated_at")).getTime()) > timeoutMs;
888
889                                return timeout;
890                        }
891                        return false;
892                } catch (SQLException e) {
893                        throw new IOException(e.getMessage(), e);
894                } finally {
895                        try {
896                                if (executor != null)
897                                        executor.close();
898                        } catch (Exception e) {
899                                log.error(e.getMessage(), e);
900                        }
901                }
902        }
903
904        private Map<String, Object> readPart(Map<String, Object> rfRecord) throws IOException {
905                CacheDriverExecutor executor = null;
906                Connection conn = null;
907                try {
908                        calculateDataMappingDataSourceReader(this, rfRecord);
909                        if(this.dataMappingTable <= -1){
910                            return null;
911                        }
912                        Clock clock = new Clock();
913                        conn = dataMappingDataSourceReader.getConnection();
914                        conn.setAutoCommit(true);
915                        executor = new CacheDriverExecutor(conn);
916                        Map<String, Object> dataRecord = new HashMap<String, Object>();
917                        dataRecord.put("remote_file_id", rfRecord.get("id"));
918                        return executor.first(formatSqlForFilePartData(SQL_SELECT_BY_RF_ID_FROM_FILE_DATA), dataRecord);
919                } catch (SQLException e) {
920                        throw new IOException(e.getMessage(), e);
921                } finally {
922                        try {
923                                if (executor != null)
924                                        executor.close();
925                        } catch (Exception e) {
926                                log.error(e.getMessage(), e);
927                        }
928                }
929        }
930
931        public void merge(FilePart filePart) throws IOException {
932                if (isLink()) {
933                        throw new IOException(String.format("The file '%s' is a link.", origin.getAbsolutePath()));
934                }
935                if (isDir()) {
936                        throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath()));
937                }
938                CacheDriverExecutor executor = null;
939                Connection conn = null;
940                try {
941                        Clock clock = new Clock();
942                        conn = dataSourceReader.getConnection();
943                        conn.setAutoCommit(true);
944                        executor = new CacheDriverExecutor(conn);
945                        Map dataRecord = getCompletedRecord();
946                        if (dataRecord != null) {
947                                CacheArray rows = new CacheArray();
948                                CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
949                                        long fileSize = 0L;
950                                        Map<String, Object> fp = null;
951                                        Map<String, Object> item = null;
952
953                                        @Override
954                                        public void execute(Integer index, Object o) {
955
956                                                try {
957                                                        item = (Map<String, Object>) o;
958                                                        fp = readPart(item);
959                                                        Object fpData = null;
960                                                        
961                                                        if(fp == null){
962                                                            fpData = new byte[]{};
963                                                        }else{
964                                                            fpData = fp.get("file_part_data");
965                                                        }
966                                                        Thread.currentThread()
967                                                                        .setName(String.format("RemoteFile-merge-%s", item.get("file_name")));
968                                                        byte[] partData = null;
969                                                        if (fpData instanceof byte[]) {
970                                                                partData = (byte[]) fpData;
971                                                        }
972                                                        fileSize += partData.length;
973                                                        filePart.process(index, partData);
974                                                } catch (Exception e) {
975                                                        log.error(e.getMessage(), e);
976                                                        this.terminated = true;
977                                                }
978                                        }
979
980                                        @Override
981                                        public void completed(Integer size) {
982                                                try {
983                                                        if (item != null) {
984                                                                Thread.currentThread()
985                                                                                .setName(String.format("RemoteFile-merge-completed-%s", item.get("file_name")));
986                                                        }
987                                                        filePart.completed(size, new Clock().getSqlTimestamp(), fileSize);
988                                                } catch (IOException e) {
989                                                        log.error(e.getMessage(), e);
990                                                }
991                                        }
992
993                                        @Override
994                                        public void terminated() {
995                                                if (item != null) {
996                                                        Thread.currentThread()
997                                                                        .setName(String.format("RemoteFile-merge-trminated-%s", item.get("file_name")));
998                                                        log.error("Merge terminated: {}/{}", item.get("file_parent_path"), item.get("file_name"));
999                                                }
1000                                        }
1001                                };
1002                                rows.filter(filter);
1003                                executor.find(formatSql(SQL_SELECT_FILE_HAVE_DATA), dataRecord, rows);
1004                        }
1005                } catch (SQLException e) {
1006                        throw new IOException(e.getMessage(), e);
1007                } finally {
1008                        if (executor != null) {
1009                                try {
1010                                        executor.close();
1011                                } catch (Exception e) {
1012                                        throw new IOException(e.getMessage(), e);
1013                                }
1014                        }
1015                }
1016        }
1017
1018        protected void copyFrom(boolean copyStructureOnly, DiskFile diskFile) throws IOException {
1019                var the = this;
1020                if (diskFile.isLink()) {
1021                        the.setModifyTime(diskFile.getModifyTimeForClock());
1022                        the.createLink(diskFile.readLink());
1023                }
1024                if (diskFile.isDir()) {
1025                        the.setModifyTime(diskFile.getModifyTimeForClock());
1026                        the.mkdirs();
1027                }
1028                if (diskFile.isFile()) {
1029                        the.setModifyTime(diskFile.getModifyTimeForClock());
1030                        if (copyStructureOnly) {
1031                                the.createStructureFile(diskFile.size());
1032                                diskFile.removeLogicAccess();
1033                                DiskFile.removeQueuePathMapping(diskFile.getPath());                            
1034                        } else {
1035                                diskFile.split(filePartSize, new FilePart() {
1036                                        final List<byte[]> partDataList = new ArrayList<byte[]>();
1037                                        int currentFilePartStartIndex = 0;
1038
1039                                        @Override
1040                                        protected void process(int partIndex, byte[] data) throws IOException {
1041                                                if (partIndex == 0) {
1042                                                        if (diskFile.syncRoot != null) {
1043                                                                if (diskFile.isLogicModify()) {
1044                                                                        throw new IOException(
1045                                                                                        String.format("The file '%s' is already logic locked by another thread.",
1046                                                                                                        diskFile.getPath()));
1047                                                                }
1048                                                        }
1049                                                        if (the.exists(true)) {
1050                                                                throw new IOException(
1051                                                                                String.format("The remote file '%s' already exist.", the.getPath()));
1052                                                        }
1053                                                        if (isDebug(the)) {
1054                                                                LoggerFactory.getLogger(RemoteFile.class).mark("CreateRemoteFile - {}", the.getPath());
1055                                                        }
1056                                                }
1057
1058                                                partDataList.add(data);
1059                                                if (partDataList.size() >= BATCH_SIZE) {
1060                                                        diskFile.logicAccess();
1061                                                        if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){
1062                                                                throw new IOException(String.format("The remote file '%s' write failed.",the.getPath()));
1063                                                        }
1064                                                        partDataList.clear();
1065                                                        currentFilePartStartIndex += BATCH_SIZE;
1066                                                }
1067                                        }
1068
1069                                        @Override
1070                                        protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize)
1071                                                        throws IOException {
1072                                                try {
1073                                                        if (diskFile.syncRoot != null) {
1074                                                                if (diskFile.isLogicModify()) {
1075                                                                        throw new IOException(
1076                                                                                        String.format("The file '%s' is already logic locked by another thread.",
1077                                                                                                        diskFile.getPath()));
1078                                                                }
1079                                                        }
1080                                                        /**This is 0 bytes file**/
1081                                                        if (lastPartIndex == -1) {
1082
1083                                                                if (the.exists(true)) {
1084                                                                        the.forceDeleteFile(false);
1085                                                                }
1086
1087                                                                the.setModifyTime(modifyTime);
1088                                                                the.createEmptyFile();
1089                                                        } else {
1090                                                                the.setModifyTime(modifyTime);
1091                                                                if (partDataList.size() > 0) {
1092                                                                        if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){
1093                                                                            throw new IOException(String.format("The remote file '%s' write failed.",the.getPath()));
1094                                                                        }
1095                                                                        partDataList.clear();
1096                                                                }
1097                                                                the.complete(lastPartIndex, fileSize);
1098                                                        }
1099                                                        if (isDebug(the)) {
1100                                                                LoggerFactory.getLogger(RemoteFile.class).mark("CompletedRemoteFile - {}",
1101                                                                                the.getPath());
1102                                                        }
1103                                                        DiskFile.removeQueuePathMapping(diskFile.getPath());
1104                                                } catch (IOException e) {
1105                                                        throw e;
1106                                                }
1107                                        }
1108
1109                                        @Override
1110                                        protected void ended(int lastPartIndex, long fileSize) {
1111
1112                                        }
1113
1114                                });
1115                        }
1116                }
1117        }
1118
1119        private boolean createEmptyFile() throws IOException {
1120            boolean allowed = beforeWrite();
1121            
1122            if(!allowed) return false;
1123                
1124                Clock clock = new Clock();
1125                Timestamp currentTime = clock.getSqlTimestamp();
1126                CacheDriverExecutor executor = null;
1127                Connection conn = null;
1128                try {
1129                    calculateDataMappingDataSourceWriter(this);
1130                        conn = dataSourceWriter.getConnection();
1131                        conn.setAutoCommit(true);
1132                        executor = new CacheDriverExecutor(conn);
1133                        Map dataRecord = new HashMap<String, Object>();
1134                        dataRecord.put("id",
1135                                        generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix));
1136                        dataRecord.put("file_name", origin.getName());
1137                        dataRecord.put("file_parent_path", getParent());
1138                        dataRecord.put("file_type", "F");
1139                        dataRecord.put("file_part_index", 0);
1140                        dataRecord.put("file_part_size", 0);
1141                        dataRecord.put("file_part_data_ds", "NONE");
1142                        dataRecord.put("file_part_data_table", this.dataMappingTable);
1143                        dataRecord.put("file_part_last_index", 0);
1144                        dataRecord.put("file_size", 0);
1145                        dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1146                        dataRecord.put("file_deleted", 0);
1147                        dataRecord.put("created_user_id", getDefaultModifyUserId());
1148                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1149                        dataRecord.put("created_at", currentTime);
1150                        dataRecord.put("updated_at", currentTime);
1151                        dataRecord.put("file_id", fileId);
1152                        dataRecord.put("source_hostname", CommonTools.getHostname());
1153                        dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1154                        dataRecord.put("deleted_on_hostnames", ";");
1155                        executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord);
1156                        
1157                        afterWrite();
1158                        
1159                        return true;
1160                } catch (Exception e) {
1161                        throw new IOException(e.getMessage(), e);
1162                } finally {
1163                        try {
1164                                if (executor != null)
1165                                        executor.close();
1166                        } catch (Exception e) {
1167                                log.error(e.getMessage(), e);
1168                        }
1169                }
1170        }
1171
1172        private boolean createStructureFile(long fileSize) throws IOException {
1173            boolean allowed = beforeWrite();
1174            
1175            if(!allowed) return false;
1176            
1177                log.debug("CreateStructureFile: {}", getPath());
1178                Clock clock = new Clock();
1179                Timestamp currentTime = clock.getSqlTimestamp();
1180                CacheDriverExecutor executor = null;
1181                Connection conn = null;
1182                try {
1183                    calculateDataMappingDataSourceWriter(this);
1184                        conn = dataSourceWriter.getConnection();
1185                        conn.setAutoCommit(true);
1186                        executor = new CacheDriverExecutor(conn);
1187                        Map dataRecord = new HashMap<String, Object>();
1188                        dataRecord.put("id",
1189                                        generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix));
1190                        dataRecord.put("file_name", origin.getName());
1191                        dataRecord.put("file_parent_path", getParent());
1192                        dataRecord.put("file_type", "F");
1193                        dataRecord.put("file_part_index", 0);
1194                        dataRecord.put("file_part_size", fileSize);
1195                        dataRecord.put("file_part_data_ds", "NONE");
1196                        dataRecord.put("file_part_data_table", this.dataMappingTable);
1197                        dataRecord.put("file_part_last_index", 0);
1198                        dataRecord.put("file_size", fileSize);
1199                        dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1200                        dataRecord.put("file_deleted", 0);
1201                        dataRecord.put("created_user_id", getDefaultModifyUserId());
1202                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1203                        dataRecord.put("created_at", currentTime);
1204                        dataRecord.put("updated_at", currentTime);
1205                        dataRecord.put("file_id", fileId);
1206                        dataRecord.put("source_hostname", CommonTools.getHostname());
1207                        dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1208                        dataRecord.put("deleted_on_hostnames", ";");
1209                        executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord);
1210                        
1211                        afterWrite();
1212                        
1213                        return true;
1214                } catch (Exception e) {
1215                        throw new IOException(e.getMessage(), e);
1216                } finally {
1217                        try {
1218                                if (executor != null)
1219                                        executor.close();
1220                        } catch (Exception e) {
1221                                log.error(e.getMessage(), e);
1222                        }
1223                }
1224        }
1225
1226        protected boolean syncedToDisk() throws IOException {
1227                Clock clock = new Clock();
1228                CacheDriverExecutor executor = null;
1229                Connection conn = null;
1230                try {
1231                        conn = dataSourceWriter.getConnection();
1232                        conn.setAutoCommit(true);
1233                        executor = new CacheDriverExecutor(conn);
1234                        Map<String, Object> record = getCompletedRecord();
1235                        if (record == null) {
1236                                log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath()));
1237                        }
1238                        if (record != null) {
1239                            String hostname = CommonTools.getHostname();
1240                                String syncedOnHostnames = (String) record.get("synced_on_hostnames");
1241                                syncedOnHostnames = syncedOnHostnames.replaceAll(String.format(";%s;",hostname),";");
1242                                String shn = String.format("%s%s;", syncedOnHostnames, hostname);
1243                                Map<String, Object> dataRecord = new HashMap<String, Object>();
1244                            dataRecord.put("file_parent_path",record.get("file_parent_path"));
1245                            dataRecord.put("file_name", record.get("file_name"));
1246                                dataRecord.put("synced_on_hostnames",shn);
1247                                dataRecord.put("updated_at", clock.getSqlTimestamp());
1248                                dataRecord.put("updated_user_id", getDefaultModifyUserId());
1249                                executor.execute(formatSql(SQL_UPDATE_FOR_SYNCED_TO_DISK), dataRecord);
1250                                return true;
1251                        }
1252                        return false;
1253                } catch (SQLException e) {
1254                        throw new IOException(e.getMessage(), e);
1255                } finally {
1256                        try {
1257                                if (executor != null)
1258                                        executor.close();
1259                        } catch (Exception e) {
1260                                log.error(e.getMessage(), e);
1261                        }
1262                }
1263        }
1264
1265        protected boolean deletedToDiskAndSub(Object id) throws IOException {
1266                Clock clock = new Clock();
1267                CacheDriverExecutor executor = null;
1268                Connection conn = null;
1269                try {
1270                        conn = dataSourceWriter.getConnection();
1271                        conn.setAutoCommit(true);
1272                        executor = new CacheDriverExecutor(conn);
1273                        Map<String, Object> record = getRecordById(id);
1274                        if (record == null) {
1275                                log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath()));
1276                        }
1277                        if (record != null) {
1278                            String hostname = CommonTools.getHostname();
1279                                String deletedOnHostnames = (String) record.get("deleted_on_hostnames");
1280                                deletedOnHostnames = deletedOnHostnames.replaceAll(String.format(";%s;",hostname),";");
1281                                String dhn = String.format("%s%s;", deletedOnHostnames, hostname);
1282                                Timestamp now = clock.getSqlTimestamp();
1283                                final Map<String, Object> dataRecord = new HashMap<String, Object>();
1284                            dataRecord.put("file_parent_path", record.get("file_parent_path"));
1285                            dataRecord.put("file_name", record.get("file_name"));
1286                                dataRecord.put("deleted_on_hostnames", dhn);
1287                                dataRecord.put("updated_at", now);
1288                                dataRecord.put("updated_user_id", getDefaultModifyUserId());
1289                                executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK), dataRecord);
1290                                if (record.get("file_type").equals("D")) {
1291                                        String dirPath = String.format("%s/%s", record.get("file_parent_path"),
1292                                                        record.get("file_name"));
1293                                        dataRecord.put("file_parent_path", dirPath);
1294                                        dataRecord.put("like_file_parent_path", dirPath + "/%");
1295                                        executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK_SUB_FILES), dataRecord);
1296                                }
1297                                return true;
1298                        }
1299                        return false;
1300                } catch (SQLException e) {
1301                        throw new IOException(e.getMessage(), e);
1302                } finally {
1303                        try {
1304                                if (executor != null)
1305                                        executor.close();
1306                        } catch (Exception e) {
1307                                log.error(e.getMessage(), e);
1308                        }
1309                }
1310        }
1311
1312        private boolean complete(int filePartLastIndex, long fileSize) throws IOException {
1313                Clock clock = new Clock();
1314                CacheDriverExecutor executor = null;
1315                Timestamp currentTime = clock.getSqlTimestamp();
1316                Connection conn = null;
1317                try {
1318                        conn = dataSourceWriter.getConnection();
1319                        conn.setAutoCommit(true);
1320                        executor = new CacheDriverExecutor(conn);
1321                        Map dataRecord = new HashMap<String, Object>();
1322                        dataRecord.put("file_id", fileId);
1323                        dataRecord.put("file_part_last_index", filePartLastIndex);
1324                        dataRecord.put("file_size", fileSize);
1325                        dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1326                        dataRecord.put("updated_at", currentTime);
1327                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1328                        int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord);
1329                        
1330                        afterWrite();
1331                        
1332                        return rows > 0;
1333                } catch (SQLException e) {
1334                        throw new IOException(e.getMessage(), e);
1335                } finally {
1336                        try {
1337                                if (executor != null)
1338                                        executor.close();
1339                        } catch (Exception e) {
1340                                log.error(e.getMessage(), e);
1341                        }
1342                }
1343        }
1344
1345        @Override
1346        public boolean complete() throws IOException {
1347                Clock clock = new Clock();
1348                Timestamp currentTime = clock.getSqlTimestamp();
1349                CacheDriverExecutor executor = null;
1350                Connection conn = null;
1351                try {
1352                        conn = dataSourceWriter.getConnection();
1353                        conn.setAutoCommit(true);
1354                        executor = new CacheDriverExecutor(conn);
1355                        Map dataRecord = new HashMap<String, Object>();
1356                        dataRecord.put("file_name", origin.getName());
1357                        dataRecord.put("file_parent_path", getParent());
1358                        dataRecord.put("file_id", fileId);
1359
1360                        Map<String, Object> sumRecord = executor.first(formatSql(SQL_SELECT_FOR_SUM_FILE_PART_SIZE_INDEX),
1361                                        dataRecord);
1362
1363                        if (sumRecord == null) {
1364                                log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath()));
1365                        }
1366
1367                        if (sumRecord != null) {
1368                                long fileSize = Long.parseLong(sumRecord.get("file_part_size") + "");
1369                                long filePartLastIndex = Integer.parseInt(sumRecord.get("file_part_index") + "");
1370
1371                                dataRecord.put("file_part_last_index", filePartLastIndex);
1372                                dataRecord.put("file_size", fileSize);
1373                                dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1374                                dataRecord.put("updated_at", currentTime);
1375                                dataRecord.put("updated_user_id", getDefaultModifyUserId());
1376                                int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord);
1377                                
1378                                afterWrite();
1379                                
1380                                return rows > 0;
1381                        }
1382                        return false;
1383                } catch (SQLException e) {
1384                        throw new IOException(e.getMessage(), e);
1385                } finally {
1386                        try {
1387                                if (executor != null)
1388                                        executor.close();
1389                        } catch (Exception e) {
1390                                log.error(e.getMessage(), e);
1391                        }
1392                }
1393        }
1394
1395        public boolean forceDelete() throws IOException {
1396                return forceDeleteAll(true);
1397        }
1398
1399        public boolean forceDeleteDir(boolean realDeleted) throws IOException {
1400                return forceDelete(realDeleted, "D");
1401        }
1402
1403        public boolean forceDeleteLink(boolean realDeleted) throws IOException {
1404                return forceDelete(realDeleted, "L");
1405        }
1406
1407        public boolean forceDeleteFile(boolean realDeleted) throws IOException {
1408                return forceDelete(realDeleted, "F");
1409        }
1410
1411        public boolean forceDelete(boolean realDeleted, String fileType) throws IOException {
1412                if (fileType.equalsIgnoreCase("D")) {
1413                        return forceDeleteAll(realDeleted);
1414                } else {
1415                        boolean allowed = beforeDelete(realDeleted);
1416                        if (allowed) {
1417                                CacheDriverExecutor executor = null;
1418                                Connection conn = null;
1419                                try {
1420                                        Clock clock = new Clock();
1421                                        Timestamp currentTime = clock.getSqlTimestamp();
1422                                        conn = dataSourceWriter.getConnection();
1423                                        conn.setAutoCommit(true);
1424                                        executor = new CacheDriverExecutor(conn);
1425                                        Map dataRecord = new HashMap<String, Object>();
1426                                        dataRecord.put("file_deleted", realDeleted ? 1 : 0);
1427                                        dataRecord.put("deleted", realDeleted ? 0 : 1);
1428                                        dataRecord.put("file_name", origin.getName());
1429                                        dataRecord.put("file_type", fileType.toUpperCase());
1430                                        dataRecord.put("file_parent_path", getParent());
1431                                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1432                                        dataRecord.put("deleted_user_id", getDefaultModifyUserId());
1433                                        dataRecord.put("updated_at", currentTime);
1434                                        dataRecord.put("deleted_at", currentTime);
1435                                        dataRecord.put("root_folder", getPath());
1436                                        dataRecord.put("like_subfolder", parsePath(getPath() + "/%"));
1437                                        int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_FILE_LINK), dataRecord);
1438                                        log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows);
1439                    
1440                    afterDelete(realDeleted);
1441                    
1442                                        return resultRows > 0;
1443                                } catch (SQLException e) {
1444                                        throw new IOException(e.getMessage(), e);
1445                                } finally {
1446                                        try {
1447                                                if (executor != null)
1448                                                        executor.close();
1449                                        } catch (Exception e) {
1450                                                log.error(e.getMessage(), e);
1451                                        }
1452                                }
1453                        }
1454                }
1455                return false;
1456        }
1457
1458        public boolean forceDeleteAll(boolean realDeleted) throws IOException {
1459                boolean allowed = beforeDelete(realDeleted);
1460                if (allowed) {
1461                        CacheDriverExecutor executor = null;
1462                        Connection conn = null;
1463                        try {
1464                                Clock clock = new Clock();
1465                                Timestamp currentTime = clock.getSqlTimestamp();
1466                                conn = dataSourceWriter.getConnection();
1467                                conn.setAutoCommit(true);
1468                                executor = new CacheDriverExecutor(conn);
1469                                Map dataRecord = new HashMap<String, Object>();
1470                                dataRecord.put("file_deleted", realDeleted ? 1 : 0);
1471                                dataRecord.put("deleted", realDeleted ? 0 : 1);
1472                                dataRecord.put("file_name", origin.getName());
1473                                dataRecord.put("file_parent_path", getParent());
1474                                dataRecord.put("updated_user_id", getDefaultModifyUserId());
1475                                dataRecord.put("deleted_user_id", getDefaultModifyUserId());
1476                                dataRecord.put("updated_at", currentTime);
1477                                dataRecord.put("deleted_at", currentTime);
1478                                dataRecord.put("root_folder", getPath());
1479                                dataRecord.put("like_subfolder", parsePath(getPath() + "/%"));
1480                                int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_ALL), dataRecord);
1481                                log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows);
1482
1483                afterDelete(realDeleted);
1484                
1485                                return resultRows > 0;
1486                        } catch (SQLException e) {
1487                                throw new IOException(e.getMessage(), e);
1488                        } finally {
1489                                try {
1490                                        if (executor != null)
1491                                                executor.close();
1492                                } catch (Exception e) {
1493                                        log.error(e.getMessage(), e);
1494                                }
1495                        }
1496                }
1497                return false;
1498        }
1499
1500        @Override
1501        public boolean delete() throws IOException {
1502                return forceDeleteAll(true);
1503        }
1504
1505        public boolean delete(boolean realDeleted) throws IOException {
1506                return forceDeleteAll(realDeleted);
1507        }
1508
1509        @Override
1510        public boolean beforeDelete(boolean realDeleted) {
1511                return true;
1512        }
1513        
1514        @Override
1515        public void afterDelete(boolean realDeleted) {
1516            
1517        }       
1518        
1519        @Override
1520        public boolean beforeMkdirs() {
1521            return true;
1522        }
1523        
1524        @Override
1525        public void afterMkdirs() {
1526            
1527        }
1528        
1529        @Override
1530        public boolean beforeCreateLink(String target) {
1531            return true;
1532        }
1533        
1534        @Override
1535        public void afterCreateLink(String target) {
1536            
1537        }
1538        
1539        @Override
1540        public boolean beforeWrite(){
1541            return true;
1542        }
1543        
1544        @Override
1545        public void afterWrite(){
1546            
1547        }       
1548
1549        @Override
1550        public boolean exists() throws IOException {
1551                return getFirstFilePart() != null;
1552        }
1553        
1554        public boolean exists(boolean refresh) throws IOException {
1555            if(refresh){
1556                return getFirstFilePartRefresh() != null;
1557            }else{
1558                    return exists();
1559            }
1560        }       
1561
1562        @Override
1563        public boolean mkdirs() throws IOException {
1564                return mkdirs(false);
1565        }
1566
1567        public boolean mkdirs(boolean checkSourceHostname) throws IOException {
1568            
1569            boolean allowed = beforeMkdirs();
1570            
1571            if(!allowed) return false;
1572            
1573                Clock clock = new Clock();
1574                CacheDriverExecutor executor = null;
1575                Connection conn = null;
1576                try {
1577                        if (getFirstFilePart() != null) {
1578                                return false;
1579                        }
1580                        if (checkSourceHostname) {
1581                                String lastSourceHostname = getLastSourceHostname();
1582                                if (lastSourceHostname != null) {
1583                                        String clientHostname = CommonTools.getHostname();
1584                                        boolean sameHostname = clientHostname.equals(lastSourceHostname);
1585                                        if (!sameHostname) {
1586                                                return false;
1587                                        }
1588                                }
1589                        }
1590                        log.debug("Mkidrs: {}", getPath());
1591                        calculateDataMappingDataSourceWriter(this);
1592                        Timestamp currentTime = clock.getSqlTimestamp();
1593                        conn = dataSourceWriter.getConnection();
1594                        conn.setAutoCommit(true);
1595                        executor = new CacheDriverExecutor(conn);
1596                        Map dataRecord = new HashMap<String, Object>();
1597                        dataRecord.put("id",
1598                                        generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix));
1599                        dataRecord.put("file_name", origin.getName());
1600                        dataRecord.put("file_parent_path", getParent());
1601                        dataRecord.put("file_type", "D");
1602                        dataRecord.put("file_part_index", 0);
1603                        dataRecord.put("file_part_size", 0);
1604                        dataRecord.put("file_part_data_ds", "NONE");
1605                        dataRecord.put("file_part_data_table", this.dataMappingTable);
1606                        dataRecord.put("file_part_last_index", 0);
1607                        dataRecord.put("file_size", 0);
1608                        dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1609                        dataRecord.put("file_deleted", 0);
1610                        dataRecord.put("created_user_id", getDefaultModifyUserId());
1611                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1612                        dataRecord.put("created_at", currentTime);
1613                        dataRecord.put("updated_at", currentTime);
1614                        dataRecord.put("file_id", fileId);
1615                        dataRecord.put("source_hostname", CommonTools.getHostname());
1616                        dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1617                        dataRecord.put("deleted_on_hostnames", ";");
1618                        int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord);
1619                        
1620                        afterMkdirs();
1621                        
1622                        return rows > 0;
1623                } catch (Exception e) {
1624                        throw new IOException(e.getMessage(), e);
1625                } finally {
1626                        try {
1627                                if (executor != null)
1628                                        executor.close();
1629                        } catch (Exception e) {
1630                                log.error(e.getMessage(), e);
1631                        }
1632                }
1633        }
1634
1635        @Override
1636        public boolean write(byte[] data, boolean append) throws IOException {
1637                return writeTo(data, append);
1638        }
1639
1640        public synchronized boolean writeBlock(byte[] data, boolean append) throws IOException {
1641                return writeTo(data, append);
1642        }
1643
1644        private boolean writeToBatch(int currentFilePartStartIndex, List<byte[]> partDataList) throws IOException {
1645            
1646            boolean allowed = beforeWrite();
1647            
1648            if(!allowed) return false;
1649            
1650                CacheDriverExecutor executor = null;
1651                CacheDriverExecutor dmExecutor = null;
1652                List<Map<String, Object>> rfList = new ArrayList<Map<String, Object>>();
1653                List<Map<String, Object>> dmList = new ArrayList<Map<String, Object>>();
1654                try {
1655                        Clock clock = new Clock();
1656                        log.debug("Write: {}", getPath());
1657                        int size = partDataList.size();
1658                        Timestamp currentTime = clock.getSqlTimestamp();
1659                        String hostname = CommonTools.getHostname();
1660                        calculateDataMappingDataSourceWriter(this);
1661                        for (int filePartIndex = 0; filePartIndex < size; filePartIndex++) {
1662                                byte[] data = partDataList.get(filePartIndex);
1663                                String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix,
1664                                                identityCardSuffix);
1665                                Map rfRecord = new HashMap<String, Object>();
1666                                rfRecord.put("id", remoteFileId);
1667                                rfRecord.put("file_name", origin.getName());
1668                                rfRecord.put("file_parent_path", getParent());
1669                                rfRecord.put("file_type", "F");
1670                                rfRecord.put("file_part_index", filePartIndex + currentFilePartStartIndex);
1671                                rfRecord.put("file_part_size", data.length);
1672                                rfRecord.put("file_part_data_ds", this.dataMappingDs);
1673                                rfRecord.put("file_part_data_table", this.dataMappingTable);
1674                                rfRecord.put("file_part_last_index", -1);
1675                                rfRecord.put("file_size", 0);
1676                                rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1677                                rfRecord.put("file_deleted", 0);
1678                                rfRecord.put("created_user_id", getDefaultModifyUserId());
1679                                rfRecord.put("updated_user_id", getDefaultModifyUserId());
1680                                rfRecord.put("created_at", currentTime);
1681                                rfRecord.put("updated_at", currentTime);
1682                                rfRecord.put("file_id", fileId);
1683                                rfRecord.put("source_hostname", hostname);
1684                                rfRecord.put("synced_on_hostnames", String.format(";%s;", hostname));
1685                                rfRecord.put("deleted_on_hostnames", ";");
1686                                rfList.add(rfRecord);
1687
1688                                Map<String, Object> dataRecord = new HashMap<String, Object>();
1689                                dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this));
1690                                dataRecord.put("file_part_data", encrypt(data));
1691                                dataRecord.put("remote_file_id", remoteFileId);
1692                                dataRecord.put("file_id", fileId);
1693                                dataRecord.put("created_at", currentTime);
1694                                dataRecord.put("updated_at", currentTime);
1695                                dmList.add(dataRecord);
1696                        }
1697                        Connection conn = dataSourceWriter.getConnection();
1698                        conn.setAutoCommit(true);
1699                        executor = new CacheDriverExecutor(conn);
1700                        executor.executeBatch(formatSql(SQL_INSERT_INTO_FILE), rfList);
1701
1702                        conn = dataMappingDataSourceWriter.getConnection();
1703                        conn.setAutoCommit(true);
1704                        dmExecutor = new CacheDriverExecutor(conn);
1705                        dmExecutor.executeBatch(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dmList);
1706                        
1707                        return true;
1708                } catch (Exception e) {
1709                        throw new IOException(e.getMessage(), e);
1710                } finally {
1711                        try {
1712                                if (executor != null)
1713                                        executor.close();
1714                        } catch (Exception e) {
1715                                log.error(e.getMessage(), e);
1716                        }
1717                        try {
1718                                if (dmExecutor != null)
1719                                        dmExecutor.close();
1720                        } catch (Exception e) {
1721                                log.error(e.getMessage(), e);
1722                        }
1723                }
1724        }
1725
1726        private boolean writeTo(byte[] data, boolean append) throws IOException {
1727            
1728            boolean allowed = beforeWrite();
1729            
1730            if(!allowed) return false;
1731            
1732                CacheDriverExecutor executor = null;
1733                CacheDriverExecutor dmExecutor = null;
1734                try {
1735                        if (!append) {
1736                                if (exists())
1737                                        throw new IOException(
1738                                                        String.format("The remote file '%s' already exists.", origin.getAbsolutePath()));
1739                        }
1740                        Clock clock = new Clock();
1741                        Timestamp currentTime = clock.getSqlTimestamp();
1742                        Map<String, Object> lastRecord = getLastIndexRecord();
1743                        int filePartIndex = 0;
1744                        if (lastRecord != null) {
1745                                if (lastRecord.get("file_type").equals("D")) {
1746                                        throw new IOException(String.format("This is a folder '%s'.", origin.getAbsolutePath()));
1747                                }
1748                                if (lastRecord.get("file_type").equals("L")) {
1749                                        throw new IOException(String.format("This is a link '%s'.", origin.getAbsolutePath()));
1750                                }
1751                                if (append) {
1752                                        filePartIndex = Integer.parseInt(lastRecord.get("file_part_index") + "") + 1;
1753                                }
1754                                boolean isDeleted = false;
1755                                Object deletedValue = lastRecord.get("deleted");
1756                                if (deletedValue instanceof Boolean) {
1757                                        isDeleted = (Boolean) deletedValue;
1758                                } else {
1759                                        isDeleted = Integer.parseInt(deletedValue + "") == 1;
1760                                }
1761                                if (isDeleted) {
1762                                        throw new IOException(String.format("The file '%s' is deleted.", origin.getAbsolutePath()));
1763                                }
1764                        }
1765                        calculateDataMappingDataSourceWriter(this);
1766                        log.debug("Write: {}", getPath());
1767                        String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix,
1768                                        identityCardSuffix);
1769                        Map rfRecord = new HashMap<String, Object>();
1770                        rfRecord.put("id", remoteFileId);
1771                        rfRecord.put("file_name", origin.getName());
1772                        rfRecord.put("file_parent_path", getParent());
1773                        rfRecord.put("file_type", "F");
1774                        rfRecord.put("file_part_index", filePartIndex);
1775                        rfRecord.put("file_part_size", data.length);
1776                        rfRecord.put("file_part_data_ds", this.dataMappingDs);
1777                        rfRecord.put("file_part_data_table", this.dataMappingTable);
1778                        rfRecord.put("file_part_last_index", -1);
1779                        rfRecord.put("file_size", 0);
1780                        rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1781                        rfRecord.put("file_deleted", 0);
1782                        rfRecord.put("created_user_id", getDefaultModifyUserId());
1783                        rfRecord.put("updated_user_id", getDefaultModifyUserId());
1784                        rfRecord.put("created_at", currentTime);
1785                        rfRecord.put("updated_at", currentTime);
1786                        rfRecord.put("file_id", fileId);
1787                        rfRecord.put("source_hostname", CommonTools.getHostname());
1788                        rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1789                        rfRecord.put("deleted_on_hostnames", ";");
1790
1791                        Connection conn = dataSourceWriter.getConnection();
1792                        conn.setAutoCommit(true);
1793                        executor = new CacheDriverExecutor(conn);
1794                        executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord);
1795
1796                        Map dataRecord = new HashMap<String, Object>();
1797                        dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this));
1798                        dataRecord.put("file_part_data", encrypt(data));
1799                        dataRecord.put("remote_file_id", remoteFileId);
1800                        dataRecord.put("file_id", fileId);
1801                        dataRecord.put("created_at", currentTime);
1802                        dataRecord.put("updated_at", currentTime);
1803
1804                        conn = dataMappingDataSourceWriter.getConnection();
1805                        conn.setAutoCommit(true);
1806                        dmExecutor = new CacheDriverExecutor(conn);
1807                        dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord);
1808
1809                        return true;
1810                } catch (Exception e) {
1811                        throw new IOException(e.getMessage(), e);
1812                } finally {
1813                        try {
1814                                if (executor != null)
1815                                        executor.close();
1816                        } catch (Exception e) {
1817                                log.error(e.getMessage(), e);
1818                        }
1819                        try {
1820                                if (dmExecutor != null)
1821                                        dmExecutor.close();
1822                        } catch (Exception e) {
1823                                log.error(e.getMessage(), e);
1824                        }
1825                }
1826        }
1827
1828        @Override
1829        public boolean write(byte[] data) throws IOException {
1830                return write(data, false);
1831        }
1832
1833        @Override
1834        public boolean write(String data, boolean append) throws IOException {
1835                return write(data.getBytes(CHARSET), append);
1836        }
1837
1838        @Override
1839        public boolean write(String data) throws IOException {
1840                return write(data.getBytes(CHARSET));
1841        }
1842
1843        @Override
1844        public boolean createLink(String target) throws IOException {
1845            boolean allowed = beforeCreateLink(target);
1846            
1847            if(!allowed) return false;
1848            
1849                Clock clock = new Clock();
1850                CacheDriverExecutor executor = null;
1851                CacheDriverExecutor dmExecutor = null;
1852                try {
1853                        if (getFirstFilePart() != null) {
1854                                return false;
1855                        }
1856                        log.debug("CreateLink: {}", getPath());
1857                        Timestamp currentTime = clock.getSqlTimestamp();
1858                        calculateDataMappingDataSourceWriter(this);
1859                        String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix,
1860                                        identityCardSuffix);
1861                        byte[] data = target.getBytes(CHARSET);
1862                        Map rfRecord = new HashMap<String, Object>();
1863                        rfRecord.put("id", remoteFileId);
1864                        rfRecord.put("file_name", origin.getName());
1865                        rfRecord.put("file_parent_path", getParent());
1866                        rfRecord.put("file_type", "L");
1867                        rfRecord.put("file_part_index", 0);
1868                        rfRecord.put("file_part_size", data.length);
1869                        rfRecord.put("file_part_data_ds", this.dataMappingDs);
1870                        rfRecord.put("file_part_data_table", this.dataMappingTable);
1871                        rfRecord.put("file_part_last_index", -1);
1872                        rfRecord.put("file_size", 0);
1873                        rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1874                        rfRecord.put("file_deleted", 0);
1875                        rfRecord.put("created_user_id", getDefaultModifyUserId());
1876                        rfRecord.put("updated_user_id", getDefaultModifyUserId());
1877                        rfRecord.put("created_at", currentTime);
1878                        rfRecord.put("updated_at", currentTime);
1879                        rfRecord.put("file_id", fileId);
1880                        rfRecord.put("source_hostname", CommonTools.getHostname());
1881                        rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1882                        rfRecord.put("deleted_on_hostnames", ";");
1883
1884                        Connection conn = dataSourceWriter.getConnection();
1885                        conn.setAutoCommit(true);
1886                        executor = new CacheDriverExecutor(conn);
1887                        int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord);
1888
1889                        Map dataRecord = new HashMap<String, Object>();
1890                        dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this));
1891                        dataRecord.put("file_part_data", encrypt(data));
1892                        dataRecord.put("remote_file_id", remoteFileId);
1893                        dataRecord.put("file_id", fileId);
1894                        dataRecord.put("created_at", currentTime);
1895                        dataRecord.put("updated_at", currentTime);
1896
1897                        conn = dataMappingDataSourceWriter.getConnection();
1898                        conn.setAutoCommit(true);
1899                        dmExecutor = new CacheDriverExecutor(conn);
1900                        dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord);
1901
1902                        complete();
1903            
1904            afterCreateLink(target);
1905            
1906                        return rows > 0;
1907                } catch (Exception e) {
1908                        throw new IOException(e.getMessage(), e);
1909                } finally {
1910                        try {
1911                                if (executor != null)
1912                                        executor.close();
1913                        } catch (Exception e) {
1914                                log.error(e.getMessage(), e);
1915                        }
1916                        try {
1917                                if (dmExecutor != null)
1918                                        dmExecutor.close();
1919                        } catch (Exception e) {
1920                                log.error(e.getMessage(), e);
1921                        }
1922                }
1923        }
1924
1925        @Override
1926        public String readLink() throws IOException {
1927                return new String(readAllBytes(),BaseFile.CHARSET);
1928        }
1929
1930        @Override
1931        public byte[] readAllBytes() throws IOException {
1932                return readAllBytesFrom();
1933        }
1934
1935        private byte[] readAllBytesFrom() throws IOException {
1936                if (isDir()) {
1937                        throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath()));
1938                }
1939                if (!exists()) {
1940                        throw new IOException(String.format("The file '%s' does not exist.", origin.getAbsolutePath()));
1941                }
1942                CacheDriverExecutor executor = null;
1943                Connection conn = null;
1944                ByteArrayOutputStream baos = null;
1945                try {
1946                        conn = dataSourceReader.getConnection();
1947                        conn.setAutoCommit(true);
1948                        executor = new CacheDriverExecutor(conn);
1949                        Map dataRecord = getCompletedRecord();
1950                        List<Map<String, Object>> filePartList = executor.find(formatSql(SQL_SELECT_FILE), dataRecord);
1951                        baos = new ByteArrayOutputStream();
1952                        for (Map<String, Object> fpItem : filePartList) {
1953                                Map<String, Object> fp = readPart(fpItem);
1954                                Object fpData = null;
1955                                
1956                                if(fp == null){
1957                                    fpData = new byte[]{};
1958                                }else{
1959                                    fpData = fp.get("file_part_data");  
1960                                }
1961
1962                                if (fpData instanceof byte[]) {
1963                                        byte[] partData = (byte[]) fpData;
1964                                        baos.write(decrypt(partData));
1965                                        baos.flush();
1966                                }
1967                        }
1968                        return baos == null ? null : baos.toByteArray();
1969                } catch (Exception e) {
1970                        throw new IOException(e.getMessage(), e);
1971                } finally {
1972                        try {
1973                                if (baos != null)
1974                                        baos.close();
1975                        } catch (Exception e) {
1976                                log.error(e.getMessage(), e);
1977                        }
1978                        try {
1979                                if (executor != null)
1980                                        executor.close();
1981                        } catch (Exception e) {
1982                                log.error(e.getMessage(), e);
1983                        }
1984                }
1985        }
1986
1987        @Override
1988        public boolean isFile() throws IOException {
1989                Map<String, Object> lastRecord = getFirstFilePart();
1990
1991                if (lastRecord == null) {
1992                        return false;
1993                }
1994                return lastRecord.get("file_type").equals("F");
1995        }
1996
1997        @Override
1998        public boolean isDirectory() throws IOException {
1999                Map<String, Object> lastRecord = getFirstFilePart();
2000
2001                if (lastRecord == null) {
2002                        return false;
2003                }
2004                return lastRecord.get("file_type").equals("D");
2005        }
2006
2007        @Override
2008        public boolean isLink() throws IOException {
2009                Map<String, Object> lastRecord = getFirstFilePart();
2010
2011                if (lastRecord == null) {
2012                        return false;
2013                }
2014                return lastRecord.get("file_type").equals("L");
2015        }
2016
2017        @Override
2018        public String readAllString() throws IOException {
2019                ByteArrayOutputStream baos = null;
2020                try {
2021                        byte[] bytes = readAllBytes();
2022                        
2023                        if(bytes == null) return null;
2024                        
2025                        baos = new ByteArrayOutputStream();
2026                        baos.write(bytes);
2027                        baos.flush();
2028                        return baos.toString();
2029                } finally {
2030                        if (baos != null) {
2031                                try {
2032                                        baos.close();
2033                                } catch (IOException e) {
2034                                        log.error(e.getMessage(), e);
2035                                }
2036                        }
2037                }
2038        }
2039
2040        @Override
2041        public long size() throws IOException {
2042                Map<String, Object> record = getCompletedRecord();
2043                if (record != null) {
2044                        return Long.parseLong(record.get("file_size") + "");
2045                }
2046                return 0L;
2047        }
2048
2049        public List<Map<String, Object>> list() throws IOException {
2050                return list(getPath(), "%");
2051        }
2052
2053        public List<Map<String, Object>> list(String likeFileName) throws IOException {
2054                return list(getPath(), likeFileName);
2055        }
2056
2057        public List<Map<String, Object>> list(String likeFolderName, String likeFileName) throws IOException {
2058                CacheDriverExecutor executor = null;
2059                Connection conn = null;
2060                try {
2061                        conn = dataSourceReader.getConnection();
2062                        conn.setAutoCommit(true);
2063                        executor = new CacheDriverExecutor(conn);
2064                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2065                        if (CommonTools.isBlank(likeFileName)) {
2066                                dataRecord.put("file_name", "%");
2067                        } else {
2068                                dataRecord.put("file_name", likeFileName);
2069                        }
2070                        if (CommonTools.isBlank(likeFolderName)) {
2071                                dataRecord.put("file_parent_path", "%");
2072                        } else {
2073                                dataRecord.put("file_parent_path", likeFolderName);
2074                        }
2075                        return executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord);
2076                } catch (SQLException e) {
2077                        throw new IOException(e.getMessage(), e);
2078                } finally {
2079                        if (executor != null) {
2080                                try {
2081                                        executor.close();
2082                                } catch (Exception e) {
2083                                        throw new IOException(e.getMessage(), e);
2084                                }
2085                        }
2086                }
2087        }
2088
2089        public void listAll(String likeFileName, CacheArray rows) throws IOException {
2090                listAll(getPath(), likeFileName, rows);
2091        }
2092
2093        public void listAll(CacheArray rows) throws IOException {
2094                listAll(getPath(), "%", rows);
2095        }
2096
2097        public void listAll(String likeFolderName, String likeFileName, CacheArray rows) throws IOException {
2098                CacheDriverExecutor executor = null;
2099                Connection conn = null;
2100                try {
2101                        conn = dataSourceReader.getConnection();
2102                        conn.setAutoCommit(true);
2103                        executor = new CacheDriverExecutor(conn);
2104                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2105                        if (CommonTools.isBlank(likeFileName)) {
2106                                dataRecord.put("file_name", "%");
2107                        } else {
2108                                dataRecord.put("file_name", likeFileName);
2109                        }
2110                        if (CommonTools.isBlank(likeFolderName)) {
2111                                dataRecord.put("file_parent_path", "%");
2112                        } else {
2113                                dataRecord.put("file_parent_path", likeFolderName);
2114                        }
2115                        executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord, rows);
2116                } catch (SQLException e) {
2117                        throw new IOException(e.getMessage(), e);
2118                } finally {
2119                        if (executor != null) {
2120                                try {
2121                                        executor.close();
2122                                } catch (Exception e) {
2123                                        throw new IOException(e.getMessage(), e);
2124                                }
2125                        }
2126                }
2127        }
2128
2129        protected void listAllForDelete(String likeFolderName, String likeFileName, CacheArray rows) throws IOException {
2130                CacheDriverExecutor executor = null;
2131                Connection conn = null;
2132                try {
2133                    long deleteCutoffTimeMs = 0L;
2134                    
2135                    if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime();
2136                    
2137                        Timestamp currentTime = new Clock().getSqlTimestamp();
2138                        
2139                        conn = dataSourceReader.getConnection();
2140                        conn.setAutoCommit(true);
2141                        executor = new CacheDriverExecutor(conn);
2142                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2143                        if (CommonTools.isBlank(likeFileName)) {
2144                                dataRecord.put("file_name", "%");
2145                        } else {
2146                                dataRecord.put("file_name", likeFileName);
2147                        }
2148                        if (CommonTools.isBlank(likeFolderName)) {
2149                                dataRecord.put("file_parent_path", "%");
2150                        } else {
2151                                dataRecord.put("file_parent_path", likeFolderName);
2152                        }
2153                        dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%");
2154                        dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs));
2155                        dataRecord.put("end_time", currentTime);
2156                        dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME));
2157                        executor.find(0, MAX_QUEUE_SIZE,formatSql(SQL_SELECT_FOR_LIST_ALL_DELETE), dataRecord, rows);
2158                } catch (SQLException e) {
2159                        throw new IOException(e.getMessage(), e);
2160                } finally {
2161                        if (executor != null) {
2162                                try {
2163                                        executor.close();
2164                                } catch (Exception e) {
2165                                        throw new IOException(e.getMessage(), e);
2166                                }
2167                        }
2168                }
2169        }
2170
2171        protected void listAllForSync(String likeFolderName, String likeFileName, CacheArray rows) throws IOException {
2172                CacheDriverExecutor executor = null;
2173                Connection conn = null;
2174                try {
2175                    long deleteCutoffTimeMs = 0L;
2176                    
2177                    if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = SYNC_CUTOFF_TIME.getTime();
2178                    
2179                        Timestamp currentTime = new Clock().getSqlTimestamp();
2180                        
2181                        conn = dataSourceReader.getConnection();
2182                        conn.setAutoCommit(true);
2183                        executor = new CacheDriverExecutor(conn);
2184                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2185                        if (CommonTools.isBlank(likeFileName)) {
2186                                dataRecord.put("file_name", "%");
2187                        } else {
2188                                dataRecord.put("file_name", likeFileName);
2189                        }
2190                        if (CommonTools.isBlank(likeFolderName)) {
2191                                dataRecord.put("file_parent_path", "%");
2192                        } else {
2193                                dataRecord.put("file_parent_path", likeFolderName);
2194                        }
2195                        dataRecord.put("synced_on_hostnames", "%;" + CommonTools.getHostname() + ";%");
2196                        dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs));
2197                        dataRecord.put("end_time", currentTime);
2198                        dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME));
2199                        executor.find(0, MAX_QUEUE_SIZE, formatSql(SQL_SELECT_FOR_LIST_ALL_SYNC), dataRecord, rows);
2200                } catch (SQLException e) {
2201                        throw new IOException(e.getMessage(), e);
2202                } finally {
2203                        if (executor != null) {
2204                                try {
2205                                        executor.close();
2206                                } catch (Exception e) {
2207                                        throw new IOException(e.getMessage(), e);
2208                                }
2209                        }
2210                }
2211        }
2212
2213        public void setModifyUserId(String modifyUserId) {
2214                this.modifyUserId = modifyUserId;
2215        }
2216
2217        private String getDefaultModifyUserId() {
2218                if (CommonTools.isBlank(modifyUserId)) {
2219                        return DEFAULE_MODIFY_USER_ID;
2220                } else {
2221                        return modifyUserId;
2222                }
2223        }
2224
2225        @Override
2226        public void setModifyTime(Timestamp modifyTime) throws IOException {
2227                this.modifyTime = modifyTime;
2228        }
2229
2230        @Override
2231        public Timestamp getModifyTime() throws IOException {
2232                Map<String, Object> record = getFirstFilePart();
2233                if (record != null) {
2234                        return (Timestamp) record.get("file_modify_time");
2235                }
2236                return null;
2237        }
2238
2239        public boolean isTimeout() throws IOException {
2240                return isTimeout(LOGIC_TIMEOUT_MS);
2241        }
2242
2243        @Override
2244        public void copyTo(String toPath) throws IOException {
2245                log.debug("CopyTo: {} -> {}", getPath(), toPath);
2246                if (!exists()) {
2247                        log.mark(String.format("The remote file '%s' does not exist.", this.getPath()));
2248                }
2249                RemoteFile destRf = new RemoteFile(toPath);
2250                final Timestamp originModifyTime = this.getModifyTime();
2251                destRf.setModifyUserId(modifyUserId);
2252                destRf.setModifyTime(originModifyTime);
2253                if (isLink()) {
2254                        if (destRf.exists(true)) {
2255                                destRf.forceDeleteLink(false);
2256                        }
2257                        destRf.createLink(readLink());
2258                }
2259                if (isFile()) {
2260                        final RemoteFile _destRf = destRf;
2261                        if (_destRf.exists(true)) {
2262                                _destRf.forceDeleteFile(false);
2263                        }
2264                        merge(new FilePart() {
2265                                @Override
2266                                protected void process(int partIndex, byte[] data) {
2267                                        try {
2268                                                _destRf.write(decrypt(data), partIndex > 0);
2269                                        } catch (Exception e) {
2270                                                log.error(e.getMessage(), e);
2271                                        }
2272                                }
2273
2274                                @Override
2275                                protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) {
2276                                        try {
2277                                                /**This is 0 bytes file**/
2278                                                if (lastPartIndex == -1) {
2279
2280                                                        if (_destRf.exists(true))
2281                                                                _destRf.forceDeleteFile(false);
2282
2283                                                        _destRf.setModifyTime(originModifyTime);
2284                                                        _destRf.createEmptyFile();
2285                                                } else {
2286                                                        _destRf.setModifyTime(originModifyTime);
2287                                                        _destRf.complete();
2288                                                }
2289                                        } catch (IOException e) {
2290                                                log.error(e.getMessage(), e);
2291                                        }
2292                                }
2293
2294                                @Override
2295                                protected void ended(int lastPartIndex, long fileSize) {
2296
2297                                }
2298
2299                        });
2300                }
2301                if (isDir()) {
2302                        List<Map<String, Object>> copyList = list(getPath(), "%");
2303                        int size = copyList.size();
2304                        if (size > 0) {
2305                                if (!destRf.exists(true)) {
2306                                        destRf.mkdirs();
2307                                }
2308                        }
2309                        String copyRootPath = getPath();
2310                        copyList = list(parsePath(getPath() + "/%"), "%");
2311                        if (copyList.size() > 0) {
2312                                if (!destRf.exists(true)) {
2313                                        destRf.mkdirs();
2314                                }
2315                        }
2316                        for (Map<String, Object> item : copyList) {
2317                                String itemParentPath = (String) item.get("file_parent_path");
2318                                String destParentPath = itemParentPath.replaceFirst(copyRootPath + "/", toPath);
2319                                RemoteFile sourceRf = new RemoteFile(
2320                                                String.format("%s/%s", item.get("file_parent_path"), item.get("file_name")));
2321                                String destRfPath = new File(String.format("%s/%s", destParentPath, item.get("file_name")))
2322                                                .getAbsolutePath();
2323                                if (sourceRf.isDir()) {
2324                                        RemoteFile tmpRf = new RemoteFile(destRfPath + "/");
2325                                        tmpRf.setModifyTime(sourceRf.getModifyTime());
2326                                        if (!tmpRf.exists(true)) {
2327                                                tmpRf.mkdirs();
2328                                        }
2329                                } else {
2330                                        sourceRf.copyTo(destRfPath);
2331                                }
2332                        }
2333                }
2334        }
2335
2336        @Override
2337        public void moveTo(String toPath) throws IOException {
2338                log.debug("MoveTo: {} -> {}", getPath(), toPath);
2339                RemoteFile destRf = new RemoteFile(toPath);
2340                copyTo(toPath);
2341                boolean exists = this.exists(true);
2342                if (exists && destRf.exists(true) && destRf.isCompleted()) {
2343                        forceDeleteAll(true);
2344                }
2345        }
2346
2347        private void startFullAsyncThreadForDelete(boolean subFolders, long timerMs, String replaceTo,
2348                        Runnable completedCallback) {
2349                RemoteFile the = this;
2350                try {
2351                        if (syncRoot == null) {
2352                                File replaceToFile = new File(replaceTo);
2353                                if (replaceToFile.getParent() == null) {
2354                                        throw new IOException("Cannot execute delete task.");
2355                                }
2356                                String replaceToParent = replacePath(replaceToFile.getParent());
2357                                syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName()));
2358                        }
2359
2360                        CacheArray rows = new CacheArray();
2361                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
2362                                @Override
2363                                public void execute(Integer index, Object o) {
2364                                        Map<String, Object> item = (Map<String, Object>) o;
2365                                        String fileName = (String) item.get("file_name");
2366                                        String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName);
2367                                        Thread.currentThread()
2368                                                        .setName(String.format("RemoteFile-startFullAsyncThreadForDelete-%s", fileName));
2369                                        RemoteFile rf = new RemoteFile(fullPath);
2370                                        if(isDebug(rf)){
2371                                            debugLog("CheckUsage",rf,this);
2372                                        }                                       
2373                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
2374                                                rf.fileId = (String) item.get("file_id");
2375                                        String toDf = rf.getPath();
2376                                        if (!CommonTools.isBlank(replaceTo)) {
2377                                                if (subFolders) {
2378                                                        toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/");
2379                                                } else {
2380                                                        toDf = fullPath.replaceFirst(getPath(), replaceTo + "/");
2381                                                }
2382                                        }
2383                                        DiskFile df = new DiskFile(toDf);
2384                                        df.copyAttrs(df, false, the.syncRoot);
2385                                        executeSyncRunnable(this, item, df, rf);            
2386                                        } else{
2387                                            log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
2388                                        }
2389                                }
2390
2391                                @Override
2392                                public void terminated() {
2393                                        log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId());
2394                                }
2395
2396                                @Override
2397                                public void completed(Integer size) {
2398                                        Thread.currentThread().setName(String
2399                                                        .format("RemoteFile-startFullAsyncThreadForDelete-completed-%s", the.origin.getName()));
2400                                        if (isStopSync()) {
2401                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start delete task thread.");
2402                                        }
2403                                        if (timerMs <= 0) {
2404                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed start delete task thread.");
2405                                        }
2406                                        if (!isStopSync() && timerMs > 0) {
2407                                                try {
2408                                                        Thread.sleep(timerMs);
2409                                                        if (!isStopSync()) {
2410                                                                startFullAsyncThreadForDelete(!subFolders, timerMs, replaceTo, completedCallback);
2411                                                        }
2412                                                } catch (Exception e) {
2413                                                        log.error(e.getMessage(), e);
2414                                                }
2415                                        }
2416                                        if (completedCallback != null) {
2417                                                completedCallback.run();
2418                                        }
2419                                }
2420
2421                        };
2422                        rows.filter(filter);
2423                        listAllForDelete(getPath() + (subFolders ? "/%" : ""), "%", rows);
2424                } catch (Exception e) {
2425                        log.error(e.getMessage(), e);
2426                        if (timerMs > 0) {
2427                                try {
2428                                        Thread.sleep(timerMs);
2429                                        startFullAsyncThreadForDelete(subFolders, timerMs, replaceTo, completedCallback);
2430                                } catch (InterruptedException ee) {
2431                                        log.error(ee.getMessage(), ee);
2432                                }
2433                        }
2434                }
2435        }
2436
2437        private void startFullAsyncThread(boolean subFolders, long timerMs, String replaceTo, Runnable completedCallback) {
2438                RemoteFile the = this;
2439                try {
2440                        if (syncRoot == null) {
2441                                File replaceToFile = new File(replaceTo);
2442                                if (replaceToFile.getParent() == null) {
2443                                        throw new IOException("Cannot sync the root path.");
2444                                }
2445                                String replaceToParent = replacePath(replaceToFile.getParent());
2446                                syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName()));
2447                        }
2448
2449                        CacheArray rows = new CacheArray();
2450                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
2451                                @Override
2452                                public void execute(Integer index, Object o) {
2453                                        Map<String, Object> item = (Map<String, Object>) o;
2454                                        String fileName = (String) item.get("file_name");
2455                                        Thread.currentThread().setName(String.format("RemoteFile-startFullAsyncThread-%s", fileName));
2456                                        String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName);
2457                                        RemoteFile rf = new RemoteFile(fullPath);
2458                                        if(isDebug(the)){
2459                                            debugLog("CheckUsage",the,this);
2460                            }
2461                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
2462                                                String toDf = rf.getPath();
2463                                                if (!CommonTools.isBlank(replaceTo)) {
2464                                                        if (subFolders) {
2465                                                                toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/");
2466                                                        } else {
2467                                                                toDf = fullPath.replaceFirst(getPath(), replaceTo + "/");
2468                                                        }
2469                                                }
2470                                                DiskFile df = new DiskFile(toDf);
2471                                                df.copyAttrs(df, false, the.syncRoot);
2472                                                executeSyncRunnable(this, item, df, rf);
2473                                        } else{
2474                                             log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
2475                                        }
2476                                }
2477
2478                                @Override
2479                                public void terminated() {
2480                                        log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId());
2481                                }
2482
2483                                @Override
2484                                public void completed(Integer size) {
2485                                        Thread.currentThread().setName(
2486                                                        String.format("RemoteFile-startFullAsyncThread-completed-%s", the.origin.getName()));
2487                                        if (isStopSync()) {
2488                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start full async thread.");
2489                                        }
2490                                        if (timerMs <= 0) {
2491                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed start full async thread.");
2492                                        }
2493                                        if (!isStopSync() && timerMs > 0) {
2494                                                try {
2495                                                        Thread.sleep(timerMs);
2496                                                        if (!isStopSync()) {
2497                                                                startFullAsyncThread(!subFolders, timerMs, replaceTo, completedCallback);
2498                                                        }
2499                                                } catch (Exception e) {
2500                                                        log.error(e.getMessage(), e);
2501                                                }
2502                                        }
2503                                        if (completedCallback != null) {
2504                                                completedCallback.run();
2505                                        }
2506                                }
2507
2508                        };
2509                        rows.filter(filter);
2510                        listAllForSync(getPath() + (subFolders ? "/%" : ""), "%", rows);
2511                } catch (Exception e) {
2512                        log.error(e.getMessage(), e);
2513                        if (timerMs > 0) {
2514                                try {
2515                                        Thread.sleep(timerMs);
2516                                        startFullAsyncThread(subFolders, timerMs, replaceTo, completedCallback);
2517                                } catch (InterruptedException ee) {
2518                                        log.error(ee.getMessage(), ee);
2519                                }
2520                        }
2521                }
2522        }
2523
2524        public void startFullAsync(long timerMs) throws IOException {
2525                startFullAsync(timerMs, getPath() + "/", null);
2526        }
2527
2528        public void startFullAsync(long timerMs, Runnable completedCallback) throws IOException {
2529                startFullAsync(timerMs, getPath() + "/", completedCallback);
2530        }
2531
2532        public void startFullAsync(long timerMs, String replaceTo) throws IOException {
2533                startFullAsync(timerMs, replaceTo, null);
2534        }
2535
2536        public void startFullAsync(long timerMs, String replaceTo, Runnable completedCallback) throws IOException {
2537
2538                stopSync = false;
2539
2540                RemoteFile the = this;
2541                Clock clock = new Clock();
2542                Timestamp toTime = new Timestamp(clock.getTime() - timerMs);
2543                Executors.newFixedThreadPool(1).execute(new Runnable() {
2544
2545                        @Override
2546                        public void run() {
2547                                startFullAsyncThread(false, timerMs, replaceTo, completedCallback);
2548                        }
2549
2550                });
2551
2552                Executors.newFixedThreadPool(1).execute(new Runnable() {
2553
2554                        @Override
2555                        public void run() {
2556                                startFullAsyncThreadForDelete(false, timerMs, replaceTo, completedCallback);
2557                        }
2558
2559                });
2560        }
2561
2562        public DriverDataSource getDataSourceReader() {
2563                return dataSourceReader;
2564        }
2565
2566        public DriverDataSource getDataSourceWriter() {
2567                return dataSourceWriter;
2568        }
2569
2570        public ConfigProperties getConfigProperties() {
2571                return configProperties;
2572        }
2573
2574        private byte[] encrypt(byte[] data) throws Exception {
2575                boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false);
2576                if (useEncrypt) {
2577            String aesKey = configProperties.getString("EncryptAesKey");
2578            return CipherTools.AESEncrypt(aesKey,data);
2579                } else {
2580                        return data;
2581                }
2582        }
2583
2584        private byte[] decrypt(byte[] encData) throws Exception {
2585                boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false);
2586                if (useEncrypt) {
2587            String aesKey = configProperties.getString("EncryptAesKey");
2588            return CipherTools.AESDecrypt(aesKey,encData);
2589                } else {
2590                        return encData;
2591                }
2592        }
2593
2594        public String getSyncedOnHostnames() throws IOException {
2595                Map<String, Object> record = getCompletedRecord();
2596                if (record != null) {
2597                        return (String) record.get("synced_on_hostnames");
2598                }
2599                return null;
2600        }
2601
2602        public String getDeletedOnHostnames() throws IOException {
2603                Map<String, Object> record = getCompletedRecord();
2604                if (record != null) {
2605                        return (String) record.get("deleted_on_hostnames");
2606                }
2607                return null;
2608        }
2609
2610        public String getSourceHostname() throws IOException {
2611                Map<String, Object> record = getCompletedRecord();
2612                if (record != null) {
2613                        return (String) record.get("source_hostname");
2614                }
2615                return null;
2616        }
2617
2618        protected String getLastSourceHostname() throws IOException {
2619                Map<String, Object> record = getLastUpdateRecord();
2620                if (record != null) {
2621                        return (String) record.get("source_hostname");
2622                }
2623                return null;
2624        }
2625
2626        protected Timestamp getLastOperateTime() throws IOException {
2627                Map<String, Object> record = getLastUpdateRecord();
2628                if (record != null) {
2629                        Timestamp updatedAt = (Timestamp) record.get("updated_at");
2630                        Timestamp deletedAt = (Timestamp) record.get("deleted_at");
2631
2632                        return deletedAt == null ? updatedAt : deletedAt;
2633                }
2634                return null;
2635        }
2636
2637        protected Timestamp getLastModifyTime() throws IOException {
2638                Map<String, Object> record = getLastUpdateRecord();
2639                if (record != null) {
2640                        return (Timestamp) record.get("file_modify_time");
2641                }
2642                return null;
2643        }
2644
2645        protected boolean isLastOperateDelete() throws IOException {
2646                Map<String, Object> record = getLastUpdateRecord();
2647                if (record != null) {
2648                        Timestamp deletedAt = (Timestamp) record.get("deleted_at");
2649
2650                        return deletedAt == null ? false : true;
2651                }
2652                return false;
2653        }
2654
2655        protected boolean isLastOperateRealDelete() throws IOException {
2656                Map<String, Object> record = getLastUpdateRecord();
2657                if (record != null) {
2658                        Object deletedValue = record.get("file_deleted");
2659                        if (deletedValue != null) {
2660                                if (deletedValue instanceof Boolean) {
2661                                        return (Boolean) deletedValue;
2662                                } else {
2663                                        return Integer.parseInt(deletedValue + "") == 1;
2664                                }
2665                        }
2666                }
2667                return false;
2668        }
2669        
2670        protected void listAllForScanDelete(Integer filePartDataTable, CacheArray rows) throws IOException {
2671                CacheDriverExecutor executor = null;
2672                Connection conn = null;
2673                try {
2674                    long deleteCutoffTimeMs = 0L;
2675                    
2676                    if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime();
2677                    
2678                        Timestamp currentTime = new Clock().getSqlTimestamp();
2679                        conn = dataSourceReader.getConnection();
2680                        conn.setAutoCommit(true);
2681                        executor = new CacheDriverExecutor(conn);
2682                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2683                        dataRecord.put("file_parent_path", getPath());
2684                        dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
2685                        dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%");
2686                        dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs));
2687                        dataRecord.put("end_time", currentTime);
2688                        dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME));
2689                        dataRecord.put("file_part_data_table", filePartDataTable);
2690                        executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL_SCAN_DELETE), dataRecord, rows);
2691                } catch (SQLException e) {
2692                        throw new IOException(e.getMessage(), e);
2693                } finally {
2694                        if (executor != null) {
2695                                try {
2696                                        executor.close();
2697                                } catch (Exception e) {
2698                                        throw new IOException(e.getMessage(), e);
2699                                }
2700                        }
2701                }
2702        }
2703
2704        protected boolean isSyncedOnHostname() throws IOException {
2705                Map<String, Object> record = getCompletedRecord();
2706                if (record != null) {
2707                        String syncedOnHostnames = (String) record.get("synced_on_hostnames");
2708                        return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1;
2709                }
2710                return false;
2711        }
2712
2713        protected boolean isSyncedOnHostname(Object id) throws IOException {
2714                Map<String, Object> record = getRecordById(id);
2715                if (record != null) {
2716                        String syncedOnHostnames = (String) record.get("synced_on_hostnames");
2717                        return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1;
2718                }
2719                return false;
2720        }
2721
2722        protected boolean isDeletedOnHostname(Object id) throws IOException {
2723                Map<String, Object> record = getRecordById(id);
2724                if (record != null) {
2725                        String deletedOnHostnames = (String) record.get("deleted_on_hostnames");
2726                        return deletedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1;
2727                }
2728                return false;
2729        }
2730
2731        public RemoteFile getParentFile() {
2732                return new RemoteFile(getParent());
2733        }
2734
2735        private void executeSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final DiskFile df,
2736                        final RemoteFile rf) {
2737                if (!df.isLogicModify()) {
2738                        df.logicModify();
2739                        if (mergePool == null) {
2740                                LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE);
2741                                mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
2742                        }
2743                        mergePool.execute(getSyncRunnable(filter, item, rf, df));
2744                }
2745        }
2746
2747        private Runnable getSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final RemoteFile rf,
2748                        final DiskFile df) {
2749                Runnable runnable = new Runnable() {
2750                        @Override
2751                        public void run() {
2752                                try {
2753                                        String fileName = (String) item.get("file_name");
2754                                        Thread.currentThread()
2755                                                        .setName(String.format("RemoteFile-getSyncRunnable-%s-%s", fileName, item.get("id")));
2756                                        boolean isDebug = isDebug(rf);
2757                                        boolean isDeleted = false;
2758                                        Object deletedValue = item.get("file_deleted");
2759
2760                                        if (deletedValue instanceof Boolean) {
2761                                                isDeleted = (Boolean) deletedValue;
2762                                        } else {
2763                                                isDeleted = Integer.parseInt(deletedValue + "") == 1;
2764                                        }
2765
2766                                        String clientHostname = CommonTools.getHostname();
2767                                        String lastSourceHostname = rf.getLastSourceHostname();
2768
2769                                        boolean isClientHostname = lastSourceHostname != null && !lastSourceHostname.equals(clientHostname);
2770                                        boolean rootExists = syncRoot != null && syncRoot.exists();
2771
2772                        if(!rootExists){
2773                            Logger.systemError(RemoteFile.class,"The root path '{}' does not exist.",syncRoot.getPath());
2774                        }
2775                        
2776                                        if (isDeleted && rootExists) {
2777                                                if (df.exists()) {
2778                                                        if (isDebug) {
2779                                                                LoggerFactory.getLogger(RemoteFile.class).mark("DeleteDiskFile - {}", df.getPath());
2780                                                        }
2781                                                        df.delete();
2782                                                        rf.deletedToDiskAndSub(item.get("id"));
2783                                                } else {
2784                                                        rf.deletedToDiskAndSub(item.get("id"));
2785                                                }
2786                                                df.removeLogicModify();
2787                                        }
2788
2789                                        String sourceHostname = rf.getSourceHostname();
2790                                        isClientHostname = sourceHostname != null && !sourceHostname.equals(clientHostname);
2791                                        boolean checkHaveParent = configProperties.getBoolean("CheckHaveParent", false);
2792                                        boolean haveParent = true;
2793                                        if (checkHaveParent) {
2794                                                haveParent = df.getOrigin().getParentFile().exists();
2795                                        }
2796                                        String fileType = (String) item.get("file_type");
2797
2798                                        if (!isDeleted && isClientHostname && haveParent && rootExists) {
2799                                                Timestamp rfModifyTime = rf.getModifyTime();
2800                                                df.setModifyTimeMsFromClock(rfModifyTime.getTime());
2801
2802                                                if (fileType.equals("L")) {
2803                                                        if (df.exists() && !df.isLink()) {
2804                                                                df.delete();
2805                                                        }
2806                                                        if (df.exists()) {
2807                                                                Timestamp dfModifyTime = df.getModifyTimeForClock();
2808                                                                if (rfModifyTime.getTime() > dfModifyTime.getTime()) {
2809                                                                        if (isDebug) {
2810                                                                                LoggerFactory.getLogger(RemoteFile.class).mark("ReCreateDiskLink - {}",
2811                                                                                                df.getPath());
2812                                                                        }
2813                                                                        df.delete();
2814                                                                        if (df.createLink(rf.readLink())) {
2815                                                                                if (df.getOrigin().getName().startsWith(".")) {
2816                                                                                        Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true);
2817                                                                                }
2818                                                                                rf.syncedToDisk();
2819                                                                        }
2820                                                                } else {
2821                                                                        rf.syncedToDisk();
2822                                                                }
2823                                                        } else {
2824                                                                if (isDebug) {
2825                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", df.getPath());
2826                                                                }
2827                                                                if (df.createLink(rf.readLink())) {
2828                                                                        if (df.getOrigin().getName().startsWith(".")) {
2829                                                                                Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true);
2830                                                                        }
2831                                                                        rf.syncedToDisk();
2832                                                                }
2833                                                        }
2834                                                        df.removeLogicModify();
2835                                                }
2836                                                if (fileType.equals("F")) {
2837                                                        if (df.exists()) {
2838                                                                Timestamp dfModifyTime = df.getModifyTimeForClock();
2839                                                                boolean changed = rfModifyTime.getTime() > dfModifyTime.getTime();
2840                                                                if (changed) {
2841                                                                        DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE);
2842                                                                        writingDf.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot);
2843                                                                        writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime());
2844                                                                        if (!writingDf.isLogicModify()) {
2845                                                                                writingDf.logicModify();
2846                                                                                if (RemoteFile.addQueuePathMapping(rf.getPath())) {
2847                                                                                        if (isDebug(rf)) {
2848                                                                                                LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}",
2849                                                                                                                df.getPath());
2850                                                                                        }
2851                                                                                        rf.writeToDisk(writingDf);
2852                                                                                }
2853                                                                        }
2854                                                                } else {
2855                                                                        rf.syncedToDisk();
2856                                                                        df.removeLogicModify();
2857                                                                }
2858                                                        } else {
2859                                                                if (rf.exists()) {
2860                                                                        DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE);
2861                                                                        writingDf.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot);
2862                                                                        writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime());
2863                                                                        if (!writingDf.isLogicModify()) {
2864                                                                                writingDf.logicModify();
2865                                                                                if (RemoteFile.addQueuePathMapping(rf.getPath())) {
2866                                                                                        if (isDebug(rf)) {
2867                                                                                                LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}",
2868                                                                                                                df.getPath());
2869                                                                                        }
2870                                                                                        rf.writeToDisk(writingDf);
2871                                                                                }
2872                                                                        }
2873                                                                }
2874                                                        }
2875                                                }
2876                                                if (fileType.equals("D")) {
2877                                                        if (rf.exists()) {
2878                                                                if (isDebug) {
2879                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskDir - {}", df.getPath());
2880                                                                }
2881                                                                if (df.exists()) {
2882                                                                        if (df.isDir()) {
2883                                                                                rf.syncedToDisk();
2884                                                                        } else {
2885                                                                                df.delete();
2886                                                                                if (df.mkdirs()) {
2887                                                                                        if (df.getOrigin().getName().startsWith(".")) {
2888                                                                                                Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true);
2889                                                                                        }
2890                                                                                        rf.syncedToDisk();
2891                                                                                }
2892                                                                        }
2893                                                                } else {
2894                                                                        if (df.mkdirs()) {
2895                                                                                if (df.getOrigin().getName().startsWith(".")) {
2896                                                                                        Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true);
2897                                                                                }
2898                                                                                rf.syncedToDisk();
2899                                                                        }
2900                                                                }
2901                                                        }
2902                                                        df.removeLogicModify();
2903                                                }
2904                                        }
2905                                } catch (Exception e) {
2906                                        log.error(e.getMessage(), e);
2907                                }
2908                        }
2909                };
2910                return runnable;
2911        }
2912
2913        public void startArchiveForFileDeleted(long timerMs, String archiveTime) throws IOException {
2914                long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L);
2915                startArchiveForFileDeleted(timerMs, archiveMs, null);
2916        }
2917
2918        public void startArchiveForFileDeleted(long timerMs, long archiveMs) throws IOException {
2919                startArchiveForFileDeleted(timerMs, archiveMs, null);
2920        }
2921
2922        public void startArchiveForFileDeleted(long timerMs, String archiveTime, Runnable completedCallback)
2923                        throws IOException {
2924                long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L);
2925                startArchiveForFileDeleted(timerMs, archiveMs, completedCallback);
2926        }
2927
2928        public void startArchiveForFileDeleted(long timerMs, long archiveMs, Runnable completedCallback)
2929                        throws IOException {
2930                final RemoteFile the = this;
2931                if (archiveMs <= 0) {
2932                        throw new IOException(String.format(
2933                                        "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs));
2934                }
2935
2936                stopSync = false;
2937        
2938                CacheDriverExecutor executor = null;
2939                Connection conn = null;
2940                try {
2941                    Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", getPath()));
2942                        conn = dataSourceReader.getConnection();
2943                        conn.setAutoCommit(true);
2944                        executor = new CacheDriverExecutor(conn);
2945                        Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs);
2946                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2947                        dataRecord.put("file_parent_path", getPath());
2948                        dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
2949                        dataRecord.put("before_deleted_at", beforeTime);
2950                        CacheArray rows = new CacheArray();
2951                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
2952
2953                                @Override
2954                                public void completed(Integer size) {
2955                                        if (isStopSync()) {
2956                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for file deleted.");
2957                                        }
2958                                        if (timerMs <= 0) {
2959                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for file deleted.");
2960                                        }
2961                                        
2962                                        if (completedCallback != null) {
2963                                                completedCallback.run();
2964                                        }
2965                                        
2966                                        if (!isStopSync() && timerMs > 0) {
2967                                            while(true){
2968                                                
2969                                                if(isStopSync()) break;
2970                                                
2971                                                try {
2972                                                        Thread.sleep(timerMs);
2973                                                        if (!isStopSync()) {
2974                                                            boolean allowed = checkUsage(the);
2975                                                            if(allowed){
2976                                                                    startArchiveForFileDeleted(timerMs, archiveMs, completedCallback);
2977                                                                    break;
2978                                                            }
2979                                                        }
2980                                                } catch (Exception e) {
2981                                                        log.error(e.getMessage(), e);
2982                                                }
2983                                            }
2984                                        }
2985
2986                                }
2987
2988                                @Override
2989                                public void execute(Integer index, Object o) {
2990                                    Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForFileDeleted-%s", index));
2991                                        if(isDebug(the)){
2992                                            debugLog("CheckUsage",the,this);
2993                                        }                                   
2994                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
2995                                                CacheDriverExecutor _executor = null;
2996                                                try {
2997                                                        Map<String, Object> item = (Map<String, Object>) o;
2998                                                        boolean notDir = !item.get("file_type").equals("D");
2999                                                        if(notDir){
3000                                                        calculateDataMappingDataSourceReader(the, item);
3001                                                        Object fileId = item.get("file_id");
3002                                                        Map<String, Object> dataRecord = new HashMap<String, Object>();
3003                                                        dataRecord.put("file_id", fileId);
3004                                                        dataRecord.put("deleted_at", new Clock().getSqlTimestamp());
3005                                                        Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", fileId));
3006                                            Connection _conn = dataMappingDataSourceReader.getConnection();
3007                                                        _conn.setAutoCommit(true);
3008                                                        _executor = new CacheDriverExecutor(_conn);
3009                                        _executor.execute(formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA), dataRecord);
3010                                                        }
3011                                                } catch (Exception e) {
3012                                                        log.error(e.getMessage(), e);
3013                                                } finally {
3014                                                        try {
3015                                                                if (_executor != null)
3016                                                                        _executor.close();
3017                                                        } catch (Exception e) {
3018                                                                log.error(e.getMessage(), e);
3019                                                        }
3020                                                }
3021                                        }else{
3022                                            log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
3023                                        }
3024                                }
3025                        };
3026                        rows.filter(filter);
3027                        final CacheDriverExecutor finalExecutor = executor;
3028                        Executors.newFixedThreadPool(1).execute(new Runnable(){
3029                            @Override
3030                            public void run(){
3031                                try{
3032                                     finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_FILE_DELETED), dataRecord, rows);
3033                                }catch(Exception e){
3034                                    log.error(e.getMessage(),e);
3035                                }
3036                            }
3037                        });
3038                } catch (SQLException e) {
3039                        throw new IOException(e.getMessage(), e);
3040                } finally {
3041                        if (executor != null) {
3042                                try {
3043                                        executor.close();
3044                                } catch (Exception e) {
3045                                        throw new IOException(e.getMessage(), e);
3046                                }
3047                        }
3048                }
3049        }
3050
3051        public void startArchiveForDeleted(long timerMs, String archiveTime) throws IOException {
3052                long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L);
3053                startArchiveForDeleted(timerMs, archiveMs, null);
3054        }
3055
3056        public void startArchiveForDeleted(long timerMs, long archiveMs) throws IOException {
3057                startArchiveForDeleted(timerMs, archiveMs, null);
3058        }
3059
3060        public void startArchiveForDeleted(long timerMs, String archiveTime, Runnable completedCallback)
3061                        throws IOException {
3062                long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L);
3063                startArchiveForDeleted(timerMs, archiveMs, completedCallback);
3064        }
3065
3066        public void startArchiveForDeleted(long timerMs, long archiveMs, Runnable completedCallback) throws IOException {
3067            final RemoteFile the = this;
3068                if (archiveMs <= 0) {
3069                        throw new IOException(String.format(
3070                                        "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs));
3071                }
3072
3073                stopSync = false;
3074
3075                CacheDriverExecutor executor = null;
3076                Connection conn = null;
3077                try {
3078                    Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", getPath()));
3079                        conn = dataSourceReader.getConnection();
3080                        conn.setAutoCommit(true);
3081                        executor = new CacheDriverExecutor(conn);
3082                        Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs);
3083                        Map<String, Object> dataRecord = new HashMap<String, Object>();
3084                        dataRecord.put("file_parent_path", getPath());
3085                        dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
3086                        dataRecord.put("before_deleted_at", beforeTime);
3087
3088                        CacheArray rows = new CacheArray();
3089                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
3090
3091                                @Override
3092                                public void completed(Integer size) {
3093                                        if (isStopSync()) {
3094                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for deleted.");
3095                                        }
3096                                        if (timerMs <= 0) {
3097                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for deleted.");
3098                                        }
3099                                        
3100                                        if (completedCallback != null) {
3101                                                completedCallback.run();
3102                                        }
3103                                        
3104                                        if (!isStopSync() && timerMs > 0) {
3105                                            while(true){
3106                                                
3107                                                if(isStopSync()) break;
3108                                                
3109                                                try {
3110                                                        Thread.sleep(timerMs);
3111                                                        if (!isStopSync()) {
3112                                                            boolean allowed = checkUsage(the);
3113                                                            if(allowed){
3114                                                                    startArchiveForDeleted(timerMs, archiveMs, completedCallback);
3115                                                                    break;
3116                                                            }
3117                                                        }
3118                                                } catch (Exception e) {
3119                                                        log.error(e.getMessage(), e);
3120                                                }
3121                                            }
3122                                        }
3123                                }
3124
3125                                @Override
3126                                public void execute(Integer index, Object o) {
3127                                    Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForDeleted-%s", index));
3128                                        CacheDriverExecutor dmExecutor = null;
3129                                        CacheDriverExecutor dmDelExecutor = null;
3130                                        CacheDriverExecutor executor = null;
3131                                        if(isDebug(the)){
3132                                            debugLog("CheckUsage",the,this);
3133                                        }                                       
3134                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
3135                                                try {
3136                                                        Clock clock = new Clock();
3137                                                        Timestamp currentTime = clock.getSqlTimestamp();
3138                                                        Map<String, Object> item = (Map<String, Object>) o;
3139                                                        Object fileId = item.get("file_id");
3140                                                        Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", fileId));
3141                                                        calculateDataMappingDataSourceReader(the, item);
3142                                                        Map<String, Object> dataRecord = null;
3143                                                        boolean haveSubFiles = false;
3144                                                        /**Need use dataMappingDataSourceReader**/
3145                                                        if (item.get("file_type").equals("D")) {
3146                                                                int count = countAllSubFiles(item.get("file_parent_path") + "",
3147                                                                                item.get("file_name") + "");
3148                                                                haveSubFiles = count > 0;
3149                                                        }
3150                                                        if (!haveSubFiles) {
3151                                                                if (!item.get("file_type").equals("D")) {
3152                                                                        Connection dmConn = dataMappingDataSourceReader.getConnection();
3153                                                                        dmConn.setAutoCommit(true);
3154                                                                        dmExecutor = new CacheDriverExecutor(dmConn);
3155                                                                        dataRecord = new HashMap<String, Object>();
3156                                                                        dataRecord.put("file_id", item.get("file_id"));
3157                                                                        dataRecord.put("deleted_at", currentTime);
3158                                                                        dmExecutor.execute(
3159                                                                                        formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA),
3160                                                                                        dataRecord);
3161                                                                }
3162                                                                Connection conn = dataSourceWriter.getConnection();
3163                                                                conn.setAutoCommit(true);
3164                                                                executor = new CacheDriverExecutor(conn);
3165                                                                dataRecord = new HashMap<String, Object>();
3166                                                                dataRecord.put("file_id", item.get("file_id"));
3167                                                                executor.execute(formatSql(SQL_DELETE_BY_FILE_ID_FROM_FILE), dataRecord);
3168                                                        }
3169
3170                                                        Connection dmDelConn = dataMappingDataSourceReader.getConnection();
3171                                                        dmDelConn.setAutoCommit(true);
3172                                                        dmDelExecutor = new CacheDriverExecutor(dmDelConn);
3173                                                        dataRecord = new HashMap<String, Object>();
3174                                                        dataRecord.put("deleted_at", new Timestamp(currentTime.getTime() - archiveMs));
3175                                                        dmDelExecutor.execute(formatSqlForFilePartData(SQL_DELETE_BY_RF_ID_FROM_FILE_DATA),
3176                                                                        dataRecord);
3177                                                } catch (Exception e) {
3178                                                        log.error(e.getMessage(), e);
3179                                                } finally {
3180                                                        try {
3181                                                                if (dmExecutor != null)
3182                                                                        dmExecutor.close();
3183                                                        } catch (Exception e) {
3184                                                                log.error(e.getMessage(), e);
3185                                                        }
3186                                                        try {
3187                                                                if (dmDelExecutor != null)
3188                                                                        dmDelExecutor.close();
3189                                                        } catch (Exception e) {
3190                                                                log.error(e.getMessage(), e);
3191                                                        }
3192                                                        try {
3193                                                                if (executor != null)
3194                                                                        executor.close();
3195                                                        } catch (Exception e) {
3196                                                                log.error(e.getMessage(), e);
3197                                                        }
3198                                                }
3199
3200                                        }else{
3201                                            log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
3202                                        }
3203                                }
3204                        };
3205                        rows.filter(filter);
3206                        final CacheDriverExecutor finalExecutor = executor;
3207                        Executors.newFixedThreadPool(1).execute(new Runnable(){
3208                            @Override
3209                            public void run(){
3210                                try{
3211                                    finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_DELETED), dataRecord, rows);
3212                                }catch(Exception e){
3213                                    log.error(e.getMessage(),e);
3214                                }
3215                            }
3216                        });                     
3217                } catch (SQLException e) {
3218                        throw new IOException(e.getMessage(), e);
3219                } finally {
3220                        if (executor != null) {
3221                                try {
3222                                        executor.close();
3223                                } catch (Exception e) {
3224                                        throw new IOException(e.getMessage(), e);
3225                                }
3226                        }
3227                }
3228        }
3229        
3230        public void startArchiveForTimeout(long timerMs) throws IOException {
3231            startArchiveForTimeout(timerMs,null);
3232        }
3233        
3234        public void startArchiveForTimeout(long timerMs,Runnable completedCallback) throws IOException {
3235            final RemoteFile the = this;
3236            
3237                stopSync = false;
3238
3239                CacheDriverExecutor executor = null;
3240                Connection conn = null;
3241                try {
3242                    Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", getPath()));
3243                        conn = dataSourceReader.getConnection();
3244                        conn.setAutoCommit(true);
3245                        executor = new CacheDriverExecutor(conn);
3246                        Timestamp beforeTime = new Timestamp(new Clock().getTime() - (LOGIC_TIMEOUT_MS*BATCH_SIZE));
3247                        Map<String, Object> dataRecord = new HashMap<String, Object>();
3248                        dataRecord.put("file_parent_path", getPath());
3249                        dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
3250                        dataRecord.put("before_updated_at", beforeTime);
3251
3252                        CacheArray rows = new CacheArray();
3253                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
3254
3255                                @Override
3256                                public void completed(Integer size) {
3257                                        if (isStopSync()) {
3258                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for timeout deleted.");
3259                                        }
3260                                        if (timerMs <= 0) {
3261                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for timeout deleted.");
3262                                        }
3263                                        
3264                                        if (completedCallback != null) {
3265                                                completedCallback.run();
3266                                        }
3267                                        
3268                                        if (!isStopSync() && timerMs > 0) {
3269                                            while(true){
3270                                                
3271                                                if(isStopSync()) break;
3272                                                
3273                                                try {
3274                                                        Thread.sleep(timerMs);
3275                                                        if (!isStopSync()) {
3276                                                            boolean allowed = checkUsage(the);
3277                                                            if(allowed){
3278                                                                    startArchiveForTimeout(timerMs, completedCallback);
3279                                                                    break;
3280                                                            }
3281                                                        }
3282                                                } catch (Exception e) {
3283                                                        log.error(e.getMessage(), e);
3284                                                }
3285                                            }
3286                                        }
3287
3288                                }
3289
3290                                @Override
3291                                public void execute(Integer index, Object o) {
3292                                    Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForTimeout-%s", index));
3293                                        if(isDebug(the)){
3294                                            debugLog("CheckUsage",the,this);
3295                                        }                                   
3296                                        CacheDriverExecutor executor = null;
3297                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
3298                                                try {
3299                                                        Clock clock = new Clock();
3300                                                        Timestamp currentTime = clock.getSqlTimestamp();
3301                                                        Map<String, Object> item = (Map<String, Object>) o;
3302                                                        Object fileId = item.get("file_id");
3303                                                        Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", fileId));
3304                                                        Connection conn = dataSourceWriter.getConnection();
3305                                                        conn.setAutoCommit(true);
3306                                                        executor = new CacheDriverExecutor(conn);
3307                                                        Map<String,Object> dataRecord = new HashMap<String, Object>();
3308                                                        dataRecord.put("file_id",fileId);
3309                                                        dataRecord.put("deleted_at",currentTime);
3310                                                        dataRecord.put("deleted_user_id",getDefaultModifyUserId());
3311                                                        executor.execute(formatSql(SQL_UPDATE_FOR_TIMEOUT_DELETE), dataRecord);
3312                                                } catch (Exception e) {
3313                                                        log.error(e.getMessage(), e);
3314                                                } finally {
3315                                                        try {
3316                                                                if (executor != null)
3317                                                                        executor.close();
3318                                                        } catch (Exception e) {
3319                                                                log.error(e.getMessage(), e);
3320                                                        }
3321                                                }
3322
3323                                        }else{
3324                                             log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
3325                                        }
3326                                }
3327                        };
3328                        rows.filter(filter);
3329                        final CacheDriverExecutor finalExecutor = executor;
3330                        Executors.newFixedThreadPool(1).execute(new Runnable(){
3331                            @Override
3332                            public void run(){
3333                                try{
3334                                    finalExecutor.find(formatSql(SQL_SELECT_FOR_ALL_TIMEOUT), dataRecord, rows);
3335                                }catch(Exception e){
3336                                    log.error(e.getMessage(),e);
3337                                }
3338                            }
3339                        });                             
3340                } catch (SQLException e) {
3341                        throw new IOException(e.getMessage(), e);
3342                } finally {
3343                        if (executor != null) {
3344                                try {
3345                                        executor.close();
3346                                } catch (Exception e) {
3347                                        throw new IOException(e.getMessage(), e);
3348                                }
3349                        }
3350                }
3351        }       
3352
3353        public static Map<String, DriverDataSource> getDriverDataSourceMap() {
3354                return Collections.synchronizedMap(DATASOURCE_REMOTE_FILE);
3355        }
3356
3357        private static synchronized String generateId(boolean identityCardEnable, String identityCardName,
3358                        String identityCardPrefix, int identityCardSuffix) throws SQLException {
3359                if (identityCardEnable) {
3360                        if (identityCard == null) {
3361                                identityCard = new IdentityCard(identityCardName);
3362                        }
3363                        return identityCard.generateId(16, identityCardPrefix, "", identityCardSuffix);
3364                } else {
3365                        return CommonTools.generateId(16);
3366                }
3367        }
3368
3369        private static synchronized String generateDataId(boolean identityCardEnable, String identityCardName,
3370                        int identityCardSuffix, RemoteFile theRf) throws SQLException {
3371                if (identityCardEnable) {
3372                        if (identityCard == null) {
3373                                identityCard = new IdentityCard(identityCardName + "Data");
3374                        }
3375                        return identityCard.generateId(16, String.format("%04d", theRf.dataMappingTable), "", identityCardSuffix);
3376                } else {
3377                        return CommonTools.generateId(16);
3378                }
3379        }
3380
3381        protected static synchronized boolean addQueuePathMapping(String path) {
3382                Long addTimeMs = null;
3383
3384                boolean allowed = checkQueuePathMapping();
3385
3386                if (!allowed)
3387                        return false;
3388
3389                addTimeMs = QUEUE_PATH_MAPPING.get(path);
3390
3391                if (addTimeMs == null) {
3392                        QUEUE_PATH_MAPPING.put(path, System.currentTimeMillis());
3393                        return true;
3394                }
3395
3396                return false;
3397        }
3398
3399        public static synchronized boolean checkQueuePathMapping() {
3400                Long addTimeMs = null;
3401                Map<String, Long> copyQueueMap = getQueuePathMapping();
3402                List<String> keyList = new ArrayList<String>(copyQueueMap.keySet());
3403                for (String key : keyList) {
3404                        addTimeMs = copyQueueMap.get(key);
3405                        if (addTimeMs != null) {
3406                                boolean timeout = (System.currentTimeMillis() - addTimeMs) >= LOGIC_TIMEOUT_MS;
3407
3408                                if (timeout) {
3409                                        QUEUE_PATH_MAPPING.remove(key);
3410                                }
3411                        }
3412                }
3413
3414                if (QUEUE_PATH_MAPPING.keySet().size() >= MAX_QUEUE_SIZE)
3415                        return false;
3416
3417                return true;
3418        }
3419
3420        protected static synchronized void removeQueuePathMapping(String path) {
3421                QUEUE_PATH_MAPPING.remove(path);
3422        }
3423
3424        public static synchronized Map<String, Long> getQueuePathMapping() {
3425                return Collections.synchronizedMap(QUEUE_PATH_MAPPING);
3426        }
3427
3428        protected static synchronized void loadDataMapping(RemoteFile rf) throws Exception {
3429                List<String> dmArray = rf.configProperties.getArray("DataMapping", new String[] {});
3430                int size = dmArray.size();
3431                List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey);
3432
3433                if (dmList == null)
3434                        dmList = new ArrayList<String>();
3435
3436                if (!DATA_MAPPING_INDEX.containsKey(rf.dsKey))
3437                        DATA_MAPPING_INDEX.put(rf.dsKey, 0);
3438
3439                for (int i = 0; i < size; i++) {
3440                        String dm = dmArray.get(i);
3441                        String originKey = String.format("DataMapping[%s]", i);
3442                        Map<String, Object> dmMap = rf.configProperties.getMap(originKey);
3443                        String shortKey = String.format("%s.%s", rf.dsKey, dm);
3444                        if (dmMap != null) {
3445                                String dmKey = String.format("%s.%s", rf.dsKey, originKey);
3446
3447                                if (!DATA_MAPPING.containsKey(shortKey)) {
3448                                        ConfigProperties dmCp = new ConfigProperties();
3449                                        dmCp.putAllAndReturnSlef(dmMap);
3450                                        int priority = dmCp.getInteger(String.format("%s.Priority", originKey), 0);
3451                                        if (i == 0 && priority < 1) {
3452                                                throw new Exception("First DataMapping priority cannot be less than 1.");
3453                                        }
3454                                        String dataSourceFileOriginValue = dmCp.getString(String.format("%s.DataSource", originKey));
3455                                        String dataSourceReaderFileOriginValue = dmCp
3456                                                        .getString(String.format("%s.DataSourceReader", originKey), dataSourceFileOriginValue);
3457                                        String dataSourceWriterFileOriginValue = dmCp
3458                                                        .getString(String.format("%s.DataSourceWriter", originKey), dataSourceFileOriginValue);
3459
3460                                        String dmDsReaderKey = String.format("%s.reader", CommonTools.md5(dataSourceReaderFileOriginValue));
3461                                        String dmDsWriterKey = String.format("%s.writer", CommonTools.md5(dataSourceWriterFileOriginValue));
3462                                        DriverDataSource dmDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey);
3463                                        DriverDataSource dmDataSourceWriter = DATASOURCE_REMOTE_FILE.get(dmDsWriterKey);
3464                                        if (dmDataSourceReader == null) {
3465                                                DATASOURCE_REMOTE_FILE.put(dmDsReaderKey,
3466                                                                new DriverDataSource(new File(dataSourceReaderFileOriginValue)));
3467                                        }
3468                                        if (dmDataSourceWriter == null) {
3469                                                DATASOURCE_REMOTE_FILE.put(dmDsWriterKey,
3470                                                                new DriverDataSource(new File(dataSourceWriterFileOriginValue)));
3471                                        }
3472                                        String tableRangOriginValue = dmCp.getString(String.format("%s.TableRange", originKey), "0");
3473                                        ConfigProperties shortCp = new ConfigProperties();
3474                                        List<Integer> tableRange = new ArrayList<Integer>();
3475                                        if (tableRangOriginValue.contains("-")) {
3476                                                String[] ranges = tableRangOriginValue.split("-");
3477                                                Integer begin = Integer.parseInt(ranges[0].trim());
3478                                                Integer end = Integer.parseInt(ranges[1].trim());
3479                                                for (int j = begin; j <= end; j++) {
3480                                                        tableRange.add(begin + j);
3481                                                }
3482                                        } else if (tableRangOriginValue.contains(",")) {
3483                                                String[] ranges = tableRangOriginValue.split(",");
3484                                                int rangesLen = ranges.length;
3485                                                for (int j = 0; j < rangesLen; j++) {
3486                                                        tableRange.add(Integer.parseInt(ranges[j].trim()));
3487                                                }
3488                                        } else {
3489                                                tableRange.add(Integer.parseInt(tableRangOriginValue.trim()));
3490                                        }
3491                                        shortCp.put("TableRange", tableRange);
3492                                        shortCp.putAndReturnSlef("DataSourceReader", dmDsReaderKey);
3493                                        shortCp.putAndReturnSlef("DataSourceWriter", dmDsWriterKey);
3494                                        shortCp.putAndReturnSlef("Priority", priority);
3495                                        shortCp.putAndReturnSlef("UsingCount", 0);
3496                                        shortCp.putAndReturnSlef("Name", dm);
3497                                        int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size());
3498                                        shortCp.putAndReturnSlef("Table", tableRange.get(randomIndex));
3499                                        DATA_MAPPING.put(shortKey, shortCp);
3500                                        dmList.add(dm);
3501                                        DATA_MAPPING_LIST.put(rf.dsKey, dmList);
3502                                }
3503                        }
3504                }
3505
3506        }
3507
3508        protected static synchronized void calculateDataMappingDataSourceReader(RemoteFile rf,
3509                        Map<String, Object> rfRecord) {
3510                rf.dataMappingDs = (String) rfRecord.get("file_part_data_ds");
3511                rf.dataMappingTable = (Integer) rfRecord.get("file_part_data_table");
3512                String shortKey = String.format("%s.%s", rf.dsKey, rf.dataMappingDs);
3513                ConfigProperties cp = DATA_MAPPING.get(shortKey);
3514                if (cp != null) {
3515                        String dmDsReaderKey = cp.getString("DataSourceReader");
3516                        rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey);
3517                }
3518        }
3519
3520        protected static synchronized void calculateDataMappingDataSourceWriter(RemoteFile rf) {
3521                if (DATA_MAPPING_INDEX.get(rf.dsKey) >= DATA_MAPPING_LIST.size() - 1) {
3522                        DATA_MAPPING_INDEX.put(rf.dsKey, 0);
3523                }
3524                ConfigProperties cp = getDataMapping(rf, DATA_MAPPING_INDEX.get(rf.dsKey));
3525                List<Integer> tableRange = (List<Integer>) cp.get("TableRange");
3526                Integer priority = cp.getInteger("Priority", 0);
3527                Integer usingCount = cp.getInteger("UsingCount", 0);
3528                String key = String.format("%s.%s", rf.dsKey, cp.getString("Name"));
3529                if (usingCount < priority) {
3530                        rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceReader"));
3531                        rf.dataMappingDataSourceWriter = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceWriter"));
3532                        rf.dataMappingDs = cp.getString("Name");
3533                        rf.dataMappingTable = cp.getInteger("Table");
3534                        usingCount++;
3535                        if (usingCount >= priority) {
3536                                DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1);
3537                                cp.putAndReturnSlef("UsingCount", 0);
3538                                int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size());
3539                                cp.putAndReturnSlef("Table", tableRange.get(randomIndex));
3540                                DATA_MAPPING.put(key, cp);
3541                        } else {
3542                                cp.putAndReturnSlef("UsingCount", usingCount);
3543                                int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size());
3544                                cp.putAndReturnSlef("Table", tableRange.get(randomIndex));
3545                                DATA_MAPPING.put(key, cp);
3546                        }
3547                } else {
3548                        DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1);
3549                        calculateDataMappingDataSourceWriter(rf);
3550                }
3551        }
3552
3553        protected static synchronized ConfigProperties getDataMapping(RemoteFile rf, int index) {
3554                List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey);
3555                String dm = dmList.get(index);
3556                String key = String.format("%s.%s", rf.dsKey, dm);
3557                return DATA_MAPPING.get(key);
3558        }
3559
3560        protected String getDataMappingTableName() {
3561                return String.format("%s_data_%s", this.tableName, this.dataMappingTable);
3562        }
3563        
3564        private static void debugLog(String point,RemoteFile rf,CacheArrayFilter filter){
3565                debugLog(RemoteFile.class,point,rf,filter);
3566        }       
3567
3568}