001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.file; 025 026import java.io.File; 027import com.killcoding.datasource.DriverDataSource; 028import com.killcoding.tool.ConfigProperties; 029import com.killcoding.tool.CommonTools; 030import com.killcoding.log.Logger; 031import com.killcoding.log.LoggerFactory; 032import java.io.FileInputStream; 033import java.io.IOException; 034import java.sql.Connection; 035import java.sql.SQLException; 036import com.killcoding.datasource.CacheDriverExecutor; 037import java.util.Map; 038import java.util.HashMap; 039import com.killcoding.datasource.Clock; 040import java.sql.Timestamp; 041import java.nio.file.Files; 042import java.util.concurrent.ConcurrentHashMap; 043import java.util.List; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.io.ByteArrayOutputStream; 047import java.sql.Blob; 048import java.nio.file.Paths; 049import java.nio.file.Path; 050import java.net.URI; 051import java.io.ByteArrayInputStream; 052import java.nio.ByteBuffer; 053import java.util.concurrent.ExecutorService; 054import java.util.concurrent.Executors; 055import java.util.concurrent.Future; 056import java.nio.file.LinkOption; 057import javax.crypto.Cipher; 058import javax.crypto.CipherInputStream; 059import javax.crypto.CipherOutputStream; 060import javax.crypto.KeyGenerator; 061import javax.crypto.SecretKey; 062import javax.crypto.SecretKeyFactory; 063import javax.crypto.spec.DESKeySpec; 064import javax.crypto.spec.IvParameterSpec; 065import javax.crypto.spec.SecretKeySpec; 066import com.killcoding.file.DiskFile; 067import java.util.Collection; 068import com.killcoding.cache.CacheArray; 069import com.killcoding.cache.CacheArrayFilter; 070import java.nio.file.attribute.BasicFileAttributes; 071import java.util.concurrent.Callable; 072import com.killcoding.datasource.IdentityCard; 073import java.util.Collections; 074import java.util.concurrent.ThreadLocalRandom; 075import com.killcoding.tool.CipherTools; 076import com.killcoding.cache.Cache; 077import java.util.Date; 078 079public class RemoteFile extends BaseFile implements RemoteFileSQL { 080 081 private static final Map<String, DriverDataSource> DATASOURCE_REMOTE_FILE = new ConcurrentHashMap<String, DriverDataSource>(); 082 083 private static final Map<String, ConfigProperties> CONFIG_PROPS_REMOTE_FILE = new ConcurrentHashMap<String, ConfigProperties>(); 084 085 private static final Map<String, Long> QUEUE_PATH_MAPPING = new ConcurrentHashMap<String, Long>(); 086 087 private static final Map<String, ConfigProperties> DATA_MAPPING = new ConcurrentHashMap<String, ConfigProperties>(); 088 089 private static final Map<String, List<String>> DATA_MAPPING_LIST = new ConcurrentHashMap<String, List<String>>(); 090 091 private static final Map<String, Integer> DATA_MAPPING_INDEX = new ConcurrentHashMap<String, Integer>(); 092 093 protected static boolean FIRST_LOADED = false; 094 095 public static Integer MAX_POOL_SIZE = 100; 096 public static Double MAX_CONNECT_USAGE = 0.5D; 097 public static Double MAX_MEMORY_USAGE = 0.8D; 098 public static Integer MAX_QUEUE_SIZE = 10; 099 public static Long MAX_FILE_SZIE = -1L; 100 public static boolean SHOW_LINK = false; 101 public static boolean SHOW_HIDDEN = false; 102 public static boolean COPY_STRUCTURE_ONLY = false; 103 public static Integer MAX_FILE_PARENT_PATH_LENGTH = 200; 104 public static Integer MAX_FILE_NAME_LENGTH = 200; 105 public static Integer CACHE_SECONDS = 5; 106 public static Date SYNC_CUTOFF_TIME = null; 107 public static Date DELETE_CUTOFF_TIME = null; 108 109 private static IdentityCard identityCard = null; 110 private static File appRemoteFile = null; 111 private DriverDataSource dataSourceReader = null; 112 private DriverDataSource dataSourceWriter = null; 113 private String dataSourceReaderPath = null; 114 private String dataSourceWriterPath = null; 115 private String modifyUserId = null; 116 private Integer filePartSize = null; 117 private Boolean identityCardEnable = false; 118 private String identityCardName = null; 119 private String identityCardPrefix = null; 120 private Integer identityCardSuffix = 0; 121 private Timestamp modifyTime = null; 122 123 protected ConfigProperties configProperties = null; 124 protected String fileId = null; 125 protected String tableName = null; 126 protected Integer dataMappingTable = null; 127 protected String dataMappingDs = null; 128 protected DriverDataSource dataMappingDataSourceReader = null; 129 protected DriverDataSource dataMappingDataSourceWriter = null; 130 protected String dsKey = null; 131 132 public static Long BEFORE_THE_QUEUE_TIME = 60000L; 133 public static Integer BATCH_SIZE = 10; 134 public static String DEFAULE_MODIFY_USER_ID = null; 135 136 private Logger log = null; 137 138 private static ExecutorService mergePool = null; 139 140 public RemoteFile(String path) { 141 super(path); 142 log = LoggerFactory.getLogger(getClass()); 143 fileId = CommonTools.generateId(16); 144 initGlobal(); 145 } 146 147 public static synchronized void initPool(int poolSize) { 148 if (mergePool == null) { 149 MAX_POOL_SIZE = poolSize; 150 LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE); 151 mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 152 } 153 } 154 155 private synchronized void init() { 156 dsKey = CommonTools.md5(this.appRemoteFile.getAbsolutePath()); 157 dataSourceReader = DATASOURCE_REMOTE_FILE.get(String.format("%s.reader", dsKey)); 158 dataSourceWriter = DATASOURCE_REMOTE_FILE.get(String.format("%s.writer", dsKey)); 159 configProperties = CONFIG_PROPS_REMOTE_FILE.get(dsKey); 160 if (dataSourceReader == null && dataSourceWriter == null) { 161 try { 162 if (!appRemoteFile.exists()) 163 throw new IOException(String.format("Not exist '%s'.", appRemoteFile.getAbsolutePath())); 164 165 configProperties = new ConfigProperties(); 166 Runnable updatedRun = new Runnable() { 167 @Override 168 public void run() { 169 try { 170 reload(); 171 LoggerFactory.getLogger(RemoteFile.class).mark("RemoteFile Reloaded"); 172 } catch (Exception e) { 173 log.error(e.getMessage(), e); 174 } 175 } 176 }; 177 configProperties.load(appRemoteFile, updatedRun); 178 179 CONFIG_PROPS_REMOTE_FILE.put(dsKey, configProperties); 180 181 String dataSourcePath = configProperties.getString("DataSource"); 182 dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath); 183 dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath); 184 reload(); 185 } catch (Exception e) { 186 log.error(e.getMessage(), e); 187 } 188 } 189 if (configProperties != null) { 190 String dataSourcePath = configProperties.getString("DataSource"); 191 dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath); 192 dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath); 193 reload(); 194 } 195 calculateDataMappingDataSourceWriter(this); 196 } 197 198 private synchronized void initGlobal() { 199 if(this.appRemoteFile == null){ 200 this.appRemoteFile = CommonTools.getSystemProperties(RemoteFile.class, "APP_REMOTE_FILE", 201 "RemoteFile.properties"); 202 } 203 init(); 204 } 205 206 private synchronized void reload() { 207 try { 208 LOGIC_TIMEOUT_MS = configProperties.getMilliSeconds("LogicTimeout", 300000L); 209 LOGIC_CHECK_TIMEOUT_MS = configProperties.getMilliSeconds("LogicCheckTimeout", 60000L); 210 MAX_FILE_SZIE = configProperties.getFileSize("MaxFileSize", 1024 * 1024 * 1L); 211 SHOW_HIDDEN = configProperties.getBoolean("ShowHidden", false); 212 SHOW_LINK = configProperties.getBoolean("ShowLink", false); 213 COPY_STRUCTURE_ONLY = configProperties.getBoolean("CopyStructureOnly", false); 214 MAX_QUEUE_SIZE = configProperties.getInteger("MaxQueueSize", 10); 215 BATCH_SIZE = configProperties.getInteger("BatchSize", 10); 216 MAX_CONNECT_USAGE = configProperties.getDouble("MaxConnectUsage", 0.5D); 217 MAX_MEMORY_USAGE = configProperties.getDouble("MaxMemoryUsage", 0.8D); 218 BEFORE_THE_QUEUE_TIME = configProperties.getMilliSeconds("BeforeTheQueueTime", 15000L); 219 MAX_FILE_PARENT_PATH_LENGTH = configProperties.getInteger("MaxFileParentPathLength", 200); 220 MAX_FILE_NAME_LENGTH = configProperties.getInteger("MaxFileNameLength", 200); 221 CACHE_SECONDS = configProperties.getInteger("CacheSeconds", 5); 222 223 SYNC_CUTOFF_TIME = configProperties.getDateTime("SyncCutoffTime","0000-01-01 00:00:00.0000"); 224 DELETE_CUTOFF_TIME = configProperties.getDateTime("DeleteCutoffTime","0000-01-01 00:00:00.0000"); 225 226 if(BEFORE_THE_QUEUE_TIME < 0) BEFORE_THE_QUEUE_TIME = 15000L; 227 228 if(MAX_CONNECT_USAGE < 0) MAX_CONNECT_USAGE = 0D; 229 230 if(MAX_MEMORY_USAGE < 0) MAX_MEMORY_USAGE = 0D; 231 232 if(BATCH_SIZE <= 0) BATCH_SIZE = 10; 233 234 if(MAX_QUEUE_SIZE <= 0) MAX_QUEUE_SIZE = 10; 235 236 if(MAX_FILE_SZIE <= 0) MAX_FILE_SZIE = 1024 * 1024 * 1L; 237 238 if(LOGIC_TIMEOUT_MS <= 0) LOGIC_TIMEOUT_MS = 900000L; 239 240 if(LOGIC_CHECK_TIMEOUT_MS < 0) LOGIC_CHECK_TIMEOUT_MS = 60000L; 241 242 if(MAX_FILE_PARENT_PATH_LENGTH <= 0) MAX_FILE_PARENT_PATH_LENGTH = 200; 243 244 if(MAX_FILE_NAME_LENGTH <= 0) MAX_FILE_NAME_LENGTH = 200; 245 246 if(CACHE_SECONDS < 0) CACHE_SECONDS = 0; 247 248 log.debug("LOGIC_TIMEOUT_MS={}", LOGIC_TIMEOUT_MS); 249 log.debug("LOGIC_CHECK_TIMEOUT_MS={}", LOGIC_CHECK_TIMEOUT_MS); 250 log.debug("MAX_FILE_SZIE={}", MAX_FILE_SZIE); 251 log.debug("SHOW_HIDDEN={}", SHOW_HIDDEN); 252 log.debug("SHOW_LINK={}", SHOW_LINK); 253 log.debug("COPY_STRUCTURE_ONLY={}", COPY_STRUCTURE_ONLY); 254 log.debug("MAX_QUEUE_SIZE={}", MAX_QUEUE_SIZE); 255 log.debug("BATCH_SIZE={}", BATCH_SIZE); 256 log.debug("MAX_CONNECT_USAGE={}", MAX_CONNECT_USAGE); 257 log.debug("MAX_MEMORY_USAGE={}", MAX_MEMORY_USAGE); 258 log.debug("MAX_FILE_PARENT_PATH_LENGTH={}", MAX_FILE_PARENT_PATH_LENGTH); 259 log.debug("MAX_FILE_NAME_LENGTH={}", MAX_FILE_NAME_LENGTH); 260 log.debug("CACHE_SECONDS={}", CACHE_SECONDS); 261 262 String dataSourcePath = configProperties.getString("DataSource"); 263 String reloadDataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath); 264 String reloadDataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath); 265 filePartSize = configProperties.getFileSize("FilePartSize", 1024 * 1024 * 1L).intValue(); 266 267 if(filePartSize <= 0) filePartSize = 1024 * 1024 * 1; 268 269 tableName = configProperties.getString("TableName", "remote_file"); 270 identityCardEnable = configProperties.getBoolean("IdentityCardEnable", false); 271 identityCardName = configProperties.getString("IdentityCardName", "RemoteFile"); 272 identityCardPrefix = configProperties.getString("IdentityCardPrefix", "RMF"); 273 identityCardSuffix = configProperties.getInteger("IdentityCardSuffix", 0); 274 275 String readerDsKey = String.format("%s.reader", dsKey); 276 String writerDsKey = String.format("%s.writer", dsKey); 277 /** 278 May be referenced when expansion is needed 279 if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) { 280 DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(readerDsKey); 281 if (currentDs != null) { 282 currentDs.closeAll(); 283 } 284 **/ 285 if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) { 286 dataSourceReaderPath = reloadDataSourceReaderPath; 287 dataSourceReader = new DriverDataSource(new File(dataSourceReaderPath)); 288 DATASOURCE_REMOTE_FILE.put(readerDsKey, dataSourceReader); 289 } 290 /** 291 May be referenced when expansion is needed 292 if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) { 293 DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(writerDsKey); 294 if (currentDs != null) { 295 currentDs.closeAll(); 296 } 297 **/ 298 if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) { 299 dataSourceWriterPath = reloadDataSourceWriterPath; 300 dataSourceWriter = new DriverDataSource(new File(dataSourceWriterPath)); 301 DATASOURCE_REMOTE_FILE.put(writerDsKey, dataSourceWriter); 302 } 303 304 loadDataMapping(this); 305 306 FIRST_LOADED = true; 307 } catch (Exception e) { 308 log.error(e.getMessage(), e); 309 } 310 } 311 312 public String formatSql(String sql) { 313 return String.format(sql, tableName); 314 } 315 316 public String formatSqlForFilePartData(String sql) { 317 return String.format(sql, getDataMappingTableName()); 318 } 319 320 public DiskFile writeToDisk(DiskFile diskFile) throws IOException { 321 if (!exists()) { 322 log.mark(String.format("The remote file '%s' does not exist.", this.getPath())); 323 } 324 RemoteFile the = this; 325 326 Timestamp rfModifyTime = the.getModifyTime(); 327 if (rfModifyTime != null) { 328 diskFile.setModifyTimeMsFromClock(rfModifyTime.getTime()); 329 } 330 if (isLink()) { 331 if (isDebug(the)) { 332 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", diskFile.getPath()); 333 } 334 diskFile.createLink(new String(readAllBytes(),BaseFile.CHARSET)); 335 } else if (isFile()) { 336 merge(new FilePart() { 337 final List<byte[]> partDataList = new ArrayList<byte[]>(); 338 boolean isWritingDf = false; 339 DiskFile realDf = null; 340 341 @Override 342 protected void process(int partIndex, byte[] data) throws IOException { 343 Thread.currentThread() 344 .setName(String.format("RemoteFile-writeToDisk-merge-%s", diskFile.getOrigin().getName())); 345 if (syncRoot != null) { 346 if (!diskFile.isLogicModify()) { 347 throw new IOException( 348 String.format("The file '%s' missing modify logic lock.", the.getPath())); 349 } 350 } 351 if (diskFile != null) 352 diskFile.logicModify(); 353 try { 354 String fileName = diskFile.getOrigin().getName(); 355 isWritingDf = fileName.matches(TMP_WRITING_SWP); 356 if (isWritingDf) { 357 if (partIndex == 0) { 358 if (diskFile.exists()) { 359 diskFile.delete(); 360 } 361 if (isDebug(the)) { 362 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}", 363 diskFile.getPath()); 364 } 365 String realFileName = fileName.replaceFirst(BaseFile.TMP_WRITING_FOR_REPLACE, ""); 366 String realDfPath = String.format("%s/%s", diskFile.getParent(), realFileName); 367 realDf = new DiskFile(realDfPath); 368 realDf.copyAttrs(realDf, diskFile.isCopyStructureOnly(), diskFile.syncRoot); 369 if(!diskFile.manualOpen(true)){ 370 throw new IOException( 371 String.format("The disk file '%s' cannot open.", diskFile.getPath())); 372 } 373 } 374 partDataList.add(decrypt(data)); 375 if (partDataList.size() >= BATCH_SIZE) { 376 377 if (realDf != null) 378 realDf.logicModify(); 379 380 ByteArrayOutputStream ops = new ByteArrayOutputStream(); 381 int size = partDataList.size(); 382 for (int i = 0; i < size; i++) { 383 byte[] partData = partDataList.get(i); 384 ops.write(partData); 385 } 386 diskFile.manualWrite(ops.toByteArray()); 387 partDataList.clear(); 388 } 389 } else { 390 if (partIndex == 0) { 391 if (diskFile.exists()) { 392 throw new IOException( 393 String.format("The remote file '%s' already exist.", diskFile.getPath())); 394 } 395 if (isDebug(the)) { 396 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}", 397 diskFile.getPath()); 398 } 399 if(!diskFile.manualOpen(true)){ 400 throw new IOException( 401 String.format("The disk file '%s' cannot open.", diskFile.getPath())); 402 } 403 } 404 partDataList.add(decrypt(data)); 405 if (partDataList.size() >= BATCH_SIZE) { 406 ByteArrayOutputStream ops = new ByteArrayOutputStream(); 407 int size = partDataList.size(); 408 for (int i = 0; i < size; i++) { 409 byte[] partData = partDataList.get(i); 410 ops.write(partData); 411 } 412 diskFile.manualWrite(ops.toByteArray()); 413 partDataList.clear(); 414 } 415 } 416 417 } catch (Exception e) { 418 log.warn(e.getMessage(), e); 419 if (diskFile != null) { 420 diskFile.manualClose(); 421 diskFile.delete(); 422 } 423 throw new IOException(e.getMessage(), e); 424 } 425 } 426 427 @Override 428 protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) throws IOException { 429 if (partDataList.size() > 0) { 430 ByteArrayOutputStream ops = new ByteArrayOutputStream(); 431 int size = partDataList.size(); 432 for (int i = 0; i < size; i++) { 433 byte[] partData = partDataList.get(i); 434 ops.write(partData); 435 } 436 diskFile.manualWrite(ops.toByteArray()); 437 partDataList.clear(); 438 } 439 diskFile.manualClose(); 440 if (syncRoot != null && !diskFile.isLogicModify()) { 441 diskFile.delete(); 442 throw new IOException( 443 String.format("The file '%s' missing modify logic lock.", diskFile.getPath())); 444 } 445 try { 446 if (isWritingDf) { 447 if (diskFile.exists()) { 448 diskFile.moveTo(realDf.getPath()); 449 if (isDebug(the)) { 450 LoggerFactory.getLogger(RemoteFile.class).mark("CompletedDiskFile - {}", 451 realDf.getPath()); 452 } 453 } 454 if (realDf.exists()) 455 the.syncedToDisk(); 456 } 457 if (realDf == null) { 458 if (diskFile != null) { 459 if (diskFile.getOrigin().getName().startsWith(".")) { 460 Files.setAttribute(Paths.get(diskFile.getPath()), "dos:hidden", true); 461 } 462 } 463 } else { 464 if (realDf.getOrigin().getName().startsWith(".")) { 465 Files.setAttribute(Paths.get(realDf.getPath()), "dos:hidden", true); 466 } 467 } 468 469 RemoteFile.removeQueuePathMapping(the.getPath()); 470 471 if (diskFile != null) 472 diskFile.removeLogicModify(); 473 474 if (realDf != null) 475 realDf.removeLogicModify(); 476 477 } catch (Exception e) { 478 log.error(e.getMessage(), e); 479 } 480 } 481 482 @Override 483 protected void ended(int lastPartIndex, long fileSize) { 484 try { 485 if (diskFile != null) { 486 diskFile.manualClose(); 487 } 488 } catch (Exception e) { 489 log.error(e.getMessage(), e); 490 } 491 } 492 }); 493 } else { 494 List<Map<String, Object>> writeList = list(getPath(), "%"); 495 writeList.addAll(list(parsePath(getPath() + "/%"), "%")); 496 497 int size = writeList.size(); 498 if (size > 0) { 499 if (!diskFile.exists()) { 500 if (isDebug(the)) { 501 log.mark("CreateDiskDir - {} ", diskFile.getPath()); 502 } 503 diskFile.mkdirs(); 504 } 505 } 506 507 for (Map<String, Object> item : writeList) { 508 RemoteFile rf = new RemoteFile( 509 String.format("%s/%s", item.get("file_parent_path"), item.get("file_name"))); 510 511 if (rf.getPath().equals(getPath())) 512 continue; 513 514 String dfPath = rf.getPath().replaceFirst(getPath(), diskFile.getPath()); 515 DiskFile df = new DiskFile(dfPath); 516 df.copyAttrs(df, false, this.syncRoot); 517 Timestamp mt = rf.getModifyTime(); 518 if (mt != null) { 519 df.setModifyTimeMsFromClock(mt.getTime()); 520 } 521 if (rf.isDir()) { 522 if (!df.exists()) { 523 if (isDebug(the)) { 524 log.mark("CreateDiskDir - {}", diskFile.getPath()); 525 } 526 df.mkdirs(); 527 } 528 } else { 529 rf.writeToDisk(df); 530 } 531 } 532 } 533 return diskFile; 534 } 535 536 public DiskFile writeToDisk() throws IOException { 537 DiskFile df = new DiskFile(this.origin.getAbsolutePath()); 538 df.copyAttrs(df, false, this.syncRoot); 539 return writeToDisk(df); 540 } 541 542 public DiskFile writeToDisk(String path) throws IOException { 543 DiskFile df = new DiskFile(path); 544 df.copyAttrs(df, false, this.syncRoot); 545 return writeToDisk(df); 546 } 547 548 public boolean isCompleted() throws IOException { 549 return getCompletedRecord() != null; 550 } 551 552 public Map<String, Object> getCompletedRecord() throws IOException { 553 String sql = formatSql(SQL_SELECT_FOR_COMPLETED); 554 Map dataRecord = new HashMap<String, Object>(); 555 dataRecord.put("file_name", origin.getName()); 556 dataRecord.put("file_parent_path", getParent()); 557 String cacheKey = String.format("first-%s-%s",sql,dataRecord); 558 Map<String,Object> record = (Map<String,Object>)Cache.get(cacheKey); 559 if(record == null){ 560 CacheDriverExecutor executor = null; 561 Connection conn = null; 562 try { 563 conn = dataSourceReader.getConnection(); 564 conn.setAutoCommit(true); 565 executor = new CacheDriverExecutor(conn); 566 record = executor.first(sql, dataRecord); 567 if(record != null){ 568 Cache.set(cacheKey,record,CACHE_SECONDS); 569 } 570 } catch (SQLException e) { 571 throw new IOException(e.getMessage(), e); 572 } finally { 573 try { 574 if (executor != null) 575 executor.close(); 576 } catch (Exception e) { 577 log.error(e.getMessage(), e); 578 } 579 } 580 } 581 return record; 582 } 583 584 public Map<String, Object> getRecordById(Object id) throws IOException { 585 String sql = formatSql(SQL_SELECT_GET_RECORD_BY_ID); 586 Map dataRecord = new HashMap<String, Object>(); 587 dataRecord.put("id", id); 588 Map<String, Object> record = null; 589 if (record == null) { 590 CacheDriverExecutor executor = null; 591 Connection conn = null; 592 try { 593 conn = dataSourceReader.getConnection(); 594 conn.setAutoCommit(true); 595 executor = new CacheDriverExecutor(conn); 596 record = executor.first(sql, dataRecord); 597 } catch (SQLException e) { 598 throw new IOException(e.getMessage(), e); 599 } finally { 600 try { 601 if (executor != null) 602 executor.close(); 603 } catch (Exception e) { 604 log.error(e.getMessage(), e); 605 } 606 } 607 } 608 return record; 609 } 610 611 public Map<String, Object> getRecordByFileId(Object fileId) throws IOException { 612 String sql = formatSql(SQL_SELECT_GET_RECORD_BY_FILE_ID); 613 Map dataRecord = new HashMap<String, Object>(); 614 dataRecord.put("file_id", fileId); 615 Map<String, Object> record = null; 616 if (record == null) { 617 CacheDriverExecutor executor = null; 618 Connection conn = null; 619 try { 620 conn = dataSourceReader.getConnection(); 621 conn.setAutoCommit(true); 622 executor = new CacheDriverExecutor(conn); 623 record = executor.first(sql, dataRecord); 624 } catch (SQLException e) { 625 throw new IOException(e.getMessage(), e); 626 } finally { 627 try { 628 if (executor != null) 629 executor.close(); 630 } catch (Exception e) { 631 log.error(e.getMessage(), e); 632 } 633 } 634 } 635 return record; 636 } 637 638 public Map<String, Object> getLastUpdateRecord() throws IOException { 639 String sql = formatSql(SQL_SELECT_FOR_LAST_UPDATE); 640 Map dataRecord = new HashMap<String, Object>(); 641 dataRecord.put("file_name", origin.getName()); 642 dataRecord.put("file_parent_path", getParent()); 643 Map<String, Object> record = null; 644 CacheDriverExecutor executor = null; 645 Connection conn = null; 646 try { 647 conn = dataSourceReader.getConnection(); 648 conn.setAutoCommit(true); 649 executor = new CacheDriverExecutor(conn); 650 record = executor.first(sql, dataRecord); 651 return record; 652 } catch (SQLException e) { 653 throw new IOException(e.getMessage(), e); 654 } finally { 655 try { 656 if (executor != null) 657 executor.close(); 658 } catch (Exception e) { 659 log.error(e.getMessage(), e); 660 } 661 } 662 } 663 664 protected Timestamp getFirstCreatedAt() throws IOException { 665 String sql = formatSql(SQL_SELECT_FOR_FIRST_CREATED_AT); 666 Map dataRecord = new HashMap<String, Object>(); 667 dataRecord.put("file_parent_path", getPath()); 668 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 669 dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 670 CacheDriverExecutor executor = null; 671 Connection conn = null; 672 try { 673 conn = dataSourceReader.getConnection(); 674 conn.setAutoCommit(true); 675 executor = new CacheDriverExecutor(conn); 676 Map<String, Object> record = executor.first(sql, dataRecord); 677 return record == null ? null : (Timestamp)record.get("created_at"); 678 } catch (SQLException e) { 679 throw new IOException(e.getMessage(), e); 680 } finally { 681 try { 682 if (executor != null) 683 executor.close(); 684 } catch (Exception e) { 685 log.error(e.getMessage(), e); 686 } 687 } 688 } 689 690 protected Integer getMaxFilePartDataTable() throws IOException { 691 String sql = formatSql(SQL_SELECT_FOR_MAX_DATA_TABLE); 692 Map dataRecord = new HashMap<String, Object>(); 693 dataRecord.put("file_parent_path", getPath()); 694 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 695 CacheDriverExecutor executor = null; 696 Connection conn = null; 697 try { 698 conn = dataSourceReader.getConnection(); 699 conn.setAutoCommit(true); 700 executor = new CacheDriverExecutor(conn); 701 Map<String, Object> record = executor.first(sql, dataRecord); 702 return record == null ? 0 : Integer.parseInt(record.get("file_part_data_table")+""); 703 } catch (SQLException e) { 704 throw new IOException(e.getMessage(), e); 705 } finally { 706 try { 707 if (executor != null) 708 executor.close(); 709 } catch (Exception e) { 710 log.error(e.getMessage(), e); 711 } 712 } 713 } 714 715 private Map<String, Object> getSelectByPathForCheckExists() throws IOException { 716 String sql = formatSql(SQL_SELECT_BY_PATH_FOR_CHECK_EXISTS); 717 Map dataRecord = new HashMap<String, Object>(); 718 dataRecord.put("file_name", origin.getName()); 719 dataRecord.put("file_parent_path", getParent()); 720 Map<String, Object> record = null; 721 if (record == null) { 722 CacheDriverExecutor executor = null; 723 Connection conn = null; 724 try { 725 conn = dataSourceReader.getConnection(); 726 conn.setAutoCommit(true); 727 executor = new CacheDriverExecutor(conn); 728 record = executor.first(sql, dataRecord); 729 return record; 730 } catch (SQLException e) { 731 throw new IOException(e.getMessage(), e); 732 } finally { 733 try { 734 if (executor != null) 735 executor.close(); 736 } catch (Exception e) { 737 log.error(e.getMessage(), e); 738 } 739 } 740 } 741 return record; 742 } 743 744 private int countAllSubFiles(String fileParentPath, String fileName) throws IOException { 745 String sql = formatSql(SQL_SELECT_FOR_COUNT_ALL_SUB_FILES); 746 Map dataRecord = new HashMap<String, Object>(); 747 dataRecord.put("file_name", fileName); 748 dataRecord.put("file_parent_path", fileParentPath); 749 Map<String, Object> record = null; 750 if (record == null) { 751 CacheDriverExecutor executor = null; 752 Connection conn = null; 753 try { 754 conn = dataSourceReader.getConnection(); 755 conn.setAutoCommit(true); 756 executor = new CacheDriverExecutor(conn); 757 record = executor.first(sql, dataRecord); 758 return Integer.parseInt(record.get("as_count") + ""); 759 } catch (SQLException e) { 760 throw new IOException(e.getMessage(), e); 761 } finally { 762 try { 763 if (executor != null) 764 executor.close(); 765 } catch (Exception e) { 766 log.error(e.getMessage(), e); 767 } 768 } 769 } 770 return -1; 771 } 772 773 private Map<String, Object> getFirstFilePart() throws IOException { 774 String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE); 775 Map dataRecord = new HashMap<String, Object>(); 776 dataRecord.put("file_name", origin.getName()); 777 dataRecord.put("file_parent_path", getParent()); 778 String cacheKey = String.format("first-%s-%s",sql,dataRecord); 779 Map<String, Object> record = (Map<String,Object>)Cache.get(cacheKey); 780 if(record == null){ 781 CacheDriverExecutor executor = null; 782 Connection conn = null; 783 try { 784 conn = dataSourceReader.getConnection(); 785 conn.setAutoCommit(true); 786 executor = new CacheDriverExecutor(conn); 787 record = executor.first(sql, dataRecord); 788 if(record != null){ 789 Cache.set(cacheKey,record,CACHE_SECONDS); 790 } 791 } catch (SQLException e) { 792 throw new IOException(e.getMessage(), e); 793 } finally { 794 try { 795 if (executor != null) 796 executor.close(); 797 } catch (Exception e) { 798 log.error(e.getMessage(), e); 799 } 800 } 801 } 802 return record; 803 } 804 805 private Map<String, Object> getFirstFilePartRefresh() throws IOException { 806 String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE); 807 Map dataRecord = new HashMap<String, Object>(); 808 dataRecord.put("file_name", origin.getName()); 809 dataRecord.put("file_parent_path", getParent()); 810 String cacheKey = String.format("first-%s-%s",sql,dataRecord); 811 Cache.remove(cacheKey); 812 CacheDriverExecutor executor = null; 813 Connection conn = null; 814 try { 815 conn = dataSourceReader.getConnection(); 816 conn.setAutoCommit(true); 817 executor = new CacheDriverExecutor(conn); 818 Map<String,Object> record = executor.first(sql, dataRecord); 819 if(record != null){ 820 Cache.set(cacheKey,record,CACHE_SECONDS); 821 } 822 return record; 823 } catch (SQLException e) { 824 throw new IOException(e.getMessage(), e); 825 } finally { 826 try { 827 if (executor != null) 828 executor.close(); 829 } catch (Exception e) { 830 log.error(e.getMessage(), e); 831 } 832 } 833 } 834 835 private Map<String, Object> getLastIndexRecord() throws IOException { 836 String sql = formatSql(SQL_SELECT_FOR_LAST_FILE_PART); 837 Map dataRecord = new HashMap<String, Object>(); 838 dataRecord.put("file_name", origin.getName()); 839 dataRecord.put("file_parent_path", getParent()); 840 dataRecord.put("file_id", fileId); 841 Map<String, Object> record = null; 842 if (record == null) { 843 CacheDriverExecutor executor = null; 844 Connection conn = null; 845 try { 846 conn = dataSourceReader.getConnection(); 847 conn.setAutoCommit(true); 848 executor = new CacheDriverExecutor(conn); 849 record = executor.first(sql, dataRecord); 850 } catch (SQLException e) { 851 throw new IOException(e.getMessage(), e); 852 } finally { 853 try { 854 if (executor != null) 855 executor.close(); 856 } catch (Exception e) { 857 log.error(e.getMessage(), e); 858 } 859 } 860 } 861 return record; 862 } 863 864 public boolean isTimeout(long timeoutMs) throws IOException { 865 CacheDriverExecutor executor = null; 866 Connection conn = null; 867 try { 868 Clock clock = new Clock(); 869 conn = dataSourceReader.getConnection(); 870 conn.setAutoCommit(true); 871 executor = new CacheDriverExecutor(conn); 872 Map dataRecord = new HashMap<String, Object>(); 873 dataRecord.put("file_name", origin.getName()); 874 dataRecord.put("file_parent_path", getParent()); 875 Map<String, Object> completedRecord = getCompletedRecord(); 876 877 if (completedRecord != null) 878 return false; 879 880 Map<String, Object> record = executor.first(formatSql(SQL_SELECT_FOR_TIMEOUT), dataRecord); 881 882 if (record == null) 883 return false; 884 885 boolean completed = executor.first(formatSql(SQL_SELECT_CHECK_COMPLETED), record) != null; 886 if (!completed) { 887 boolean timeout = (clock.getTime() - ((Timestamp) record.get("updated_at")).getTime()) > timeoutMs; 888 889 return timeout; 890 } 891 return false; 892 } catch (SQLException e) { 893 throw new IOException(e.getMessage(), e); 894 } finally { 895 try { 896 if (executor != null) 897 executor.close(); 898 } catch (Exception e) { 899 log.error(e.getMessage(), e); 900 } 901 } 902 } 903 904 private Map<String, Object> readPart(Map<String, Object> rfRecord) throws IOException { 905 CacheDriverExecutor executor = null; 906 Connection conn = null; 907 try { 908 calculateDataMappingDataSourceReader(this, rfRecord); 909 if(this.dataMappingTable <= -1){ 910 return null; 911 } 912 Clock clock = new Clock(); 913 conn = dataMappingDataSourceReader.getConnection(); 914 conn.setAutoCommit(true); 915 executor = new CacheDriverExecutor(conn); 916 Map<String, Object> dataRecord = new HashMap<String, Object>(); 917 dataRecord.put("remote_file_id", rfRecord.get("id")); 918 return executor.first(formatSqlForFilePartData(SQL_SELECT_BY_RF_ID_FROM_FILE_DATA), dataRecord); 919 } catch (SQLException e) { 920 throw new IOException(e.getMessage(), e); 921 } finally { 922 try { 923 if (executor != null) 924 executor.close(); 925 } catch (Exception e) { 926 log.error(e.getMessage(), e); 927 } 928 } 929 } 930 931 public void merge(FilePart filePart) throws IOException { 932 if (isLink()) { 933 throw new IOException(String.format("The file '%s' is a link.", origin.getAbsolutePath())); 934 } 935 if (isDir()) { 936 throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath())); 937 } 938 CacheDriverExecutor executor = null; 939 Connection conn = null; 940 try { 941 Clock clock = new Clock(); 942 conn = dataSourceReader.getConnection(); 943 conn.setAutoCommit(true); 944 executor = new CacheDriverExecutor(conn); 945 Map dataRecord = getCompletedRecord(); 946 if (dataRecord != null) { 947 CacheArray rows = new CacheArray(); 948 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 949 long fileSize = 0L; 950 Map<String, Object> fp = null; 951 Map<String, Object> item = null; 952 953 @Override 954 public void execute(Integer index, Object o) { 955 956 try { 957 item = (Map<String, Object>) o; 958 fp = readPart(item); 959 Object fpData = null; 960 961 if(fp == null){ 962 fpData = new byte[]{}; 963 }else{ 964 fpData = fp.get("file_part_data"); 965 } 966 Thread.currentThread() 967 .setName(String.format("RemoteFile-merge-%s", item.get("file_name"))); 968 byte[] partData = null; 969 if (fpData instanceof byte[]) { 970 partData = (byte[]) fpData; 971 } 972 fileSize += partData.length; 973 filePart.process(index, partData); 974 } catch (Exception e) { 975 log.error(e.getMessage(), e); 976 this.terminated = true; 977 } 978 } 979 980 @Override 981 public void completed(Integer size) { 982 try { 983 if (item != null) { 984 Thread.currentThread() 985 .setName(String.format("RemoteFile-merge-completed-%s", item.get("file_name"))); 986 } 987 filePart.completed(size, new Clock().getSqlTimestamp(), fileSize); 988 } catch (IOException e) { 989 log.error(e.getMessage(), e); 990 } 991 } 992 993 @Override 994 public void terminated() { 995 if (item != null) { 996 Thread.currentThread() 997 .setName(String.format("RemoteFile-merge-trminated-%s", item.get("file_name"))); 998 log.error("Merge terminated: {}/{}", item.get("file_parent_path"), item.get("file_name")); 999 } 1000 } 1001 }; 1002 rows.filter(filter); 1003 executor.find(formatSql(SQL_SELECT_FILE_HAVE_DATA), dataRecord, rows); 1004 } 1005 } catch (SQLException e) { 1006 throw new IOException(e.getMessage(), e); 1007 } finally { 1008 if (executor != null) { 1009 try { 1010 executor.close(); 1011 } catch (Exception e) { 1012 throw new IOException(e.getMessage(), e); 1013 } 1014 } 1015 } 1016 } 1017 1018 protected void copyFrom(boolean copyStructureOnly, DiskFile diskFile) throws IOException { 1019 var the = this; 1020 if (diskFile.isLink()) { 1021 the.setModifyTime(diskFile.getModifyTimeForClock()); 1022 the.createLink(diskFile.readLink()); 1023 } 1024 if (diskFile.isDir()) { 1025 the.setModifyTime(diskFile.getModifyTimeForClock()); 1026 the.mkdirs(); 1027 } 1028 if (diskFile.isFile()) { 1029 the.setModifyTime(diskFile.getModifyTimeForClock()); 1030 if (copyStructureOnly) { 1031 the.createStructureFile(diskFile.size()); 1032 diskFile.removeLogicAccess(); 1033 DiskFile.removeQueuePathMapping(diskFile.getPath()); 1034 } else { 1035 diskFile.split(filePartSize, new FilePart() { 1036 final List<byte[]> partDataList = new ArrayList<byte[]>(); 1037 int currentFilePartStartIndex = 0; 1038 1039 @Override 1040 protected void process(int partIndex, byte[] data) throws IOException { 1041 if (partIndex == 0) { 1042 if (diskFile.syncRoot != null) { 1043 if (diskFile.isLogicModify()) { 1044 throw new IOException( 1045 String.format("The file '%s' is already logic locked by another thread.", 1046 diskFile.getPath())); 1047 } 1048 } 1049 if (the.exists(true)) { 1050 throw new IOException( 1051 String.format("The remote file '%s' already exist.", the.getPath())); 1052 } 1053 if (isDebug(the)) { 1054 LoggerFactory.getLogger(RemoteFile.class).mark("CreateRemoteFile - {}", the.getPath()); 1055 } 1056 } 1057 1058 partDataList.add(data); 1059 if (partDataList.size() >= BATCH_SIZE) { 1060 diskFile.logicAccess(); 1061 if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){ 1062 throw new IOException(String.format("The remote file '%s' write failed.",the.getPath())); 1063 } 1064 partDataList.clear(); 1065 currentFilePartStartIndex += BATCH_SIZE; 1066 } 1067 } 1068 1069 @Override 1070 protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) 1071 throws IOException { 1072 try { 1073 if (diskFile.syncRoot != null) { 1074 if (diskFile.isLogicModify()) { 1075 throw new IOException( 1076 String.format("The file '%s' is already logic locked by another thread.", 1077 diskFile.getPath())); 1078 } 1079 } 1080 /**This is 0 bytes file**/ 1081 if (lastPartIndex == -1) { 1082 1083 if (the.exists(true)) { 1084 the.forceDeleteFile(false); 1085 } 1086 1087 the.setModifyTime(modifyTime); 1088 the.createEmptyFile(); 1089 } else { 1090 the.setModifyTime(modifyTime); 1091 if (partDataList.size() > 0) { 1092 if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){ 1093 throw new IOException(String.format("The remote file '%s' write failed.",the.getPath())); 1094 } 1095 partDataList.clear(); 1096 } 1097 the.complete(lastPartIndex, fileSize); 1098 } 1099 if (isDebug(the)) { 1100 LoggerFactory.getLogger(RemoteFile.class).mark("CompletedRemoteFile - {}", 1101 the.getPath()); 1102 } 1103 DiskFile.removeQueuePathMapping(diskFile.getPath()); 1104 } catch (IOException e) { 1105 throw e; 1106 } 1107 } 1108 1109 @Override 1110 protected void ended(int lastPartIndex, long fileSize) { 1111 1112 } 1113 1114 }); 1115 } 1116 } 1117 } 1118 1119 private boolean createEmptyFile() throws IOException { 1120 boolean allowed = beforeWrite(); 1121 1122 if(!allowed) return false; 1123 1124 Clock clock = new Clock(); 1125 Timestamp currentTime = clock.getSqlTimestamp(); 1126 CacheDriverExecutor executor = null; 1127 Connection conn = null; 1128 try { 1129 calculateDataMappingDataSourceWriter(this); 1130 conn = dataSourceWriter.getConnection(); 1131 conn.setAutoCommit(true); 1132 executor = new CacheDriverExecutor(conn); 1133 Map dataRecord = new HashMap<String, Object>(); 1134 dataRecord.put("id", 1135 generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix)); 1136 dataRecord.put("file_name", origin.getName()); 1137 dataRecord.put("file_parent_path", getParent()); 1138 dataRecord.put("file_type", "F"); 1139 dataRecord.put("file_part_index", 0); 1140 dataRecord.put("file_part_size", 0); 1141 dataRecord.put("file_part_data_ds", "NONE"); 1142 dataRecord.put("file_part_data_table", this.dataMappingTable); 1143 dataRecord.put("file_part_last_index", 0); 1144 dataRecord.put("file_size", 0); 1145 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1146 dataRecord.put("file_deleted", 0); 1147 dataRecord.put("created_user_id", getDefaultModifyUserId()); 1148 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1149 dataRecord.put("created_at", currentTime); 1150 dataRecord.put("updated_at", currentTime); 1151 dataRecord.put("file_id", fileId); 1152 dataRecord.put("source_hostname", CommonTools.getHostname()); 1153 dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1154 dataRecord.put("deleted_on_hostnames", ";"); 1155 executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord); 1156 1157 afterWrite(); 1158 1159 return true; 1160 } catch (Exception e) { 1161 throw new IOException(e.getMessage(), e); 1162 } finally { 1163 try { 1164 if (executor != null) 1165 executor.close(); 1166 } catch (Exception e) { 1167 log.error(e.getMessage(), e); 1168 } 1169 } 1170 } 1171 1172 private boolean createStructureFile(long fileSize) throws IOException { 1173 boolean allowed = beforeWrite(); 1174 1175 if(!allowed) return false; 1176 1177 log.debug("CreateStructureFile: {}", getPath()); 1178 Clock clock = new Clock(); 1179 Timestamp currentTime = clock.getSqlTimestamp(); 1180 CacheDriverExecutor executor = null; 1181 Connection conn = null; 1182 try { 1183 calculateDataMappingDataSourceWriter(this); 1184 conn = dataSourceWriter.getConnection(); 1185 conn.setAutoCommit(true); 1186 executor = new CacheDriverExecutor(conn); 1187 Map dataRecord = new HashMap<String, Object>(); 1188 dataRecord.put("id", 1189 generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix)); 1190 dataRecord.put("file_name", origin.getName()); 1191 dataRecord.put("file_parent_path", getParent()); 1192 dataRecord.put("file_type", "F"); 1193 dataRecord.put("file_part_index", 0); 1194 dataRecord.put("file_part_size", fileSize); 1195 dataRecord.put("file_part_data_ds", "NONE"); 1196 dataRecord.put("file_part_data_table", this.dataMappingTable); 1197 dataRecord.put("file_part_last_index", 0); 1198 dataRecord.put("file_size", fileSize); 1199 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1200 dataRecord.put("file_deleted", 0); 1201 dataRecord.put("created_user_id", getDefaultModifyUserId()); 1202 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1203 dataRecord.put("created_at", currentTime); 1204 dataRecord.put("updated_at", currentTime); 1205 dataRecord.put("file_id", fileId); 1206 dataRecord.put("source_hostname", CommonTools.getHostname()); 1207 dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1208 dataRecord.put("deleted_on_hostnames", ";"); 1209 executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord); 1210 1211 afterWrite(); 1212 1213 return true; 1214 } catch (Exception e) { 1215 throw new IOException(e.getMessage(), e); 1216 } finally { 1217 try { 1218 if (executor != null) 1219 executor.close(); 1220 } catch (Exception e) { 1221 log.error(e.getMessage(), e); 1222 } 1223 } 1224 } 1225 1226 protected boolean syncedToDisk() throws IOException { 1227 Clock clock = new Clock(); 1228 CacheDriverExecutor executor = null; 1229 Connection conn = null; 1230 try { 1231 conn = dataSourceWriter.getConnection(); 1232 conn.setAutoCommit(true); 1233 executor = new CacheDriverExecutor(conn); 1234 Map<String, Object> record = getCompletedRecord(); 1235 if (record == null) { 1236 log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath())); 1237 } 1238 if (record != null) { 1239 String hostname = CommonTools.getHostname(); 1240 String syncedOnHostnames = (String) record.get("synced_on_hostnames"); 1241 syncedOnHostnames = syncedOnHostnames.replaceAll(String.format(";%s;",hostname),";"); 1242 String shn = String.format("%s%s;", syncedOnHostnames, hostname); 1243 Map<String, Object> dataRecord = new HashMap<String, Object>(); 1244 dataRecord.put("file_parent_path",record.get("file_parent_path")); 1245 dataRecord.put("file_name", record.get("file_name")); 1246 dataRecord.put("synced_on_hostnames",shn); 1247 dataRecord.put("updated_at", clock.getSqlTimestamp()); 1248 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1249 executor.execute(formatSql(SQL_UPDATE_FOR_SYNCED_TO_DISK), dataRecord); 1250 return true; 1251 } 1252 return false; 1253 } catch (SQLException e) { 1254 throw new IOException(e.getMessage(), e); 1255 } finally { 1256 try { 1257 if (executor != null) 1258 executor.close(); 1259 } catch (Exception e) { 1260 log.error(e.getMessage(), e); 1261 } 1262 } 1263 } 1264 1265 protected boolean deletedToDiskAndSub(Object id) throws IOException { 1266 Clock clock = new Clock(); 1267 CacheDriverExecutor executor = null; 1268 Connection conn = null; 1269 try { 1270 conn = dataSourceWriter.getConnection(); 1271 conn.setAutoCommit(true); 1272 executor = new CacheDriverExecutor(conn); 1273 Map<String, Object> record = getRecordById(id); 1274 if (record == null) { 1275 log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath())); 1276 } 1277 if (record != null) { 1278 String hostname = CommonTools.getHostname(); 1279 String deletedOnHostnames = (String) record.get("deleted_on_hostnames"); 1280 deletedOnHostnames = deletedOnHostnames.replaceAll(String.format(";%s;",hostname),";"); 1281 String dhn = String.format("%s%s;", deletedOnHostnames, hostname); 1282 Timestamp now = clock.getSqlTimestamp(); 1283 final Map<String, Object> dataRecord = new HashMap<String, Object>(); 1284 dataRecord.put("file_parent_path", record.get("file_parent_path")); 1285 dataRecord.put("file_name", record.get("file_name")); 1286 dataRecord.put("deleted_on_hostnames", dhn); 1287 dataRecord.put("updated_at", now); 1288 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1289 executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK), dataRecord); 1290 if (record.get("file_type").equals("D")) { 1291 String dirPath = String.format("%s/%s", record.get("file_parent_path"), 1292 record.get("file_name")); 1293 dataRecord.put("file_parent_path", dirPath); 1294 dataRecord.put("like_file_parent_path", dirPath + "/%"); 1295 executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK_SUB_FILES), dataRecord); 1296 } 1297 return true; 1298 } 1299 return false; 1300 } catch (SQLException e) { 1301 throw new IOException(e.getMessage(), e); 1302 } finally { 1303 try { 1304 if (executor != null) 1305 executor.close(); 1306 } catch (Exception e) { 1307 log.error(e.getMessage(), e); 1308 } 1309 } 1310 } 1311 1312 private boolean complete(int filePartLastIndex, long fileSize) throws IOException { 1313 Clock clock = new Clock(); 1314 CacheDriverExecutor executor = null; 1315 Timestamp currentTime = clock.getSqlTimestamp(); 1316 Connection conn = null; 1317 try { 1318 conn = dataSourceWriter.getConnection(); 1319 conn.setAutoCommit(true); 1320 executor = new CacheDriverExecutor(conn); 1321 Map dataRecord = new HashMap<String, Object>(); 1322 dataRecord.put("file_id", fileId); 1323 dataRecord.put("file_part_last_index", filePartLastIndex); 1324 dataRecord.put("file_size", fileSize); 1325 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1326 dataRecord.put("updated_at", currentTime); 1327 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1328 int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord); 1329 1330 afterWrite(); 1331 1332 return rows > 0; 1333 } catch (SQLException e) { 1334 throw new IOException(e.getMessage(), e); 1335 } finally { 1336 try { 1337 if (executor != null) 1338 executor.close(); 1339 } catch (Exception e) { 1340 log.error(e.getMessage(), e); 1341 } 1342 } 1343 } 1344 1345 @Override 1346 public boolean complete() throws IOException { 1347 Clock clock = new Clock(); 1348 Timestamp currentTime = clock.getSqlTimestamp(); 1349 CacheDriverExecutor executor = null; 1350 Connection conn = null; 1351 try { 1352 conn = dataSourceWriter.getConnection(); 1353 conn.setAutoCommit(true); 1354 executor = new CacheDriverExecutor(conn); 1355 Map dataRecord = new HashMap<String, Object>(); 1356 dataRecord.put("file_name", origin.getName()); 1357 dataRecord.put("file_parent_path", getParent()); 1358 dataRecord.put("file_id", fileId); 1359 1360 Map<String, Object> sumRecord = executor.first(formatSql(SQL_SELECT_FOR_SUM_FILE_PART_SIZE_INDEX), 1361 dataRecord); 1362 1363 if (sumRecord == null) { 1364 log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath())); 1365 } 1366 1367 if (sumRecord != null) { 1368 long fileSize = Long.parseLong(sumRecord.get("file_part_size") + ""); 1369 long filePartLastIndex = Integer.parseInt(sumRecord.get("file_part_index") + ""); 1370 1371 dataRecord.put("file_part_last_index", filePartLastIndex); 1372 dataRecord.put("file_size", fileSize); 1373 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1374 dataRecord.put("updated_at", currentTime); 1375 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1376 int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord); 1377 1378 afterWrite(); 1379 1380 return rows > 0; 1381 } 1382 return false; 1383 } catch (SQLException e) { 1384 throw new IOException(e.getMessage(), e); 1385 } finally { 1386 try { 1387 if (executor != null) 1388 executor.close(); 1389 } catch (Exception e) { 1390 log.error(e.getMessage(), e); 1391 } 1392 } 1393 } 1394 1395 public boolean forceDelete() throws IOException { 1396 return forceDeleteAll(true); 1397 } 1398 1399 public boolean forceDeleteDir(boolean realDeleted) throws IOException { 1400 return forceDelete(realDeleted, "D"); 1401 } 1402 1403 public boolean forceDeleteLink(boolean realDeleted) throws IOException { 1404 return forceDelete(realDeleted, "L"); 1405 } 1406 1407 public boolean forceDeleteFile(boolean realDeleted) throws IOException { 1408 return forceDelete(realDeleted, "F"); 1409 } 1410 1411 public boolean forceDelete(boolean realDeleted, String fileType) throws IOException { 1412 if (fileType.equalsIgnoreCase("D")) { 1413 return forceDeleteAll(realDeleted); 1414 } else { 1415 boolean allowed = beforeDelete(realDeleted); 1416 if (allowed) { 1417 CacheDriverExecutor executor = null; 1418 Connection conn = null; 1419 try { 1420 Clock clock = new Clock(); 1421 Timestamp currentTime = clock.getSqlTimestamp(); 1422 conn = dataSourceWriter.getConnection(); 1423 conn.setAutoCommit(true); 1424 executor = new CacheDriverExecutor(conn); 1425 Map dataRecord = new HashMap<String, Object>(); 1426 dataRecord.put("file_deleted", realDeleted ? 1 : 0); 1427 dataRecord.put("deleted", realDeleted ? 0 : 1); 1428 dataRecord.put("file_name", origin.getName()); 1429 dataRecord.put("file_type", fileType.toUpperCase()); 1430 dataRecord.put("file_parent_path", getParent()); 1431 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1432 dataRecord.put("deleted_user_id", getDefaultModifyUserId()); 1433 dataRecord.put("updated_at", currentTime); 1434 dataRecord.put("deleted_at", currentTime); 1435 dataRecord.put("root_folder", getPath()); 1436 dataRecord.put("like_subfolder", parsePath(getPath() + "/%")); 1437 int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_FILE_LINK), dataRecord); 1438 log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows); 1439 1440 afterDelete(realDeleted); 1441 1442 return resultRows > 0; 1443 } catch (SQLException e) { 1444 throw new IOException(e.getMessage(), e); 1445 } finally { 1446 try { 1447 if (executor != null) 1448 executor.close(); 1449 } catch (Exception e) { 1450 log.error(e.getMessage(), e); 1451 } 1452 } 1453 } 1454 } 1455 return false; 1456 } 1457 1458 public boolean forceDeleteAll(boolean realDeleted) throws IOException { 1459 boolean allowed = beforeDelete(realDeleted); 1460 if (allowed) { 1461 CacheDriverExecutor executor = null; 1462 Connection conn = null; 1463 try { 1464 Clock clock = new Clock(); 1465 Timestamp currentTime = clock.getSqlTimestamp(); 1466 conn = dataSourceWriter.getConnection(); 1467 conn.setAutoCommit(true); 1468 executor = new CacheDriverExecutor(conn); 1469 Map dataRecord = new HashMap<String, Object>(); 1470 dataRecord.put("file_deleted", realDeleted ? 1 : 0); 1471 dataRecord.put("deleted", realDeleted ? 0 : 1); 1472 dataRecord.put("file_name", origin.getName()); 1473 dataRecord.put("file_parent_path", getParent()); 1474 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1475 dataRecord.put("deleted_user_id", getDefaultModifyUserId()); 1476 dataRecord.put("updated_at", currentTime); 1477 dataRecord.put("deleted_at", currentTime); 1478 dataRecord.put("root_folder", getPath()); 1479 dataRecord.put("like_subfolder", parsePath(getPath() + "/%")); 1480 int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_ALL), dataRecord); 1481 log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows); 1482 1483 afterDelete(realDeleted); 1484 1485 return resultRows > 0; 1486 } catch (SQLException e) { 1487 throw new IOException(e.getMessage(), e); 1488 } finally { 1489 try { 1490 if (executor != null) 1491 executor.close(); 1492 } catch (Exception e) { 1493 log.error(e.getMessage(), e); 1494 } 1495 } 1496 } 1497 return false; 1498 } 1499 1500 @Override 1501 public boolean delete() throws IOException { 1502 return forceDeleteAll(true); 1503 } 1504 1505 public boolean delete(boolean realDeleted) throws IOException { 1506 return forceDeleteAll(realDeleted); 1507 } 1508 1509 @Override 1510 public boolean beforeDelete(boolean realDeleted) { 1511 return true; 1512 } 1513 1514 @Override 1515 public void afterDelete(boolean realDeleted) { 1516 1517 } 1518 1519 @Override 1520 public boolean beforeMkdirs() { 1521 return true; 1522 } 1523 1524 @Override 1525 public void afterMkdirs() { 1526 1527 } 1528 1529 @Override 1530 public boolean beforeCreateLink(String target) { 1531 return true; 1532 } 1533 1534 @Override 1535 public void afterCreateLink(String target) { 1536 1537 } 1538 1539 @Override 1540 public boolean beforeWrite(){ 1541 return true; 1542 } 1543 1544 @Override 1545 public void afterWrite(){ 1546 1547 } 1548 1549 @Override 1550 public boolean exists() throws IOException { 1551 return getFirstFilePart() != null; 1552 } 1553 1554 public boolean exists(boolean refresh) throws IOException { 1555 if(refresh){ 1556 return getFirstFilePartRefresh() != null; 1557 }else{ 1558 return exists(); 1559 } 1560 } 1561 1562 @Override 1563 public boolean mkdirs() throws IOException { 1564 return mkdirs(false); 1565 } 1566 1567 public boolean mkdirs(boolean checkSourceHostname) throws IOException { 1568 1569 boolean allowed = beforeMkdirs(); 1570 1571 if(!allowed) return false; 1572 1573 Clock clock = new Clock(); 1574 CacheDriverExecutor executor = null; 1575 Connection conn = null; 1576 try { 1577 if (getFirstFilePart() != null) { 1578 return false; 1579 } 1580 if (checkSourceHostname) { 1581 String lastSourceHostname = getLastSourceHostname(); 1582 if (lastSourceHostname != null) { 1583 String clientHostname = CommonTools.getHostname(); 1584 boolean sameHostname = clientHostname.equals(lastSourceHostname); 1585 if (!sameHostname) { 1586 return false; 1587 } 1588 } 1589 } 1590 log.debug("Mkidrs: {}", getPath()); 1591 calculateDataMappingDataSourceWriter(this); 1592 Timestamp currentTime = clock.getSqlTimestamp(); 1593 conn = dataSourceWriter.getConnection(); 1594 conn.setAutoCommit(true); 1595 executor = new CacheDriverExecutor(conn); 1596 Map dataRecord = new HashMap<String, Object>(); 1597 dataRecord.put("id", 1598 generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix)); 1599 dataRecord.put("file_name", origin.getName()); 1600 dataRecord.put("file_parent_path", getParent()); 1601 dataRecord.put("file_type", "D"); 1602 dataRecord.put("file_part_index", 0); 1603 dataRecord.put("file_part_size", 0); 1604 dataRecord.put("file_part_data_ds", "NONE"); 1605 dataRecord.put("file_part_data_table", this.dataMappingTable); 1606 dataRecord.put("file_part_last_index", 0); 1607 dataRecord.put("file_size", 0); 1608 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1609 dataRecord.put("file_deleted", 0); 1610 dataRecord.put("created_user_id", getDefaultModifyUserId()); 1611 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1612 dataRecord.put("created_at", currentTime); 1613 dataRecord.put("updated_at", currentTime); 1614 dataRecord.put("file_id", fileId); 1615 dataRecord.put("source_hostname", CommonTools.getHostname()); 1616 dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1617 dataRecord.put("deleted_on_hostnames", ";"); 1618 int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord); 1619 1620 afterMkdirs(); 1621 1622 return rows > 0; 1623 } catch (Exception e) { 1624 throw new IOException(e.getMessage(), e); 1625 } finally { 1626 try { 1627 if (executor != null) 1628 executor.close(); 1629 } catch (Exception e) { 1630 log.error(e.getMessage(), e); 1631 } 1632 } 1633 } 1634 1635 @Override 1636 public boolean write(byte[] data, boolean append) throws IOException { 1637 return writeTo(data, append); 1638 } 1639 1640 public synchronized boolean writeBlock(byte[] data, boolean append) throws IOException { 1641 return writeTo(data, append); 1642 } 1643 1644 private boolean writeToBatch(int currentFilePartStartIndex, List<byte[]> partDataList) throws IOException { 1645 1646 boolean allowed = beforeWrite(); 1647 1648 if(!allowed) return false; 1649 1650 CacheDriverExecutor executor = null; 1651 CacheDriverExecutor dmExecutor = null; 1652 List<Map<String, Object>> rfList = new ArrayList<Map<String, Object>>(); 1653 List<Map<String, Object>> dmList = new ArrayList<Map<String, Object>>(); 1654 try { 1655 Clock clock = new Clock(); 1656 log.debug("Write: {}", getPath()); 1657 int size = partDataList.size(); 1658 Timestamp currentTime = clock.getSqlTimestamp(); 1659 String hostname = CommonTools.getHostname(); 1660 calculateDataMappingDataSourceWriter(this); 1661 for (int filePartIndex = 0; filePartIndex < size; filePartIndex++) { 1662 byte[] data = partDataList.get(filePartIndex); 1663 String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix, 1664 identityCardSuffix); 1665 Map rfRecord = new HashMap<String, Object>(); 1666 rfRecord.put("id", remoteFileId); 1667 rfRecord.put("file_name", origin.getName()); 1668 rfRecord.put("file_parent_path", getParent()); 1669 rfRecord.put("file_type", "F"); 1670 rfRecord.put("file_part_index", filePartIndex + currentFilePartStartIndex); 1671 rfRecord.put("file_part_size", data.length); 1672 rfRecord.put("file_part_data_ds", this.dataMappingDs); 1673 rfRecord.put("file_part_data_table", this.dataMappingTable); 1674 rfRecord.put("file_part_last_index", -1); 1675 rfRecord.put("file_size", 0); 1676 rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1677 rfRecord.put("file_deleted", 0); 1678 rfRecord.put("created_user_id", getDefaultModifyUserId()); 1679 rfRecord.put("updated_user_id", getDefaultModifyUserId()); 1680 rfRecord.put("created_at", currentTime); 1681 rfRecord.put("updated_at", currentTime); 1682 rfRecord.put("file_id", fileId); 1683 rfRecord.put("source_hostname", hostname); 1684 rfRecord.put("synced_on_hostnames", String.format(";%s;", hostname)); 1685 rfRecord.put("deleted_on_hostnames", ";"); 1686 rfList.add(rfRecord); 1687 1688 Map<String, Object> dataRecord = new HashMap<String, Object>(); 1689 dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this)); 1690 dataRecord.put("file_part_data", encrypt(data)); 1691 dataRecord.put("remote_file_id", remoteFileId); 1692 dataRecord.put("file_id", fileId); 1693 dataRecord.put("created_at", currentTime); 1694 dataRecord.put("updated_at", currentTime); 1695 dmList.add(dataRecord); 1696 } 1697 Connection conn = dataSourceWriter.getConnection(); 1698 conn.setAutoCommit(true); 1699 executor = new CacheDriverExecutor(conn); 1700 executor.executeBatch(formatSql(SQL_INSERT_INTO_FILE), rfList); 1701 1702 conn = dataMappingDataSourceWriter.getConnection(); 1703 conn.setAutoCommit(true); 1704 dmExecutor = new CacheDriverExecutor(conn); 1705 dmExecutor.executeBatch(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dmList); 1706 1707 return true; 1708 } catch (Exception e) { 1709 throw new IOException(e.getMessage(), e); 1710 } finally { 1711 try { 1712 if (executor != null) 1713 executor.close(); 1714 } catch (Exception e) { 1715 log.error(e.getMessage(), e); 1716 } 1717 try { 1718 if (dmExecutor != null) 1719 dmExecutor.close(); 1720 } catch (Exception e) { 1721 log.error(e.getMessage(), e); 1722 } 1723 } 1724 } 1725 1726 private boolean writeTo(byte[] data, boolean append) throws IOException { 1727 1728 boolean allowed = beforeWrite(); 1729 1730 if(!allowed) return false; 1731 1732 CacheDriverExecutor executor = null; 1733 CacheDriverExecutor dmExecutor = null; 1734 try { 1735 if (!append) { 1736 if (exists()) 1737 throw new IOException( 1738 String.format("The remote file '%s' already exists.", origin.getAbsolutePath())); 1739 } 1740 Clock clock = new Clock(); 1741 Timestamp currentTime = clock.getSqlTimestamp(); 1742 Map<String, Object> lastRecord = getLastIndexRecord(); 1743 int filePartIndex = 0; 1744 if (lastRecord != null) { 1745 if (lastRecord.get("file_type").equals("D")) { 1746 throw new IOException(String.format("This is a folder '%s'.", origin.getAbsolutePath())); 1747 } 1748 if (lastRecord.get("file_type").equals("L")) { 1749 throw new IOException(String.format("This is a link '%s'.", origin.getAbsolutePath())); 1750 } 1751 if (append) { 1752 filePartIndex = Integer.parseInt(lastRecord.get("file_part_index") + "") + 1; 1753 } 1754 boolean isDeleted = false; 1755 Object deletedValue = lastRecord.get("deleted"); 1756 if (deletedValue instanceof Boolean) { 1757 isDeleted = (Boolean) deletedValue; 1758 } else { 1759 isDeleted = Integer.parseInt(deletedValue + "") == 1; 1760 } 1761 if (isDeleted) { 1762 throw new IOException(String.format("The file '%s' is deleted.", origin.getAbsolutePath())); 1763 } 1764 } 1765 calculateDataMappingDataSourceWriter(this); 1766 log.debug("Write: {}", getPath()); 1767 String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix, 1768 identityCardSuffix); 1769 Map rfRecord = new HashMap<String, Object>(); 1770 rfRecord.put("id", remoteFileId); 1771 rfRecord.put("file_name", origin.getName()); 1772 rfRecord.put("file_parent_path", getParent()); 1773 rfRecord.put("file_type", "F"); 1774 rfRecord.put("file_part_index", filePartIndex); 1775 rfRecord.put("file_part_size", data.length); 1776 rfRecord.put("file_part_data_ds", this.dataMappingDs); 1777 rfRecord.put("file_part_data_table", this.dataMappingTable); 1778 rfRecord.put("file_part_last_index", -1); 1779 rfRecord.put("file_size", 0); 1780 rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1781 rfRecord.put("file_deleted", 0); 1782 rfRecord.put("created_user_id", getDefaultModifyUserId()); 1783 rfRecord.put("updated_user_id", getDefaultModifyUserId()); 1784 rfRecord.put("created_at", currentTime); 1785 rfRecord.put("updated_at", currentTime); 1786 rfRecord.put("file_id", fileId); 1787 rfRecord.put("source_hostname", CommonTools.getHostname()); 1788 rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1789 rfRecord.put("deleted_on_hostnames", ";"); 1790 1791 Connection conn = dataSourceWriter.getConnection(); 1792 conn.setAutoCommit(true); 1793 executor = new CacheDriverExecutor(conn); 1794 executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord); 1795 1796 Map dataRecord = new HashMap<String, Object>(); 1797 dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this)); 1798 dataRecord.put("file_part_data", encrypt(data)); 1799 dataRecord.put("remote_file_id", remoteFileId); 1800 dataRecord.put("file_id", fileId); 1801 dataRecord.put("created_at", currentTime); 1802 dataRecord.put("updated_at", currentTime); 1803 1804 conn = dataMappingDataSourceWriter.getConnection(); 1805 conn.setAutoCommit(true); 1806 dmExecutor = new CacheDriverExecutor(conn); 1807 dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord); 1808 1809 return true; 1810 } catch (Exception e) { 1811 throw new IOException(e.getMessage(), e); 1812 } finally { 1813 try { 1814 if (executor != null) 1815 executor.close(); 1816 } catch (Exception e) { 1817 log.error(e.getMessage(), e); 1818 } 1819 try { 1820 if (dmExecutor != null) 1821 dmExecutor.close(); 1822 } catch (Exception e) { 1823 log.error(e.getMessage(), e); 1824 } 1825 } 1826 } 1827 1828 @Override 1829 public boolean write(byte[] data) throws IOException { 1830 return write(data, false); 1831 } 1832 1833 @Override 1834 public boolean write(String data, boolean append) throws IOException { 1835 return write(data.getBytes(CHARSET), append); 1836 } 1837 1838 @Override 1839 public boolean write(String data) throws IOException { 1840 return write(data.getBytes(CHARSET)); 1841 } 1842 1843 @Override 1844 public boolean createLink(String target) throws IOException { 1845 boolean allowed = beforeCreateLink(target); 1846 1847 if(!allowed) return false; 1848 1849 Clock clock = new Clock(); 1850 CacheDriverExecutor executor = null; 1851 CacheDriverExecutor dmExecutor = null; 1852 try { 1853 if (getFirstFilePart() != null) { 1854 return false; 1855 } 1856 log.debug("CreateLink: {}", getPath()); 1857 Timestamp currentTime = clock.getSqlTimestamp(); 1858 calculateDataMappingDataSourceWriter(this); 1859 String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix, 1860 identityCardSuffix); 1861 byte[] data = target.getBytes(CHARSET); 1862 Map rfRecord = new HashMap<String, Object>(); 1863 rfRecord.put("id", remoteFileId); 1864 rfRecord.put("file_name", origin.getName()); 1865 rfRecord.put("file_parent_path", getParent()); 1866 rfRecord.put("file_type", "L"); 1867 rfRecord.put("file_part_index", 0); 1868 rfRecord.put("file_part_size", data.length); 1869 rfRecord.put("file_part_data_ds", this.dataMappingDs); 1870 rfRecord.put("file_part_data_table", this.dataMappingTable); 1871 rfRecord.put("file_part_last_index", -1); 1872 rfRecord.put("file_size", 0); 1873 rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1874 rfRecord.put("file_deleted", 0); 1875 rfRecord.put("created_user_id", getDefaultModifyUserId()); 1876 rfRecord.put("updated_user_id", getDefaultModifyUserId()); 1877 rfRecord.put("created_at", currentTime); 1878 rfRecord.put("updated_at", currentTime); 1879 rfRecord.put("file_id", fileId); 1880 rfRecord.put("source_hostname", CommonTools.getHostname()); 1881 rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1882 rfRecord.put("deleted_on_hostnames", ";"); 1883 1884 Connection conn = dataSourceWriter.getConnection(); 1885 conn.setAutoCommit(true); 1886 executor = new CacheDriverExecutor(conn); 1887 int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord); 1888 1889 Map dataRecord = new HashMap<String, Object>(); 1890 dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this)); 1891 dataRecord.put("file_part_data", encrypt(data)); 1892 dataRecord.put("remote_file_id", remoteFileId); 1893 dataRecord.put("file_id", fileId); 1894 dataRecord.put("created_at", currentTime); 1895 dataRecord.put("updated_at", currentTime); 1896 1897 conn = dataMappingDataSourceWriter.getConnection(); 1898 conn.setAutoCommit(true); 1899 dmExecutor = new CacheDriverExecutor(conn); 1900 dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord); 1901 1902 complete(); 1903 1904 afterCreateLink(target); 1905 1906 return rows > 0; 1907 } catch (Exception e) { 1908 throw new IOException(e.getMessage(), e); 1909 } finally { 1910 try { 1911 if (executor != null) 1912 executor.close(); 1913 } catch (Exception e) { 1914 log.error(e.getMessage(), e); 1915 } 1916 try { 1917 if (dmExecutor != null) 1918 dmExecutor.close(); 1919 } catch (Exception e) { 1920 log.error(e.getMessage(), e); 1921 } 1922 } 1923 } 1924 1925 @Override 1926 public String readLink() throws IOException { 1927 return new String(readAllBytes(),BaseFile.CHARSET); 1928 } 1929 1930 @Override 1931 public byte[] readAllBytes() throws IOException { 1932 return readAllBytesFrom(); 1933 } 1934 1935 private byte[] readAllBytesFrom() throws IOException { 1936 if (isDir()) { 1937 throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath())); 1938 } 1939 if (!exists()) { 1940 throw new IOException(String.format("The file '%s' does not exist.", origin.getAbsolutePath())); 1941 } 1942 CacheDriverExecutor executor = null; 1943 Connection conn = null; 1944 ByteArrayOutputStream baos = null; 1945 try { 1946 conn = dataSourceReader.getConnection(); 1947 conn.setAutoCommit(true); 1948 executor = new CacheDriverExecutor(conn); 1949 Map dataRecord = getCompletedRecord(); 1950 List<Map<String, Object>> filePartList = executor.find(formatSql(SQL_SELECT_FILE), dataRecord); 1951 baos = new ByteArrayOutputStream(); 1952 for (Map<String, Object> fpItem : filePartList) { 1953 Map<String, Object> fp = readPart(fpItem); 1954 Object fpData = null; 1955 1956 if(fp == null){ 1957 fpData = new byte[]{}; 1958 }else{ 1959 fpData = fp.get("file_part_data"); 1960 } 1961 1962 if (fpData instanceof byte[]) { 1963 byte[] partData = (byte[]) fpData; 1964 baos.write(decrypt(partData)); 1965 baos.flush(); 1966 } 1967 } 1968 return baos == null ? null : baos.toByteArray(); 1969 } catch (Exception e) { 1970 throw new IOException(e.getMessage(), e); 1971 } finally { 1972 try { 1973 if (baos != null) 1974 baos.close(); 1975 } catch (Exception e) { 1976 log.error(e.getMessage(), e); 1977 } 1978 try { 1979 if (executor != null) 1980 executor.close(); 1981 } catch (Exception e) { 1982 log.error(e.getMessage(), e); 1983 } 1984 } 1985 } 1986 1987 @Override 1988 public boolean isFile() throws IOException { 1989 Map<String, Object> lastRecord = getFirstFilePart(); 1990 1991 if (lastRecord == null) { 1992 return false; 1993 } 1994 return lastRecord.get("file_type").equals("F"); 1995 } 1996 1997 @Override 1998 public boolean isDirectory() throws IOException { 1999 Map<String, Object> lastRecord = getFirstFilePart(); 2000 2001 if (lastRecord == null) { 2002 return false; 2003 } 2004 return lastRecord.get("file_type").equals("D"); 2005 } 2006 2007 @Override 2008 public boolean isLink() throws IOException { 2009 Map<String, Object> lastRecord = getFirstFilePart(); 2010 2011 if (lastRecord == null) { 2012 return false; 2013 } 2014 return lastRecord.get("file_type").equals("L"); 2015 } 2016 2017 @Override 2018 public String readAllString() throws IOException { 2019 ByteArrayOutputStream baos = null; 2020 try { 2021 byte[] bytes = readAllBytes(); 2022 2023 if(bytes == null) return null; 2024 2025 baos = new ByteArrayOutputStream(); 2026 baos.write(bytes); 2027 baos.flush(); 2028 return baos.toString(); 2029 } finally { 2030 if (baos != null) { 2031 try { 2032 baos.close(); 2033 } catch (IOException e) { 2034 log.error(e.getMessage(), e); 2035 } 2036 } 2037 } 2038 } 2039 2040 @Override 2041 public long size() throws IOException { 2042 Map<String, Object> record = getCompletedRecord(); 2043 if (record != null) { 2044 return Long.parseLong(record.get("file_size") + ""); 2045 } 2046 return 0L; 2047 } 2048 2049 public List<Map<String, Object>> list() throws IOException { 2050 return list(getPath(), "%"); 2051 } 2052 2053 public List<Map<String, Object>> list(String likeFileName) throws IOException { 2054 return list(getPath(), likeFileName); 2055 } 2056 2057 public List<Map<String, Object>> list(String likeFolderName, String likeFileName) throws IOException { 2058 CacheDriverExecutor executor = null; 2059 Connection conn = null; 2060 try { 2061 conn = dataSourceReader.getConnection(); 2062 conn.setAutoCommit(true); 2063 executor = new CacheDriverExecutor(conn); 2064 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2065 if (CommonTools.isBlank(likeFileName)) { 2066 dataRecord.put("file_name", "%"); 2067 } else { 2068 dataRecord.put("file_name", likeFileName); 2069 } 2070 if (CommonTools.isBlank(likeFolderName)) { 2071 dataRecord.put("file_parent_path", "%"); 2072 } else { 2073 dataRecord.put("file_parent_path", likeFolderName); 2074 } 2075 return executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord); 2076 } catch (SQLException e) { 2077 throw new IOException(e.getMessage(), e); 2078 } finally { 2079 if (executor != null) { 2080 try { 2081 executor.close(); 2082 } catch (Exception e) { 2083 throw new IOException(e.getMessage(), e); 2084 } 2085 } 2086 } 2087 } 2088 2089 public void listAll(String likeFileName, CacheArray rows) throws IOException { 2090 listAll(getPath(), likeFileName, rows); 2091 } 2092 2093 public void listAll(CacheArray rows) throws IOException { 2094 listAll(getPath(), "%", rows); 2095 } 2096 2097 public void listAll(String likeFolderName, String likeFileName, CacheArray rows) throws IOException { 2098 CacheDriverExecutor executor = null; 2099 Connection conn = null; 2100 try { 2101 conn = dataSourceReader.getConnection(); 2102 conn.setAutoCommit(true); 2103 executor = new CacheDriverExecutor(conn); 2104 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2105 if (CommonTools.isBlank(likeFileName)) { 2106 dataRecord.put("file_name", "%"); 2107 } else { 2108 dataRecord.put("file_name", likeFileName); 2109 } 2110 if (CommonTools.isBlank(likeFolderName)) { 2111 dataRecord.put("file_parent_path", "%"); 2112 } else { 2113 dataRecord.put("file_parent_path", likeFolderName); 2114 } 2115 executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord, rows); 2116 } catch (SQLException e) { 2117 throw new IOException(e.getMessage(), e); 2118 } finally { 2119 if (executor != null) { 2120 try { 2121 executor.close(); 2122 } catch (Exception e) { 2123 throw new IOException(e.getMessage(), e); 2124 } 2125 } 2126 } 2127 } 2128 2129 protected void listAllForDelete(String likeFolderName, String likeFileName, CacheArray rows) throws IOException { 2130 CacheDriverExecutor executor = null; 2131 Connection conn = null; 2132 try { 2133 long deleteCutoffTimeMs = 0L; 2134 2135 if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime(); 2136 2137 Timestamp currentTime = new Clock().getSqlTimestamp(); 2138 2139 conn = dataSourceReader.getConnection(); 2140 conn.setAutoCommit(true); 2141 executor = new CacheDriverExecutor(conn); 2142 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2143 if (CommonTools.isBlank(likeFileName)) { 2144 dataRecord.put("file_name", "%"); 2145 } else { 2146 dataRecord.put("file_name", likeFileName); 2147 } 2148 if (CommonTools.isBlank(likeFolderName)) { 2149 dataRecord.put("file_parent_path", "%"); 2150 } else { 2151 dataRecord.put("file_parent_path", likeFolderName); 2152 } 2153 dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 2154 dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs)); 2155 dataRecord.put("end_time", currentTime); 2156 dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME)); 2157 executor.find(0, MAX_QUEUE_SIZE,formatSql(SQL_SELECT_FOR_LIST_ALL_DELETE), dataRecord, rows); 2158 } catch (SQLException e) { 2159 throw new IOException(e.getMessage(), e); 2160 } finally { 2161 if (executor != null) { 2162 try { 2163 executor.close(); 2164 } catch (Exception e) { 2165 throw new IOException(e.getMessage(), e); 2166 } 2167 } 2168 } 2169 } 2170 2171 protected void listAllForSync(String likeFolderName, String likeFileName, CacheArray rows) throws IOException { 2172 CacheDriverExecutor executor = null; 2173 Connection conn = null; 2174 try { 2175 long deleteCutoffTimeMs = 0L; 2176 2177 if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = SYNC_CUTOFF_TIME.getTime(); 2178 2179 Timestamp currentTime = new Clock().getSqlTimestamp(); 2180 2181 conn = dataSourceReader.getConnection(); 2182 conn.setAutoCommit(true); 2183 executor = new CacheDriverExecutor(conn); 2184 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2185 if (CommonTools.isBlank(likeFileName)) { 2186 dataRecord.put("file_name", "%"); 2187 } else { 2188 dataRecord.put("file_name", likeFileName); 2189 } 2190 if (CommonTools.isBlank(likeFolderName)) { 2191 dataRecord.put("file_parent_path", "%"); 2192 } else { 2193 dataRecord.put("file_parent_path", likeFolderName); 2194 } 2195 dataRecord.put("synced_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 2196 dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs)); 2197 dataRecord.put("end_time", currentTime); 2198 dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME)); 2199 executor.find(0, MAX_QUEUE_SIZE, formatSql(SQL_SELECT_FOR_LIST_ALL_SYNC), dataRecord, rows); 2200 } catch (SQLException e) { 2201 throw new IOException(e.getMessage(), e); 2202 } finally { 2203 if (executor != null) { 2204 try { 2205 executor.close(); 2206 } catch (Exception e) { 2207 throw new IOException(e.getMessage(), e); 2208 } 2209 } 2210 } 2211 } 2212 2213 public void setModifyUserId(String modifyUserId) { 2214 this.modifyUserId = modifyUserId; 2215 } 2216 2217 private String getDefaultModifyUserId() { 2218 if (CommonTools.isBlank(modifyUserId)) { 2219 return DEFAULE_MODIFY_USER_ID; 2220 } else { 2221 return modifyUserId; 2222 } 2223 } 2224 2225 @Override 2226 public void setModifyTime(Timestamp modifyTime) throws IOException { 2227 this.modifyTime = modifyTime; 2228 } 2229 2230 @Override 2231 public Timestamp getModifyTime() throws IOException { 2232 Map<String, Object> record = getFirstFilePart(); 2233 if (record != null) { 2234 return (Timestamp) record.get("file_modify_time"); 2235 } 2236 return null; 2237 } 2238 2239 public boolean isTimeout() throws IOException { 2240 return isTimeout(LOGIC_TIMEOUT_MS); 2241 } 2242 2243 @Override 2244 public void copyTo(String toPath) throws IOException { 2245 log.debug("CopyTo: {} -> {}", getPath(), toPath); 2246 if (!exists()) { 2247 log.mark(String.format("The remote file '%s' does not exist.", this.getPath())); 2248 } 2249 RemoteFile destRf = new RemoteFile(toPath); 2250 final Timestamp originModifyTime = this.getModifyTime(); 2251 destRf.setModifyUserId(modifyUserId); 2252 destRf.setModifyTime(originModifyTime); 2253 if (isLink()) { 2254 if (destRf.exists(true)) { 2255 destRf.forceDeleteLink(false); 2256 } 2257 destRf.createLink(readLink()); 2258 } 2259 if (isFile()) { 2260 final RemoteFile _destRf = destRf; 2261 if (_destRf.exists(true)) { 2262 _destRf.forceDeleteFile(false); 2263 } 2264 merge(new FilePart() { 2265 @Override 2266 protected void process(int partIndex, byte[] data) { 2267 try { 2268 _destRf.write(decrypt(data), partIndex > 0); 2269 } catch (Exception e) { 2270 log.error(e.getMessage(), e); 2271 } 2272 } 2273 2274 @Override 2275 protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) { 2276 try { 2277 /**This is 0 bytes file**/ 2278 if (lastPartIndex == -1) { 2279 2280 if (_destRf.exists(true)) 2281 _destRf.forceDeleteFile(false); 2282 2283 _destRf.setModifyTime(originModifyTime); 2284 _destRf.createEmptyFile(); 2285 } else { 2286 _destRf.setModifyTime(originModifyTime); 2287 _destRf.complete(); 2288 } 2289 } catch (IOException e) { 2290 log.error(e.getMessage(), e); 2291 } 2292 } 2293 2294 @Override 2295 protected void ended(int lastPartIndex, long fileSize) { 2296 2297 } 2298 2299 }); 2300 } 2301 if (isDir()) { 2302 List<Map<String, Object>> copyList = list(getPath(), "%"); 2303 int size = copyList.size(); 2304 if (size > 0) { 2305 if (!destRf.exists(true)) { 2306 destRf.mkdirs(); 2307 } 2308 } 2309 String copyRootPath = getPath(); 2310 copyList = list(parsePath(getPath() + "/%"), "%"); 2311 if (copyList.size() > 0) { 2312 if (!destRf.exists(true)) { 2313 destRf.mkdirs(); 2314 } 2315 } 2316 for (Map<String, Object> item : copyList) { 2317 String itemParentPath = (String) item.get("file_parent_path"); 2318 String destParentPath = itemParentPath.replaceFirst(copyRootPath + "/", toPath); 2319 RemoteFile sourceRf = new RemoteFile( 2320 String.format("%s/%s", item.get("file_parent_path"), item.get("file_name"))); 2321 String destRfPath = new File(String.format("%s/%s", destParentPath, item.get("file_name"))) 2322 .getAbsolutePath(); 2323 if (sourceRf.isDir()) { 2324 RemoteFile tmpRf = new RemoteFile(destRfPath + "/"); 2325 tmpRf.setModifyTime(sourceRf.getModifyTime()); 2326 if (!tmpRf.exists(true)) { 2327 tmpRf.mkdirs(); 2328 } 2329 } else { 2330 sourceRf.copyTo(destRfPath); 2331 } 2332 } 2333 } 2334 } 2335 2336 @Override 2337 public void moveTo(String toPath) throws IOException { 2338 log.debug("MoveTo: {} -> {}", getPath(), toPath); 2339 RemoteFile destRf = new RemoteFile(toPath); 2340 copyTo(toPath); 2341 boolean exists = this.exists(true); 2342 if (exists && destRf.exists(true) && destRf.isCompleted()) { 2343 forceDeleteAll(true); 2344 } 2345 } 2346 2347 private void startFullAsyncThreadForDelete(boolean subFolders, long timerMs, String replaceTo, 2348 Runnable completedCallback) { 2349 RemoteFile the = this; 2350 try { 2351 if (syncRoot == null) { 2352 File replaceToFile = new File(replaceTo); 2353 if (replaceToFile.getParent() == null) { 2354 throw new IOException("Cannot execute delete task."); 2355 } 2356 String replaceToParent = replacePath(replaceToFile.getParent()); 2357 syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName())); 2358 } 2359 2360 CacheArray rows = new CacheArray(); 2361 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 2362 @Override 2363 public void execute(Integer index, Object o) { 2364 Map<String, Object> item = (Map<String, Object>) o; 2365 String fileName = (String) item.get("file_name"); 2366 String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName); 2367 Thread.currentThread() 2368 .setName(String.format("RemoteFile-startFullAsyncThreadForDelete-%s", fileName)); 2369 RemoteFile rf = new RemoteFile(fullPath); 2370 if(isDebug(rf)){ 2371 debugLog("CheckUsage",rf,this); 2372 } 2373 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 2374 rf.fileId = (String) item.get("file_id"); 2375 String toDf = rf.getPath(); 2376 if (!CommonTools.isBlank(replaceTo)) { 2377 if (subFolders) { 2378 toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/"); 2379 } else { 2380 toDf = fullPath.replaceFirst(getPath(), replaceTo + "/"); 2381 } 2382 } 2383 DiskFile df = new DiskFile(toDf); 2384 df.copyAttrs(df, false, the.syncRoot); 2385 executeSyncRunnable(this, item, df, rf); 2386 } else{ 2387 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 2388 } 2389 } 2390 2391 @Override 2392 public void terminated() { 2393 log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId()); 2394 } 2395 2396 @Override 2397 public void completed(Integer size) { 2398 Thread.currentThread().setName(String 2399 .format("RemoteFile-startFullAsyncThreadForDelete-completed-%s", the.origin.getName())); 2400 if (isStopSync()) { 2401 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start delete task thread."); 2402 } 2403 if (timerMs <= 0) { 2404 LoggerFactory.getLogger(RemoteFile.class).mark("Completed start delete task thread."); 2405 } 2406 if (!isStopSync() && timerMs > 0) { 2407 try { 2408 Thread.sleep(timerMs); 2409 if (!isStopSync()) { 2410 startFullAsyncThreadForDelete(!subFolders, timerMs, replaceTo, completedCallback); 2411 } 2412 } catch (Exception e) { 2413 log.error(e.getMessage(), e); 2414 } 2415 } 2416 if (completedCallback != null) { 2417 completedCallback.run(); 2418 } 2419 } 2420 2421 }; 2422 rows.filter(filter); 2423 listAllForDelete(getPath() + (subFolders ? "/%" : ""), "%", rows); 2424 } catch (Exception e) { 2425 log.error(e.getMessage(), e); 2426 if (timerMs > 0) { 2427 try { 2428 Thread.sleep(timerMs); 2429 startFullAsyncThreadForDelete(subFolders, timerMs, replaceTo, completedCallback); 2430 } catch (InterruptedException ee) { 2431 log.error(ee.getMessage(), ee); 2432 } 2433 } 2434 } 2435 } 2436 2437 private void startFullAsyncThread(boolean subFolders, long timerMs, String replaceTo, Runnable completedCallback) { 2438 RemoteFile the = this; 2439 try { 2440 if (syncRoot == null) { 2441 File replaceToFile = new File(replaceTo); 2442 if (replaceToFile.getParent() == null) { 2443 throw new IOException("Cannot sync the root path."); 2444 } 2445 String replaceToParent = replacePath(replaceToFile.getParent()); 2446 syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName())); 2447 } 2448 2449 CacheArray rows = new CacheArray(); 2450 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 2451 @Override 2452 public void execute(Integer index, Object o) { 2453 Map<String, Object> item = (Map<String, Object>) o; 2454 String fileName = (String) item.get("file_name"); 2455 Thread.currentThread().setName(String.format("RemoteFile-startFullAsyncThread-%s", fileName)); 2456 String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName); 2457 RemoteFile rf = new RemoteFile(fullPath); 2458 if(isDebug(the)){ 2459 debugLog("CheckUsage",the,this); 2460 } 2461 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 2462 String toDf = rf.getPath(); 2463 if (!CommonTools.isBlank(replaceTo)) { 2464 if (subFolders) { 2465 toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/"); 2466 } else { 2467 toDf = fullPath.replaceFirst(getPath(), replaceTo + "/"); 2468 } 2469 } 2470 DiskFile df = new DiskFile(toDf); 2471 df.copyAttrs(df, false, the.syncRoot); 2472 executeSyncRunnable(this, item, df, rf); 2473 } else{ 2474 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 2475 } 2476 } 2477 2478 @Override 2479 public void terminated() { 2480 log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId()); 2481 } 2482 2483 @Override 2484 public void completed(Integer size) { 2485 Thread.currentThread().setName( 2486 String.format("RemoteFile-startFullAsyncThread-completed-%s", the.origin.getName())); 2487 if (isStopSync()) { 2488 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start full async thread."); 2489 } 2490 if (timerMs <= 0) { 2491 LoggerFactory.getLogger(RemoteFile.class).mark("Completed start full async thread."); 2492 } 2493 if (!isStopSync() && timerMs > 0) { 2494 try { 2495 Thread.sleep(timerMs); 2496 if (!isStopSync()) { 2497 startFullAsyncThread(!subFolders, timerMs, replaceTo, completedCallback); 2498 } 2499 } catch (Exception e) { 2500 log.error(e.getMessage(), e); 2501 } 2502 } 2503 if (completedCallback != null) { 2504 completedCallback.run(); 2505 } 2506 } 2507 2508 }; 2509 rows.filter(filter); 2510 listAllForSync(getPath() + (subFolders ? "/%" : ""), "%", rows); 2511 } catch (Exception e) { 2512 log.error(e.getMessage(), e); 2513 if (timerMs > 0) { 2514 try { 2515 Thread.sleep(timerMs); 2516 startFullAsyncThread(subFolders, timerMs, replaceTo, completedCallback); 2517 } catch (InterruptedException ee) { 2518 log.error(ee.getMessage(), ee); 2519 } 2520 } 2521 } 2522 } 2523 2524 public void startFullAsync(long timerMs) throws IOException { 2525 startFullAsync(timerMs, getPath() + "/", null); 2526 } 2527 2528 public void startFullAsync(long timerMs, Runnable completedCallback) throws IOException { 2529 startFullAsync(timerMs, getPath() + "/", completedCallback); 2530 } 2531 2532 public void startFullAsync(long timerMs, String replaceTo) throws IOException { 2533 startFullAsync(timerMs, replaceTo, null); 2534 } 2535 2536 public void startFullAsync(long timerMs, String replaceTo, Runnable completedCallback) throws IOException { 2537 2538 stopSync = false; 2539 2540 RemoteFile the = this; 2541 Clock clock = new Clock(); 2542 Timestamp toTime = new Timestamp(clock.getTime() - timerMs); 2543 Executors.newFixedThreadPool(1).execute(new Runnable() { 2544 2545 @Override 2546 public void run() { 2547 startFullAsyncThread(false, timerMs, replaceTo, completedCallback); 2548 } 2549 2550 }); 2551 2552 Executors.newFixedThreadPool(1).execute(new Runnable() { 2553 2554 @Override 2555 public void run() { 2556 startFullAsyncThreadForDelete(false, timerMs, replaceTo, completedCallback); 2557 } 2558 2559 }); 2560 } 2561 2562 public DriverDataSource getDataSourceReader() { 2563 return dataSourceReader; 2564 } 2565 2566 public DriverDataSource getDataSourceWriter() { 2567 return dataSourceWriter; 2568 } 2569 2570 public ConfigProperties getConfigProperties() { 2571 return configProperties; 2572 } 2573 2574 private byte[] encrypt(byte[] data) throws Exception { 2575 boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false); 2576 if (useEncrypt) { 2577 String aesKey = configProperties.getString("EncryptAesKey"); 2578 return CipherTools.AESEncrypt(aesKey,data); 2579 } else { 2580 return data; 2581 } 2582 } 2583 2584 private byte[] decrypt(byte[] encData) throws Exception { 2585 boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false); 2586 if (useEncrypt) { 2587 String aesKey = configProperties.getString("EncryptAesKey"); 2588 return CipherTools.AESDecrypt(aesKey,encData); 2589 } else { 2590 return encData; 2591 } 2592 } 2593 2594 public String getSyncedOnHostnames() throws IOException { 2595 Map<String, Object> record = getCompletedRecord(); 2596 if (record != null) { 2597 return (String) record.get("synced_on_hostnames"); 2598 } 2599 return null; 2600 } 2601 2602 public String getDeletedOnHostnames() throws IOException { 2603 Map<String, Object> record = getCompletedRecord(); 2604 if (record != null) { 2605 return (String) record.get("deleted_on_hostnames"); 2606 } 2607 return null; 2608 } 2609 2610 public String getSourceHostname() throws IOException { 2611 Map<String, Object> record = getCompletedRecord(); 2612 if (record != null) { 2613 return (String) record.get("source_hostname"); 2614 } 2615 return null; 2616 } 2617 2618 protected String getLastSourceHostname() throws IOException { 2619 Map<String, Object> record = getLastUpdateRecord(); 2620 if (record != null) { 2621 return (String) record.get("source_hostname"); 2622 } 2623 return null; 2624 } 2625 2626 protected Timestamp getLastOperateTime() throws IOException { 2627 Map<String, Object> record = getLastUpdateRecord(); 2628 if (record != null) { 2629 Timestamp updatedAt = (Timestamp) record.get("updated_at"); 2630 Timestamp deletedAt = (Timestamp) record.get("deleted_at"); 2631 2632 return deletedAt == null ? updatedAt : deletedAt; 2633 } 2634 return null; 2635 } 2636 2637 protected Timestamp getLastModifyTime() throws IOException { 2638 Map<String, Object> record = getLastUpdateRecord(); 2639 if (record != null) { 2640 return (Timestamp) record.get("file_modify_time"); 2641 } 2642 return null; 2643 } 2644 2645 protected boolean isLastOperateDelete() throws IOException { 2646 Map<String, Object> record = getLastUpdateRecord(); 2647 if (record != null) { 2648 Timestamp deletedAt = (Timestamp) record.get("deleted_at"); 2649 2650 return deletedAt == null ? false : true; 2651 } 2652 return false; 2653 } 2654 2655 protected boolean isLastOperateRealDelete() throws IOException { 2656 Map<String, Object> record = getLastUpdateRecord(); 2657 if (record != null) { 2658 Object deletedValue = record.get("file_deleted"); 2659 if (deletedValue != null) { 2660 if (deletedValue instanceof Boolean) { 2661 return (Boolean) deletedValue; 2662 } else { 2663 return Integer.parseInt(deletedValue + "") == 1; 2664 } 2665 } 2666 } 2667 return false; 2668 } 2669 2670 protected void listAllForScanDelete(Integer filePartDataTable, CacheArray rows) throws IOException { 2671 CacheDriverExecutor executor = null; 2672 Connection conn = null; 2673 try { 2674 long deleteCutoffTimeMs = 0L; 2675 2676 if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime(); 2677 2678 Timestamp currentTime = new Clock().getSqlTimestamp(); 2679 conn = dataSourceReader.getConnection(); 2680 conn.setAutoCommit(true); 2681 executor = new CacheDriverExecutor(conn); 2682 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2683 dataRecord.put("file_parent_path", getPath()); 2684 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 2685 dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 2686 dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs)); 2687 dataRecord.put("end_time", currentTime); 2688 dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME)); 2689 dataRecord.put("file_part_data_table", filePartDataTable); 2690 executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL_SCAN_DELETE), dataRecord, rows); 2691 } catch (SQLException e) { 2692 throw new IOException(e.getMessage(), e); 2693 } finally { 2694 if (executor != null) { 2695 try { 2696 executor.close(); 2697 } catch (Exception e) { 2698 throw new IOException(e.getMessage(), e); 2699 } 2700 } 2701 } 2702 } 2703 2704 protected boolean isSyncedOnHostname() throws IOException { 2705 Map<String, Object> record = getCompletedRecord(); 2706 if (record != null) { 2707 String syncedOnHostnames = (String) record.get("synced_on_hostnames"); 2708 return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1; 2709 } 2710 return false; 2711 } 2712 2713 protected boolean isSyncedOnHostname(Object id) throws IOException { 2714 Map<String, Object> record = getRecordById(id); 2715 if (record != null) { 2716 String syncedOnHostnames = (String) record.get("synced_on_hostnames"); 2717 return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1; 2718 } 2719 return false; 2720 } 2721 2722 protected boolean isDeletedOnHostname(Object id) throws IOException { 2723 Map<String, Object> record = getRecordById(id); 2724 if (record != null) { 2725 String deletedOnHostnames = (String) record.get("deleted_on_hostnames"); 2726 return deletedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1; 2727 } 2728 return false; 2729 } 2730 2731 public RemoteFile getParentFile() { 2732 return new RemoteFile(getParent()); 2733 } 2734 2735 private void executeSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final DiskFile df, 2736 final RemoteFile rf) { 2737 if (!df.isLogicModify()) { 2738 df.logicModify(); 2739 if (mergePool == null) { 2740 LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE); 2741 mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 2742 } 2743 mergePool.execute(getSyncRunnable(filter, item, rf, df)); 2744 } 2745 } 2746 2747 private Runnable getSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final RemoteFile rf, 2748 final DiskFile df) { 2749 Runnable runnable = new Runnable() { 2750 @Override 2751 public void run() { 2752 try { 2753 String fileName = (String) item.get("file_name"); 2754 Thread.currentThread() 2755 .setName(String.format("RemoteFile-getSyncRunnable-%s-%s", fileName, item.get("id"))); 2756 boolean isDebug = isDebug(rf); 2757 boolean isDeleted = false; 2758 Object deletedValue = item.get("file_deleted"); 2759 2760 if (deletedValue instanceof Boolean) { 2761 isDeleted = (Boolean) deletedValue; 2762 } else { 2763 isDeleted = Integer.parseInt(deletedValue + "") == 1; 2764 } 2765 2766 String clientHostname = CommonTools.getHostname(); 2767 String lastSourceHostname = rf.getLastSourceHostname(); 2768 2769 boolean isClientHostname = lastSourceHostname != null && !lastSourceHostname.equals(clientHostname); 2770 boolean rootExists = syncRoot != null && syncRoot.exists(); 2771 2772 if(!rootExists){ 2773 Logger.systemError(RemoteFile.class,"The root path '{}' does not exist.",syncRoot.getPath()); 2774 } 2775 2776 if (isDeleted && rootExists) { 2777 if (df.exists()) { 2778 if (isDebug) { 2779 LoggerFactory.getLogger(RemoteFile.class).mark("DeleteDiskFile - {}", df.getPath()); 2780 } 2781 df.delete(); 2782 rf.deletedToDiskAndSub(item.get("id")); 2783 } else { 2784 rf.deletedToDiskAndSub(item.get("id")); 2785 } 2786 df.removeLogicModify(); 2787 } 2788 2789 String sourceHostname = rf.getSourceHostname(); 2790 isClientHostname = sourceHostname != null && !sourceHostname.equals(clientHostname); 2791 boolean checkHaveParent = configProperties.getBoolean("CheckHaveParent", false); 2792 boolean haveParent = true; 2793 if (checkHaveParent) { 2794 haveParent = df.getOrigin().getParentFile().exists(); 2795 } 2796 String fileType = (String) item.get("file_type"); 2797 2798 if (!isDeleted && isClientHostname && haveParent && rootExists) { 2799 Timestamp rfModifyTime = rf.getModifyTime(); 2800 df.setModifyTimeMsFromClock(rfModifyTime.getTime()); 2801 2802 if (fileType.equals("L")) { 2803 if (df.exists() && !df.isLink()) { 2804 df.delete(); 2805 } 2806 if (df.exists()) { 2807 Timestamp dfModifyTime = df.getModifyTimeForClock(); 2808 if (rfModifyTime.getTime() > dfModifyTime.getTime()) { 2809 if (isDebug) { 2810 LoggerFactory.getLogger(RemoteFile.class).mark("ReCreateDiskLink - {}", 2811 df.getPath()); 2812 } 2813 df.delete(); 2814 if (df.createLink(rf.readLink())) { 2815 if (df.getOrigin().getName().startsWith(".")) { 2816 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2817 } 2818 rf.syncedToDisk(); 2819 } 2820 } else { 2821 rf.syncedToDisk(); 2822 } 2823 } else { 2824 if (isDebug) { 2825 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", df.getPath()); 2826 } 2827 if (df.createLink(rf.readLink())) { 2828 if (df.getOrigin().getName().startsWith(".")) { 2829 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2830 } 2831 rf.syncedToDisk(); 2832 } 2833 } 2834 df.removeLogicModify(); 2835 } 2836 if (fileType.equals("F")) { 2837 if (df.exists()) { 2838 Timestamp dfModifyTime = df.getModifyTimeForClock(); 2839 boolean changed = rfModifyTime.getTime() > dfModifyTime.getTime(); 2840 if (changed) { 2841 DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE); 2842 writingDf.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot); 2843 writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime()); 2844 if (!writingDf.isLogicModify()) { 2845 writingDf.logicModify(); 2846 if (RemoteFile.addQueuePathMapping(rf.getPath())) { 2847 if (isDebug(rf)) { 2848 LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}", 2849 df.getPath()); 2850 } 2851 rf.writeToDisk(writingDf); 2852 } 2853 } 2854 } else { 2855 rf.syncedToDisk(); 2856 df.removeLogicModify(); 2857 } 2858 } else { 2859 if (rf.exists()) { 2860 DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE); 2861 writingDf.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot); 2862 writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime()); 2863 if (!writingDf.isLogicModify()) { 2864 writingDf.logicModify(); 2865 if (RemoteFile.addQueuePathMapping(rf.getPath())) { 2866 if (isDebug(rf)) { 2867 LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}", 2868 df.getPath()); 2869 } 2870 rf.writeToDisk(writingDf); 2871 } 2872 } 2873 } 2874 } 2875 } 2876 if (fileType.equals("D")) { 2877 if (rf.exists()) { 2878 if (isDebug) { 2879 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskDir - {}", df.getPath()); 2880 } 2881 if (df.exists()) { 2882 if (df.isDir()) { 2883 rf.syncedToDisk(); 2884 } else { 2885 df.delete(); 2886 if (df.mkdirs()) { 2887 if (df.getOrigin().getName().startsWith(".")) { 2888 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2889 } 2890 rf.syncedToDisk(); 2891 } 2892 } 2893 } else { 2894 if (df.mkdirs()) { 2895 if (df.getOrigin().getName().startsWith(".")) { 2896 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2897 } 2898 rf.syncedToDisk(); 2899 } 2900 } 2901 } 2902 df.removeLogicModify(); 2903 } 2904 } 2905 } catch (Exception e) { 2906 log.error(e.getMessage(), e); 2907 } 2908 } 2909 }; 2910 return runnable; 2911 } 2912 2913 public void startArchiveForFileDeleted(long timerMs, String archiveTime) throws IOException { 2914 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 2915 startArchiveForFileDeleted(timerMs, archiveMs, null); 2916 } 2917 2918 public void startArchiveForFileDeleted(long timerMs, long archiveMs) throws IOException { 2919 startArchiveForFileDeleted(timerMs, archiveMs, null); 2920 } 2921 2922 public void startArchiveForFileDeleted(long timerMs, String archiveTime, Runnable completedCallback) 2923 throws IOException { 2924 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 2925 startArchiveForFileDeleted(timerMs, archiveMs, completedCallback); 2926 } 2927 2928 public void startArchiveForFileDeleted(long timerMs, long archiveMs, Runnable completedCallback) 2929 throws IOException { 2930 final RemoteFile the = this; 2931 if (archiveMs <= 0) { 2932 throw new IOException(String.format( 2933 "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs)); 2934 } 2935 2936 stopSync = false; 2937 2938 CacheDriverExecutor executor = null; 2939 Connection conn = null; 2940 try { 2941 Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", getPath())); 2942 conn = dataSourceReader.getConnection(); 2943 conn.setAutoCommit(true); 2944 executor = new CacheDriverExecutor(conn); 2945 Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs); 2946 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2947 dataRecord.put("file_parent_path", getPath()); 2948 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 2949 dataRecord.put("before_deleted_at", beforeTime); 2950 CacheArray rows = new CacheArray(); 2951 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 2952 2953 @Override 2954 public void completed(Integer size) { 2955 if (isStopSync()) { 2956 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for file deleted."); 2957 } 2958 if (timerMs <= 0) { 2959 LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for file deleted."); 2960 } 2961 2962 if (completedCallback != null) { 2963 completedCallback.run(); 2964 } 2965 2966 if (!isStopSync() && timerMs > 0) { 2967 while(true){ 2968 2969 if(isStopSync()) break; 2970 2971 try { 2972 Thread.sleep(timerMs); 2973 if (!isStopSync()) { 2974 boolean allowed = checkUsage(the); 2975 if(allowed){ 2976 startArchiveForFileDeleted(timerMs, archiveMs, completedCallback); 2977 break; 2978 } 2979 } 2980 } catch (Exception e) { 2981 log.error(e.getMessage(), e); 2982 } 2983 } 2984 } 2985 2986 } 2987 2988 @Override 2989 public void execute(Integer index, Object o) { 2990 Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForFileDeleted-%s", index)); 2991 if(isDebug(the)){ 2992 debugLog("CheckUsage",the,this); 2993 } 2994 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 2995 CacheDriverExecutor _executor = null; 2996 try { 2997 Map<String, Object> item = (Map<String, Object>) o; 2998 boolean notDir = !item.get("file_type").equals("D"); 2999 if(notDir){ 3000 calculateDataMappingDataSourceReader(the, item); 3001 Object fileId = item.get("file_id"); 3002 Map<String, Object> dataRecord = new HashMap<String, Object>(); 3003 dataRecord.put("file_id", fileId); 3004 dataRecord.put("deleted_at", new Clock().getSqlTimestamp()); 3005 Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", fileId)); 3006 Connection _conn = dataMappingDataSourceReader.getConnection(); 3007 _conn.setAutoCommit(true); 3008 _executor = new CacheDriverExecutor(_conn); 3009 _executor.execute(formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA), dataRecord); 3010 } 3011 } catch (Exception e) { 3012 log.error(e.getMessage(), e); 3013 } finally { 3014 try { 3015 if (_executor != null) 3016 _executor.close(); 3017 } catch (Exception e) { 3018 log.error(e.getMessage(), e); 3019 } 3020 } 3021 }else{ 3022 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 3023 } 3024 } 3025 }; 3026 rows.filter(filter); 3027 final CacheDriverExecutor finalExecutor = executor; 3028 Executors.newFixedThreadPool(1).execute(new Runnable(){ 3029 @Override 3030 public void run(){ 3031 try{ 3032 finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_FILE_DELETED), dataRecord, rows); 3033 }catch(Exception e){ 3034 log.error(e.getMessage(),e); 3035 } 3036 } 3037 }); 3038 } catch (SQLException e) { 3039 throw new IOException(e.getMessage(), e); 3040 } finally { 3041 if (executor != null) { 3042 try { 3043 executor.close(); 3044 } catch (Exception e) { 3045 throw new IOException(e.getMessage(), e); 3046 } 3047 } 3048 } 3049 } 3050 3051 public void startArchiveForDeleted(long timerMs, String archiveTime) throws IOException { 3052 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 3053 startArchiveForDeleted(timerMs, archiveMs, null); 3054 } 3055 3056 public void startArchiveForDeleted(long timerMs, long archiveMs) throws IOException { 3057 startArchiveForDeleted(timerMs, archiveMs, null); 3058 } 3059 3060 public void startArchiveForDeleted(long timerMs, String archiveTime, Runnable completedCallback) 3061 throws IOException { 3062 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 3063 startArchiveForDeleted(timerMs, archiveMs, completedCallback); 3064 } 3065 3066 public void startArchiveForDeleted(long timerMs, long archiveMs, Runnable completedCallback) throws IOException { 3067 final RemoteFile the = this; 3068 if (archiveMs <= 0) { 3069 throw new IOException(String.format( 3070 "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs)); 3071 } 3072 3073 stopSync = false; 3074 3075 CacheDriverExecutor executor = null; 3076 Connection conn = null; 3077 try { 3078 Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", getPath())); 3079 conn = dataSourceReader.getConnection(); 3080 conn.setAutoCommit(true); 3081 executor = new CacheDriverExecutor(conn); 3082 Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs); 3083 Map<String, Object> dataRecord = new HashMap<String, Object>(); 3084 dataRecord.put("file_parent_path", getPath()); 3085 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 3086 dataRecord.put("before_deleted_at", beforeTime); 3087 3088 CacheArray rows = new CacheArray(); 3089 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 3090 3091 @Override 3092 public void completed(Integer size) { 3093 if (isStopSync()) { 3094 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for deleted."); 3095 } 3096 if (timerMs <= 0) { 3097 LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for deleted."); 3098 } 3099 3100 if (completedCallback != null) { 3101 completedCallback.run(); 3102 } 3103 3104 if (!isStopSync() && timerMs > 0) { 3105 while(true){ 3106 3107 if(isStopSync()) break; 3108 3109 try { 3110 Thread.sleep(timerMs); 3111 if (!isStopSync()) { 3112 boolean allowed = checkUsage(the); 3113 if(allowed){ 3114 startArchiveForDeleted(timerMs, archiveMs, completedCallback); 3115 break; 3116 } 3117 } 3118 } catch (Exception e) { 3119 log.error(e.getMessage(), e); 3120 } 3121 } 3122 } 3123 } 3124 3125 @Override 3126 public void execute(Integer index, Object o) { 3127 Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForDeleted-%s", index)); 3128 CacheDriverExecutor dmExecutor = null; 3129 CacheDriverExecutor dmDelExecutor = null; 3130 CacheDriverExecutor executor = null; 3131 if(isDebug(the)){ 3132 debugLog("CheckUsage",the,this); 3133 } 3134 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 3135 try { 3136 Clock clock = new Clock(); 3137 Timestamp currentTime = clock.getSqlTimestamp(); 3138 Map<String, Object> item = (Map<String, Object>) o; 3139 Object fileId = item.get("file_id"); 3140 Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", fileId)); 3141 calculateDataMappingDataSourceReader(the, item); 3142 Map<String, Object> dataRecord = null; 3143 boolean haveSubFiles = false; 3144 /**Need use dataMappingDataSourceReader**/ 3145 if (item.get("file_type").equals("D")) { 3146 int count = countAllSubFiles(item.get("file_parent_path") + "", 3147 item.get("file_name") + ""); 3148 haveSubFiles = count > 0; 3149 } 3150 if (!haveSubFiles) { 3151 if (!item.get("file_type").equals("D")) { 3152 Connection dmConn = dataMappingDataSourceReader.getConnection(); 3153 dmConn.setAutoCommit(true); 3154 dmExecutor = new CacheDriverExecutor(dmConn); 3155 dataRecord = new HashMap<String, Object>(); 3156 dataRecord.put("file_id", item.get("file_id")); 3157 dataRecord.put("deleted_at", currentTime); 3158 dmExecutor.execute( 3159 formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA), 3160 dataRecord); 3161 } 3162 Connection conn = dataSourceWriter.getConnection(); 3163 conn.setAutoCommit(true); 3164 executor = new CacheDriverExecutor(conn); 3165 dataRecord = new HashMap<String, Object>(); 3166 dataRecord.put("file_id", item.get("file_id")); 3167 executor.execute(formatSql(SQL_DELETE_BY_FILE_ID_FROM_FILE), dataRecord); 3168 } 3169 3170 Connection dmDelConn = dataMappingDataSourceReader.getConnection(); 3171 dmDelConn.setAutoCommit(true); 3172 dmDelExecutor = new CacheDriverExecutor(dmDelConn); 3173 dataRecord = new HashMap<String, Object>(); 3174 dataRecord.put("deleted_at", new Timestamp(currentTime.getTime() - archiveMs)); 3175 dmDelExecutor.execute(formatSqlForFilePartData(SQL_DELETE_BY_RF_ID_FROM_FILE_DATA), 3176 dataRecord); 3177 } catch (Exception e) { 3178 log.error(e.getMessage(), e); 3179 } finally { 3180 try { 3181 if (dmExecutor != null) 3182 dmExecutor.close(); 3183 } catch (Exception e) { 3184 log.error(e.getMessage(), e); 3185 } 3186 try { 3187 if (dmDelExecutor != null) 3188 dmDelExecutor.close(); 3189 } catch (Exception e) { 3190 log.error(e.getMessage(), e); 3191 } 3192 try { 3193 if (executor != null) 3194 executor.close(); 3195 } catch (Exception e) { 3196 log.error(e.getMessage(), e); 3197 } 3198 } 3199 3200 }else{ 3201 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 3202 } 3203 } 3204 }; 3205 rows.filter(filter); 3206 final CacheDriverExecutor finalExecutor = executor; 3207 Executors.newFixedThreadPool(1).execute(new Runnable(){ 3208 @Override 3209 public void run(){ 3210 try{ 3211 finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_DELETED), dataRecord, rows); 3212 }catch(Exception e){ 3213 log.error(e.getMessage(),e); 3214 } 3215 } 3216 }); 3217 } catch (SQLException e) { 3218 throw new IOException(e.getMessage(), e); 3219 } finally { 3220 if (executor != null) { 3221 try { 3222 executor.close(); 3223 } catch (Exception e) { 3224 throw new IOException(e.getMessage(), e); 3225 } 3226 } 3227 } 3228 } 3229 3230 public void startArchiveForTimeout(long timerMs) throws IOException { 3231 startArchiveForTimeout(timerMs,null); 3232 } 3233 3234 public void startArchiveForTimeout(long timerMs,Runnable completedCallback) throws IOException { 3235 final RemoteFile the = this; 3236 3237 stopSync = false; 3238 3239 CacheDriverExecutor executor = null; 3240 Connection conn = null; 3241 try { 3242 Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", getPath())); 3243 conn = dataSourceReader.getConnection(); 3244 conn.setAutoCommit(true); 3245 executor = new CacheDriverExecutor(conn); 3246 Timestamp beforeTime = new Timestamp(new Clock().getTime() - (LOGIC_TIMEOUT_MS*BATCH_SIZE)); 3247 Map<String, Object> dataRecord = new HashMap<String, Object>(); 3248 dataRecord.put("file_parent_path", getPath()); 3249 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 3250 dataRecord.put("before_updated_at", beforeTime); 3251 3252 CacheArray rows = new CacheArray(); 3253 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 3254 3255 @Override 3256 public void completed(Integer size) { 3257 if (isStopSync()) { 3258 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for timeout deleted."); 3259 } 3260 if (timerMs <= 0) { 3261 LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for timeout deleted."); 3262 } 3263 3264 if (completedCallback != null) { 3265 completedCallback.run(); 3266 } 3267 3268 if (!isStopSync() && timerMs > 0) { 3269 while(true){ 3270 3271 if(isStopSync()) break; 3272 3273 try { 3274 Thread.sleep(timerMs); 3275 if (!isStopSync()) { 3276 boolean allowed = checkUsage(the); 3277 if(allowed){ 3278 startArchiveForTimeout(timerMs, completedCallback); 3279 break; 3280 } 3281 } 3282 } catch (Exception e) { 3283 log.error(e.getMessage(), e); 3284 } 3285 } 3286 } 3287 3288 } 3289 3290 @Override 3291 public void execute(Integer index, Object o) { 3292 Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForTimeout-%s", index)); 3293 if(isDebug(the)){ 3294 debugLog("CheckUsage",the,this); 3295 } 3296 CacheDriverExecutor executor = null; 3297 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 3298 try { 3299 Clock clock = new Clock(); 3300 Timestamp currentTime = clock.getSqlTimestamp(); 3301 Map<String, Object> item = (Map<String, Object>) o; 3302 Object fileId = item.get("file_id"); 3303 Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", fileId)); 3304 Connection conn = dataSourceWriter.getConnection(); 3305 conn.setAutoCommit(true); 3306 executor = new CacheDriverExecutor(conn); 3307 Map<String,Object> dataRecord = new HashMap<String, Object>(); 3308 dataRecord.put("file_id",fileId); 3309 dataRecord.put("deleted_at",currentTime); 3310 dataRecord.put("deleted_user_id",getDefaultModifyUserId()); 3311 executor.execute(formatSql(SQL_UPDATE_FOR_TIMEOUT_DELETE), dataRecord); 3312 } catch (Exception e) { 3313 log.error(e.getMessage(), e); 3314 } finally { 3315 try { 3316 if (executor != null) 3317 executor.close(); 3318 } catch (Exception e) { 3319 log.error(e.getMessage(), e); 3320 } 3321 } 3322 3323 }else{ 3324 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 3325 } 3326 } 3327 }; 3328 rows.filter(filter); 3329 final CacheDriverExecutor finalExecutor = executor; 3330 Executors.newFixedThreadPool(1).execute(new Runnable(){ 3331 @Override 3332 public void run(){ 3333 try{ 3334 finalExecutor.find(formatSql(SQL_SELECT_FOR_ALL_TIMEOUT), dataRecord, rows); 3335 }catch(Exception e){ 3336 log.error(e.getMessage(),e); 3337 } 3338 } 3339 }); 3340 } catch (SQLException e) { 3341 throw new IOException(e.getMessage(), e); 3342 } finally { 3343 if (executor != null) { 3344 try { 3345 executor.close(); 3346 } catch (Exception e) { 3347 throw new IOException(e.getMessage(), e); 3348 } 3349 } 3350 } 3351 } 3352 3353 public static Map<String, DriverDataSource> getDriverDataSourceMap() { 3354 return Collections.synchronizedMap(DATASOURCE_REMOTE_FILE); 3355 } 3356 3357 private static synchronized String generateId(boolean identityCardEnable, String identityCardName, 3358 String identityCardPrefix, int identityCardSuffix) throws SQLException { 3359 if (identityCardEnable) { 3360 if (identityCard == null) { 3361 identityCard = new IdentityCard(identityCardName); 3362 } 3363 return identityCard.generateId(16, identityCardPrefix, "", identityCardSuffix); 3364 } else { 3365 return CommonTools.generateId(16); 3366 } 3367 } 3368 3369 private static synchronized String generateDataId(boolean identityCardEnable, String identityCardName, 3370 int identityCardSuffix, RemoteFile theRf) throws SQLException { 3371 if (identityCardEnable) { 3372 if (identityCard == null) { 3373 identityCard = new IdentityCard(identityCardName + "Data"); 3374 } 3375 return identityCard.generateId(16, String.format("%04d", theRf.dataMappingTable), "", identityCardSuffix); 3376 } else { 3377 return CommonTools.generateId(16); 3378 } 3379 } 3380 3381 protected static synchronized boolean addQueuePathMapping(String path) { 3382 Long addTimeMs = null; 3383 3384 boolean allowed = checkQueuePathMapping(); 3385 3386 if (!allowed) 3387 return false; 3388 3389 addTimeMs = QUEUE_PATH_MAPPING.get(path); 3390 3391 if (addTimeMs == null) { 3392 QUEUE_PATH_MAPPING.put(path, System.currentTimeMillis()); 3393 return true; 3394 } 3395 3396 return false; 3397 } 3398 3399 public static synchronized boolean checkQueuePathMapping() { 3400 Long addTimeMs = null; 3401 Map<String, Long> copyQueueMap = getQueuePathMapping(); 3402 List<String> keyList = new ArrayList<String>(copyQueueMap.keySet()); 3403 for (String key : keyList) { 3404 addTimeMs = copyQueueMap.get(key); 3405 if (addTimeMs != null) { 3406 boolean timeout = (System.currentTimeMillis() - addTimeMs) >= LOGIC_TIMEOUT_MS; 3407 3408 if (timeout) { 3409 QUEUE_PATH_MAPPING.remove(key); 3410 } 3411 } 3412 } 3413 3414 if (QUEUE_PATH_MAPPING.keySet().size() >= MAX_QUEUE_SIZE) 3415 return false; 3416 3417 return true; 3418 } 3419 3420 protected static synchronized void removeQueuePathMapping(String path) { 3421 QUEUE_PATH_MAPPING.remove(path); 3422 } 3423 3424 public static synchronized Map<String, Long> getQueuePathMapping() { 3425 return Collections.synchronizedMap(QUEUE_PATH_MAPPING); 3426 } 3427 3428 protected static synchronized void loadDataMapping(RemoteFile rf) throws Exception { 3429 List<String> dmArray = rf.configProperties.getArray("DataMapping", new String[] {}); 3430 int size = dmArray.size(); 3431 List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey); 3432 3433 if (dmList == null) 3434 dmList = new ArrayList<String>(); 3435 3436 if (!DATA_MAPPING_INDEX.containsKey(rf.dsKey)) 3437 DATA_MAPPING_INDEX.put(rf.dsKey, 0); 3438 3439 for (int i = 0; i < size; i++) { 3440 String dm = dmArray.get(i); 3441 String originKey = String.format("DataMapping[%s]", i); 3442 Map<String, Object> dmMap = rf.configProperties.getMap(originKey); 3443 String shortKey = String.format("%s.%s", rf.dsKey, dm); 3444 if (dmMap != null) { 3445 String dmKey = String.format("%s.%s", rf.dsKey, originKey); 3446 3447 if (!DATA_MAPPING.containsKey(shortKey)) { 3448 ConfigProperties dmCp = new ConfigProperties(); 3449 dmCp.putAllAndReturnSlef(dmMap); 3450 int priority = dmCp.getInteger(String.format("%s.Priority", originKey), 0); 3451 if (i == 0 && priority < 1) { 3452 throw new Exception("First DataMapping priority cannot be less than 1."); 3453 } 3454 String dataSourceFileOriginValue = dmCp.getString(String.format("%s.DataSource", originKey)); 3455 String dataSourceReaderFileOriginValue = dmCp 3456 .getString(String.format("%s.DataSourceReader", originKey), dataSourceFileOriginValue); 3457 String dataSourceWriterFileOriginValue = dmCp 3458 .getString(String.format("%s.DataSourceWriter", originKey), dataSourceFileOriginValue); 3459 3460 String dmDsReaderKey = String.format("%s.reader", CommonTools.md5(dataSourceReaderFileOriginValue)); 3461 String dmDsWriterKey = String.format("%s.writer", CommonTools.md5(dataSourceWriterFileOriginValue)); 3462 DriverDataSource dmDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey); 3463 DriverDataSource dmDataSourceWriter = DATASOURCE_REMOTE_FILE.get(dmDsWriterKey); 3464 if (dmDataSourceReader == null) { 3465 DATASOURCE_REMOTE_FILE.put(dmDsReaderKey, 3466 new DriverDataSource(new File(dataSourceReaderFileOriginValue))); 3467 } 3468 if (dmDataSourceWriter == null) { 3469 DATASOURCE_REMOTE_FILE.put(dmDsWriterKey, 3470 new DriverDataSource(new File(dataSourceWriterFileOriginValue))); 3471 } 3472 String tableRangOriginValue = dmCp.getString(String.format("%s.TableRange", originKey), "0"); 3473 ConfigProperties shortCp = new ConfigProperties(); 3474 List<Integer> tableRange = new ArrayList<Integer>(); 3475 if (tableRangOriginValue.contains("-")) { 3476 String[] ranges = tableRangOriginValue.split("-"); 3477 Integer begin = Integer.parseInt(ranges[0].trim()); 3478 Integer end = Integer.parseInt(ranges[1].trim()); 3479 for (int j = begin; j <= end; j++) { 3480 tableRange.add(begin + j); 3481 } 3482 } else if (tableRangOriginValue.contains(",")) { 3483 String[] ranges = tableRangOriginValue.split(","); 3484 int rangesLen = ranges.length; 3485 for (int j = 0; j < rangesLen; j++) { 3486 tableRange.add(Integer.parseInt(ranges[j].trim())); 3487 } 3488 } else { 3489 tableRange.add(Integer.parseInt(tableRangOriginValue.trim())); 3490 } 3491 shortCp.put("TableRange", tableRange); 3492 shortCp.putAndReturnSlef("DataSourceReader", dmDsReaderKey); 3493 shortCp.putAndReturnSlef("DataSourceWriter", dmDsWriterKey); 3494 shortCp.putAndReturnSlef("Priority", priority); 3495 shortCp.putAndReturnSlef("UsingCount", 0); 3496 shortCp.putAndReturnSlef("Name", dm); 3497 int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size()); 3498 shortCp.putAndReturnSlef("Table", tableRange.get(randomIndex)); 3499 DATA_MAPPING.put(shortKey, shortCp); 3500 dmList.add(dm); 3501 DATA_MAPPING_LIST.put(rf.dsKey, dmList); 3502 } 3503 } 3504 } 3505 3506 } 3507 3508 protected static synchronized void calculateDataMappingDataSourceReader(RemoteFile rf, 3509 Map<String, Object> rfRecord) { 3510 rf.dataMappingDs = (String) rfRecord.get("file_part_data_ds"); 3511 rf.dataMappingTable = (Integer) rfRecord.get("file_part_data_table"); 3512 String shortKey = String.format("%s.%s", rf.dsKey, rf.dataMappingDs); 3513 ConfigProperties cp = DATA_MAPPING.get(shortKey); 3514 if (cp != null) { 3515 String dmDsReaderKey = cp.getString("DataSourceReader"); 3516 rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey); 3517 } 3518 } 3519 3520 protected static synchronized void calculateDataMappingDataSourceWriter(RemoteFile rf) { 3521 if (DATA_MAPPING_INDEX.get(rf.dsKey) >= DATA_MAPPING_LIST.size() - 1) { 3522 DATA_MAPPING_INDEX.put(rf.dsKey, 0); 3523 } 3524 ConfigProperties cp = getDataMapping(rf, DATA_MAPPING_INDEX.get(rf.dsKey)); 3525 List<Integer> tableRange = (List<Integer>) cp.get("TableRange"); 3526 Integer priority = cp.getInteger("Priority", 0); 3527 Integer usingCount = cp.getInteger("UsingCount", 0); 3528 String key = String.format("%s.%s", rf.dsKey, cp.getString("Name")); 3529 if (usingCount < priority) { 3530 rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceReader")); 3531 rf.dataMappingDataSourceWriter = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceWriter")); 3532 rf.dataMappingDs = cp.getString("Name"); 3533 rf.dataMappingTable = cp.getInteger("Table"); 3534 usingCount++; 3535 if (usingCount >= priority) { 3536 DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1); 3537 cp.putAndReturnSlef("UsingCount", 0); 3538 int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size()); 3539 cp.putAndReturnSlef("Table", tableRange.get(randomIndex)); 3540 DATA_MAPPING.put(key, cp); 3541 } else { 3542 cp.putAndReturnSlef("UsingCount", usingCount); 3543 int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size()); 3544 cp.putAndReturnSlef("Table", tableRange.get(randomIndex)); 3545 DATA_MAPPING.put(key, cp); 3546 } 3547 } else { 3548 DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1); 3549 calculateDataMappingDataSourceWriter(rf); 3550 } 3551 } 3552 3553 protected static synchronized ConfigProperties getDataMapping(RemoteFile rf, int index) { 3554 List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey); 3555 String dm = dmList.get(index); 3556 String key = String.format("%s.%s", rf.dsKey, dm); 3557 return DATA_MAPPING.get(key); 3558 } 3559 3560 protected String getDataMappingTableName() { 3561 return String.format("%s_data_%s", this.tableName, this.dataMappingTable); 3562 } 3563 3564 private static void debugLog(String point,RemoteFile rf,CacheArrayFilter filter){ 3565 debugLog(RemoteFile.class,point,rf,filter); 3566 } 3567 3568}