001/*******************************************************************************
002The MIT License (MIT)
003
004Copyright (c) 2024 KILLCODING.COM
005
006Permission is hereby granted, free of charge, to any person obtaining a copy
007of this software and associated documentation files (the "Software"), to deal
008in the Software without restriction, including without limitation the rights
009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
010copies of the Software, and to permit persons to whom the Software is
011furnished to do so, subject to the following conditions:
012
013The above copyright notice and this permission notice shall be included in
014all copies or substantial portions of the Software.
015
016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
022THE SOFTWARE.
023*****************************************************************************/
024package com.killcoding.file;
025
026import java.io.File;
027import com.killcoding.datasource.DriverDataSource;
028import com.killcoding.tool.ConfigProperties;
029import com.killcoding.tool.CommonTools;
030import com.killcoding.log.Logger;
031import com.killcoding.log.LoggerFactory;
032import java.io.FileInputStream;
033import java.io.IOException;
034import java.sql.Connection;
035import java.sql.SQLException;
036import com.killcoding.datasource.CacheDriverExecutor;
037import java.util.Map;
038import java.util.HashMap;
039import com.killcoding.datasource.Clock;
040import java.sql.Timestamp;
041import java.nio.file.Files;
042import java.util.concurrent.ConcurrentHashMap;
043import java.util.List;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.io.ByteArrayOutputStream;
047import java.sql.Blob;
048import java.nio.file.Paths;
049import java.nio.file.Path;
050import java.net.URI;
051import java.io.ByteArrayInputStream;
052import java.nio.ByteBuffer;
053import java.util.concurrent.ExecutorService;
054import java.util.concurrent.Executors;
055import java.util.concurrent.Future;
056import java.nio.file.LinkOption;
057import javax.crypto.Cipher;
058import javax.crypto.CipherInputStream;
059import javax.crypto.CipherOutputStream;
060import javax.crypto.KeyGenerator;
061import javax.crypto.SecretKey;
062import javax.crypto.SecretKeyFactory;
063import javax.crypto.spec.DESKeySpec;
064import javax.crypto.spec.IvParameterSpec;
065import javax.crypto.spec.SecretKeySpec;
066import com.killcoding.file.DiskFile;
067import java.util.Collection;
068import com.killcoding.cache.CacheArray;
069import com.killcoding.cache.CacheArrayFilter;
070import java.nio.file.attribute.BasicFileAttributes;
071import java.util.concurrent.Callable;
072import com.killcoding.datasource.IdentityCard;
073import java.util.Collections;
074import java.util.concurrent.ThreadLocalRandom;
075import com.killcoding.tool.CipherTools;
076import com.killcoding.cache.Cache;
077import java.util.Date;
078
079public class RemoteFile extends BaseFile implements RemoteFileSQL {
080
081        private static final Map<String, DriverDataSource> DATASOURCE_REMOTE_FILE = new ConcurrentHashMap<String, DriverDataSource>();
082
083        private static final Map<String, ConfigProperties> CONFIG_PROPS_REMOTE_FILE = new ConcurrentHashMap<String, ConfigProperties>();
084
085        private static final Map<String, Long> QUEUE_PATH_MAPPING = new ConcurrentHashMap<String, Long>();
086
087        private static final Map<String, ConfigProperties> DATA_MAPPING = new ConcurrentHashMap<String, ConfigProperties>();
088
089        private static final Map<String, List<String>> DATA_MAPPING_LIST = new ConcurrentHashMap<String, List<String>>();
090
091        private static final Map<String, Integer> DATA_MAPPING_INDEX = new ConcurrentHashMap<String, Integer>();
092
093    protected static boolean FIRST_LOADED = false;
094    
095    public static Integer MAX_POOL_SIZE = 100;
096        public static Double MAX_CONNECT_USAGE = 0.5D;
097        public static Double MAX_MEMORY_USAGE = 0.8D;
098        public static Integer MAX_QUEUE_SIZE = 10;
099        public static Long MAX_FILE_SZIE = -1L;
100        public static boolean SHOW_LINK = false;
101        public static boolean SHOW_HIDDEN = false;
102        public static boolean COPY_STRUCTURE_ONLY = false;
103        public static Integer MAX_FILE_PARENT_PATH_LENGTH = 200;
104        public static Integer MAX_FILE_NAME_LENGTH = 200;
105        public static Integer CACHE_SECONDS = 5;
106        public static Date SYNC_CUTOFF_TIME = null;
107        public static Date DELETE_CUTOFF_TIME = null;
108
109        private static IdentityCard identityCard = null;
110        private static File appRemoteFile = null;
111        private DriverDataSource dataSourceReader = null;
112        private DriverDataSource dataSourceWriter = null;
113        private String dataSourceReaderPath = null;
114        private String dataSourceWriterPath = null;
115        private String modifyUserId = null;
116        private Integer filePartSize = null;
117        private Boolean identityCardEnable = false;
118        private String identityCardName = null;
119        private String identityCardPrefix = null;
120        private Integer identityCardSuffix = 0;
121        private Timestamp modifyTime = null;
122
123    protected ConfigProperties configProperties = null;
124    protected String fileId = null;
125    protected String tableName = null;
126        protected Integer dataMappingTable = null;
127        protected String dataMappingDs = null;
128        protected DriverDataSource dataMappingDataSourceReader = null;
129        protected DriverDataSource dataMappingDataSourceWriter = null;
130        protected String dsKey = null;
131
132        public static Long BEFORE_THE_QUEUE_TIME = 60000L;
133        public static Integer BATCH_SIZE = 10;
134        public static String DEFAULE_MODIFY_USER_ID = null;
135
136        private Logger log = null;
137
138        private static ExecutorService mergePool = null;
139
140        public RemoteFile(String path) {
141                super(path);
142                log = LoggerFactory.getLogger(getClass());
143                fileId = CommonTools.generateId(16);
144                initGlobal();
145        }
146        
147        public RemoteFile(File origin) {
148                super(origin);
149                log = LoggerFactory.getLogger(getClass());
150                fileId = CommonTools.generateId(16);
151                initGlobal();
152        }       
153
154        public static synchronized void initPool(int poolSize) {
155                if (mergePool == null) {
156                        MAX_POOL_SIZE = poolSize;
157                        LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE);
158                        mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
159                }
160        }
161
162        private synchronized void init() {
163                dsKey = CommonTools.md5(this.appRemoteFile.getAbsolutePath());
164                dataSourceReader = DATASOURCE_REMOTE_FILE.get(String.format("%s.reader", dsKey));
165                dataSourceWriter = DATASOURCE_REMOTE_FILE.get(String.format("%s.writer", dsKey));
166                configProperties = CONFIG_PROPS_REMOTE_FILE.get(dsKey);
167                if (dataSourceReader == null && dataSourceWriter == null) {
168                        try {
169                                if (!appRemoteFile.exists())
170                                        throw new IOException(String.format("Not exist '%s'.", appRemoteFile.getAbsolutePath()));
171
172                                configProperties = new ConfigProperties();
173                                Runnable updatedRun = new Runnable() {
174                                        @Override
175                                        public void run() {
176                                                try {
177                                                        reload();
178                                                        LoggerFactory.getLogger(RemoteFile.class).mark("RemoteFile Reloaded");
179                                                } catch (Exception e) {
180                                                        log.error(e.getMessage(), e);
181                                                }
182                                        }
183                                };
184                                configProperties.load(appRemoteFile, updatedRun);
185
186                                CONFIG_PROPS_REMOTE_FILE.put(dsKey, configProperties);
187
188                                String dataSourcePath = configProperties.getString("DataSource");
189                                dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath);
190                                dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath);
191                                reload();
192                        } catch (Exception e) {
193                                log.error(e.getMessage(), e);
194                        }
195                }
196                if (configProperties != null) {
197                        String dataSourcePath = configProperties.getString("DataSource");
198                        dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath);
199                        dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath);
200                        reload();
201                }
202                calculateDataMappingDataSourceWriter(this);
203        }
204
205        private synchronized void initGlobal() {
206            if(this.appRemoteFile == null){
207                this.appRemoteFile = CommonTools.getSystemProperties(RemoteFile.class, "APP_REMOTE_FILE",
208                                        "RemoteFile.properties");
209            }
210                init();
211        }
212
213        private synchronized void reload() {
214                try {
215                        LOGIC_TIMEOUT_MS = configProperties.getMilliSeconds("LogicTimeout", 300000L);
216                        LOGIC_ACCESS_TIMEOUT_MS = configProperties.getMilliSeconds("LogicAccessTimeout", 60000L);
217                        LOGIC_CHECK_TIMEOUT_MS = configProperties.getMilliSeconds("LogicCheckTimeout", 60000L);
218                        MAX_FILE_SZIE = configProperties.getFileSize("MaxFileSize", 1024 * 1024 * 1L);
219                        SHOW_HIDDEN = configProperties.getBoolean("ShowHidden", false);
220                        SHOW_LINK = configProperties.getBoolean("ShowLink", false);
221                        COPY_STRUCTURE_ONLY = configProperties.getBoolean("CopyStructureOnly", false);
222                        MAX_QUEUE_SIZE = configProperties.getInteger("MaxQueueSize", 10);
223                        BATCH_SIZE = configProperties.getInteger("BatchSize", 10);
224                        MAX_CONNECT_USAGE = configProperties.getDouble("MaxConnectUsage", 0.5D);
225                        MAX_MEMORY_USAGE = configProperties.getDouble("MaxMemoryUsage", 0.8D);
226                        BEFORE_THE_QUEUE_TIME = configProperties.getMilliSeconds("BeforeTheQueueTime", 15000L);
227                        MAX_FILE_PARENT_PATH_LENGTH = configProperties.getInteger("MaxFileParentPathLength", 200);
228                        MAX_FILE_NAME_LENGTH = configProperties.getInteger("MaxFileNameLength", 200);
229                        CACHE_SECONDS = configProperties.getInteger("CacheSeconds", 5);
230                        
231                        SYNC_CUTOFF_TIME = configProperties.getDateTime("SyncCutoffTime","0000-01-01 00:00:00.0000");
232                        DELETE_CUTOFF_TIME = configProperties.getDateTime("DeleteCutoffTime","0000-01-01 00:00:00.0000");
233                        
234                        if(BEFORE_THE_QUEUE_TIME < 0) BEFORE_THE_QUEUE_TIME = 15000L;
235                        
236                        if(MAX_CONNECT_USAGE < 0) MAX_CONNECT_USAGE = 0D;
237                        
238                        if(MAX_MEMORY_USAGE < 0) MAX_MEMORY_USAGE = 0D;
239                        
240                        if(BATCH_SIZE <= 0) BATCH_SIZE = 10;
241                        
242                        if(MAX_QUEUE_SIZE <= 0) MAX_QUEUE_SIZE = 10;
243                        
244                        if(MAX_FILE_SZIE <= 0) MAX_FILE_SZIE = 1024 * 1024 * 1L;
245                        
246                        if(LOGIC_TIMEOUT_MS <= 0) LOGIC_TIMEOUT_MS = 900000L;
247                        
248                        if(LOGIC_ACCESS_TIMEOUT_MS < 0) LOGIC_ACCESS_TIMEOUT_MS = 60000L;
249                        
250                        if(LOGIC_CHECK_TIMEOUT_MS < 0) LOGIC_CHECK_TIMEOUT_MS = 60000L;
251                        
252                        if(MAX_FILE_PARENT_PATH_LENGTH <= 0) MAX_FILE_PARENT_PATH_LENGTH = 200;
253                        
254                        if(MAX_FILE_NAME_LENGTH <= 0) MAX_FILE_NAME_LENGTH = 200;
255                        
256                        if(CACHE_SECONDS < 0) CACHE_SECONDS = 0;
257
258                        log.debug("LOGIC_TIMEOUT_MS={}", LOGIC_TIMEOUT_MS);
259                        log.debug("LOGIC_ACCESS_TIMEOUT_MS={}", LOGIC_ACCESS_TIMEOUT_MS);
260                        log.debug("LOGIC_CHECK_TIMEOUT_MS={}", LOGIC_CHECK_TIMEOUT_MS);
261                        log.debug("MAX_FILE_SZIE={}", MAX_FILE_SZIE);
262                        log.debug("SHOW_HIDDEN={}", SHOW_HIDDEN);
263                        log.debug("SHOW_LINK={}", SHOW_LINK);
264                        log.debug("COPY_STRUCTURE_ONLY={}", COPY_STRUCTURE_ONLY);
265                        log.debug("MAX_QUEUE_SIZE={}", MAX_QUEUE_SIZE);
266                        log.debug("BATCH_SIZE={}", BATCH_SIZE);
267                        log.debug("MAX_CONNECT_USAGE={}", MAX_CONNECT_USAGE);
268                        log.debug("MAX_MEMORY_USAGE={}", MAX_MEMORY_USAGE);
269                        log.debug("MAX_FILE_PARENT_PATH_LENGTH={}", MAX_FILE_PARENT_PATH_LENGTH);
270                        log.debug("MAX_FILE_NAME_LENGTH={}", MAX_FILE_NAME_LENGTH);
271                        log.debug("CACHE_SECONDS={}", CACHE_SECONDS);
272
273                        String dataSourcePath = configProperties.getString("DataSource");
274                        String reloadDataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath);
275                        String reloadDataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath);
276                        filePartSize = configProperties.getFileSize("FilePartSize", 1024 * 1024 * 1L).intValue();
277                        
278                        if(filePartSize <= 0) filePartSize = 1024 * 1024 * 1;
279                        
280                        tableName = configProperties.getString("TableName", "remote_file");
281                        identityCardEnable = configProperties.getBoolean("IdentityCardEnable", false);
282                        identityCardName = configProperties.getString("IdentityCardName", "RemoteFile");
283                        identityCardPrefix = configProperties.getString("IdentityCardPrefix", "RMF");
284                        identityCardSuffix = configProperties.getInteger("IdentityCardSuffix", 0);
285
286                        String readerDsKey = String.format("%s.reader", dsKey);
287                        String writerDsKey = String.format("%s.writer", dsKey);
288                        /**
289                        May be referenced when expansion is needed
290                        if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) {
291                                DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(readerDsKey);
292                                if (currentDs != null) {
293                                        currentDs.closeAll();
294                        }
295                        **/
296                        if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) {
297                                dataSourceReaderPath = reloadDataSourceReaderPath;
298                                dataSourceReader = new DriverDataSource(new File(dataSourceReaderPath));
299                                DATASOURCE_REMOTE_FILE.put(readerDsKey, dataSourceReader);
300                        }
301                        /**
302                        May be referenced when expansion is needed
303                        if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) {
304                                DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(writerDsKey);
305                                if (currentDs != null) {
306                                        currentDs.closeAll();
307                        }
308                        **/
309                        if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) {
310                                dataSourceWriterPath = reloadDataSourceWriterPath;
311                                dataSourceWriter = new DriverDataSource(new File(dataSourceWriterPath));
312                                DATASOURCE_REMOTE_FILE.put(writerDsKey, dataSourceWriter);
313                        }
314
315                        loadDataMapping(this);
316                        
317                        FIRST_LOADED = true;
318                } catch (Exception e) {
319                        log.error(e.getMessage(), e);
320                }
321        }
322
323        public String formatSql(String sql) {
324                return String.format(sql, tableName);
325        }
326
327        public String formatSqlForFilePartData(String sql) {
328                return String.format(sql, getDataMappingTableName());
329        }
330
331        public DiskFile writeToDisk(DiskFile diskFile) throws IOException {
332                if (!exists()) {
333                        log.mark(String.format("The remote file '%s' does not exist.", this.getPath()));
334                }
335                RemoteFile the = this;
336
337                Timestamp rfModifyTime = the.getModifyTime();
338                if (rfModifyTime != null) {
339                        diskFile.setModifyTimeMsFromClock(rfModifyTime.getTime());
340                }
341                if (isLink()) {
342                        if (isDebug(the)) {
343                                LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", diskFile.getPath());
344                        }
345                        diskFile.createLink(new String(readAllBytes(),BaseFile.CHARSET));
346                } else if (isFile()) {
347                        merge(new FilePart() {
348                                final List<byte[]> partDataList = new ArrayList<byte[]>();
349                                boolean isWritingDf = false;
350                                DiskFile realDf = null;
351
352                                @Override
353                                protected void process(int partIndex, byte[] data) throws IOException {
354                                        Thread.currentThread()
355                                                        .setName(String.format("RemoteFile-writeToDisk-merge-%s", diskFile.getOrigin().getName()));
356
357                                        try {
358
359                                        if (syncRoot != null) {
360                                                if (!diskFile.isLogicModify()) {
361                                                        throw new IOException(
362                                                                        String.format("The file '%s' missing modify logic lock.", the.getPath()));
363                                                }
364                                        }
365                                        
366                                            if (diskFile != null)
367                                                    diskFile.logicModify();
368                                                    
369                                                String fileName = diskFile.getOrigin().getName();
370                                                isWritingDf = fileName.matches(TMP_WRITING_SWP);
371                                                if (isWritingDf) {
372                                                        if (partIndex == 0) {
373                                                                if (diskFile.exists()) {
374                                                                        diskFile.delete();
375                                                                }
376                                                                if (isDebug(the)) {
377                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}",
378                                                                                        diskFile.getPath());
379                                                                }
380                                                                String realFileName = fileName.replaceFirst(BaseFile.TMP_WRITING_FOR_REPLACE, "");
381                                                                String realDfPath = String.format("%s/%s", diskFile.getParent(), realFileName);
382                                                                realDf = new DiskFile(realDfPath);
383                                                                DiskFile.copyAttrs(realDf, diskFile.isCopyStructureOnly(), diskFile.syncRoot);
384                                                                if(!diskFile.manualOpen(true)){
385                                                                        throw new IOException(
386                                                                                        String.format("The disk file '%s' cannot open.", diskFile.getPath()));  
387                                                                }
388                                                        }
389                                                        partDataList.add(decrypt(data));
390                                                        if (partDataList.size() >= BATCH_SIZE) {
391
392                                                                if (realDf != null)
393                                                                        realDf.logicModify();
394
395                                                                ByteArrayOutputStream ops = new ByteArrayOutputStream();
396                                                                int size = partDataList.size();
397                                                                for (int i = 0; i < size; i++) {
398                                                                        byte[] partData = partDataList.get(i);
399                                                                        ops.write(partData);
400                                                                }
401                                                                diskFile.manualWrite(ops.toByteArray());
402                                                                partDataList.clear();
403                                                        }
404                                                } else {
405                                                        if (partIndex == 0) {
406                                                                if (diskFile.exists()) {
407                                                                        throw new IOException(
408                                                                                        String.format("The remote file '%s' already exist.", diskFile.getPath()));
409                                                                }
410                                                                if (isDebug(the)) {
411                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}",
412                                                                                        diskFile.getPath());
413                                                                }
414                                                                if(!diskFile.manualOpen(true)){
415                                                                        throw new IOException(
416                                                                                        String.format("The disk file '%s' cannot open.", diskFile.getPath()));
417                                                                }
418                                                        }
419                                                        partDataList.add(decrypt(data));
420                                                        if (partDataList.size() >= BATCH_SIZE) {
421                                                                ByteArrayOutputStream ops = new ByteArrayOutputStream();
422                                                                int size = partDataList.size();
423                                                                for (int i = 0; i < size; i++) {
424                                                                        byte[] partData = partDataList.get(i);
425                                                                        ops.write(partData);
426                                                                }
427                                                                diskFile.manualWrite(ops.toByteArray());
428                                                                partDataList.clear();
429                                                        }
430                                                }
431
432                                        } catch (Exception e) {
433                                                log.warn(e.getMessage(), e);
434                                                if (diskFile != null) {
435                                                        diskFile.manualClose();
436                                                        diskFile.delete();
437                                                }
438                                                throw new IOException(e.getMessage(), e);
439                                        }
440                                }
441
442                                @Override
443                                protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) throws IOException {
444                                        if (partDataList.size() > 0) {
445                                                ByteArrayOutputStream ops = new ByteArrayOutputStream();
446                                                int size = partDataList.size();
447                                                for (int i = 0; i < size; i++) {
448                                                        byte[] partData = partDataList.get(i);
449                                                        ops.write(partData);
450                                                }
451                                                diskFile.manualWrite(ops.toByteArray());
452                                                partDataList.clear();
453                                        }
454                                        diskFile.manualClose();
455                                        try {
456                                        if (syncRoot != null && !diskFile.isLogicModify()) {
457                                                diskFile.delete();
458                                                throw new IOException(
459                                                                String.format("The file '%s' missing modify logic lock.", diskFile.getPath()));
460                                        }                                           
461                                                if (isWritingDf) {
462                                                        if (diskFile.exists()) {
463                                                                diskFile.moveTo(realDf.getPath());
464                                                                if (isDebug(the)) {
465                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CompletedDiskFile - {}",
466                                                                                        realDf.getPath());
467                                                                }
468                                                        }
469                                                        if (realDf.exists())
470                                                                the.syncedToDisk();
471                                                }
472                                                if (realDf == null) {
473                                                        if (diskFile != null) {
474                                                                if (diskFile.getOrigin().getName().startsWith(".")) {
475                                                                        Files.setAttribute(Paths.get(diskFile.getPath()), "dos:hidden", true);
476                                                                }
477                                                        }
478                                                } else {
479                                                        if (realDf.getOrigin().getName().startsWith(".")) {
480                                                                Files.setAttribute(Paths.get(realDf.getPath()), "dos:hidden", true);
481                                                        }
482                                                }
483
484                                                RemoteFile.removeQueuePathMapping(the.getPath());
485
486                                                if (diskFile != null)
487                                                        diskFile.removeLogicModify();
488
489                                                if (realDf != null)
490                                                        realDf.removeLogicModify();
491
492                                        } catch (Exception e) {
493                                                log.error(e.getMessage(), e);
494                                        }
495                                }
496
497                                @Override
498                                protected void ended(int lastPartIndex, long fileSize) {
499                                        try {
500                                                if (diskFile != null) {
501                                                        diskFile.manualClose();
502                                                }
503                                        } catch (Exception e) {
504                                                log.error(e.getMessage(), e);
505                                        }
506                                }
507                        });
508                } else {
509                        List<Map<String, Object>> writeList = list(getPath(), "%");
510                        writeList.addAll(list(parsePath(getPath() + "/%"), "%"));
511
512                        int size = writeList.size();
513                        if (size > 0) {
514                                if (!diskFile.exists()) {
515                                        if (isDebug(the)) {
516                                                log.mark("CreateDiskDir - {} ", diskFile.getPath());
517                                        }
518                                        diskFile.mkdirs();
519                                }
520                        }
521
522                        for (Map<String, Object> item : writeList) {
523                                RemoteFile rf = new RemoteFile(
524                                                String.format("%s/%s", item.get("file_parent_path"), item.get("file_name")));
525
526                                if (rf.getPath().equals(getPath()))
527                                        continue;
528
529                                String dfPath = rf.getPath().replaceFirst(getPath(), diskFile.getPath());
530                                DiskFile df = new DiskFile(dfPath);
531                                DiskFile.copyAttrs(df, false, this.syncRoot);
532                                Timestamp mt = rf.getModifyTime();
533                                if (mt != null) {
534                                        df.setModifyTimeMsFromClock(mt.getTime());
535                                }
536                                if (rf.isDir()) {
537                                        if (!df.exists()) {
538                                                if (isDebug(the)) {
539                                                        log.mark("CreateDiskDir - {}", diskFile.getPath());
540                                                }
541                                                df.mkdirs();
542                                        }
543                                } else {
544                                        rf.writeToDisk(df);
545                                }
546                        }
547                }
548                return diskFile;
549        }
550
551        public DiskFile writeToDisk() throws IOException {
552                DiskFile df = new DiskFile(this.origin.getAbsolutePath());
553                DiskFile.copyAttrs(df, false, this.syncRoot);
554                return writeToDisk(df);
555        }
556
557        public DiskFile writeToDisk(String path) throws IOException {
558                DiskFile df = new DiskFile(path);
559                DiskFile.copyAttrs(df, false, this.syncRoot);
560                return writeToDisk(df);
561        }
562
563        public boolean isCompleted() throws IOException {
564                return getCompletedRecord() != null;
565        }
566
567        public Map<String, Object> getCompletedRecord() throws IOException {
568                String sql = formatSql(SQL_SELECT_FOR_COMPLETED);
569                Map dataRecord = new HashMap<String, Object>();
570                dataRecord.put("file_name", origin.getName());
571                dataRecord.put("file_parent_path", getParent());
572                String cacheKey = String.format("first-%s-%s",sql,dataRecord);
573                Map<String,Object> record = (Map<String,Object>)Cache.get(cacheKey);
574                if(record == null){
575                        CacheDriverExecutor executor = null;
576                        Connection conn = null;
577                        try {
578                                conn = dataSourceReader.getConnection();
579                                conn.setAutoCommit(true);
580                                executor = new CacheDriverExecutor(conn);
581                                record = executor.first(sql, dataRecord);
582                                if(record != null){
583                                    Cache.set(cacheKey,record,CACHE_SECONDS);
584                                }
585                        } catch (SQLException e) {
586                                throw new IOException(e.getMessage(), e);
587                        } finally {
588                                try {
589                                        if (executor != null)
590                                                executor.close();
591                                } catch (Exception e) {
592                                        log.error(e.getMessage(), e);
593                                }
594                        }
595                }
596                return record;
597        }
598
599        public Map<String, Object> getRecordById(Object id) throws IOException {
600                String sql = formatSql(SQL_SELECT_GET_RECORD_BY_ID);
601                Map dataRecord = new HashMap<String, Object>();
602                dataRecord.put("id", id);
603                Map<String, Object> record = null;
604                if (record == null) {
605                        CacheDriverExecutor executor = null;
606                        Connection conn = null;
607                        try {
608                                conn = dataSourceReader.getConnection();
609                                conn.setAutoCommit(true);
610                                executor = new CacheDriverExecutor(conn);
611                                record = executor.first(sql, dataRecord);
612                        } catch (SQLException e) {
613                                throw new IOException(e.getMessage(), e);
614                        } finally {
615                                try {
616                                        if (executor != null)
617                                                executor.close();
618                                } catch (Exception e) {
619                                        log.error(e.getMessage(), e);
620                                }
621                        }
622                }
623                return record;
624        }
625
626        public Map<String, Object> getRecordByFileId(Object fileId) throws IOException {
627                String sql = formatSql(SQL_SELECT_GET_RECORD_BY_FILE_ID);
628                Map dataRecord = new HashMap<String, Object>();
629                dataRecord.put("file_id", fileId);
630                Map<String, Object> record = null;
631                if (record == null) {
632                        CacheDriverExecutor executor = null;
633                        Connection conn = null;
634                        try {
635                                conn = dataSourceReader.getConnection();
636                                conn.setAutoCommit(true);
637                                executor = new CacheDriverExecutor(conn);
638                                record = executor.first(sql, dataRecord);
639                        } catch (SQLException e) {
640                                throw new IOException(e.getMessage(), e);
641                        } finally {
642                                try {
643                                        if (executor != null)
644                                                executor.close();
645                                } catch (Exception e) {
646                                        log.error(e.getMessage(), e);
647                                }
648                        }
649                }
650                return record;
651        }
652
653        public Map<String, Object> getLastUpdateRecord() throws IOException {
654                String sql = formatSql(SQL_SELECT_FOR_LAST_UPDATE);
655                Map dataRecord = new HashMap<String, Object>();
656                dataRecord.put("file_name", origin.getName());
657                dataRecord.put("file_parent_path", getParent());
658                Map<String, Object> record = null;
659                CacheDriverExecutor executor = null;
660                Connection conn = null;
661                try {
662                        conn = dataSourceReader.getConnection();
663                        conn.setAutoCommit(true);
664                        executor = new CacheDriverExecutor(conn);
665                        record = executor.first(sql, dataRecord);
666                        return record;
667                } catch (SQLException e) {
668                        throw new IOException(e.getMessage(), e);
669                } finally {
670                        try {
671                                if (executor != null)
672                                        executor.close();
673                        } catch (Exception e) {
674                                log.error(e.getMessage(), e);
675                        }
676                }
677        }
678        
679        protected Timestamp getFirstCreatedAt() throws IOException {
680                String sql = formatSql(SQL_SELECT_FOR_FIRST_CREATED_AT);
681                Map dataRecord = new HashMap<String, Object>();
682                dataRecord.put("file_parent_path", getPath());
683                dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
684                dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%");
685                CacheDriverExecutor executor = null;
686                Connection conn = null;
687                try {
688                        conn = dataSourceReader.getConnection();
689                        conn.setAutoCommit(true);
690                        executor = new CacheDriverExecutor(conn);
691                        Map<String, Object> record = executor.first(sql, dataRecord);
692                        return record == null ? null : (Timestamp)record.get("created_at");
693                } catch (SQLException e) {
694                        throw new IOException(e.getMessage(), e);
695                } finally {
696                        try {
697                                if (executor != null)
698                                        executor.close();
699                        } catch (Exception e) {
700                                log.error(e.getMessage(), e);
701                        }
702                }
703        }       
704        
705        protected Integer getMaxFilePartDataTable() throws IOException {
706                String sql = formatSql(SQL_SELECT_FOR_MAX_DATA_TABLE);
707                Map dataRecord = new HashMap<String, Object>();
708                dataRecord.put("file_parent_path", getPath());
709                dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
710                CacheDriverExecutor executor = null;
711                Connection conn = null;
712                try {
713                        conn = dataSourceReader.getConnection();
714                        conn.setAutoCommit(true);
715                        executor = new CacheDriverExecutor(conn);
716                        Map<String, Object> record = executor.first(sql, dataRecord);
717                        return record == null ? 0 : Integer.parseInt(record.get("file_part_data_table")+"");
718                } catch (SQLException e) {
719                        throw new IOException(e.getMessage(), e);
720                } finally {
721                        try {
722                                if (executor != null)
723                                        executor.close();
724                        } catch (Exception e) {
725                                log.error(e.getMessage(), e);
726                        }
727                }
728        }       
729
730        private Map<String, Object> getSelectByPathForCheckExists() throws IOException {
731                String sql = formatSql(SQL_SELECT_BY_PATH_FOR_CHECK_EXISTS);
732                Map dataRecord = new HashMap<String, Object>();
733                dataRecord.put("file_name", origin.getName());
734                dataRecord.put("file_parent_path", getParent());
735                Map<String, Object> record = null;
736                if (record == null) {
737                        CacheDriverExecutor executor = null;
738                        Connection conn = null;
739                        try {
740                                conn = dataSourceReader.getConnection();
741                                conn.setAutoCommit(true);
742                                executor = new CacheDriverExecutor(conn);
743                                record = executor.first(sql, dataRecord);
744                                return record;
745                        } catch (SQLException e) {
746                                throw new IOException(e.getMessage(), e);
747                        } finally {
748                                try {
749                                        if (executor != null)
750                                                executor.close();
751                                } catch (Exception e) {
752                                        log.error(e.getMessage(), e);
753                                }
754                        }
755                }
756                return record;
757        }
758
759        private int countAllSubFiles(String fileParentPath, String fileName) throws IOException {
760                String sql = formatSql(SQL_SELECT_FOR_COUNT_ALL_SUB_FILES);
761                Map dataRecord = new HashMap<String, Object>();
762                dataRecord.put("file_name", fileName);
763                dataRecord.put("file_parent_path", fileParentPath);
764                Map<String, Object> record = null;
765                if (record == null) {
766                        CacheDriverExecutor executor = null;
767                        Connection conn = null;
768                        try {
769                                conn = dataSourceReader.getConnection();
770                                conn.setAutoCommit(true);
771                                executor = new CacheDriverExecutor(conn);
772                                record = executor.first(sql, dataRecord);
773                                return Integer.parseInt(record.get("as_count") + "");
774                        } catch (SQLException e) {
775                                throw new IOException(e.getMessage(), e);
776                        } finally {
777                                try {
778                                        if (executor != null)
779                                                executor.close();
780                                } catch (Exception e) {
781                                        log.error(e.getMessage(), e);
782                                }
783                        }
784                }
785                return -1;
786        }
787
788        private Map<String, Object> getFirstFilePart() throws IOException {
789                String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE);
790                Map dataRecord = new HashMap<String, Object>();
791                dataRecord.put("file_name", origin.getName());
792                dataRecord.put("file_parent_path", getParent());
793                String cacheKey = String.format("first-%s-%s",sql,dataRecord);
794                Map<String, Object> record = (Map<String,Object>)Cache.get(cacheKey);
795                if(record == null){
796                        CacheDriverExecutor executor = null;
797                        Connection conn = null;
798                        try {
799                                conn = dataSourceReader.getConnection();
800                                conn.setAutoCommit(true);
801                                executor = new CacheDriverExecutor(conn);
802                                record = executor.first(sql, dataRecord);
803                                if(record != null){
804                                    Cache.set(cacheKey,record,CACHE_SECONDS);
805                                }
806                        } catch (SQLException e) {
807                                throw new IOException(e.getMessage(), e);
808                        } finally {
809                                try {
810                                        if (executor != null)
811                                                executor.close();
812                                } catch (Exception e) {
813                                        log.error(e.getMessage(), e);
814                                }
815                        }
816                }
817                return record;
818        }
819        
820        private Map<String, Object> getFirstFilePartRefresh() throws IOException {
821                String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE);
822                Map dataRecord = new HashMap<String, Object>();
823                dataRecord.put("file_name", origin.getName());
824                dataRecord.put("file_parent_path", getParent());
825                String cacheKey = String.format("first-%s-%s",sql,dataRecord);
826                Cache.remove(cacheKey);
827                CacheDriverExecutor executor = null;
828                Connection conn = null;
829                try {
830                        conn = dataSourceReader.getConnection();
831                        conn.setAutoCommit(true);
832                        executor = new CacheDriverExecutor(conn);
833                        Map<String,Object> record =  executor.first(sql, dataRecord);
834                        if(record != null){
835                            Cache.set(cacheKey,record,CACHE_SECONDS);
836                        }
837                        return record;
838                } catch (SQLException e) {
839                        throw new IOException(e.getMessage(), e);
840                } finally {
841                        try {
842                                if (executor != null)
843                                        executor.close();
844                        } catch (Exception e) {
845                                log.error(e.getMessage(), e);
846                        }
847                }
848        }       
849
850        private Map<String, Object> getLastIndexRecord() throws IOException {
851                String sql = formatSql(SQL_SELECT_FOR_LAST_FILE_PART);
852                Map dataRecord = new HashMap<String, Object>();
853                dataRecord.put("file_name", origin.getName());
854                dataRecord.put("file_parent_path", getParent());
855                dataRecord.put("file_id", fileId);
856                Map<String, Object> record = null;
857                if (record == null) {
858                        CacheDriverExecutor executor = null;
859                        Connection conn = null;
860                        try {
861                                conn = dataSourceReader.getConnection();
862                                conn.setAutoCommit(true);
863                                executor = new CacheDriverExecutor(conn);
864                                record = executor.first(sql, dataRecord);
865                        } catch (SQLException e) {
866                                throw new IOException(e.getMessage(), e);
867                        } finally {
868                                try {
869                                        if (executor != null)
870                                                executor.close();
871                                } catch (Exception e) {
872                                        log.error(e.getMessage(), e);
873                                }
874                        }
875                }
876                return record;
877        }
878
879        public boolean isTimeout(long timeoutMs) throws IOException {
880                CacheDriverExecutor executor = null;
881                Connection conn = null;
882                try {
883                        Clock clock = new Clock();
884                        conn = dataSourceReader.getConnection();
885                        conn.setAutoCommit(true);
886                        executor = new CacheDriverExecutor(conn);
887                        Map dataRecord = new HashMap<String, Object>();
888                        dataRecord.put("file_name", origin.getName());
889                        dataRecord.put("file_parent_path", getParent());
890                        Map<String, Object> completedRecord = getCompletedRecord();
891
892                        if (completedRecord != null)
893                                return false;
894
895                        Map<String, Object> record = executor.first(formatSql(SQL_SELECT_FOR_TIMEOUT), dataRecord);
896
897                        if (record == null)
898                                return false;
899
900                        boolean completed = executor.first(formatSql(SQL_SELECT_CHECK_COMPLETED), record) != null;
901                        if (!completed) {
902                                boolean timeout = (clock.getTime() - ((Timestamp) record.get("updated_at")).getTime()) > timeoutMs;
903
904                                return timeout;
905                        }
906                        return false;
907                } catch (SQLException e) {
908                        throw new IOException(e.getMessage(), e);
909                } finally {
910                        try {
911                                if (executor != null)
912                                        executor.close();
913                        } catch (Exception e) {
914                                log.error(e.getMessage(), e);
915                        }
916                }
917        }
918
919        private Map<String, Object> readPart(Map<String, Object> rfRecord) throws IOException {
920                CacheDriverExecutor executor = null;
921                Connection conn = null;
922                try {
923                        calculateDataMappingDataSourceReader(this, rfRecord);
924                        if(this.dataMappingTable <= -1){
925                            return null;
926                        }
927                        Clock clock = new Clock();
928                        conn = dataMappingDataSourceReader.getConnection();
929                        conn.setAutoCommit(true);
930                        executor = new CacheDriverExecutor(conn);
931                        Map<String, Object> dataRecord = new HashMap<String, Object>();
932                        dataRecord.put("remote_file_id", rfRecord.get("id"));
933                        return executor.first(formatSqlForFilePartData(SQL_SELECT_BY_RF_ID_FROM_FILE_DATA), dataRecord);
934                } catch (SQLException e) {
935                        throw new IOException(e.getMessage(), e);
936                } finally {
937                        try {
938                                if (executor != null)
939                                        executor.close();
940                        } catch (Exception e) {
941                                log.error(e.getMessage(), e);
942                        }
943                }
944        }
945
946        public void merge(FilePart filePart) throws IOException {
947                if (isLink()) {
948                        throw new IOException(String.format("The file '%s' is a link.", origin.getAbsolutePath()));
949                }
950                if (isDir()) {
951                        throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath()));
952                }
953                CacheDriverExecutor executor = null;
954                Connection conn = null;
955                try {
956                        Clock clock = new Clock();
957                        conn = dataSourceReader.getConnection();
958                        conn.setAutoCommit(true);
959                        executor = new CacheDriverExecutor(conn);
960                        Map dataRecord = getCompletedRecord();
961                        if (dataRecord != null) {
962                                CacheArray rows = new CacheArray();
963                                CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
964                                        long fileSize = 0L;
965                                        Map<String, Object> fp = null;
966                                        Map<String, Object> item = null;
967
968                                        @Override
969                                        public void execute(Integer index, Object o) {
970
971                                                try {
972                                                        item = (Map<String, Object>) o;
973                                                        fp = readPart(item);
974                                                        Object fpData = null;
975                                                        
976                                                        if(fp == null){
977                                                            fpData = new byte[]{};
978                                                        }else{
979                                                            fpData = fp.get("file_part_data");
980                                                        }
981                                                        Thread.currentThread()
982                                                                        .setName(String.format("RemoteFile-merge-%s", item.get("file_name")));
983                                                        byte[] partData = null;
984                                                        if (fpData instanceof byte[]) {
985                                                                partData = (byte[]) fpData;
986                                                        }
987                                                        fileSize += partData.length;
988                                                        filePart.process(index, partData);
989                                                } catch (Exception e) {
990                                                        log.error(e.getMessage(), e);
991                                                        this.terminated = true;
992                                                }
993                                        }
994
995                                        @Override
996                                        public void completed(Integer size) {
997                                                try {
998                                                        if (item != null) {
999                                                                Thread.currentThread()
1000                                                                                .setName(String.format("RemoteFile-merge-completed-%s", item.get("file_name")));
1001                                                        }
1002                                                        filePart.completed(size, new Clock().getSqlTimestamp(), fileSize);
1003                                                } catch (IOException e) {
1004                                                        log.error(e.getMessage(), e);
1005                                                }
1006                                        }
1007
1008                                        @Override
1009                                        public void terminated() {
1010                                                if (item != null) {
1011                                                        Thread.currentThread()
1012                                                                        .setName(String.format("RemoteFile-merge-trminated-%s", item.get("file_name")));
1013                                                        log.error("Merge terminated: {}/{}", item.get("file_parent_path"), item.get("file_name"));
1014                                                }
1015                                        }
1016                                };
1017                                rows.filter(filter);
1018                                executor.find(formatSql(SQL_SELECT_FILE_HAVE_DATA), dataRecord, rows);
1019                        }
1020                } catch (SQLException e) {
1021                        throw new IOException(e.getMessage(), e);
1022                } finally {
1023                        if (executor != null) {
1024                                try {
1025                                        executor.close();
1026                                } catch (Exception e) {
1027                                        throw new IOException(e.getMessage(), e);
1028                                }
1029                        }
1030                }
1031        }
1032
1033        protected void copyFrom(boolean copyStructureOnly, DiskFile diskFile) throws IOException {
1034                var the = this;
1035                if (diskFile.isLink()) {
1036                        the.setModifyTime(diskFile.getModifyTimeForClock());
1037                        the.createLink(diskFile.readLink());
1038                }
1039                if (diskFile.isDir()) {
1040                        the.setModifyTime(diskFile.getModifyTimeForClock());
1041                        the.mkdirs();
1042                }
1043                if (diskFile.isFile()) {
1044                        the.setModifyTime(diskFile.getModifyTimeForClock());
1045                        if (copyStructureOnly) {
1046                                the.createStructureFile(diskFile.size());
1047                                diskFile.removeLogicAccess();
1048                                DiskFile.removeQueuePathMapping(diskFile.getPath());                            
1049                        } else {
1050                                diskFile.split(filePartSize, new FilePart() {
1051                                        final List<byte[]> partDataList = new ArrayList<byte[]>();
1052                                        int currentFilePartStartIndex = 0;
1053
1054                                        @Override
1055                                        protected void process(int partIndex, byte[] data) throws IOException {
1056                                            try{
1057                                                if (partIndex == 0) {
1058                                                        if (diskFile.syncRoot != null) {
1059                                                                if (diskFile.isLogicModify()) {
1060                                                                        throw new IOException(
1061                                                                                        String.format("The file '%s' is already logic locked by another thread.",
1062                                                                                                        diskFile.getPath()));
1063                                                                }
1064                                                        }
1065                                                        if (the.exists(true)) {
1066                                                                throw new IOException(
1067                                                                                String.format("The remote file '%s' already exist.", the.getPath()));
1068                                                        }
1069                                                        if (isDebug(the)) {
1070                                                                LoggerFactory.getLogger(RemoteFile.class).mark("CreateRemoteFile - {}", the.getPath());
1071                                                        }
1072                                                }
1073    
1074                                                partDataList.add(data);
1075                                                if (partDataList.size() >= BATCH_SIZE) {
1076                                                        diskFile.logicAccess();
1077                                                        if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){
1078                                                                throw new IOException(String.format("The remote file '%s' write failed.",the.getPath()));
1079                                                        }
1080                                                        partDataList.clear();
1081                                                        currentFilePartStartIndex += BATCH_SIZE;
1082                                                }
1083                                            }catch(Exception e){
1084                                                throw new IOException(e.getMessage(),e);
1085                                            }
1086                                        }
1087
1088                                        @Override
1089                                        protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize)
1090                                                        throws IOException {
1091                                                try {
1092                                                        if (diskFile.syncRoot != null) {
1093                                                                if (diskFile.isLogicModify()) {
1094                                                                        throw new IOException(
1095                                                                                        String.format("The file '%s' is already logic locked by another thread.",
1096                                                                                                        diskFile.getPath()));
1097                                                                }
1098                                                        }
1099                                                        /**This is 0 bytes file**/
1100                                                        if (lastPartIndex == -1) {
1101
1102                                                                if (the.exists(true)) {
1103                                                                        the.forceDeleteFile(false);
1104                                                                }
1105
1106                                                                the.setModifyTime(modifyTime);
1107                                                                the.createEmptyFile();
1108                                                        } else {
1109                                                                the.setModifyTime(modifyTime);
1110                                                                if (partDataList.size() > 0) {
1111                                                                        if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){
1112                                                                            throw new IOException(String.format("The remote file '%s' write failed.",the.getPath()));
1113                                                                        }
1114                                                                        partDataList.clear();
1115                                                                }
1116                                                                the.complete(lastPartIndex, fileSize);
1117                                                        }
1118                                                        if (isDebug(the)) {
1119                                                                LoggerFactory.getLogger(RemoteFile.class).mark("CompletedRemoteFile - {}",
1120                                                                                the.getPath());
1121                                                        }
1122                                                        DiskFile.removeQueuePathMapping(diskFile.getPath());
1123                                                } catch (Exception e) {
1124                                                        throw new IOException(e.getMessage(),e);
1125                                                }
1126                                        }
1127
1128                                        @Override
1129                                        protected void ended(int lastPartIndex, long fileSize) {
1130
1131                                        }
1132
1133                                });
1134                        }
1135                }
1136        }
1137
1138        private boolean createEmptyFile() throws IOException {
1139            boolean allowed = beforeWrite();
1140            
1141            if(!allowed) return false;
1142                
1143                Clock clock = new Clock();
1144                Timestamp currentTime = clock.getSqlTimestamp();
1145                CacheDriverExecutor executor = null;
1146                Connection conn = null;
1147                try {
1148                    calculateDataMappingDataSourceWriter(this);
1149                        conn = dataSourceWriter.getConnection();
1150                        conn.setAutoCommit(true);
1151                        executor = new CacheDriverExecutor(conn);
1152                        Map dataRecord = new HashMap<String, Object>();
1153                        dataRecord.put("id",
1154                                        generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix));
1155                        dataRecord.put("file_name", origin.getName());
1156                        dataRecord.put("file_parent_path", getParent());
1157                        dataRecord.put("file_type", "F");
1158                        dataRecord.put("file_part_index", 0);
1159                        dataRecord.put("file_part_size", 0);
1160                        dataRecord.put("file_part_data_ds", "NONE");
1161                        dataRecord.put("file_part_data_table", this.dataMappingTable);
1162                        dataRecord.put("file_part_last_index", 0);
1163                        dataRecord.put("file_size", 0);
1164                        dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1165                        dataRecord.put("file_deleted", 0);
1166                        dataRecord.put("created_user_id", getDefaultModifyUserId());
1167                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1168                        dataRecord.put("created_at", currentTime);
1169                        dataRecord.put("updated_at", currentTime);
1170                        dataRecord.put("file_id", fileId);
1171                        dataRecord.put("source_hostname", CommonTools.getHostname());
1172                        dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1173                        dataRecord.put("deleted_on_hostnames", ";");
1174                        executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord);
1175                        
1176                        afterWrite();
1177                        
1178                        return true;
1179                } catch (Exception e) {
1180                        throw new IOException(e.getMessage(), e);
1181                } finally {
1182                        try {
1183                                if (executor != null)
1184                                        executor.close();
1185                        } catch (Exception e) {
1186                                log.error(e.getMessage(), e);
1187                        }
1188                }
1189        }
1190
1191        private boolean createStructureFile(long fileSize) throws IOException {
1192            boolean allowed = beforeWrite();
1193            
1194            if(!allowed) return false;
1195            
1196                log.debug("CreateStructureFile: {}", getPath());
1197                Clock clock = new Clock();
1198                Timestamp currentTime = clock.getSqlTimestamp();
1199                CacheDriverExecutor executor = null;
1200                Connection conn = null;
1201                try {
1202                    calculateDataMappingDataSourceWriter(this);
1203                        conn = dataSourceWriter.getConnection();
1204                        conn.setAutoCommit(true);
1205                        executor = new CacheDriverExecutor(conn);
1206                        Map dataRecord = new HashMap<String, Object>();
1207                        dataRecord.put("id",
1208                                        generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix));
1209                        dataRecord.put("file_name", origin.getName());
1210                        dataRecord.put("file_parent_path", getParent());
1211                        dataRecord.put("file_type", "F");
1212                        dataRecord.put("file_part_index", 0);
1213                        dataRecord.put("file_part_size", fileSize);
1214                        dataRecord.put("file_part_data_ds", "NONE");
1215                        dataRecord.put("file_part_data_table", this.dataMappingTable);
1216                        dataRecord.put("file_part_last_index", 0);
1217                        dataRecord.put("file_size", fileSize);
1218                        dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1219                        dataRecord.put("file_deleted", 0);
1220                        dataRecord.put("created_user_id", getDefaultModifyUserId());
1221                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1222                        dataRecord.put("created_at", currentTime);
1223                        dataRecord.put("updated_at", currentTime);
1224                        dataRecord.put("file_id", fileId);
1225                        dataRecord.put("source_hostname", CommonTools.getHostname());
1226                        dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1227                        dataRecord.put("deleted_on_hostnames", ";");
1228                        executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord);
1229                        
1230                        afterWrite();
1231                        
1232                        return true;
1233                } catch (Exception e) {
1234                        throw new IOException(e.getMessage(), e);
1235                } finally {
1236                        try {
1237                                if (executor != null)
1238                                        executor.close();
1239                        } catch (Exception e) {
1240                                log.error(e.getMessage(), e);
1241                        }
1242                }
1243        }
1244
1245        protected boolean syncedToDisk() throws IOException {
1246                Clock clock = new Clock();
1247                CacheDriverExecutor executor = null;
1248                Connection conn = null;
1249                try {
1250                        conn = dataSourceWriter.getConnection();
1251                        conn.setAutoCommit(true);
1252                        executor = new CacheDriverExecutor(conn);
1253                        Map<String, Object> record = getCompletedRecord();
1254                        if (record == null) {
1255                                log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath()));
1256                        }
1257                        if (record != null) {
1258                            String hostname = CommonTools.getHostname();
1259                                String syncedOnHostnames = (String) record.get("synced_on_hostnames");
1260                                syncedOnHostnames = syncedOnHostnames.replaceAll(String.format(";%s;",hostname),";");
1261                                String shn = String.format("%s%s;", syncedOnHostnames, hostname);
1262                                Map<String, Object> dataRecord = new HashMap<String, Object>();
1263                            dataRecord.put("file_parent_path",record.get("file_parent_path"));
1264                            dataRecord.put("file_name", record.get("file_name"));
1265                                dataRecord.put("synced_on_hostnames",shn);
1266                                dataRecord.put("updated_at", clock.getSqlTimestamp());
1267                                dataRecord.put("updated_user_id", getDefaultModifyUserId());
1268                                executor.execute(formatSql(SQL_UPDATE_FOR_SYNCED_TO_DISK), dataRecord);
1269                                return true;
1270                        }
1271                        return false;
1272                } catch (SQLException e) {
1273                        throw new IOException(e.getMessage(), e);
1274                } finally {
1275                        try {
1276                                if (executor != null)
1277                                        executor.close();
1278                        } catch (Exception e) {
1279                                log.error(e.getMessage(), e);
1280                        }
1281                }
1282        }
1283
1284        protected boolean deletedToDiskAndSub(Object id) throws IOException {
1285                Clock clock = new Clock();
1286                CacheDriverExecutor executor = null;
1287                Connection conn = null;
1288                try {
1289                        conn = dataSourceWriter.getConnection();
1290                        conn.setAutoCommit(true);
1291                        executor = new CacheDriverExecutor(conn);
1292                        Map<String, Object> record = getRecordById(id);
1293                        if (record == null) {
1294                                log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath()));
1295                        }
1296                        if (record != null) {
1297                            String hostname = CommonTools.getHostname();
1298                                String deletedOnHostnames = (String) record.get("deleted_on_hostnames");
1299                                deletedOnHostnames = deletedOnHostnames.replaceAll(String.format(";%s;",hostname),";");
1300                                String dhn = String.format("%s%s;", deletedOnHostnames, hostname);
1301                                Timestamp now = clock.getSqlTimestamp();
1302                                final Map<String, Object> dataRecord = new HashMap<String, Object>();
1303                            dataRecord.put("file_parent_path", record.get("file_parent_path"));
1304                            dataRecord.put("file_name", record.get("file_name"));
1305                                dataRecord.put("deleted_on_hostnames", dhn);
1306                                dataRecord.put("updated_at", now);
1307                                dataRecord.put("updated_user_id", getDefaultModifyUserId());
1308                                executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK), dataRecord);
1309                                if (record.get("file_type").equals("D")) {
1310                                        String dirPath = String.format("%s/%s", record.get("file_parent_path"),
1311                                                        record.get("file_name"));
1312                                        dataRecord.put("file_parent_path", dirPath);
1313                                        dataRecord.put("like_file_parent_path", dirPath + "/%");
1314                                        executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK_SUB_FILES), dataRecord);
1315                                }
1316                                return true;
1317                        }
1318                        return false;
1319                } catch (SQLException e) {
1320                        throw new IOException(e.getMessage(), e);
1321                } finally {
1322                        try {
1323                                if (executor != null)
1324                                        executor.close();
1325                        } catch (Exception e) {
1326                                log.error(e.getMessage(), e);
1327                        }
1328                }
1329        }
1330
1331        private boolean complete(int filePartLastIndex, long fileSize) throws IOException {
1332                Clock clock = new Clock();
1333                CacheDriverExecutor executor = null;
1334                Timestamp currentTime = clock.getSqlTimestamp();
1335                Connection conn = null;
1336                try {
1337                        conn = dataSourceWriter.getConnection();
1338                        conn.setAutoCommit(true);
1339                        executor = new CacheDriverExecutor(conn);
1340                        Map dataRecord = new HashMap<String, Object>();
1341                        dataRecord.put("file_id", fileId);
1342                        dataRecord.put("file_part_last_index", filePartLastIndex);
1343                        dataRecord.put("file_size", fileSize);
1344                        dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1345                        dataRecord.put("updated_at", currentTime);
1346                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1347                        int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord);
1348                        
1349                        afterWrite();
1350                        
1351                        return rows > 0;
1352                } catch (SQLException e) {
1353                        throw new IOException(e.getMessage(), e);
1354                } finally {
1355                        try {
1356                                if (executor != null)
1357                                        executor.close();
1358                        } catch (Exception e) {
1359                                log.error(e.getMessage(), e);
1360                        }
1361                }
1362        }
1363
1364        @Override
1365        public boolean complete() throws IOException {
1366                Clock clock = new Clock();
1367                Timestamp currentTime = clock.getSqlTimestamp();
1368                CacheDriverExecutor executor = null;
1369                Connection conn = null;
1370                try {
1371                        conn = dataSourceWriter.getConnection();
1372                        conn.setAutoCommit(true);
1373                        executor = new CacheDriverExecutor(conn);
1374                        Map dataRecord = new HashMap<String, Object>();
1375                        dataRecord.put("file_name", origin.getName());
1376                        dataRecord.put("file_parent_path", getParent());
1377                        dataRecord.put("file_id", fileId);
1378
1379                        Map<String, Object> sumRecord = executor.first(formatSql(SQL_SELECT_FOR_SUM_FILE_PART_SIZE_INDEX),
1380                                        dataRecord);
1381
1382                        if (sumRecord == null) {
1383                                log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath()));
1384                        }
1385
1386                        if (sumRecord != null) {
1387                                long fileSize = Long.parseLong(sumRecord.get("file_part_size") + "");
1388                                long filePartLastIndex = Integer.parseInt(sumRecord.get("file_part_index") + "");
1389
1390                                dataRecord.put("file_part_last_index", filePartLastIndex);
1391                                dataRecord.put("file_size", fileSize);
1392                                dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1393                                dataRecord.put("updated_at", currentTime);
1394                                dataRecord.put("updated_user_id", getDefaultModifyUserId());
1395                                int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord);
1396                                
1397                                afterWrite();
1398                                
1399                                return rows > 0;
1400                        }
1401                        return false;
1402                } catch (SQLException e) {
1403                        throw new IOException(e.getMessage(), e);
1404                } finally {
1405                        try {
1406                                if (executor != null)
1407                                        executor.close();
1408                        } catch (Exception e) {
1409                                log.error(e.getMessage(), e);
1410                        }
1411                }
1412        }
1413
1414        public boolean forceDelete() throws IOException {
1415                return forceDeleteAll(true);
1416        }
1417
1418        public boolean forceDeleteDir(boolean realDeleted) throws IOException {
1419                return forceDelete(realDeleted, "D");
1420        }
1421
1422        public boolean forceDeleteLink(boolean realDeleted) throws IOException {
1423                return forceDelete(realDeleted, "L");
1424        }
1425
1426        public boolean forceDeleteFile(boolean realDeleted) throws IOException {
1427                return forceDelete(realDeleted, "F");
1428        }
1429
1430        public boolean forceDelete(boolean realDeleted, String fileType) throws IOException {
1431                if (fileType.equalsIgnoreCase("D")) {
1432                        return forceDeleteAll(realDeleted);
1433                } else {
1434                        boolean allowed = beforeDelete(realDeleted);
1435                        if (allowed) {
1436                                CacheDriverExecutor executor = null;
1437                                Connection conn = null;
1438                                try {
1439                                        Clock clock = new Clock();
1440                                        Timestamp currentTime = clock.getSqlTimestamp();
1441                                        conn = dataSourceWriter.getConnection();
1442                                        conn.setAutoCommit(true);
1443                                        executor = new CacheDriverExecutor(conn);
1444                                        Map dataRecord = new HashMap<String, Object>();
1445                                        dataRecord.put("file_deleted", realDeleted ? 1 : 0);
1446                                        dataRecord.put("deleted", realDeleted ? 0 : 1);
1447                                        dataRecord.put("file_name", origin.getName());
1448                                        dataRecord.put("file_type", fileType.toUpperCase());
1449                                        dataRecord.put("file_parent_path", getParent());
1450                                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1451                                        dataRecord.put("deleted_user_id", getDefaultModifyUserId());
1452                                        dataRecord.put("updated_at", currentTime);
1453                                        dataRecord.put("deleted_at", currentTime);
1454                                        dataRecord.put("root_folder", getPath());
1455                                        dataRecord.put("like_subfolder", parsePath(getPath() + "/%"));
1456                                        int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_FILE_LINK), dataRecord);
1457                                        log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows);
1458                    
1459                    afterDelete(realDeleted);
1460                    
1461                                        return resultRows > 0;
1462                                } catch (SQLException e) {
1463                                        throw new IOException(e.getMessage(), e);
1464                                } finally {
1465                                        try {
1466                                                if (executor != null)
1467                                                        executor.close();
1468                                        } catch (Exception e) {
1469                                                log.error(e.getMessage(), e);
1470                                        }
1471                                }
1472                        }
1473                }
1474                return false;
1475        }
1476
1477        public boolean forceDeleteAll(boolean realDeleted) throws IOException {
1478                boolean allowed = beforeDelete(realDeleted);
1479                if (allowed) {
1480                        CacheDriverExecutor executor = null;
1481                        Connection conn = null;
1482                        try {
1483                                Clock clock = new Clock();
1484                                Timestamp currentTime = clock.getSqlTimestamp();
1485                                conn = dataSourceWriter.getConnection();
1486                                conn.setAutoCommit(true);
1487                                executor = new CacheDriverExecutor(conn);
1488                                Map dataRecord = new HashMap<String, Object>();
1489                                dataRecord.put("file_deleted", realDeleted ? 1 : 0);
1490                                dataRecord.put("deleted", realDeleted ? 0 : 1);
1491                                dataRecord.put("file_name", origin.getName());
1492                                dataRecord.put("file_parent_path", getParent());
1493                                dataRecord.put("updated_user_id", getDefaultModifyUserId());
1494                                dataRecord.put("deleted_user_id", getDefaultModifyUserId());
1495                                dataRecord.put("updated_at", currentTime);
1496                                dataRecord.put("deleted_at", currentTime);
1497                                dataRecord.put("root_folder", getPath());
1498                                dataRecord.put("like_subfolder", parsePath(getPath() + "/%"));
1499                                int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_ALL), dataRecord);
1500                                log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows);
1501
1502                afterDelete(realDeleted);
1503                
1504                                return resultRows > 0;
1505                        } catch (SQLException e) {
1506                                throw new IOException(e.getMessage(), e);
1507                        } finally {
1508                                try {
1509                                        if (executor != null)
1510                                                executor.close();
1511                                } catch (Exception e) {
1512                                        log.error(e.getMessage(), e);
1513                                }
1514                        }
1515                }
1516                return false;
1517        }
1518
1519        @Override
1520        public boolean delete() throws IOException {
1521                return forceDeleteAll(true);
1522        }
1523
1524        public boolean delete(boolean realDeleted) throws IOException {
1525                return forceDeleteAll(realDeleted);
1526        }
1527
1528        @Override
1529        public boolean beforeDelete(boolean realDeleted) {
1530                return true;
1531        }
1532        
1533        @Override
1534        public void afterDelete(boolean realDeleted) {
1535            
1536        }       
1537        
1538        @Override
1539        public boolean beforeMkdirs() {
1540            return true;
1541        }
1542        
1543        @Override
1544        public void afterMkdirs() {
1545            
1546        }
1547        
1548        @Override
1549        public boolean beforeCreateLink(String target) {
1550            return true;
1551        }
1552        
1553        @Override
1554        public void afterCreateLink(String target) {
1555            
1556        }
1557        
1558        @Override
1559        public boolean beforeWrite(){
1560            return true;
1561        }
1562        
1563        @Override
1564        public void afterWrite(){
1565            
1566        }       
1567
1568        @Override
1569        public boolean exists() throws IOException {
1570                return getFirstFilePart() != null;
1571        }
1572        
1573        public boolean exists(boolean refresh) throws IOException {
1574            if(refresh){
1575                return getFirstFilePartRefresh() != null;
1576            }else{
1577                    return exists();
1578            }
1579        }       
1580
1581        @Override
1582        public boolean mkdirs() throws IOException {
1583                return mkdirs(false);
1584        }
1585
1586        public boolean mkdirs(boolean checkSourceHostname) throws IOException {
1587            
1588            boolean allowed = beforeMkdirs();
1589            
1590            if(!allowed) return false;
1591            
1592                Clock clock = new Clock();
1593                CacheDriverExecutor executor = null;
1594                Connection conn = null;
1595                try {
1596                        if (getFirstFilePart() != null) {
1597                                return false;
1598                        }
1599                        if (checkSourceHostname) {
1600                                String lastSourceHostname = getLastSourceHostname();
1601                                if (lastSourceHostname != null) {
1602                                        String clientHostname = CommonTools.getHostname();
1603                                        boolean sameHostname = clientHostname.equals(lastSourceHostname);
1604                                        if (!sameHostname) {
1605                                                return false;
1606                                        }
1607                                }
1608                        }
1609                        log.debug("Mkidrs: {}", getPath());
1610                        calculateDataMappingDataSourceWriter(this);
1611                        Timestamp currentTime = clock.getSqlTimestamp();
1612                        conn = dataSourceWriter.getConnection();
1613                        conn.setAutoCommit(true);
1614                        executor = new CacheDriverExecutor(conn);
1615                        Map dataRecord = new HashMap<String, Object>();
1616                        dataRecord.put("id",
1617                                        generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix));
1618                        dataRecord.put("file_name", origin.getName());
1619                        dataRecord.put("file_parent_path", getParent());
1620                        dataRecord.put("file_type", "D");
1621                        dataRecord.put("file_part_index", 0);
1622                        dataRecord.put("file_part_size", 0);
1623                        dataRecord.put("file_part_data_ds", "NONE");
1624                        dataRecord.put("file_part_data_table", this.dataMappingTable);
1625                        dataRecord.put("file_part_last_index", 0);
1626                        dataRecord.put("file_size", 0);
1627                        dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1628                        dataRecord.put("file_deleted", 0);
1629                        dataRecord.put("created_user_id", getDefaultModifyUserId());
1630                        dataRecord.put("updated_user_id", getDefaultModifyUserId());
1631                        dataRecord.put("created_at", currentTime);
1632                        dataRecord.put("updated_at", currentTime);
1633                        dataRecord.put("file_id", fileId);
1634                        dataRecord.put("source_hostname", CommonTools.getHostname());
1635                        dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1636                        dataRecord.put("deleted_on_hostnames", ";");
1637                        int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord);
1638                        
1639                        afterMkdirs();
1640                        
1641                        return rows > 0;
1642                } catch (Exception e) {
1643                        throw new IOException(e.getMessage(), e);
1644                } finally {
1645                        try {
1646                                if (executor != null)
1647                                        executor.close();
1648                        } catch (Exception e) {
1649                                log.error(e.getMessage(), e);
1650                        }
1651                }
1652        }
1653
1654        @Override
1655        public boolean write(byte[] data, boolean append) throws IOException {
1656                return writeTo(data, append);
1657        }
1658
1659        public synchronized boolean writeBlock(byte[] data, boolean append) throws IOException {
1660                return writeTo(data, append);
1661        }
1662
1663        private boolean writeToBatch(int currentFilePartStartIndex, List<byte[]> partDataList) throws IOException {
1664            
1665            boolean allowed = beforeWrite();
1666            
1667            if(!allowed) return false;
1668            
1669                CacheDriverExecutor executor = null;
1670                CacheDriverExecutor dmExecutor = null;
1671                List<Map<String, Object>> rfList = new ArrayList<Map<String, Object>>();
1672                List<Map<String, Object>> dmList = new ArrayList<Map<String, Object>>();
1673                try {
1674                        Clock clock = new Clock();
1675                        log.debug("Write: {}", getPath());
1676                        int size = partDataList.size();
1677                        Timestamp currentTime = clock.getSqlTimestamp();
1678                        String hostname = CommonTools.getHostname();
1679                        calculateDataMappingDataSourceWriter(this);
1680                        for (int filePartIndex = 0; filePartIndex < size; filePartIndex++) {
1681                                byte[] data = partDataList.get(filePartIndex);
1682                                String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix,
1683                                                identityCardSuffix);
1684                                Map rfRecord = new HashMap<String, Object>();
1685                                rfRecord.put("id", remoteFileId);
1686                                rfRecord.put("file_name", origin.getName());
1687                                rfRecord.put("file_parent_path", getParent());
1688                                rfRecord.put("file_type", "F");
1689                                rfRecord.put("file_part_index", filePartIndex + currentFilePartStartIndex);
1690                                rfRecord.put("file_part_size", data.length);
1691                                rfRecord.put("file_part_data_ds", this.dataMappingDs);
1692                                rfRecord.put("file_part_data_table", this.dataMappingTable);
1693                                rfRecord.put("file_part_last_index", -1);
1694                                rfRecord.put("file_size", 0);
1695                                rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1696                                rfRecord.put("file_deleted", 0);
1697                                rfRecord.put("created_user_id", getDefaultModifyUserId());
1698                                rfRecord.put("updated_user_id", getDefaultModifyUserId());
1699                                rfRecord.put("created_at", currentTime);
1700                                rfRecord.put("updated_at", currentTime);
1701                                rfRecord.put("file_id", fileId);
1702                                rfRecord.put("source_hostname", hostname);
1703                                rfRecord.put("synced_on_hostnames", String.format(";%s;", hostname));
1704                                rfRecord.put("deleted_on_hostnames", ";");
1705                                rfList.add(rfRecord);
1706
1707                                Map<String, Object> dataRecord = new HashMap<String, Object>();
1708                                dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this));
1709                                dataRecord.put("file_part_data", encrypt(data));
1710                                dataRecord.put("remote_file_id", remoteFileId);
1711                                dataRecord.put("file_id", fileId);
1712                                dataRecord.put("created_at", currentTime);
1713                                dataRecord.put("updated_at", currentTime);
1714                                dmList.add(dataRecord);
1715                        }
1716                        Connection conn = dataSourceWriter.getConnection();
1717                        conn.setAutoCommit(true);
1718                        executor = new CacheDriverExecutor(conn);
1719                        executor.executeBatch(formatSql(SQL_INSERT_INTO_FILE), rfList);
1720
1721                        conn = dataMappingDataSourceWriter.getConnection();
1722                        conn.setAutoCommit(true);
1723                        dmExecutor = new CacheDriverExecutor(conn);
1724                        dmExecutor.executeBatch(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dmList);
1725                        
1726                        return true;
1727                } catch (Exception e) {
1728                        throw new IOException(e.getMessage(), e);
1729                } finally {
1730                        try {
1731                                if (executor != null)
1732                                        executor.close();
1733                        } catch (Exception e) {
1734                                log.error(e.getMessage(), e);
1735                        }
1736                        try {
1737                                if (dmExecutor != null)
1738                                        dmExecutor.close();
1739                        } catch (Exception e) {
1740                                log.error(e.getMessage(), e);
1741                        }
1742                }
1743        }
1744
1745        private boolean writeTo(byte[] data, boolean append) throws IOException {
1746            
1747            boolean allowed = beforeWrite();
1748            
1749            if(!allowed) return false;
1750            
1751                CacheDriverExecutor executor = null;
1752                CacheDriverExecutor dmExecutor = null;
1753                try {
1754                        if (!append) {
1755                                if (exists())
1756                                        throw new IOException(
1757                                                        String.format("The remote file '%s' already exists.", origin.getAbsolutePath()));
1758                        }
1759                        Clock clock = new Clock();
1760                        Timestamp currentTime = clock.getSqlTimestamp();
1761                        Map<String, Object> lastRecord = getLastIndexRecord();
1762                        int filePartIndex = 0;
1763                        if (lastRecord != null) {
1764                                if (lastRecord.get("file_type").equals("D")) {
1765                                        throw new IOException(String.format("This is a folder '%s'.", origin.getAbsolutePath()));
1766                                }
1767                                if (lastRecord.get("file_type").equals("L")) {
1768                                        throw new IOException(String.format("This is a link '%s'.", origin.getAbsolutePath()));
1769                                }
1770                                if (append) {
1771                                        filePartIndex = Integer.parseInt(lastRecord.get("file_part_index") + "") + 1;
1772                                }
1773                                boolean isDeleted = false;
1774                                Object deletedValue = lastRecord.get("deleted");
1775                                if (deletedValue instanceof Boolean) {
1776                                        isDeleted = (Boolean) deletedValue;
1777                                } else {
1778                                        isDeleted = Integer.parseInt(deletedValue + "") == 1;
1779                                }
1780                                if (isDeleted) {
1781                                        throw new IOException(String.format("The file '%s' is deleted.", origin.getAbsolutePath()));
1782                                }
1783                        }
1784                        calculateDataMappingDataSourceWriter(this);
1785                        log.debug("Write: {}", getPath());
1786                        String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix,
1787                                        identityCardSuffix);
1788                        Map rfRecord = new HashMap<String, Object>();
1789                        rfRecord.put("id", remoteFileId);
1790                        rfRecord.put("file_name", origin.getName());
1791                        rfRecord.put("file_parent_path", getParent());
1792                        rfRecord.put("file_type", "F");
1793                        rfRecord.put("file_part_index", filePartIndex);
1794                        rfRecord.put("file_part_size", data.length);
1795                        rfRecord.put("file_part_data_ds", this.dataMappingDs);
1796                        rfRecord.put("file_part_data_table", this.dataMappingTable);
1797                        rfRecord.put("file_part_last_index", -1);
1798                        rfRecord.put("file_size", 0);
1799                        rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1800                        rfRecord.put("file_deleted", 0);
1801                        rfRecord.put("created_user_id", getDefaultModifyUserId());
1802                        rfRecord.put("updated_user_id", getDefaultModifyUserId());
1803                        rfRecord.put("created_at", currentTime);
1804                        rfRecord.put("updated_at", currentTime);
1805                        rfRecord.put("file_id", fileId);
1806                        rfRecord.put("source_hostname", CommonTools.getHostname());
1807                        rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1808                        rfRecord.put("deleted_on_hostnames", ";");
1809
1810                        Connection conn = dataSourceWriter.getConnection();
1811                        conn.setAutoCommit(true);
1812                        executor = new CacheDriverExecutor(conn);
1813                        executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord);
1814
1815                        Map dataRecord = new HashMap<String, Object>();
1816                        dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this));
1817                        dataRecord.put("file_part_data", encrypt(data));
1818                        dataRecord.put("remote_file_id", remoteFileId);
1819                        dataRecord.put("file_id", fileId);
1820                        dataRecord.put("created_at", currentTime);
1821                        dataRecord.put("updated_at", currentTime);
1822
1823                        conn = dataMappingDataSourceWriter.getConnection();
1824                        conn.setAutoCommit(true);
1825                        dmExecutor = new CacheDriverExecutor(conn);
1826                        dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord);
1827
1828                        return true;
1829                } catch (Exception e) {
1830                        throw new IOException(e.getMessage(), e);
1831                } finally {
1832                        try {
1833                                if (executor != null)
1834                                        executor.close();
1835                        } catch (Exception e) {
1836                                log.error(e.getMessage(), e);
1837                        }
1838                        try {
1839                                if (dmExecutor != null)
1840                                        dmExecutor.close();
1841                        } catch (Exception e) {
1842                                log.error(e.getMessage(), e);
1843                        }
1844                }
1845        }
1846
1847        @Override
1848        public boolean write(byte[] data) throws IOException {
1849                return write(data, false);
1850        }
1851
1852        @Override
1853        public boolean write(String data, boolean append) throws IOException {
1854                return write(data.getBytes(CHARSET), append);
1855        }
1856
1857        @Override
1858        public boolean write(String data) throws IOException {
1859                return write(data.getBytes(CHARSET));
1860        }
1861
1862        @Override
1863        public boolean createLink(String target) throws IOException {
1864            boolean allowed = beforeCreateLink(target);
1865            
1866            if(!allowed) return false;
1867            
1868                Clock clock = new Clock();
1869                CacheDriverExecutor executor = null;
1870                CacheDriverExecutor dmExecutor = null;
1871                try {
1872                        if (getFirstFilePart() != null) {
1873                                return false;
1874                        }
1875                        log.debug("CreateLink: {}", getPath());
1876                        Timestamp currentTime = clock.getSqlTimestamp();
1877                        calculateDataMappingDataSourceWriter(this);
1878                        String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix,
1879                                        identityCardSuffix);
1880                        byte[] data = target.getBytes(CHARSET);
1881                        Map rfRecord = new HashMap<String, Object>();
1882                        rfRecord.put("id", remoteFileId);
1883                        rfRecord.put("file_name", origin.getName());
1884                        rfRecord.put("file_parent_path", getParent());
1885                        rfRecord.put("file_type", "L");
1886                        rfRecord.put("file_part_index", 0);
1887                        rfRecord.put("file_part_size", data.length);
1888                        rfRecord.put("file_part_data_ds", this.dataMappingDs);
1889                        rfRecord.put("file_part_data_table", this.dataMappingTable);
1890                        rfRecord.put("file_part_last_index", -1);
1891                        rfRecord.put("file_size", 0);
1892                        rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime);
1893                        rfRecord.put("file_deleted", 0);
1894                        rfRecord.put("created_user_id", getDefaultModifyUserId());
1895                        rfRecord.put("updated_user_id", getDefaultModifyUserId());
1896                        rfRecord.put("created_at", currentTime);
1897                        rfRecord.put("updated_at", currentTime);
1898                        rfRecord.put("file_id", fileId);
1899                        rfRecord.put("source_hostname", CommonTools.getHostname());
1900                        rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname()));
1901                        rfRecord.put("deleted_on_hostnames", ";");
1902
1903                        Connection conn = dataSourceWriter.getConnection();
1904                        conn.setAutoCommit(true);
1905                        executor = new CacheDriverExecutor(conn);
1906                        int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord);
1907
1908                        Map dataRecord = new HashMap<String, Object>();
1909                        dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this));
1910                        dataRecord.put("file_part_data", encrypt(data));
1911                        dataRecord.put("remote_file_id", remoteFileId);
1912                        dataRecord.put("file_id", fileId);
1913                        dataRecord.put("created_at", currentTime);
1914                        dataRecord.put("updated_at", currentTime);
1915
1916                        conn = dataMappingDataSourceWriter.getConnection();
1917                        conn.setAutoCommit(true);
1918                        dmExecutor = new CacheDriverExecutor(conn);
1919                        dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord);
1920
1921                        complete();
1922            
1923            afterCreateLink(target);
1924            
1925                        return rows > 0;
1926                } catch (Exception e) {
1927                        throw new IOException(e.getMessage(), e);
1928                } finally {
1929                        try {
1930                                if (executor != null)
1931                                        executor.close();
1932                        } catch (Exception e) {
1933                                log.error(e.getMessage(), e);
1934                        }
1935                        try {
1936                                if (dmExecutor != null)
1937                                        dmExecutor.close();
1938                        } catch (Exception e) {
1939                                log.error(e.getMessage(), e);
1940                        }
1941                }
1942        }
1943
1944        @Override
1945        public String readLink() throws IOException {
1946                return new String(readAllBytes(),BaseFile.CHARSET);
1947        }
1948
1949        @Override
1950        public byte[] readAllBytes() throws IOException {
1951                return readAllBytesFrom();
1952        }
1953
1954        private byte[] readAllBytesFrom() throws IOException {
1955                if (isDir()) {
1956                        throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath()));
1957                }
1958                if (!exists()) {
1959                        throw new IOException(String.format("The file '%s' does not exist.", origin.getAbsolutePath()));
1960                }
1961                CacheDriverExecutor executor = null;
1962                Connection conn = null;
1963                ByteArrayOutputStream baos = null;
1964                try {
1965                        conn = dataSourceReader.getConnection();
1966                        conn.setAutoCommit(true);
1967                        executor = new CacheDriverExecutor(conn);
1968                        Map dataRecord = getCompletedRecord();
1969                        List<Map<String, Object>> filePartList = executor.find(formatSql(SQL_SELECT_FILE), dataRecord);
1970                        baos = new ByteArrayOutputStream();
1971                        for (Map<String, Object> fpItem : filePartList) {
1972                                Map<String, Object> fp = readPart(fpItem);
1973                                Object fpData = null;
1974                                
1975                                if(fp == null){
1976                                    fpData = new byte[]{};
1977                                }else{
1978                                    fpData = fp.get("file_part_data");  
1979                                }
1980
1981                                if (fpData instanceof byte[]) {
1982                                        byte[] partData = (byte[]) fpData;
1983                                        baos.write(decrypt(partData));
1984                                        baos.flush();
1985                                }
1986                        }
1987                        return baos == null ? null : baos.toByteArray();
1988                } catch (Exception e) {
1989                        throw new IOException(e.getMessage(), e);
1990                } finally {
1991                        try {
1992                                if (baos != null)
1993                                        baos.close();
1994                        } catch (Exception e) {
1995                                log.error(e.getMessage(), e);
1996                        }
1997                        try {
1998                                if (executor != null)
1999                                        executor.close();
2000                        } catch (Exception e) {
2001                                log.error(e.getMessage(), e);
2002                        }
2003                }
2004        }
2005
2006        @Override
2007        public boolean isFile() throws IOException {
2008                Map<String, Object> lastRecord = getFirstFilePart();
2009
2010                if (lastRecord == null) {
2011                        return false;
2012                }
2013                return lastRecord.get("file_type").equals("F");
2014        }
2015
2016        @Override
2017        public boolean isDirectory() throws IOException {
2018                Map<String, Object> lastRecord = getFirstFilePart();
2019
2020                if (lastRecord == null) {
2021                        return false;
2022                }
2023                return lastRecord.get("file_type").equals("D");
2024        }
2025
2026        @Override
2027        public boolean isLink() throws IOException {
2028                Map<String, Object> lastRecord = getFirstFilePart();
2029
2030                if (lastRecord == null) {
2031                        return false;
2032                }
2033                return lastRecord.get("file_type").equals("L");
2034        }
2035
2036        @Override
2037        public String readAllString() throws IOException {
2038                ByteArrayOutputStream baos = null;
2039                try {
2040                        byte[] bytes = readAllBytes();
2041                        
2042                        if(bytes == null) return null;
2043                        
2044                        baos = new ByteArrayOutputStream();
2045                        baos.write(bytes);
2046                        baos.flush();
2047                        return baos.toString();
2048                } finally {
2049                        if (baos != null) {
2050                                try {
2051                                        baos.close();
2052                                } catch (IOException e) {
2053                                        log.error(e.getMessage(), e);
2054                                }
2055                        }
2056                }
2057        }
2058
2059        @Override
2060        public long size() throws IOException {
2061                Map<String, Object> record = getCompletedRecord();
2062                if (record != null) {
2063                        return Long.parseLong(record.get("file_size") + "");
2064                }
2065                return 0L;
2066        }
2067
2068        public List<Map<String, Object>> list() throws IOException {
2069                return list(getPath(), "%");
2070        }
2071
2072        public List<Map<String, Object>> list(String likeFileName) throws IOException {
2073                return list(getPath(), likeFileName);
2074        }
2075
2076        public List<Map<String, Object>> list(String likeFolderName, String likeFileName) throws IOException {
2077                CacheDriverExecutor executor = null;
2078                Connection conn = null;
2079                try {
2080                        conn = dataSourceReader.getConnection();
2081                        conn.setAutoCommit(true);
2082                        executor = new CacheDriverExecutor(conn);
2083                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2084                        if (CommonTools.isBlank(likeFileName)) {
2085                                dataRecord.put("file_name", "%");
2086                        } else {
2087                                dataRecord.put("file_name", likeFileName);
2088                        }
2089                        if (CommonTools.isBlank(likeFolderName)) {
2090                                dataRecord.put("file_parent_path", "%");
2091                        } else {
2092                                dataRecord.put("file_parent_path", likeFolderName);
2093                        }
2094                        return executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord);
2095                } catch (SQLException e) {
2096                        throw new IOException(e.getMessage(), e);
2097                } finally {
2098                        if (executor != null) {
2099                                try {
2100                                        executor.close();
2101                                } catch (Exception e) {
2102                                        throw new IOException(e.getMessage(), e);
2103                                }
2104                        }
2105                }
2106        }
2107
2108        public void listAll(String likeFileName, CacheArray rows) throws IOException {
2109                listAll(getPath(), likeFileName, rows);
2110        }
2111
2112        public void listAll(CacheArray rows) throws IOException {
2113                listAll(getPath(), "%", rows);
2114        }
2115
2116        public void listAll(String likeFolderName, String likeFileName, CacheArray rows) throws IOException {
2117                CacheDriverExecutor executor = null;
2118                Connection conn = null;
2119                try {
2120                        conn = dataSourceReader.getConnection();
2121                        conn.setAutoCommit(true);
2122                        executor = new CacheDriverExecutor(conn);
2123                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2124                        if (CommonTools.isBlank(likeFileName)) {
2125                                dataRecord.put("file_name", "%");
2126                        } else {
2127                                dataRecord.put("file_name", likeFileName);
2128                        }
2129                        if (CommonTools.isBlank(likeFolderName)) {
2130                                dataRecord.put("file_parent_path", "%");
2131                        } else {
2132                                dataRecord.put("file_parent_path", likeFolderName);
2133                        }
2134                        executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord, rows);
2135                } catch (SQLException e) {
2136                        throw new IOException(e.getMessage(), e);
2137                } finally {
2138                        if (executor != null) {
2139                                try {
2140                                        executor.close();
2141                                } catch (Exception e) {
2142                                        throw new IOException(e.getMessage(), e);
2143                                }
2144                        }
2145                }
2146        }
2147
2148        protected void listAllForDelete(String likeFolderName, String likeFileName, CacheArray rows) throws IOException {
2149                CacheDriverExecutor executor = null;
2150                Connection conn = null;
2151                try {
2152                    long deleteCutoffTimeMs = 0L;
2153                    
2154                    if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime();
2155                    
2156                        Timestamp currentTime = new Clock().getSqlTimestamp();
2157                        
2158                        conn = dataSourceReader.getConnection();
2159                        conn.setAutoCommit(true);
2160                        executor = new CacheDriverExecutor(conn);
2161                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2162                        if (CommonTools.isBlank(likeFileName)) {
2163                                dataRecord.put("file_name", "%");
2164                        } else {
2165                                dataRecord.put("file_name", likeFileName);
2166                        }
2167                        if (CommonTools.isBlank(likeFolderName)) {
2168                                dataRecord.put("file_parent_path", "%");
2169                        } else {
2170                                dataRecord.put("file_parent_path", likeFolderName);
2171                        }
2172                        dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%");
2173                        dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs));
2174                        dataRecord.put("end_time", currentTime);
2175                        dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME));
2176                        executor.find(0, MAX_QUEUE_SIZE,formatSql(SQL_SELECT_FOR_LIST_ALL_DELETE), dataRecord, rows);
2177                } catch (SQLException e) {
2178                        throw new IOException(e.getMessage(), e);
2179                } finally {
2180                        if (executor != null) {
2181                                try {
2182                                        executor.close();
2183                                } catch (Exception e) {
2184                                        throw new IOException(e.getMessage(), e);
2185                                }
2186                        }
2187                }
2188        }
2189
2190        protected void listAllForSync(String likeFolderName, String likeFileName, CacheArray rows) throws IOException {
2191                CacheDriverExecutor executor = null;
2192                Connection conn = null;
2193                try {
2194                    long deleteCutoffTimeMs = 0L;
2195                    
2196                    if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = SYNC_CUTOFF_TIME.getTime();
2197                    
2198                        Timestamp currentTime = new Clock().getSqlTimestamp();
2199                        
2200                        conn = dataSourceReader.getConnection();
2201                        conn.setAutoCommit(true);
2202                        executor = new CacheDriverExecutor(conn);
2203                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2204                        if (CommonTools.isBlank(likeFileName)) {
2205                                dataRecord.put("file_name", "%");
2206                        } else {
2207                                dataRecord.put("file_name", likeFileName);
2208                        }
2209                        if (CommonTools.isBlank(likeFolderName)) {
2210                                dataRecord.put("file_parent_path", "%");
2211                        } else {
2212                                dataRecord.put("file_parent_path", likeFolderName);
2213                        }
2214                        dataRecord.put("synced_on_hostnames", "%;" + CommonTools.getHostname() + ";%");
2215                        dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs));
2216                        dataRecord.put("end_time", currentTime);
2217                        dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME));
2218                        executor.find(0, MAX_QUEUE_SIZE, formatSql(SQL_SELECT_FOR_LIST_ALL_SYNC), dataRecord, rows);
2219                } catch (SQLException e) {
2220                        throw new IOException(e.getMessage(), e);
2221                } finally {
2222                        if (executor != null) {
2223                                try {
2224                                        executor.close();
2225                                } catch (Exception e) {
2226                                        throw new IOException(e.getMessage(), e);
2227                                }
2228                        }
2229                }
2230        }
2231
2232        public void setModifyUserId(String modifyUserId) {
2233                this.modifyUserId = modifyUserId;
2234        }
2235
2236        private String getDefaultModifyUserId() {
2237                if (CommonTools.isBlank(modifyUserId)) {
2238                        return DEFAULE_MODIFY_USER_ID;
2239                } else {
2240                        return modifyUserId;
2241                }
2242        }
2243
2244        @Override
2245        public void setModifyTime(Timestamp modifyTime) throws IOException {
2246                this.modifyTime = modifyTime;
2247        }
2248
2249        @Override
2250        public Timestamp getModifyTime() throws IOException {
2251                Map<String, Object> record = getFirstFilePart();
2252                if (record != null) {
2253                        return (Timestamp) record.get("file_modify_time");
2254                }
2255                return null;
2256        }
2257
2258        public boolean isTimeout() throws IOException {
2259                return isTimeout(LOGIC_TIMEOUT_MS);
2260        }
2261
2262        @Override
2263        public void copyTo(String toPath) throws IOException {
2264                log.debug("CopyTo: {} -> {}", getPath(), toPath);
2265                if (!exists()) {
2266                        log.mark(String.format("The remote file '%s' does not exist.", this.getPath()));
2267                }
2268                RemoteFile destRf = new RemoteFile(toPath);
2269                final Timestamp originModifyTime = this.getModifyTime();
2270                destRf.setModifyUserId(modifyUserId);
2271                destRf.setModifyTime(originModifyTime);
2272                if (isLink()) {
2273                        if (destRf.exists(true)) {
2274                                destRf.forceDeleteLink(false);
2275                        }
2276                        destRf.createLink(readLink());
2277                }
2278                if (isFile()) {
2279                        final RemoteFile _destRf = destRf;
2280                        if (_destRf.exists(true)) {
2281                                _destRf.forceDeleteFile(false);
2282                        }
2283                        merge(new FilePart() {
2284                                @Override
2285                                protected void process(int partIndex, byte[] data) {
2286                                        try {
2287                                                _destRf.write(decrypt(data), partIndex > 0);
2288                                        } catch (Exception e) {
2289                                                log.error(e.getMessage(), e);
2290                                        }
2291                                }
2292
2293                                @Override
2294                                protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) {
2295                                        try {
2296                                                /**This is 0 bytes file**/
2297                                                if (lastPartIndex == -1) {
2298
2299                                                        if (_destRf.exists(true))
2300                                                                _destRf.forceDeleteFile(false);
2301
2302                                                        _destRf.setModifyTime(originModifyTime);
2303                                                        _destRf.createEmptyFile();
2304                                                } else {
2305                                                        _destRf.setModifyTime(originModifyTime);
2306                                                        _destRf.complete();
2307                                                }
2308                                        } catch (IOException e) {
2309                                                log.error(e.getMessage(), e);
2310                                        }
2311                                }
2312
2313                                @Override
2314                                protected void ended(int lastPartIndex, long fileSize) {
2315
2316                                }
2317
2318                        });
2319                }
2320                if (isDir()) {
2321                        List<Map<String, Object>> copyList = list(getPath(), "%");
2322                        int size = copyList.size();
2323                        if (size > 0) {
2324                                if (!destRf.exists(true)) {
2325                                        destRf.mkdirs();
2326                                }
2327                        }
2328                        String copyRootPath = getPath();
2329                        copyList = list(parsePath(getPath() + "/%"), "%");
2330                        if (copyList.size() > 0) {
2331                                if (!destRf.exists(true)) {
2332                                        destRf.mkdirs();
2333                                }
2334                        }
2335                        for (Map<String, Object> item : copyList) {
2336                                String itemParentPath = (String) item.get("file_parent_path");
2337                                String destParentPath = itemParentPath.replaceFirst(copyRootPath + "/", toPath);
2338                                RemoteFile sourceRf = new RemoteFile(
2339                                                String.format("%s/%s", item.get("file_parent_path"), item.get("file_name")));
2340                                String destRfPath = new File(String.format("%s/%s", destParentPath, item.get("file_name")))
2341                                                .getAbsolutePath();
2342                                if (sourceRf.isDir()) {
2343                                        RemoteFile tmpRf = new RemoteFile(destRfPath + "/");
2344                                        tmpRf.setModifyTime(sourceRf.getModifyTime());
2345                                        if (!tmpRf.exists(true)) {
2346                                                tmpRf.mkdirs();
2347                                        }
2348                                } else {
2349                                        sourceRf.copyTo(destRfPath);
2350                                }
2351                        }
2352                }
2353        }
2354
2355        @Override
2356        public void moveTo(String toPath) throws IOException {
2357                log.debug("MoveTo: {} -> {}", getPath(), toPath);
2358                RemoteFile destRf = new RemoteFile(toPath);
2359                copyTo(toPath);
2360                boolean exists = this.exists(true);
2361                if (exists && destRf.exists(true) && destRf.isCompleted()) {
2362                        forceDeleteAll(true);
2363                }
2364        }
2365
2366        private void startFullAsyncThreadForDelete(boolean subFolders, long timerMs, String replaceTo,
2367                        Runnable completedCallback) {
2368                RemoteFile the = this;
2369                try {
2370                        if (syncRoot == null) {
2371                                File replaceToFile = new File(replaceTo);
2372                                if (replaceToFile.getParent() == null) {
2373                                        throw new IOException("Cannot execute delete task.");
2374                                }
2375                                String replaceToParent = replacePath(replaceToFile.getParent());
2376                                syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName()));
2377                        }
2378
2379                        CacheArray rows = new CacheArray();
2380                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
2381                                @Override
2382                                public void execute(Integer index, Object o) {
2383                                        Map<String, Object> item = (Map<String, Object>) o;
2384                                        String fileName = (String) item.get("file_name");
2385                                        String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName);
2386                                        Thread.currentThread()
2387                                                        .setName(String.format("RemoteFile-startFullAsyncThreadForDelete-%s", fileName));
2388                                        RemoteFile rf = new RemoteFile(fullPath);
2389                                        if(isDebug(rf)){
2390                                            debugLog("CheckUsage",rf,this);
2391                                        }                                       
2392                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
2393                                                rf.fileId = (String) item.get("file_id");
2394                                        String toDf = rf.getPath();
2395                                        if (!CommonTools.isBlank(replaceTo)) {
2396                                                if (subFolders) {
2397                                                        toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/");
2398                                                } else {
2399                                                        toDf = fullPath.replaceFirst(getPath(), replaceTo + "/");
2400                                                }
2401                                        }
2402                                        DiskFile df = new DiskFile(toDf);
2403                                        DiskFile.copyAttrs(df, false, the.syncRoot);
2404                                        executeSyncRunnable(this, item, df, rf);            
2405                                        } else{
2406                                            log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
2407                                        }
2408                                }
2409
2410                                @Override
2411                                public void terminated() {
2412                                        log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId());
2413                                }
2414
2415                                @Override
2416                                public void completed(Integer size) {
2417                                        Thread.currentThread().setName(String
2418                                                        .format("RemoteFile-startFullAsyncThreadForDelete-completed-%s", the.origin.getName()));
2419                                        if (isStopSync()) {
2420                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start delete task thread.");
2421                                        }
2422                                        if (timerMs <= 0) {
2423                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed start delete task thread.");
2424                                        }
2425                                        if (!isStopSync() && timerMs > 0) {
2426                                                try {
2427                                                        Thread.sleep(timerMs);
2428                                                        if (!isStopSync()) {
2429                                                                startFullAsyncThreadForDelete(!subFolders, timerMs, replaceTo, completedCallback);
2430                                                        }
2431                                                } catch (Exception e) {
2432                                                        log.error(e.getMessage(), e);
2433                                                }
2434                                        }
2435                                        if (completedCallback != null) {
2436                                                completedCallback.run();
2437                                        }
2438                                }
2439
2440                        };
2441                        rows.filter(filter);
2442                        listAllForDelete(getPath() + (subFolders ? "/%" : ""), "%", rows);
2443                } catch (Exception e) {
2444                        log.error(e.getMessage(), e);
2445                        if (timerMs > 0) {
2446                                try {
2447                                        Thread.sleep(timerMs);
2448                                        startFullAsyncThreadForDelete(subFolders, timerMs, replaceTo, completedCallback);
2449                                } catch (InterruptedException ee) {
2450                                        log.error(ee.getMessage(), ee);
2451                                }
2452                        }
2453                }
2454        }
2455
2456        private void startFullAsyncThread(boolean subFolders, long timerMs, String replaceTo, Runnable completedCallback) {
2457                RemoteFile the = this;
2458                try {
2459                        if (syncRoot == null) {
2460                                File replaceToFile = new File(replaceTo);
2461                                if (replaceToFile.getParent() == null) {
2462                                        throw new IOException("Cannot sync the root path.");
2463                                }
2464                                String replaceToParent = replacePath(replaceToFile.getParent());
2465                                syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName()));
2466                        }
2467
2468                        CacheArray rows = new CacheArray();
2469                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
2470                                @Override
2471                                public void execute(Integer index, Object o) {
2472                                        Map<String, Object> item = (Map<String, Object>) o;
2473                                        String fileName = (String) item.get("file_name");
2474                                        Thread.currentThread().setName(String.format("RemoteFile-startFullAsyncThread-%s", fileName));
2475                                        String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName);
2476                                        RemoteFile rf = new RemoteFile(fullPath);
2477                                        if(isDebug(the)){
2478                                            debugLog("CheckUsage",the,this);
2479                            }
2480                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
2481                                                String toDf = rf.getPath();
2482                                                if (!CommonTools.isBlank(replaceTo)) {
2483                                                        if (subFolders) {
2484                                                                toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/");
2485                                                        } else {
2486                                                                toDf = fullPath.replaceFirst(getPath(), replaceTo + "/");
2487                                                        }
2488                                                }
2489                                                DiskFile df = new DiskFile(toDf);
2490                                                DiskFile.copyAttrs(df, false, the.syncRoot);
2491                                                executeSyncRunnable(this, item, df, rf);
2492                                        } else{
2493                                             log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
2494                                        }
2495                                }
2496
2497                                @Override
2498                                public void terminated() {
2499                                        log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId());
2500                                }
2501
2502                                @Override
2503                                public void completed(Integer size) {
2504                                        Thread.currentThread().setName(
2505                                                        String.format("RemoteFile-startFullAsyncThread-completed-%s", the.origin.getName()));
2506                                        if (isStopSync()) {
2507                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start full async thread.");
2508                                        }
2509                                        if (timerMs <= 0) {
2510                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed start full async thread.");
2511                                        }
2512                                        if (!isStopSync() && timerMs > 0) {
2513                                                try {
2514                                                        Thread.sleep(timerMs);
2515                                                        if (!isStopSync()) {
2516                                                                startFullAsyncThread(!subFolders, timerMs, replaceTo, completedCallback);
2517                                                        }
2518                                                } catch (Exception e) {
2519                                                        log.error(e.getMessage(), e);
2520                                                }
2521                                        }
2522                                        if (completedCallback != null) {
2523                                                completedCallback.run();
2524                                        }
2525                                }
2526
2527                        };
2528                        rows.filter(filter);
2529                        listAllForSync(getPath() + (subFolders ? "/%" : ""), "%", rows);
2530                } catch (Exception e) {
2531                        log.error(e.getMessage(), e);
2532                        if (timerMs > 0) {
2533                                try {
2534                                        Thread.sleep(timerMs);
2535                                        startFullAsyncThread(subFolders, timerMs, replaceTo, completedCallback);
2536                                } catch (InterruptedException ee) {
2537                                        log.error(ee.getMessage(), ee);
2538                                }
2539                        }
2540                }
2541        }
2542
2543        public void startFullAsync(long timerMs) throws IOException {
2544                startFullAsync(timerMs, getPath() + "/", null);
2545        }
2546
2547        public void startFullAsync(long timerMs, Runnable completedCallback) throws IOException {
2548                startFullAsync(timerMs, getPath() + "/", completedCallback);
2549        }
2550
2551        public void startFullAsync(long timerMs, String replaceTo) throws IOException {
2552                startFullAsync(timerMs, replaceTo, null);
2553        }
2554
2555        public void startFullAsync(long timerMs, String replaceTo, Runnable completedCallback) throws IOException {
2556
2557                stopSync = false;
2558
2559                RemoteFile the = this;
2560                Clock clock = new Clock();
2561                Timestamp toTime = new Timestamp(clock.getTime() - timerMs);
2562                Executors.newFixedThreadPool(1).execute(new Runnable() {
2563
2564                        @Override
2565                        public void run() {
2566                                startFullAsyncThread(false, timerMs, replaceTo, completedCallback);
2567                        }
2568
2569                });
2570
2571                Executors.newFixedThreadPool(1).execute(new Runnable() {
2572
2573                        @Override
2574                        public void run() {
2575                                startFullAsyncThreadForDelete(false, timerMs, replaceTo, completedCallback);
2576                        }
2577
2578                });
2579        }
2580
2581        public DriverDataSource getDataSourceReader() {
2582                return dataSourceReader;
2583        }
2584
2585        public DriverDataSource getDataSourceWriter() {
2586                return dataSourceWriter;
2587        }
2588
2589        public ConfigProperties getConfigProperties() {
2590                return configProperties;
2591        }
2592
2593        private byte[] encrypt(byte[] data) throws Exception {
2594                boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false);
2595                if (useEncrypt) {
2596            String aesKey = configProperties.getString("EncryptAesKey");
2597            return CipherTools.AESEncrypt(aesKey,data);
2598                } else {
2599                        return data;
2600                }
2601        }
2602
2603        private byte[] decrypt(byte[] encData) throws Exception {
2604                boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false);
2605                if (useEncrypt) {
2606            String aesKey = configProperties.getString("EncryptAesKey");
2607            return CipherTools.AESDecrypt(aesKey,encData);
2608                } else {
2609                        return encData;
2610                }
2611        }
2612
2613        public String getSyncedOnHostnames() throws IOException {
2614                Map<String, Object> record = getCompletedRecord();
2615                if (record != null) {
2616                        return (String) record.get("synced_on_hostnames");
2617                }
2618                return null;
2619        }
2620
2621        public String getDeletedOnHostnames() throws IOException {
2622                Map<String, Object> record = getCompletedRecord();
2623                if (record != null) {
2624                        return (String) record.get("deleted_on_hostnames");
2625                }
2626                return null;
2627        }
2628
2629        public String getSourceHostname() throws IOException {
2630                Map<String, Object> record = getCompletedRecord();
2631                if (record != null) {
2632                        return (String) record.get("source_hostname");
2633                }
2634                return null;
2635        }
2636
2637        protected String getLastSourceHostname() throws IOException {
2638                Map<String, Object> record = getLastUpdateRecord();
2639                if (record != null) {
2640                        return (String) record.get("source_hostname");
2641                }
2642                return null;
2643        }
2644
2645        protected Timestamp getLastOperateTime() throws IOException {
2646                Map<String, Object> record = getLastUpdateRecord();
2647                if (record != null) {
2648                        Timestamp updatedAt = (Timestamp) record.get("updated_at");
2649                        Timestamp deletedAt = (Timestamp) record.get("deleted_at");
2650
2651                        return deletedAt == null ? updatedAt : deletedAt;
2652                }
2653                return null;
2654        }
2655
2656        protected Timestamp getLastModifyTime() throws IOException {
2657                Map<String, Object> record = getLastUpdateRecord();
2658                if (record != null) {
2659                        return (Timestamp) record.get("file_modify_time");
2660                }
2661                return null;
2662        }
2663
2664        protected boolean isLastOperateDelete() throws IOException {
2665                Map<String, Object> record = getLastUpdateRecord();
2666                if (record != null) {
2667                        Timestamp deletedAt = (Timestamp) record.get("deleted_at");
2668
2669                        return deletedAt == null ? false : true;
2670                }
2671                return false;
2672        }
2673
2674        protected boolean isLastOperateRealDelete() throws IOException {
2675                Map<String, Object> record = getLastUpdateRecord();
2676                if (record != null) {
2677                        Object deletedValue = record.get("file_deleted");
2678                        if (deletedValue != null) {
2679                                if (deletedValue instanceof Boolean) {
2680                                        return (Boolean) deletedValue;
2681                                } else {
2682                                        return Integer.parseInt(deletedValue + "") == 1;
2683                                }
2684                        }
2685                }
2686                return false;
2687        }
2688        
2689        protected void listAllForScanDelete(Integer filePartDataTable, CacheArray rows) throws IOException {
2690                CacheDriverExecutor executor = null;
2691                Connection conn = null;
2692                try {
2693                    long deleteCutoffTimeMs = 0L;
2694                    
2695                    if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime();
2696                    
2697                        Timestamp currentTime = new Clock().getSqlTimestamp();
2698                        conn = dataSourceReader.getConnection();
2699                        conn.setAutoCommit(true);
2700                        executor = new CacheDriverExecutor(conn);
2701                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2702                        dataRecord.put("file_parent_path", getPath());
2703                        dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
2704                        dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%");
2705                        dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs));
2706                        dataRecord.put("end_time", currentTime);
2707                        dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME));
2708                        dataRecord.put("file_part_data_table", filePartDataTable);
2709                        executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL_SCAN_DELETE), dataRecord, rows);
2710                } catch (SQLException e) {
2711                        throw new IOException(e.getMessage(), e);
2712                } finally {
2713                        if (executor != null) {
2714                                try {
2715                                        executor.close();
2716                                } catch (Exception e) {
2717                                        throw new IOException(e.getMessage(), e);
2718                                }
2719                        }
2720                }
2721        }
2722
2723        protected boolean isSyncedOnHostname() throws IOException {
2724                Map<String, Object> record = getCompletedRecord();
2725                if (record != null) {
2726                        String syncedOnHostnames = (String) record.get("synced_on_hostnames");
2727                        return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1;
2728                }
2729                return false;
2730        }
2731
2732        protected boolean isSyncedOnHostname(Object id) throws IOException {
2733                Map<String, Object> record = getRecordById(id);
2734                if (record != null) {
2735                        String syncedOnHostnames = (String) record.get("synced_on_hostnames");
2736                        return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1;
2737                }
2738                return false;
2739        }
2740
2741        protected boolean isDeletedOnHostname(Object id) throws IOException {
2742                Map<String, Object> record = getRecordById(id);
2743                if (record != null) {
2744                        String deletedOnHostnames = (String) record.get("deleted_on_hostnames");
2745                        return deletedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1;
2746                }
2747                return false;
2748        }
2749
2750        public RemoteFile getParentFile() {
2751                return new RemoteFile(getParent());
2752        }
2753
2754        private void executeSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final DiskFile df,
2755                        final RemoteFile rf) {
2756            try{
2757                if (!df.isLogicModify()) {
2758                        df.logicModify();
2759                        if (mergePool == null) {
2760                                LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE);
2761                                mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
2762                        }
2763                        mergePool.execute(getSyncRunnable(filter, item, rf, df));
2764                }
2765            }catch(Exception e){
2766                LoggerFactory.getLogger(RemoteFile.class).error(e.getMessage(),e);
2767            }
2768        }
2769
2770        private Runnable getSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final RemoteFile rf,
2771                        final DiskFile df) {
2772                Runnable runnable = new Runnable() {
2773                        @Override
2774                        public void run() {
2775                                try {
2776                                        String fileName = (String) item.get("file_name");
2777                                        Thread.currentThread()
2778                                                        .setName(String.format("RemoteFile-getSyncRunnable-%s-%s", fileName, item.get("id")));
2779                                        boolean isDebug = isDebug(rf);
2780                                        boolean isDeleted = false;
2781                                        Object deletedValue = item.get("file_deleted");
2782
2783                                        if (deletedValue instanceof Boolean) {
2784                                                isDeleted = (Boolean) deletedValue;
2785                                        } else {
2786                                                isDeleted = Integer.parseInt(deletedValue + "") == 1;
2787                                        }
2788
2789                                        String clientHostname = CommonTools.getHostname();
2790                                        String lastSourceHostname = rf.getLastSourceHostname();
2791
2792                                        boolean isClientHostname = lastSourceHostname != null && !lastSourceHostname.equals(clientHostname);
2793                                        boolean rootExists = syncRoot != null && syncRoot.exists();
2794
2795                        if(!rootExists){
2796                            Logger.systemError(RemoteFile.class,"The root path '{}' does not exist.",syncRoot.getPath());
2797                        }
2798                        
2799                                        if (isDeleted && rootExists) {
2800                                                if (df.exists()) {
2801                                                        if (isDebug) {
2802                                                                LoggerFactory.getLogger(RemoteFile.class).mark("DeleteDiskFile - {}", df.getPath());
2803                                                        }
2804                                                        df.delete();
2805                                                        rf.deletedToDiskAndSub(item.get("id"));
2806                                                } else {
2807                                                        rf.deletedToDiskAndSub(item.get("id"));
2808                                                }
2809                                                df.removeLogicModify();
2810                                        }
2811
2812                                        String sourceHostname = rf.getSourceHostname();
2813                                        isClientHostname = sourceHostname != null && !sourceHostname.equals(clientHostname);
2814                                        boolean checkHaveParent = configProperties.getBoolean("CheckHaveParent", false);
2815                                        boolean haveParent = true;
2816                                        if (checkHaveParent) {
2817                                                haveParent = df.getOrigin().getParentFile().exists();
2818                                        }
2819                                        String fileType = (String) item.get("file_type");
2820
2821                                        if (!isDeleted && isClientHostname && haveParent && rootExists) {
2822                                                Timestamp rfModifyTime = rf.getModifyTime();
2823                                                df.setModifyTimeMsFromClock(rfModifyTime.getTime());
2824
2825                                                if (fileType.equals("L")) {
2826                                                        if (df.exists() && !df.isLink()) {
2827                                                                df.delete();
2828                                                        }
2829                                                        if (df.exists()) {
2830                                                                Timestamp dfModifyTime = df.getModifyTimeForClock();
2831                                                                if (rfModifyTime.getTime() > dfModifyTime.getTime()) {
2832                                                                        if (isDebug) {
2833                                                                                LoggerFactory.getLogger(RemoteFile.class).mark("ReCreateDiskLink - {}",
2834                                                                                                df.getPath());
2835                                                                        }
2836                                                                        df.delete();
2837                                                                        if (df.createLink(rf.readLink())) {
2838                                                                                if (df.getOrigin().getName().startsWith(".")) {
2839                                                                                        Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true);
2840                                                                                }
2841                                                                                rf.syncedToDisk();
2842                                                                        }
2843                                                                } else {
2844                                                                        rf.syncedToDisk();
2845                                                                }
2846                                                        } else {
2847                                                                if (isDebug) {
2848                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", df.getPath());
2849                                                                }
2850                                                                if (df.createLink(rf.readLink())) {
2851                                                                        if (df.getOrigin().getName().startsWith(".")) {
2852                                                                                Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true);
2853                                                                        }
2854                                                                        rf.syncedToDisk();
2855                                                                }
2856                                                        }
2857                                                        df.removeLogicModify();
2858                                                }
2859                                                if (fileType.equals("F")) {
2860                                                        if (df.exists()) {
2861                                                                Timestamp dfModifyTime = df.getModifyTimeForClock();
2862                                                                boolean changed = rfModifyTime.getTime() > dfModifyTime.getTime();
2863                                                                if (changed) {
2864                                                                        DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE);
2865                                                                        DiskFile.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot);
2866                                                                        writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime());
2867                                                                        if (!writingDf.isLogicModify()) {
2868                                                                                writingDf.logicModify();
2869                                                                                if (RemoteFile.addQueuePathMapping(rf.getPath())) {
2870                                                                                        if (isDebug(rf)) {
2871                                                                                                LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}",
2872                                                                                                                df.getPath());
2873                                                                                        }
2874                                                                                        rf.writeToDisk(writingDf);
2875                                                                                }
2876                                                                        }
2877                                                                } else {
2878                                                                        rf.syncedToDisk();
2879                                                                        df.removeLogicModify();
2880                                                                }
2881                                                        } else {
2882                                                                if (rf.exists()) {
2883                                                                        DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE);
2884                                                                        DiskFile.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot);
2885                                                                        writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime());
2886                                                                        if (!writingDf.isLogicModify()) {
2887                                                                                writingDf.logicModify();
2888                                                                                if (RemoteFile.addQueuePathMapping(rf.getPath())) {
2889                                                                                        if (isDebug(rf)) {
2890                                                                                                LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}",
2891                                                                                                                df.getPath());
2892                                                                                        }
2893                                                                                        rf.writeToDisk(writingDf);
2894                                                                                }
2895                                                                        }
2896                                                                }
2897                                                        }
2898                                                }
2899                                                if (fileType.equals("D")) {
2900                                                        if (rf.exists()) {
2901                                                                if (isDebug) {
2902                                                                        LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskDir - {}", df.getPath());
2903                                                                }
2904                                                                if (df.exists()) {
2905                                                                        if (df.isDir()) {
2906                                                                                rf.syncedToDisk();
2907                                                                        } else {
2908                                                                                df.delete();
2909                                                                                if (df.mkdirs()) {
2910                                                                                        if (df.getOrigin().getName().startsWith(".")) {
2911                                                                                                Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true);
2912                                                                                        }
2913                                                                                        rf.syncedToDisk();
2914                                                                                }
2915                                                                        }
2916                                                                } else {
2917                                                                        if (df.mkdirs()) {
2918                                                                                if (df.getOrigin().getName().startsWith(".")) {
2919                                                                                        Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true);
2920                                                                                }
2921                                                                                rf.syncedToDisk();
2922                                                                        }
2923                                                                }
2924                                                        }
2925                                                        df.removeLogicModify();
2926                                                }
2927                                        }
2928                                } catch (Exception e) {
2929                                        log.error(e.getMessage(), e);
2930                                }
2931                        }
2932                };
2933                return runnable;
2934        }
2935
2936        public void startArchiveForFileDeleted(long timerMs, String archiveTime) throws IOException {
2937                long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L);
2938                startArchiveForFileDeleted(timerMs, archiveMs, null);
2939        }
2940
2941        public void startArchiveForFileDeleted(long timerMs, long archiveMs) throws IOException {
2942                startArchiveForFileDeleted(timerMs, archiveMs, null);
2943        }
2944
2945        public void startArchiveForFileDeleted(long timerMs, String archiveTime, Runnable completedCallback)
2946                        throws IOException {
2947                long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L);
2948                startArchiveForFileDeleted(timerMs, archiveMs, completedCallback);
2949        }
2950
2951        public void startArchiveForFileDeleted(long timerMs, long archiveMs, Runnable completedCallback)
2952                        throws IOException {
2953                final RemoteFile the = this;
2954                if (archiveMs <= 0) {
2955                        throw new IOException(String.format(
2956                                        "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs));
2957                }
2958
2959                stopSync = false;
2960        
2961                CacheDriverExecutor executor = null;
2962                Connection conn = null;
2963                try {
2964                    Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", getPath()));
2965                        conn = dataSourceReader.getConnection();
2966                        conn.setAutoCommit(true);
2967                        executor = new CacheDriverExecutor(conn);
2968                        Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs);
2969                        Map<String, Object> dataRecord = new HashMap<String, Object>();
2970                        dataRecord.put("file_parent_path", getPath());
2971                        dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
2972                        dataRecord.put("before_deleted_at", beforeTime);
2973                        CacheArray rows = new CacheArray();
2974                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
2975
2976                                @Override
2977                                public void completed(Integer size) {
2978                                        if (isStopSync()) {
2979                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for file deleted.");
2980                                        }
2981                                        if (timerMs <= 0) {
2982                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for file deleted.");
2983                                        }
2984                                        
2985                                        if (completedCallback != null) {
2986                                                completedCallback.run();
2987                                        }
2988                                        
2989                                        if (!isStopSync() && timerMs > 0) {
2990                                            while(true){
2991                                                
2992                                                if(isStopSync()) break;
2993                                                
2994                                                try {
2995                                                        Thread.sleep(timerMs);
2996                                                        if (!isStopSync()) {
2997                                                            boolean allowed = checkUsage(the);
2998                                                            if(allowed){
2999                                                                    startArchiveForFileDeleted(timerMs, archiveMs, completedCallback);
3000                                                                    break;
3001                                                            }
3002                                                        }
3003                                                } catch (Exception e) {
3004                                                        log.error(e.getMessage(), e);
3005                                                }
3006                                            }
3007                                        }
3008
3009                                }
3010
3011                                @Override
3012                                public void execute(Integer index, Object o) {
3013                                    Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForFileDeleted-%s", index));
3014                                        if(isDebug(the)){
3015                                            debugLog("CheckUsage",the,this);
3016                                        }                                   
3017                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
3018                                                CacheDriverExecutor _executor = null;
3019                                                try {
3020                                                        Map<String, Object> item = (Map<String, Object>) o;
3021                                                        boolean notDir = !item.get("file_type").equals("D");
3022                                                        if(notDir){
3023                                                        calculateDataMappingDataSourceReader(the, item);
3024                                                        Object fileId = item.get("file_id");
3025                                                        Map<String, Object> dataRecord = new HashMap<String, Object>();
3026                                                        dataRecord.put("file_id", fileId);
3027                                                        dataRecord.put("deleted_at", new Clock().getSqlTimestamp());
3028                                                        Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", fileId));
3029                                            Connection _conn = dataMappingDataSourceReader.getConnection();
3030                                                        _conn.setAutoCommit(true);
3031                                                        _executor = new CacheDriverExecutor(_conn);
3032                                        _executor.execute(formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA), dataRecord);
3033                                                        }
3034                                                } catch (Exception e) {
3035                                                        log.error(e.getMessage(), e);
3036                                                } finally {
3037                                                        try {
3038                                                                if (_executor != null)
3039                                                                        _executor.close();
3040                                                        } catch (Exception e) {
3041                                                                log.error(e.getMessage(), e);
3042                                                        }
3043                                                }
3044                                        }else{
3045                                            log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
3046                                        }
3047                                }
3048                        };
3049                        rows.filter(filter);
3050                        final CacheDriverExecutor finalExecutor = executor;
3051                        Executors.newFixedThreadPool(1).execute(new Runnable(){
3052                            @Override
3053                            public void run(){
3054                                try{
3055                                     finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_FILE_DELETED), dataRecord, rows);
3056                                }catch(Exception e){
3057                                    log.error(e.getMessage(),e);
3058                                }
3059                            }
3060                        });
3061                } catch (SQLException e) {
3062                        throw new IOException(e.getMessage(), e);
3063                } finally {
3064                        if (executor != null) {
3065                                try {
3066                                        executor.close();
3067                                } catch (Exception e) {
3068                                        throw new IOException(e.getMessage(), e);
3069                                }
3070                        }
3071                }
3072        }
3073
3074        public void startArchiveForDeleted(long timerMs, String archiveTime) throws IOException {
3075                long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L);
3076                startArchiveForDeleted(timerMs, archiveMs, null);
3077        }
3078
3079        public void startArchiveForDeleted(long timerMs, long archiveMs) throws IOException {
3080                startArchiveForDeleted(timerMs, archiveMs, null);
3081        }
3082
3083        public void startArchiveForDeleted(long timerMs, String archiveTime, Runnable completedCallback)
3084                        throws IOException {
3085                long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L);
3086                startArchiveForDeleted(timerMs, archiveMs, completedCallback);
3087        }
3088
3089        public void startArchiveForDeleted(long timerMs, long archiveMs, Runnable completedCallback) throws IOException {
3090            final RemoteFile the = this;
3091                if (archiveMs <= 0) {
3092                        throw new IOException(String.format(
3093                                        "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs));
3094                }
3095
3096                stopSync = false;
3097
3098                CacheDriverExecutor executor = null;
3099                Connection conn = null;
3100                try {
3101                    Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", getPath()));
3102                        conn = dataSourceReader.getConnection();
3103                        conn.setAutoCommit(true);
3104                        executor = new CacheDriverExecutor(conn);
3105                        Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs);
3106                        Map<String, Object> dataRecord = new HashMap<String, Object>();
3107                        dataRecord.put("file_parent_path", getPath());
3108                        dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
3109                        dataRecord.put("before_deleted_at", beforeTime);
3110
3111                        CacheArray rows = new CacheArray();
3112                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
3113
3114                                @Override
3115                                public void completed(Integer size) {
3116                                        if (isStopSync()) {
3117                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for deleted.");
3118                                        }
3119                                        if (timerMs <= 0) {
3120                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for deleted.");
3121                                        }
3122                                        
3123                                        if (completedCallback != null) {
3124                                                completedCallback.run();
3125                                        }
3126                                        
3127                                        if (!isStopSync() && timerMs > 0) {
3128                                            while(true){
3129                                                
3130                                                if(isStopSync()) break;
3131                                                
3132                                                try {
3133                                                        Thread.sleep(timerMs);
3134                                                        if (!isStopSync()) {
3135                                                            boolean allowed = checkUsage(the);
3136                                                            if(allowed){
3137                                                                    startArchiveForDeleted(timerMs, archiveMs, completedCallback);
3138                                                                    break;
3139                                                            }
3140                                                        }
3141                                                } catch (Exception e) {
3142                                                        log.error(e.getMessage(), e);
3143                                                }
3144                                            }
3145                                        }
3146                                }
3147
3148                                @Override
3149                                public void execute(Integer index, Object o) {
3150                                    Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForDeleted-%s", index));
3151                                        CacheDriverExecutor dmExecutor = null;
3152                                        CacheDriverExecutor dmDelExecutor = null;
3153                                        CacheDriverExecutor executor = null;
3154                                        if(isDebug(the)){
3155                                            debugLog("CheckUsage",the,this);
3156                                        }                                       
3157                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
3158                                                try {
3159                                                        Clock clock = new Clock();
3160                                                        Timestamp currentTime = clock.getSqlTimestamp();
3161                                                        Map<String, Object> item = (Map<String, Object>) o;
3162                                                        Object fileId = item.get("file_id");
3163                                                        Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", fileId));
3164                                                        calculateDataMappingDataSourceReader(the, item);
3165                                                        Map<String, Object> dataRecord = null;
3166                                                        boolean haveSubFiles = false;
3167                                                        /**Need use dataMappingDataSourceReader**/
3168                                                        if (item.get("file_type").equals("D")) {
3169                                                                int count = countAllSubFiles(item.get("file_parent_path") + "",
3170                                                                                item.get("file_name") + "");
3171                                                                haveSubFiles = count > 0;
3172                                                        }
3173                                                        if (!haveSubFiles) {
3174                                                                if (!item.get("file_type").equals("D")) {
3175                                                                        Connection dmConn = dataMappingDataSourceReader.getConnection();
3176                                                                        dmConn.setAutoCommit(true);
3177                                                                        dmExecutor = new CacheDriverExecutor(dmConn);
3178                                                                        dataRecord = new HashMap<String, Object>();
3179                                                                        dataRecord.put("file_id", item.get("file_id"));
3180                                                                        dataRecord.put("deleted_at", currentTime);
3181                                                                        dmExecutor.execute(
3182                                                                                        formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA),
3183                                                                                        dataRecord);
3184                                                                }
3185                                                                Connection conn = dataSourceWriter.getConnection();
3186                                                                conn.setAutoCommit(true);
3187                                                                executor = new CacheDriverExecutor(conn);
3188                                                                dataRecord = new HashMap<String, Object>();
3189                                                                dataRecord.put("file_id", item.get("file_id"));
3190                                                                executor.execute(formatSql(SQL_DELETE_BY_FILE_ID_FROM_FILE), dataRecord);
3191                                                        }
3192
3193                                                        Connection dmDelConn = dataMappingDataSourceReader.getConnection();
3194                                                        dmDelConn.setAutoCommit(true);
3195                                                        dmDelExecutor = new CacheDriverExecutor(dmDelConn);
3196                                                        dataRecord = new HashMap<String, Object>();
3197                                                        dataRecord.put("deleted_at", new Timestamp(currentTime.getTime() - archiveMs));
3198                                                        dmDelExecutor.execute(formatSqlForFilePartData(SQL_DELETE_BY_RF_ID_FROM_FILE_DATA),
3199                                                                        dataRecord);
3200                                                } catch (Exception e) {
3201                                                        log.error(e.getMessage(), e);
3202                                                } finally {
3203                                                        try {
3204                                                                if (dmExecutor != null)
3205                                                                        dmExecutor.close();
3206                                                        } catch (Exception e) {
3207                                                                log.error(e.getMessage(), e);
3208                                                        }
3209                                                        try {
3210                                                                if (dmDelExecutor != null)
3211                                                                        dmDelExecutor.close();
3212                                                        } catch (Exception e) {
3213                                                                log.error(e.getMessage(), e);
3214                                                        }
3215                                                        try {
3216                                                                if (executor != null)
3217                                                                        executor.close();
3218                                                        } catch (Exception e) {
3219                                                                log.error(e.getMessage(), e);
3220                                                        }
3221                                                }
3222
3223                                        }else{
3224                                            log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
3225                                        }
3226                                }
3227                        };
3228                        rows.filter(filter);
3229                        final CacheDriverExecutor finalExecutor = executor;
3230                        Executors.newFixedThreadPool(1).execute(new Runnable(){
3231                            @Override
3232                            public void run(){
3233                                try{
3234                                    finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_DELETED), dataRecord, rows);
3235                                }catch(Exception e){
3236                                    log.error(e.getMessage(),e);
3237                                }
3238                            }
3239                        });                     
3240                } catch (SQLException e) {
3241                        throw new IOException(e.getMessage(), e);
3242                } finally {
3243                        if (executor != null) {
3244                                try {
3245                                        executor.close();
3246                                } catch (Exception e) {
3247                                        throw new IOException(e.getMessage(), e);
3248                                }
3249                        }
3250                }
3251        }
3252        
3253        public void startArchiveForTimeout(long timerMs) throws IOException {
3254            startArchiveForTimeout(timerMs,null);
3255        }
3256        
3257        public void startArchiveForTimeout(long timerMs,Runnable completedCallback) throws IOException {
3258            final RemoteFile the = this;
3259            
3260                stopSync = false;
3261
3262                CacheDriverExecutor executor = null;
3263                Connection conn = null;
3264                try {
3265                    Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", getPath()));
3266                        conn = dataSourceReader.getConnection();
3267                        conn.setAutoCommit(true);
3268                        executor = new CacheDriverExecutor(conn);
3269                        Timestamp beforeTime = new Timestamp(new Clock().getTime() - (LOGIC_TIMEOUT_MS*BATCH_SIZE));
3270                        Map<String, Object> dataRecord = new HashMap<String, Object>();
3271                        dataRecord.put("file_parent_path", getPath());
3272                        dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%"));
3273                        dataRecord.put("before_updated_at", beforeTime);
3274
3275                        CacheArray rows = new CacheArray();
3276                        CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) {
3277
3278                                @Override
3279                                public void completed(Integer size) {
3280                                        if (isStopSync()) {
3281                                                LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for timeout deleted.");
3282                                        }
3283                                        if (timerMs <= 0) {
3284                                                LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for timeout deleted.");
3285                                        }
3286                                        
3287                                        if (completedCallback != null) {
3288                                                completedCallback.run();
3289                                        }
3290                                        
3291                                        if (!isStopSync() && timerMs > 0) {
3292                                            while(true){
3293                                                
3294                                                if(isStopSync()) break;
3295                                                
3296                                                try {
3297                                                        Thread.sleep(timerMs);
3298                                                        if (!isStopSync()) {
3299                                                            boolean allowed = checkUsage(the);
3300                                                            if(allowed){
3301                                                                    startArchiveForTimeout(timerMs, completedCallback);
3302                                                                    break;
3303                                                            }
3304                                                        }
3305                                                } catch (Exception e) {
3306                                                        log.error(e.getMessage(), e);
3307                                                }
3308                                            }
3309                                        }
3310
3311                                }
3312
3313                                @Override
3314                                public void execute(Integer index, Object o) {
3315                                    Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForTimeout-%s", index));
3316                                        if(isDebug(the)){
3317                                            debugLog("CheckUsage",the,this);
3318                                        }                                   
3319                                        CacheDriverExecutor executor = null;
3320                                        if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) {
3321                                                try {
3322                                                        Clock clock = new Clock();
3323                                                        Timestamp currentTime = clock.getSqlTimestamp();
3324                                                        Map<String, Object> item = (Map<String, Object>) o;
3325                                                        Object fileId = item.get("file_id");
3326                                                        Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", fileId));
3327                                                        Connection conn = dataSourceWriter.getConnection();
3328                                                        conn.setAutoCommit(true);
3329                                                        executor = new CacheDriverExecutor(conn);
3330                                                        Map<String,Object> dataRecord = new HashMap<String, Object>();
3331                                                        dataRecord.put("file_id",fileId);
3332                                                        dataRecord.put("deleted_at",currentTime);
3333                                                        dataRecord.put("deleted_user_id",getDefaultModifyUserId());
3334                                                        executor.execute(formatSql(SQL_UPDATE_FOR_TIMEOUT_DELETE), dataRecord);
3335                                                } catch (Exception e) {
3336                                                        log.error(e.getMessage(), e);
3337                                                } finally {
3338                                                        try {
3339                                                                if (executor != null)
3340                                                                        executor.close();
3341                                                        } catch (Exception e) {
3342                                                                log.error(e.getMessage(), e);
3343                                                        }
3344                                                }
3345
3346                                        }else{
3347                                             log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE);
3348                                        }
3349                                }
3350                        };
3351                        rows.filter(filter);
3352                        final CacheDriverExecutor finalExecutor = executor;
3353                        Executors.newFixedThreadPool(1).execute(new Runnable(){
3354                            @Override
3355                            public void run(){
3356                                try{
3357                                    finalExecutor.find(formatSql(SQL_SELECT_FOR_ALL_TIMEOUT), dataRecord, rows);
3358                                }catch(Exception e){
3359                                    log.error(e.getMessage(),e);
3360                                }
3361                            }
3362                        });                             
3363                } catch (SQLException e) {
3364                        throw new IOException(e.getMessage(), e);
3365                } finally {
3366                        if (executor != null) {
3367                                try {
3368                                        executor.close();
3369                                } catch (Exception e) {
3370                                        throw new IOException(e.getMessage(), e);
3371                                }
3372                        }
3373                }
3374        }       
3375
3376        public static Map<String, DriverDataSource> getDriverDataSourceMap() {
3377                return Collections.synchronizedMap(DATASOURCE_REMOTE_FILE);
3378        }
3379
3380        private static synchronized String generateId(boolean identityCardEnable, String identityCardName,
3381                        String identityCardPrefix, int identityCardSuffix) throws SQLException {
3382                if (identityCardEnable) {
3383                        if (identityCard == null) {
3384                                identityCard = new IdentityCard(identityCardName);
3385                        }
3386                        return identityCard.generateId(16, identityCardPrefix, "", identityCardSuffix);
3387                } else {
3388                        return CommonTools.generateId(16);
3389                }
3390        }
3391
3392        private static synchronized String generateDataId(boolean identityCardEnable, String identityCardName,
3393                        int identityCardSuffix, RemoteFile theRf) throws SQLException {
3394                if (identityCardEnable) {
3395                        if (identityCard == null) {
3396                                identityCard = new IdentityCard(identityCardName + "Data");
3397                        }
3398                        return identityCard.generateId(16, String.format("%04d", theRf.dataMappingTable), "", identityCardSuffix);
3399                } else {
3400                        return CommonTools.generateId(16);
3401                }
3402        }
3403
3404        protected static synchronized boolean addQueuePathMapping(String path) {
3405                Long addTimeMs = null;
3406
3407                boolean allowed = checkQueuePathMapping();
3408
3409                if (!allowed)
3410                        return false;
3411
3412                addTimeMs = QUEUE_PATH_MAPPING.get(path);
3413
3414                if (addTimeMs == null) {
3415                        QUEUE_PATH_MAPPING.put(path, System.currentTimeMillis());
3416                        return true;
3417                }
3418
3419                return false;
3420        }
3421
3422        public static synchronized boolean checkQueuePathMapping() {
3423                Long addTimeMs = null;
3424                Map<String, Long> copyQueueMap = getQueuePathMapping();
3425                List<String> keyList = new ArrayList<String>(copyQueueMap.keySet());
3426                for (String key : keyList) {
3427                        addTimeMs = copyQueueMap.get(key);
3428                        if (addTimeMs != null) {
3429                                boolean timeout = (System.currentTimeMillis() - addTimeMs) >= LOGIC_TIMEOUT_MS;
3430
3431                                if (timeout) {
3432                                        QUEUE_PATH_MAPPING.remove(key);
3433                                }
3434                        }
3435                }
3436
3437                if (QUEUE_PATH_MAPPING.keySet().size() >= MAX_QUEUE_SIZE)
3438                        return false;
3439
3440                return true;
3441        }
3442
3443        protected static synchronized void removeQueuePathMapping(String path) {
3444                QUEUE_PATH_MAPPING.remove(path);
3445        }
3446
3447        public static synchronized Map<String, Long> getQueuePathMapping() {
3448                return Collections.synchronizedMap(QUEUE_PATH_MAPPING);
3449        }
3450
3451        protected static synchronized void loadDataMapping(RemoteFile rf) throws Exception {
3452                List<String> dmArray = rf.configProperties.getArray("DataMapping", new String[] {});
3453                int size = dmArray.size();
3454                List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey);
3455
3456                if (dmList == null)
3457                        dmList = new ArrayList<String>();
3458
3459                if (!DATA_MAPPING_INDEX.containsKey(rf.dsKey))
3460                        DATA_MAPPING_INDEX.put(rf.dsKey, 0);
3461
3462                for (int i = 0; i < size; i++) {
3463                        String dm = dmArray.get(i);
3464                        String originKey = String.format("DataMapping[%s]", i);
3465                        Map<String, Object> dmMap = rf.configProperties.getMap(originKey);
3466                        String shortKey = String.format("%s.%s", rf.dsKey, dm);
3467                        if (dmMap != null) {
3468                                String dmKey = String.format("%s.%s", rf.dsKey, originKey);
3469
3470                                if (!DATA_MAPPING.containsKey(shortKey)) {
3471                                        ConfigProperties dmCp = new ConfigProperties();
3472                                        dmCp.putAllAndReturnSlef(dmMap);
3473                                        int priority = dmCp.getInteger(String.format("%s.Priority", originKey), 0);
3474                                        if (i == 0 && priority < 1) {
3475                                                throw new Exception("First DataMapping priority cannot be less than 1.");
3476                                        }
3477                                        String dataSourceFileOriginValue = dmCp.getString(String.format("%s.DataSource", originKey));
3478                                        String dataSourceReaderFileOriginValue = dmCp
3479                                                        .getString(String.format("%s.DataSourceReader", originKey), dataSourceFileOriginValue);
3480                                        String dataSourceWriterFileOriginValue = dmCp
3481                                                        .getString(String.format("%s.DataSourceWriter", originKey), dataSourceFileOriginValue);
3482
3483                                        String dmDsReaderKey = String.format("%s.reader", CommonTools.md5(dataSourceReaderFileOriginValue));
3484                                        String dmDsWriterKey = String.format("%s.writer", CommonTools.md5(dataSourceWriterFileOriginValue));
3485                                        DriverDataSource dmDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey);
3486                                        DriverDataSource dmDataSourceWriter = DATASOURCE_REMOTE_FILE.get(dmDsWriterKey);
3487                                        if (dmDataSourceReader == null) {
3488                                                DATASOURCE_REMOTE_FILE.put(dmDsReaderKey,
3489                                                                new DriverDataSource(new File(dataSourceReaderFileOriginValue)));
3490                                        }
3491                                        if (dmDataSourceWriter == null) {
3492                                                DATASOURCE_REMOTE_FILE.put(dmDsWriterKey,
3493                                                                new DriverDataSource(new File(dataSourceWriterFileOriginValue)));
3494                                        }
3495                                        String tableRangOriginValue = dmCp.getString(String.format("%s.TableRange", originKey), "0");
3496                                        ConfigProperties shortCp = new ConfigProperties();
3497                                        List<Integer> tableRange = new ArrayList<Integer>();
3498                                        if (tableRangOriginValue.contains("-")) {
3499                                                String[] ranges = tableRangOriginValue.split("-");
3500                                                Integer begin = Integer.parseInt(ranges[0].trim());
3501                                                Integer end = Integer.parseInt(ranges[1].trim());
3502                                                for (int j = begin; j <= end; j++) {
3503                                                        tableRange.add(begin + j);
3504                                                }
3505                                        } else if (tableRangOriginValue.contains(",")) {
3506                                                String[] ranges = tableRangOriginValue.split(",");
3507                                                int rangesLen = ranges.length;
3508                                                for (int j = 0; j < rangesLen; j++) {
3509                                                        tableRange.add(Integer.parseInt(ranges[j].trim()));
3510                                                }
3511                                        } else {
3512                                                tableRange.add(Integer.parseInt(tableRangOriginValue.trim()));
3513                                        }
3514                                        shortCp.put("TableRange", tableRange);
3515                                        shortCp.putAndReturnSlef("DataSourceReader", dmDsReaderKey);
3516                                        shortCp.putAndReturnSlef("DataSourceWriter", dmDsWriterKey);
3517                                        shortCp.putAndReturnSlef("Priority", priority);
3518                                        shortCp.putAndReturnSlef("UsingCount", 0);
3519                                        shortCp.putAndReturnSlef("Name", dm);
3520                                        int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size());
3521                                        shortCp.putAndReturnSlef("Table", tableRange.get(randomIndex));
3522                                        DATA_MAPPING.put(shortKey, shortCp);
3523                                        dmList.add(dm);
3524                                        DATA_MAPPING_LIST.put(rf.dsKey, dmList);
3525                                }
3526                        }
3527                }
3528
3529        }
3530
3531        protected static synchronized void calculateDataMappingDataSourceReader(RemoteFile rf,
3532                        Map<String, Object> rfRecord) {
3533                rf.dataMappingDs = (String) rfRecord.get("file_part_data_ds");
3534                rf.dataMappingTable = (Integer) rfRecord.get("file_part_data_table");
3535                String shortKey = String.format("%s.%s", rf.dsKey, rf.dataMappingDs);
3536                ConfigProperties cp = DATA_MAPPING.get(shortKey);
3537                if (cp != null) {
3538                        String dmDsReaderKey = cp.getString("DataSourceReader");
3539                        rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey);
3540                }
3541        }
3542
3543        protected static synchronized void calculateDataMappingDataSourceWriter(RemoteFile rf) {
3544                if (DATA_MAPPING_INDEX.get(rf.dsKey) >= DATA_MAPPING_LIST.size() - 1) {
3545                        DATA_MAPPING_INDEX.put(rf.dsKey, 0);
3546                }
3547                ConfigProperties cp = getDataMapping(rf, DATA_MAPPING_INDEX.get(rf.dsKey));
3548                List<Integer> tableRange = (List<Integer>) cp.get("TableRange");
3549                Integer priority = cp.getInteger("Priority", 0);
3550                Integer usingCount = cp.getInteger("UsingCount", 0);
3551                String key = String.format("%s.%s", rf.dsKey, cp.getString("Name"));
3552                if (usingCount < priority) {
3553                        rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceReader"));
3554                        rf.dataMappingDataSourceWriter = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceWriter"));
3555                        rf.dataMappingDs = cp.getString("Name");
3556                        rf.dataMappingTable = cp.getInteger("Table");
3557                        usingCount++;
3558                        if (usingCount >= priority) {
3559                                DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1);
3560                                cp.putAndReturnSlef("UsingCount", 0);
3561                                int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size());
3562                                cp.putAndReturnSlef("Table", tableRange.get(randomIndex));
3563                                DATA_MAPPING.put(key, cp);
3564                        } else {
3565                                cp.putAndReturnSlef("UsingCount", usingCount);
3566                                int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size());
3567                                cp.putAndReturnSlef("Table", tableRange.get(randomIndex));
3568                                DATA_MAPPING.put(key, cp);
3569                        }
3570                } else {
3571                        DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1);
3572                        calculateDataMappingDataSourceWriter(rf);
3573                }
3574        }
3575
3576        protected static synchronized ConfigProperties getDataMapping(RemoteFile rf, int index) {
3577                List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey);
3578                String dm = dmList.get(index);
3579                String key = String.format("%s.%s", rf.dsKey, dm);
3580                return DATA_MAPPING.get(key);
3581        }
3582
3583        protected String getDataMappingTableName() {
3584                return String.format("%s_data_%s", this.tableName, this.dataMappingTable);
3585        }
3586        
3587        private static void debugLog(String point,RemoteFile rf,CacheArrayFilter filter){
3588                debugLog(RemoteFile.class,point,rf,filter);
3589        }       
3590
3591}