001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.file; 025 026import java.io.File; 027import com.killcoding.datasource.DriverDataSource; 028import com.killcoding.tool.ConfigProperties; 029import com.killcoding.tool.CommonTools; 030import com.killcoding.log.Logger; 031import com.killcoding.log.LoggerFactory; 032import java.io.FileInputStream; 033import java.io.IOException; 034import java.sql.Connection; 035import java.sql.SQLException; 036import com.killcoding.datasource.CacheDriverExecutor; 037import java.util.Map; 038import java.util.HashMap; 039import com.killcoding.datasource.Clock; 040import java.sql.Timestamp; 041import java.nio.file.Files; 042import java.util.concurrent.ConcurrentHashMap; 043import java.util.List; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.io.ByteArrayOutputStream; 047import java.sql.Blob; 048import java.nio.file.Paths; 049import java.nio.file.Path; 050import java.net.URI; 051import java.io.ByteArrayInputStream; 052import java.nio.ByteBuffer; 053import java.util.concurrent.ExecutorService; 054import java.util.concurrent.Executors; 055import java.util.concurrent.Future; 056import java.nio.file.LinkOption; 057import javax.crypto.Cipher; 058import javax.crypto.CipherInputStream; 059import javax.crypto.CipherOutputStream; 060import javax.crypto.KeyGenerator; 061import javax.crypto.SecretKey; 062import javax.crypto.SecretKeyFactory; 063import javax.crypto.spec.DESKeySpec; 064import javax.crypto.spec.IvParameterSpec; 065import javax.crypto.spec.SecretKeySpec; 066import com.killcoding.file.DiskFile; 067import java.util.Collection; 068import com.killcoding.cache.CacheArray; 069import com.killcoding.cache.CacheArrayFilter; 070import java.nio.file.attribute.BasicFileAttributes; 071import java.util.concurrent.Callable; 072import com.killcoding.datasource.IdentityCard; 073import java.util.Collections; 074import java.util.concurrent.ThreadLocalRandom; 075import com.killcoding.tool.CipherTools; 076import com.killcoding.cache.Cache; 077import java.util.Date; 078 079public class RemoteFile extends BaseFile implements RemoteFileSQL { 080 081 private static final Map<String, DriverDataSource> DATASOURCE_REMOTE_FILE = new ConcurrentHashMap<String, DriverDataSource>(); 082 083 private static final Map<String, ConfigProperties> CONFIG_PROPS_REMOTE_FILE = new ConcurrentHashMap<String, ConfigProperties>(); 084 085 private static final Map<String, Long> QUEUE_PATH_MAPPING = new ConcurrentHashMap<String, Long>(); 086 087 private static final Map<String, ConfigProperties> DATA_MAPPING = new ConcurrentHashMap<String, ConfigProperties>(); 088 089 private static final Map<String, List<String>> DATA_MAPPING_LIST = new ConcurrentHashMap<String, List<String>>(); 090 091 private static final Map<String, Integer> DATA_MAPPING_INDEX = new ConcurrentHashMap<String, Integer>(); 092 093 protected static boolean FIRST_LOADED = false; 094 095 public static Integer MAX_POOL_SIZE = 100; 096 public static Double MAX_CONNECT_USAGE = 0.5D; 097 public static Double MAX_MEMORY_USAGE = 0.8D; 098 public static Integer MAX_QUEUE_SIZE = 10; 099 public static Long MAX_FILE_SZIE = -1L; 100 public static boolean SHOW_LINK = false; 101 public static boolean SHOW_HIDDEN = false; 102 public static boolean COPY_STRUCTURE_ONLY = false; 103 public static Integer MAX_FILE_PARENT_PATH_LENGTH = 200; 104 public static Integer MAX_FILE_NAME_LENGTH = 200; 105 public static Integer CACHE_SECONDS = 5; 106 public static Date SYNC_CUTOFF_TIME = null; 107 public static Date DELETE_CUTOFF_TIME = null; 108 109 private static IdentityCard identityCard = null; 110 private static File appRemoteFile = null; 111 private DriverDataSource dataSourceReader = null; 112 private DriverDataSource dataSourceWriter = null; 113 private String dataSourceReaderPath = null; 114 private String dataSourceWriterPath = null; 115 private String modifyUserId = null; 116 private Integer filePartSize = null; 117 private Boolean identityCardEnable = false; 118 private String identityCardName = null; 119 private String identityCardPrefix = null; 120 private Integer identityCardSuffix = 0; 121 private Timestamp modifyTime = null; 122 123 protected ConfigProperties configProperties = null; 124 protected String fileId = null; 125 protected String tableName = null; 126 protected Integer dataMappingTable = null; 127 protected String dataMappingDs = null; 128 protected DriverDataSource dataMappingDataSourceReader = null; 129 protected DriverDataSource dataMappingDataSourceWriter = null; 130 protected String dsKey = null; 131 132 public static Long BEFORE_THE_QUEUE_TIME = 60000L; 133 public static Integer BATCH_SIZE = 10; 134 public static String DEFAULE_MODIFY_USER_ID = null; 135 136 private Logger log = null; 137 138 private static ExecutorService mergePool = null; 139 140 public RemoteFile(String path) { 141 super(path); 142 log = LoggerFactory.getLogger(getClass()); 143 fileId = CommonTools.generateId(16); 144 initGlobal(); 145 } 146 147 public static synchronized void initPool(int poolSize) { 148 if (mergePool == null) { 149 MAX_POOL_SIZE = poolSize; 150 LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE); 151 mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 152 } 153 } 154 155 private synchronized void init() { 156 dsKey = CommonTools.md5(this.appRemoteFile.getAbsolutePath()); 157 dataSourceReader = DATASOURCE_REMOTE_FILE.get(String.format("%s.reader", dsKey)); 158 dataSourceWriter = DATASOURCE_REMOTE_FILE.get(String.format("%s.writer", dsKey)); 159 configProperties = CONFIG_PROPS_REMOTE_FILE.get(dsKey); 160 if (dataSourceReader == null && dataSourceWriter == null) { 161 try { 162 if (!appRemoteFile.exists()) 163 throw new IOException(String.format("Not exist '%s'.", appRemoteFile.getAbsolutePath())); 164 165 configProperties = new ConfigProperties(); 166 Runnable updatedRun = new Runnable() { 167 @Override 168 public void run() { 169 try { 170 reload(); 171 LoggerFactory.getLogger(RemoteFile.class).mark("RemoteFile Reloaded"); 172 } catch (Exception e) { 173 log.error(e.getMessage(), e); 174 } 175 } 176 }; 177 configProperties.load(appRemoteFile, updatedRun); 178 179 CONFIG_PROPS_REMOTE_FILE.put(dsKey, configProperties); 180 181 String dataSourcePath = configProperties.getString("DataSource"); 182 dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath); 183 dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath); 184 reload(); 185 } catch (Exception e) { 186 log.error(e.getMessage(), e); 187 } 188 } 189 if (configProperties != null) { 190 String dataSourcePath = configProperties.getString("DataSource"); 191 dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath); 192 dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath); 193 reload(); 194 } 195 calculateDataMappingDataSourceWriter(this); 196 } 197 198 private synchronized void initGlobal() { 199 if(this.appRemoteFile == null){ 200 this.appRemoteFile = CommonTools.getSystemProperties(RemoteFile.class, "APP_REMOTE_FILE", 201 "RemoteFile.properties"); 202 } 203 init(); 204 } 205 206 private synchronized void reload() { 207 try { 208 LOGIC_TIMEOUT_MS = configProperties.getMilliSeconds("LogicTimeout", 300000L); 209 LOGIC_CHECK_TIMEOUT_MS = configProperties.getMilliSeconds("LogicCheckTimeout", 60000L); 210 MAX_FILE_SZIE = configProperties.getFileSize("MaxFileSize", 1024 * 1024 * 1L); 211 SHOW_HIDDEN = configProperties.getBoolean("ShowHidden", false); 212 SHOW_LINK = configProperties.getBoolean("ShowLink", false); 213 COPY_STRUCTURE_ONLY = configProperties.getBoolean("CopyStructureOnly", false); 214 MAX_QUEUE_SIZE = configProperties.getInteger("MaxQueueSize", 10); 215 BATCH_SIZE = configProperties.getInteger("BatchSize", 10); 216 MAX_CONNECT_USAGE = configProperties.getDouble("MaxConnectUsage", 0.5D); 217 MAX_MEMORY_USAGE = configProperties.getDouble("MaxMemoryUsage", 0.8D); 218 BEFORE_THE_QUEUE_TIME = configProperties.getMilliSeconds("BeforeTheQueueTime", 15000L); 219 MAX_FILE_PARENT_PATH_LENGTH = configProperties.getInteger("MaxFileParentPathLength", 200); 220 MAX_FILE_NAME_LENGTH = configProperties.getInteger("MaxFileNameLength", 200); 221 CACHE_SECONDS = configProperties.getInteger("CacheSeconds", 5); 222 223 SYNC_CUTOFF_TIME = configProperties.getDateTime("SyncCutoffTime","0000-01-01 00:00:00.0000"); 224 DELETE_CUTOFF_TIME = configProperties.getDateTime("DeleteCutoffTime","0000-01-01 00:00:00.0000"); 225 226 if(BEFORE_THE_QUEUE_TIME < 0) BEFORE_THE_QUEUE_TIME = 15000L; 227 228 if(MAX_CONNECT_USAGE < 0) MAX_CONNECT_USAGE = 0D; 229 230 if(MAX_MEMORY_USAGE < 0) MAX_MEMORY_USAGE = 0D; 231 232 if(BATCH_SIZE <= 0) BATCH_SIZE = 10; 233 234 if(MAX_QUEUE_SIZE <= 0) MAX_QUEUE_SIZE = 10; 235 236 if(MAX_FILE_SZIE <= 0) MAX_FILE_SZIE = 1024 * 1024 * 1L; 237 238 if(LOGIC_TIMEOUT_MS <= 0) LOGIC_TIMEOUT_MS = 900000L; 239 240 if(LOGIC_CHECK_TIMEOUT_MS < 0) LOGIC_CHECK_TIMEOUT_MS = 60000L; 241 242 if(MAX_FILE_PARENT_PATH_LENGTH <= 0) MAX_FILE_PARENT_PATH_LENGTH = 200; 243 244 if(MAX_FILE_NAME_LENGTH <= 0) MAX_FILE_NAME_LENGTH = 200; 245 246 if(CACHE_SECONDS < 0) CACHE_SECONDS = 0; 247 248 log.debug("LOGIC_TIMEOUT_MS={}", LOGIC_TIMEOUT_MS); 249 log.debug("LOGIC_CHECK_TIMEOUT_MS={}", LOGIC_CHECK_TIMEOUT_MS); 250 log.debug("MAX_FILE_SZIE={}", MAX_FILE_SZIE); 251 log.debug("SHOW_HIDDEN={}", SHOW_HIDDEN); 252 log.debug("SHOW_LINK={}", SHOW_LINK); 253 log.debug("COPY_STRUCTURE_ONLY={}", COPY_STRUCTURE_ONLY); 254 log.debug("MAX_QUEUE_SIZE={}", MAX_QUEUE_SIZE); 255 log.debug("BATCH_SIZE={}", BATCH_SIZE); 256 log.debug("MAX_CONNECT_USAGE={}", MAX_CONNECT_USAGE); 257 log.debug("MAX_MEMORY_USAGE={}", MAX_MEMORY_USAGE); 258 log.debug("MAX_FILE_PARENT_PATH_LENGTH={}", MAX_FILE_PARENT_PATH_LENGTH); 259 log.debug("MAX_FILE_NAME_LENGTH={}", MAX_FILE_NAME_LENGTH); 260 log.debug("CACHE_SECONDS={}", CACHE_SECONDS); 261 262 String dataSourcePath = configProperties.getString("DataSource"); 263 String reloadDataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath); 264 String reloadDataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath); 265 filePartSize = configProperties.getFileSize("FilePartSize", 1024 * 1024 * 1L).intValue(); 266 267 if(filePartSize <= 0) filePartSize = 1024 * 1024 * 1; 268 269 tableName = configProperties.getString("TableName", "remote_file"); 270 identityCardEnable = configProperties.getBoolean("IdentityCardEnable", false); 271 identityCardName = configProperties.getString("IdentityCardName", "RemoteFile"); 272 identityCardPrefix = configProperties.getString("IdentityCardPrefix", "RMF"); 273 identityCardSuffix = configProperties.getInteger("IdentityCardSuffix", 0); 274 275 String readerDsKey = String.format("%s.reader", dsKey); 276 String writerDsKey = String.format("%s.writer", dsKey); 277 /** 278 May be referenced when expansion is needed 279 if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) { 280 DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(readerDsKey); 281 if (currentDs != null) { 282 currentDs.closeAll(); 283 } 284 **/ 285 if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) { 286 dataSourceReaderPath = reloadDataSourceReaderPath; 287 dataSourceReader = new DriverDataSource(new File(dataSourceReaderPath)); 288 DATASOURCE_REMOTE_FILE.put(readerDsKey, dataSourceReader); 289 } 290 /** 291 May be referenced when expansion is needed 292 if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) { 293 DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(writerDsKey); 294 if (currentDs != null) { 295 currentDs.closeAll(); 296 } 297 **/ 298 if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) { 299 dataSourceWriterPath = reloadDataSourceWriterPath; 300 dataSourceWriter = new DriverDataSource(new File(dataSourceWriterPath)); 301 DATASOURCE_REMOTE_FILE.put(writerDsKey, dataSourceWriter); 302 } 303 304 loadDataMapping(this); 305 306 FIRST_LOADED = true; 307 } catch (Exception e) { 308 log.error(e.getMessage(), e); 309 } 310 } 311 312 public String formatSql(String sql) { 313 return String.format(sql, tableName); 314 } 315 316 public String formatSqlForFilePartData(String sql) { 317 return String.format(sql, getDataMappingTableName()); 318 } 319 320 public DiskFile writeToDisk(DiskFile diskFile) throws IOException { 321 if (!exists()) { 322 log.mark(String.format("The remote file '%s' does not exist.", this.getPath())); 323 } 324 RemoteFile the = this; 325 326 Timestamp rfModifyTime = the.getModifyTime(); 327 if (rfModifyTime != null) { 328 diskFile.setModifyTimeMsFromClock(rfModifyTime.getTime()); 329 } 330 if (isLink()) { 331 if (isDebug(the)) { 332 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", diskFile.getPath()); 333 } 334 diskFile.createLink(new String(readAllBytes(),BaseFile.CHARSET)); 335 } else if (isFile()) { 336 merge(new FilePart() { 337 final List<byte[]> partDataList = new ArrayList<byte[]>(); 338 boolean isWritingDf = false; 339 DiskFile realDf = null; 340 341 @Override 342 protected void process(int partIndex, byte[] data) throws IOException { 343 Thread.currentThread() 344 .setName(String.format("RemoteFile-writeToDisk-merge-%s", diskFile.getOrigin().getName())); 345 if (syncRoot != null) { 346 if (!diskFile.isLogicModify()) { 347 throw new IOException( 348 String.format("The file '%s' missing modify logic lock.", the.getPath())); 349 } 350 } 351 if (diskFile != null) 352 diskFile.logicModify(); 353 try { 354 String fileName = diskFile.getOrigin().getName(); 355 isWritingDf = fileName.matches(TMP_WRITING_SWP); 356 if (isWritingDf) { 357 if (partIndex == 0) { 358 if (diskFile.exists()) { 359 diskFile.delete(); 360 } 361 if (isDebug(the)) { 362 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}", 363 diskFile.getPath()); 364 } 365 String realFileName = fileName.replaceFirst(BaseFile.TMP_WRITING_FOR_REPLACE, ""); 366 String realDfPath = String.format("%s/%s", diskFile.getParent(), realFileName); 367 realDf = new DiskFile(realDfPath); 368 realDf.copyAttrs(realDf, diskFile.isCopyStructureOnly(), diskFile.syncRoot); 369 if(!diskFile.manualOpen(true)){ 370 throw new IOException( 371 String.format("The disk file '%s' cannot open.", diskFile.getPath())); 372 } 373 } 374 partDataList.add(decrypt(data)); 375 if (partDataList.size() >= BATCH_SIZE) { 376 377 if (realDf != null) 378 realDf.logicModify(); 379 380 ByteArrayOutputStream ops = new ByteArrayOutputStream(); 381 int size = partDataList.size(); 382 for (int i = 0; i < size; i++) { 383 byte[] partData = partDataList.get(i); 384 ops.write(partData); 385 } 386 diskFile.manualWrite(ops.toByteArray()); 387 partDataList.clear(); 388 } 389 } else { 390 if (partIndex == 0) { 391 if (diskFile.exists()) { 392 throw new IOException( 393 String.format("The remote file '%s' already exist.", diskFile.getPath())); 394 } 395 if (isDebug(the)) { 396 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}", 397 diskFile.getPath()); 398 } 399 if(!diskFile.manualOpen(true)){ 400 throw new IOException( 401 String.format("The disk file '%s' cannot open.", diskFile.getPath())); 402 } 403 } 404 partDataList.add(decrypt(data)); 405 if (partDataList.size() >= BATCH_SIZE) { 406 ByteArrayOutputStream ops = new ByteArrayOutputStream(); 407 int size = partDataList.size(); 408 for (int i = 0; i < size; i++) { 409 byte[] partData = partDataList.get(i); 410 ops.write(partData); 411 } 412 diskFile.manualWrite(ops.toByteArray()); 413 partDataList.clear(); 414 } 415 } 416 417 } catch (Exception e) { 418 log.warn(e.getMessage(), e); 419 if (diskFile != null) { 420 diskFile.manualClose(); 421 diskFile.delete(); 422 } 423 throw new IOException(e.getMessage(), e); 424 } 425 } 426 427 @Override 428 protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) throws IOException { 429 if (partDataList.size() > 0) { 430 ByteArrayOutputStream ops = new ByteArrayOutputStream(); 431 int size = partDataList.size(); 432 for (int i = 0; i < size; i++) { 433 byte[] partData = partDataList.get(i); 434 ops.write(partData); 435 } 436 diskFile.manualWrite(ops.toByteArray()); 437 partDataList.clear(); 438 } 439 diskFile.manualClose(); 440 if (syncRoot != null && !diskFile.isLogicModify()) { 441 diskFile.delete(); 442 throw new IOException( 443 String.format("The file '%s' missing modify logic lock.", diskFile.getPath())); 444 } 445 try { 446 if (isWritingDf) { 447 if (diskFile.exists()) { 448 diskFile.moveTo(realDf.getPath()); 449 if (isDebug(the)) { 450 LoggerFactory.getLogger(RemoteFile.class).mark("CompletedDiskFile - {}", 451 realDf.getPath()); 452 } 453 } 454 if (realDf.exists()) 455 the.syncedToDisk(); 456 } 457 if (realDf == null) { 458 if (diskFile != null) { 459 if (diskFile.getOrigin().getName().startsWith(".")) { 460 Files.setAttribute(Paths.get(diskFile.getPath()), "dos:hidden", true); 461 } 462 } 463 } else { 464 if (realDf.getOrigin().getName().startsWith(".")) { 465 Files.setAttribute(Paths.get(realDf.getPath()), "dos:hidden", true); 466 } 467 } 468 469 RemoteFile.removeQueuePathMapping(the.getPath()); 470 471 if (diskFile != null) 472 diskFile.removeLogicModify(); 473 474 if (realDf != null) 475 realDf.removeLogicModify(); 476 477 } catch (Exception e) { 478 log.error(e.getMessage(), e); 479 } 480 } 481 482 @Override 483 protected void ended(int lastPartIndex, long fileSize) { 484 try { 485 if (diskFile != null) { 486 diskFile.manualClose(); 487 } 488 } catch (Exception e) { 489 log.error(e.getMessage(), e); 490 } 491 } 492 }); 493 } else { 494 List<Map<String, Object>> writeList = list(getPath(), "%"); 495 writeList.addAll(list(parsePath(getPath() + "/%"), "%")); 496 497 int size = writeList.size(); 498 if (size > 0) { 499 if (!diskFile.exists()) { 500 if (isDebug(the)) { 501 log.mark("CreateDiskDir - {} ", diskFile.getPath()); 502 } 503 diskFile.mkdirs(); 504 } 505 } 506 507 for (Map<String, Object> item : writeList) { 508 RemoteFile rf = new RemoteFile( 509 String.format("%s/%s", item.get("file_parent_path"), item.get("file_name"))); 510 511 if (rf.getPath().equals(getPath())) 512 continue; 513 514 String dfPath = rf.getPath().replaceFirst(getPath(), diskFile.getPath()); 515 DiskFile df = new DiskFile(dfPath); 516 df.copyAttrs(df, false, this.syncRoot); 517 Timestamp mt = rf.getModifyTime(); 518 if (mt != null) { 519 df.setModifyTimeMsFromClock(mt.getTime()); 520 } 521 if (rf.isDir()) { 522 if (!df.exists()) { 523 if (isDebug(the)) { 524 log.mark("CreateDiskDir - {}", diskFile.getPath()); 525 } 526 df.mkdirs(); 527 } 528 } else { 529 rf.writeToDisk(df); 530 } 531 } 532 } 533 return diskFile; 534 } 535 536 public DiskFile writeToDisk() throws IOException { 537 DiskFile df = new DiskFile(this.origin.getAbsolutePath()); 538 df.copyAttrs(df, false, this.syncRoot); 539 return writeToDisk(df); 540 } 541 542 public DiskFile writeToDisk(String path) throws IOException { 543 DiskFile df = new DiskFile(path); 544 df.copyAttrs(df, false, this.syncRoot); 545 return writeToDisk(df); 546 } 547 548 public boolean isCompleted() throws IOException { 549 return getCompletedRecord() != null; 550 } 551 552 public Map<String, Object> getCompletedRecord() throws IOException { 553 String sql = formatSql(SQL_SELECT_FOR_COMPLETED); 554 Map dataRecord = new HashMap<String, Object>(); 555 dataRecord.put("file_name", origin.getName()); 556 dataRecord.put("file_parent_path", getParent()); 557 String cacheKey = String.format("first-%s-%s",sql,dataRecord); 558 Map<String,Object> record = (Map<String,Object>)Cache.get(cacheKey); 559 if(record == null){ 560 CacheDriverExecutor executor = null; 561 Connection conn = null; 562 try { 563 conn = dataSourceReader.getConnection(); 564 conn.setAutoCommit(true); 565 executor = new CacheDriverExecutor(conn); 566 record = executor.first(sql, dataRecord); 567 if(record != null){ 568 Cache.set(cacheKey,record,CACHE_SECONDS); 569 } 570 } catch (SQLException e) { 571 throw new IOException(e.getMessage(), e); 572 } finally { 573 try { 574 if (executor != null) 575 executor.close(); 576 } catch (Exception e) { 577 log.error(e.getMessage(), e); 578 } 579 } 580 } 581 return record; 582 } 583 584 public Map<String, Object> getRecordById(Object id) throws IOException { 585 String sql = formatSql(SQL_SELECT_GET_RECORD_BY_ID); 586 Map dataRecord = new HashMap<String, Object>(); 587 dataRecord.put("id", id); 588 Map<String, Object> record = null; 589 if (record == null) { 590 CacheDriverExecutor executor = null; 591 Connection conn = null; 592 try { 593 conn = dataSourceReader.getConnection(); 594 conn.setAutoCommit(true); 595 executor = new CacheDriverExecutor(conn); 596 record = executor.first(sql, dataRecord); 597 } catch (SQLException e) { 598 throw new IOException(e.getMessage(), e); 599 } finally { 600 try { 601 if (executor != null) 602 executor.close(); 603 } catch (Exception e) { 604 log.error(e.getMessage(), e); 605 } 606 } 607 } 608 return record; 609 } 610 611 public Map<String, Object> getRecordByFileId(Object fileId) throws IOException { 612 String sql = formatSql(SQL_SELECT_GET_RECORD_BY_FILE_ID); 613 Map dataRecord = new HashMap<String, Object>(); 614 dataRecord.put("file_id", fileId); 615 Map<String, Object> record = null; 616 if (record == null) { 617 CacheDriverExecutor executor = null; 618 Connection conn = null; 619 try { 620 conn = dataSourceReader.getConnection(); 621 conn.setAutoCommit(true); 622 executor = new CacheDriverExecutor(conn); 623 record = executor.first(sql, dataRecord); 624 } catch (SQLException e) { 625 throw new IOException(e.getMessage(), e); 626 } finally { 627 try { 628 if (executor != null) 629 executor.close(); 630 } catch (Exception e) { 631 log.error(e.getMessage(), e); 632 } 633 } 634 } 635 return record; 636 } 637 638 public Map<String, Object> getLastUpdateRecord() throws IOException { 639 String sql = formatSql(SQL_SELECT_FOR_LAST_UPDATE); 640 Map dataRecord = new HashMap<String, Object>(); 641 dataRecord.put("file_name", origin.getName()); 642 dataRecord.put("file_parent_path", getParent()); 643 Map<String, Object> record = null; 644 CacheDriverExecutor executor = null; 645 Connection conn = null; 646 try { 647 conn = dataSourceReader.getConnection(); 648 conn.setAutoCommit(true); 649 executor = new CacheDriverExecutor(conn); 650 record = executor.first(sql, dataRecord); 651 return record; 652 } catch (SQLException e) { 653 throw new IOException(e.getMessage(), e); 654 } finally { 655 try { 656 if (executor != null) 657 executor.close(); 658 } catch (Exception e) { 659 log.error(e.getMessage(), e); 660 } 661 } 662 } 663 664 protected Timestamp getFirstCreatedAt() throws IOException { 665 String sql = formatSql(SQL_SELECT_FOR_FIRST_CREATED_AT); 666 Map dataRecord = new HashMap<String, Object>(); 667 dataRecord.put("file_parent_path", getPath()); 668 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 669 dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 670 CacheDriverExecutor executor = null; 671 Connection conn = null; 672 try { 673 conn = dataSourceReader.getConnection(); 674 conn.setAutoCommit(true); 675 executor = new CacheDriverExecutor(conn); 676 Map<String, Object> record = executor.first(sql, dataRecord); 677 return record == null ? null : (Timestamp)record.get("created_at"); 678 } catch (SQLException e) { 679 throw new IOException(e.getMessage(), e); 680 } finally { 681 try { 682 if (executor != null) 683 executor.close(); 684 } catch (Exception e) { 685 log.error(e.getMessage(), e); 686 } 687 } 688 } 689 690 protected Integer getMaxFilePartDataTable() throws IOException { 691 String sql = formatSql(SQL_SELECT_FOR_MAX_DATA_TABLE); 692 Map dataRecord = new HashMap<String, Object>(); 693 dataRecord.put("file_parent_path", getPath()); 694 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 695 CacheDriverExecutor executor = null; 696 Connection conn = null; 697 try { 698 conn = dataSourceReader.getConnection(); 699 conn.setAutoCommit(true); 700 executor = new CacheDriverExecutor(conn); 701 Map<String, Object> record = executor.first(sql, dataRecord); 702 return record == null ? 0 : Integer.parseInt(record.get("file_part_data_table")+""); 703 } catch (SQLException e) { 704 throw new IOException(e.getMessage(), e); 705 } finally { 706 try { 707 if (executor != null) 708 executor.close(); 709 } catch (Exception e) { 710 log.error(e.getMessage(), e); 711 } 712 } 713 } 714 715 private Map<String, Object> getSelectByPathForCheckExists() throws IOException { 716 String sql = formatSql(SQL_SELECT_BY_PATH_FOR_CHECK_EXISTS); 717 Map dataRecord = new HashMap<String, Object>(); 718 dataRecord.put("file_name", origin.getName()); 719 dataRecord.put("file_parent_path", getParent()); 720 Map<String, Object> record = null; 721 if (record == null) { 722 CacheDriverExecutor executor = null; 723 Connection conn = null; 724 try { 725 conn = dataSourceReader.getConnection(); 726 conn.setAutoCommit(true); 727 executor = new CacheDriverExecutor(conn); 728 record = executor.first(sql, dataRecord); 729 return record; 730 } catch (SQLException e) { 731 throw new IOException(e.getMessage(), e); 732 } finally { 733 try { 734 if (executor != null) 735 executor.close(); 736 } catch (Exception e) { 737 log.error(e.getMessage(), e); 738 } 739 } 740 } 741 return record; 742 } 743 744 private int countAllSubFiles(String fileParentPath, String fileName) throws IOException { 745 String sql = formatSql(SQL_SELECT_FOR_COUNT_ALL_SUB_FILES); 746 Map dataRecord = new HashMap<String, Object>(); 747 dataRecord.put("file_name", fileName); 748 dataRecord.put("file_parent_path", fileParentPath); 749 Map<String, Object> record = null; 750 if (record == null) { 751 CacheDriverExecutor executor = null; 752 Connection conn = null; 753 try { 754 conn = dataSourceReader.getConnection(); 755 conn.setAutoCommit(true); 756 executor = new CacheDriverExecutor(conn); 757 record = executor.first(sql, dataRecord); 758 return Integer.parseInt(record.get("as_count") + ""); 759 } catch (SQLException e) { 760 throw new IOException(e.getMessage(), e); 761 } finally { 762 try { 763 if (executor != null) 764 executor.close(); 765 } catch (Exception e) { 766 log.error(e.getMessage(), e); 767 } 768 } 769 } 770 return -1; 771 } 772 773 private Map<String, Object> getFirstFilePart() throws IOException { 774 String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE); 775 Map dataRecord = new HashMap<String, Object>(); 776 dataRecord.put("file_name", origin.getName()); 777 dataRecord.put("file_parent_path", getParent()); 778 String cacheKey = String.format("first-%s-%s",sql,dataRecord); 779 Map<String, Object> record = (Map<String,Object>)Cache.get(cacheKey); 780 if(record == null){ 781 CacheDriverExecutor executor = null; 782 Connection conn = null; 783 try { 784 conn = dataSourceReader.getConnection(); 785 conn.setAutoCommit(true); 786 executor = new CacheDriverExecutor(conn); 787 record = executor.first(sql, dataRecord); 788 if(record != null){ 789 Cache.set(cacheKey,record,CACHE_SECONDS); 790 } 791 } catch (SQLException e) { 792 throw new IOException(e.getMessage(), e); 793 } finally { 794 try { 795 if (executor != null) 796 executor.close(); 797 } catch (Exception e) { 798 log.error(e.getMessage(), e); 799 } 800 } 801 } 802 return record; 803 } 804 805 private Map<String, Object> getFirstFilePartRefresh() throws IOException { 806 String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE); 807 Map dataRecord = new HashMap<String, Object>(); 808 dataRecord.put("file_name", origin.getName()); 809 dataRecord.put("file_parent_path", getParent()); 810 String cacheKey = String.format("first-%s-%s",sql,dataRecord); 811 Cache.remove(cacheKey); 812 CacheDriverExecutor executor = null; 813 Connection conn = null; 814 try { 815 conn = dataSourceReader.getConnection(); 816 conn.setAutoCommit(true); 817 executor = new CacheDriverExecutor(conn); 818 Map<String,Object> record = executor.first(sql, dataRecord); 819 if(record != null){ 820 Cache.set(cacheKey,record,CACHE_SECONDS); 821 } 822 return record; 823 } catch (SQLException e) { 824 throw new IOException(e.getMessage(), e); 825 } finally { 826 try { 827 if (executor != null) 828 executor.close(); 829 } catch (Exception e) { 830 log.error(e.getMessage(), e); 831 } 832 } 833 } 834 835 private Map<String, Object> getLastIndexRecord() throws IOException { 836 String sql = formatSql(SQL_SELECT_FOR_LAST_FILE_PART); 837 Map dataRecord = new HashMap<String, Object>(); 838 dataRecord.put("file_name", origin.getName()); 839 dataRecord.put("file_parent_path", getParent()); 840 dataRecord.put("file_id", fileId); 841 Map<String, Object> record = null; 842 if (record == null) { 843 CacheDriverExecutor executor = null; 844 Connection conn = null; 845 try { 846 conn = dataSourceReader.getConnection(); 847 conn.setAutoCommit(true); 848 executor = new CacheDriverExecutor(conn); 849 record = executor.first(sql, dataRecord); 850 } catch (SQLException e) { 851 throw new IOException(e.getMessage(), e); 852 } finally { 853 try { 854 if (executor != null) 855 executor.close(); 856 } catch (Exception e) { 857 log.error(e.getMessage(), e); 858 } 859 } 860 } 861 return record; 862 } 863 864 public boolean isTimeout(long timeoutMs) throws IOException { 865 CacheDriverExecutor executor = null; 866 Connection conn = null; 867 try { 868 Clock clock = new Clock(); 869 conn = dataSourceReader.getConnection(); 870 conn.setAutoCommit(true); 871 executor = new CacheDriverExecutor(conn); 872 Map dataRecord = new HashMap<String, Object>(); 873 dataRecord.put("file_name", origin.getName()); 874 dataRecord.put("file_parent_path", getParent()); 875 Map<String, Object> completedRecord = getCompletedRecord(); 876 877 if (completedRecord != null) 878 return false; 879 880 Map<String, Object> record = executor.first(formatSql(SQL_SELECT_FOR_TIMEOUT), dataRecord); 881 882 if (record == null) 883 return false; 884 885 boolean completed = executor.first(formatSql(SQL_SELECT_CHECK_COMPLETED), record) != null; 886 if (!completed) { 887 boolean timeout = (clock.getTime() - ((Timestamp) record.get("updated_at")).getTime()) > timeoutMs; 888 889 return timeout; 890 } 891 return false; 892 } catch (SQLException e) { 893 throw new IOException(e.getMessage(), e); 894 } finally { 895 try { 896 if (executor != null) 897 executor.close(); 898 } catch (Exception e) { 899 log.error(e.getMessage(), e); 900 } 901 } 902 } 903 904 private Map<String, Object> readPart(Map<String, Object> rfRecord) throws IOException { 905 CacheDriverExecutor executor = null; 906 Connection conn = null; 907 try { 908 calculateDataMappingDataSourceReader(this, rfRecord); 909 if(this.dataMappingTable <= -1){ 910 return null; 911 } 912 Clock clock = new Clock(); 913 conn = dataMappingDataSourceReader.getConnection(); 914 conn.setAutoCommit(true); 915 executor = new CacheDriverExecutor(conn); 916 Map<String, Object> dataRecord = new HashMap<String, Object>(); 917 dataRecord.put("remote_file_id", rfRecord.get("id")); 918 return executor.first(formatSqlForFilePartData(SQL_SELECT_BY_RF_ID_FROM_FILE_DATA), dataRecord); 919 } catch (SQLException e) { 920 throw new IOException(e.getMessage(), e); 921 } finally { 922 try { 923 if (executor != null) 924 executor.close(); 925 } catch (Exception e) { 926 log.error(e.getMessage(), e); 927 } 928 } 929 } 930 931 public void merge(FilePart filePart) throws IOException { 932 if (isLink()) { 933 throw new IOException(String.format("The file '%s' is a link.", origin.getAbsolutePath())); 934 } 935 if (isDir()) { 936 throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath())); 937 } 938 CacheDriverExecutor executor = null; 939 Connection conn = null; 940 try { 941 Clock clock = new Clock(); 942 conn = dataSourceReader.getConnection(); 943 conn.setAutoCommit(true); 944 executor = new CacheDriverExecutor(conn); 945 Map dataRecord = getCompletedRecord(); 946 if (dataRecord != null) { 947 CacheArray rows = new CacheArray(); 948 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 949 long fileSize = 0L; 950 Map<String, Object> fp = null; 951 Map<String, Object> item = null; 952 953 @Override 954 public void execute(Integer index, Object o) { 955 956 try { 957 item = (Map<String, Object>) o; 958 fp = readPart(item); 959 Object fpData = null; 960 961 if(fp == null){ 962 fpData = new byte[]{}; 963 }else{ 964 fpData = fp.get("file_part_data"); 965 } 966 Thread.currentThread() 967 .setName(String.format("RemoteFile-merge-%s", item.get("file_name"))); 968 byte[] partData = null; 969 if (fpData instanceof byte[]) { 970 partData = (byte[]) fpData; 971 } 972 fileSize += partData.length; 973 filePart.process(index, partData); 974 } catch (Exception e) { 975 log.error(e.getMessage(), e); 976 this.terminated = true; 977 } 978 } 979 980 @Override 981 public void completed(Integer size) { 982 try { 983 if (item != null) { 984 Thread.currentThread() 985 .setName(String.format("RemoteFile-merge-completed-%s", item.get("file_name"))); 986 } 987 filePart.completed(size, new Clock().getSqlTimestamp(), fileSize); 988 } catch (IOException e) { 989 log.error(e.getMessage(), e); 990 } 991 } 992 993 @Override 994 public void terminated() { 995 if (item != null) { 996 Thread.currentThread() 997 .setName(String.format("RemoteFile-merge-trminated-%s", item.get("file_name"))); 998 log.error("Merge terminated: {}/{}", item.get("file_parent_path"), item.get("file_name")); 999 } 1000 } 1001 }; 1002 rows.filter(filter); 1003 executor.find(formatSql(SQL_SELECT_FILE_HAVE_DATA), dataRecord, rows); 1004 } 1005 } catch (SQLException e) { 1006 throw new IOException(e.getMessage(), e); 1007 } finally { 1008 if (executor != null) { 1009 try { 1010 executor.close(); 1011 } catch (Exception e) { 1012 throw new IOException(e.getMessage(), e); 1013 } 1014 } 1015 } 1016 } 1017 1018 protected void copyFrom(boolean copyStructureOnly, DiskFile diskFile) throws IOException { 1019 var the = this; 1020 if (diskFile.isLink()) { 1021 the.setModifyTime(diskFile.getModifyTimeForClock()); 1022 the.createLink(diskFile.readLink()); 1023 } 1024 if (diskFile.isDir()) { 1025 the.setModifyTime(diskFile.getModifyTimeForClock()); 1026 the.mkdirs(); 1027 } 1028 if (diskFile.isFile()) { 1029 the.setModifyTime(diskFile.getModifyTimeForClock()); 1030 if (copyStructureOnly) { 1031 the.createStructureFile(diskFile.size()); 1032 diskFile.removeLogicAccess(); 1033 DiskFile.removeQueuePathMapping(diskFile.getPath()); 1034 } else { 1035 diskFile.split(filePartSize, new FilePart() { 1036 final List<byte[]> partDataList = new ArrayList<byte[]>(); 1037 int currentFilePartStartIndex = 0; 1038 1039 @Override 1040 protected void process(int partIndex, byte[] data) throws IOException { 1041 if (partIndex == 0) { 1042 if (diskFile.syncRoot != null) { 1043 if (diskFile.isLogicModify()) { 1044 throw new IOException( 1045 String.format("The file '%s' is already logic locked by another thread.", 1046 diskFile.getPath())); 1047 } 1048 if (!diskFile.isLogicAccess()) { 1049 throw new IOException(String.format("The file '%s' missing access logic lock.", 1050 diskFile.getPath())); 1051 } 1052 } 1053 if (the.exists(true)) { 1054 throw new IOException( 1055 String.format("The remote file '%s' already exist.", the.getPath())); 1056 } 1057 if (isDebug(the)) { 1058 LoggerFactory.getLogger(RemoteFile.class).mark("CreateRemoteFile - {}", the.getPath()); 1059 } 1060 } 1061 1062 partDataList.add(data); 1063 if (partDataList.size() >= BATCH_SIZE) { 1064 diskFile.logicAccess(); 1065 if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){ 1066 throw new IOException(String.format("The remote file '%s' write failed.",the.getPath())); 1067 } 1068 partDataList.clear(); 1069 currentFilePartStartIndex += BATCH_SIZE; 1070 } 1071 } 1072 1073 @Override 1074 protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) 1075 throws IOException { 1076 try { 1077 if (diskFile.syncRoot != null) { 1078 if (diskFile.isLogicModify()) { 1079 throw new IOException( 1080 String.format("The file '%s' is already logic locked by another thread.", 1081 diskFile.getPath())); 1082 } 1083 if (!diskFile.isLogicAccess()) { 1084 throw new IOException(String.format("The file '%s' missing access logic lock.", 1085 diskFile.getPath())); 1086 } 1087 } 1088 /**This is 0 bytes file**/ 1089 if (lastPartIndex == -1) { 1090 1091 if (the.exists(true)) { 1092 the.forceDeleteFile(false); 1093 } 1094 1095 the.setModifyTime(modifyTime); 1096 the.createEmptyFile(); 1097 } else { 1098 the.setModifyTime(modifyTime); 1099 if (partDataList.size() > 0) { 1100 if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){ 1101 throw new IOException(String.format("The remote file '%s' write failed.",the.getPath())); 1102 } 1103 partDataList.clear(); 1104 } 1105 the.complete(lastPartIndex, fileSize); 1106 } 1107 if (isDebug(the)) { 1108 LoggerFactory.getLogger(RemoteFile.class).mark("CompletedRemoteFile - {}", 1109 the.getPath()); 1110 } 1111 DiskFile.removeQueuePathMapping(diskFile.getPath()); 1112 } catch (IOException e) { 1113 throw e; 1114 } 1115 } 1116 1117 @Override 1118 protected void ended(int lastPartIndex, long fileSize) { 1119 1120 } 1121 1122 }); 1123 } 1124 } 1125 } 1126 1127 private boolean createEmptyFile() throws IOException { 1128 boolean allowed = beforeWrite(); 1129 1130 if(!allowed) return false; 1131 1132 Clock clock = new Clock(); 1133 Timestamp currentTime = clock.getSqlTimestamp(); 1134 CacheDriverExecutor executor = null; 1135 Connection conn = null; 1136 try { 1137 calculateDataMappingDataSourceWriter(this); 1138 conn = dataSourceWriter.getConnection(); 1139 conn.setAutoCommit(true); 1140 executor = new CacheDriverExecutor(conn); 1141 Map dataRecord = new HashMap<String, Object>(); 1142 dataRecord.put("id", 1143 generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix)); 1144 dataRecord.put("file_name", origin.getName()); 1145 dataRecord.put("file_parent_path", getParent()); 1146 dataRecord.put("file_type", "F"); 1147 dataRecord.put("file_part_index", 0); 1148 dataRecord.put("file_part_size", 0); 1149 dataRecord.put("file_part_data_ds", "NONE"); 1150 dataRecord.put("file_part_data_table", this.dataMappingTable); 1151 dataRecord.put("file_part_last_index", 0); 1152 dataRecord.put("file_size", 0); 1153 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1154 dataRecord.put("file_deleted", 0); 1155 dataRecord.put("created_user_id", getDefaultModifyUserId()); 1156 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1157 dataRecord.put("created_at", currentTime); 1158 dataRecord.put("updated_at", currentTime); 1159 dataRecord.put("file_id", fileId); 1160 dataRecord.put("source_hostname", CommonTools.getHostname()); 1161 dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1162 dataRecord.put("deleted_on_hostnames", ";"); 1163 executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord); 1164 1165 afterWrite(); 1166 1167 return true; 1168 } catch (Exception e) { 1169 throw new IOException(e.getMessage(), e); 1170 } finally { 1171 try { 1172 if (executor != null) 1173 executor.close(); 1174 } catch (Exception e) { 1175 log.error(e.getMessage(), e); 1176 } 1177 } 1178 } 1179 1180 private boolean createStructureFile(long fileSize) throws IOException { 1181 boolean allowed = beforeWrite(); 1182 1183 if(!allowed) return false; 1184 1185 log.debug("CreateStructureFile: {}", getPath()); 1186 Clock clock = new Clock(); 1187 Timestamp currentTime = clock.getSqlTimestamp(); 1188 CacheDriverExecutor executor = null; 1189 Connection conn = null; 1190 try { 1191 calculateDataMappingDataSourceWriter(this); 1192 conn = dataSourceWriter.getConnection(); 1193 conn.setAutoCommit(true); 1194 executor = new CacheDriverExecutor(conn); 1195 Map dataRecord = new HashMap<String, Object>(); 1196 dataRecord.put("id", 1197 generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix)); 1198 dataRecord.put("file_name", origin.getName()); 1199 dataRecord.put("file_parent_path", getParent()); 1200 dataRecord.put("file_type", "F"); 1201 dataRecord.put("file_part_index", 0); 1202 dataRecord.put("file_part_size", fileSize); 1203 dataRecord.put("file_part_data_ds", "NONE"); 1204 dataRecord.put("file_part_data_table", this.dataMappingTable); 1205 dataRecord.put("file_part_last_index", 0); 1206 dataRecord.put("file_size", fileSize); 1207 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1208 dataRecord.put("file_deleted", 0); 1209 dataRecord.put("created_user_id", getDefaultModifyUserId()); 1210 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1211 dataRecord.put("created_at", currentTime); 1212 dataRecord.put("updated_at", currentTime); 1213 dataRecord.put("file_id", fileId); 1214 dataRecord.put("source_hostname", CommonTools.getHostname()); 1215 dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1216 dataRecord.put("deleted_on_hostnames", ";"); 1217 executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord); 1218 1219 afterWrite(); 1220 1221 return true; 1222 } catch (Exception e) { 1223 throw new IOException(e.getMessage(), e); 1224 } finally { 1225 try { 1226 if (executor != null) 1227 executor.close(); 1228 } catch (Exception e) { 1229 log.error(e.getMessage(), e); 1230 } 1231 } 1232 } 1233 1234 protected boolean syncedToDisk() throws IOException { 1235 Clock clock = new Clock(); 1236 CacheDriverExecutor executor = null; 1237 Connection conn = null; 1238 try { 1239 conn = dataSourceWriter.getConnection(); 1240 conn.setAutoCommit(true); 1241 executor = new CacheDriverExecutor(conn); 1242 Map<String, Object> record = getCompletedRecord(); 1243 if (record == null) { 1244 log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath())); 1245 } 1246 if (record != null) { 1247 String hostname = CommonTools.getHostname(); 1248 String syncedOnHostnames = (String) record.get("synced_on_hostnames"); 1249 syncedOnHostnames = syncedOnHostnames.replaceAll(String.format(";%s;",hostname),";"); 1250 String shn = String.format("%s%s;", syncedOnHostnames, hostname); 1251 Map<String, Object> dataRecord = new HashMap<String, Object>(); 1252 dataRecord.put("file_parent_path",record.get("file_parent_path")); 1253 dataRecord.put("file_name", record.get("file_name")); 1254 dataRecord.put("synced_on_hostnames",shn); 1255 dataRecord.put("updated_at", clock.getSqlTimestamp()); 1256 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1257 executor.execute(formatSql(SQL_UPDATE_FOR_SYNCED_TO_DISK), dataRecord); 1258 return true; 1259 } 1260 return false; 1261 } catch (SQLException e) { 1262 throw new IOException(e.getMessage(), e); 1263 } finally { 1264 try { 1265 if (executor != null) 1266 executor.close(); 1267 } catch (Exception e) { 1268 log.error(e.getMessage(), e); 1269 } 1270 } 1271 } 1272 1273 protected boolean deletedToDiskAndSub(Object id) throws IOException { 1274 Clock clock = new Clock(); 1275 CacheDriverExecutor executor = null; 1276 Connection conn = null; 1277 try { 1278 conn = dataSourceWriter.getConnection(); 1279 conn.setAutoCommit(true); 1280 executor = new CacheDriverExecutor(conn); 1281 Map<String, Object> record = getRecordById(id); 1282 if (record == null) { 1283 log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath())); 1284 } 1285 if (record != null) { 1286 String hostname = CommonTools.getHostname(); 1287 String deletedOnHostnames = (String) record.get("deleted_on_hostnames"); 1288 deletedOnHostnames = deletedOnHostnames.replaceAll(String.format(";%s;",hostname),";"); 1289 String dhn = String.format("%s%s;", deletedOnHostnames, hostname); 1290 Timestamp now = clock.getSqlTimestamp(); 1291 final Map<String, Object> dataRecord = new HashMap<String, Object>(); 1292 dataRecord.put("file_parent_path", record.get("file_parent_path")); 1293 dataRecord.put("file_name", record.get("file_name")); 1294 dataRecord.put("deleted_on_hostnames", dhn); 1295 dataRecord.put("updated_at", now); 1296 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1297 executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK), dataRecord); 1298 if (record.get("file_type").equals("D")) { 1299 String dirPath = String.format("%s/%s", record.get("file_parent_path"), 1300 record.get("file_name")); 1301 dataRecord.put("file_parent_path", dirPath); 1302 dataRecord.put("like_file_parent_path", dirPath + "/%"); 1303 executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK_SUB_FILES), dataRecord); 1304 } 1305 return true; 1306 } 1307 return false; 1308 } catch (SQLException e) { 1309 throw new IOException(e.getMessage(), e); 1310 } finally { 1311 try { 1312 if (executor != null) 1313 executor.close(); 1314 } catch (Exception e) { 1315 log.error(e.getMessage(), e); 1316 } 1317 } 1318 } 1319 1320 private boolean complete(int filePartLastIndex, long fileSize) throws IOException { 1321 Clock clock = new Clock(); 1322 CacheDriverExecutor executor = null; 1323 Timestamp currentTime = clock.getSqlTimestamp(); 1324 Connection conn = null; 1325 try { 1326 conn = dataSourceWriter.getConnection(); 1327 conn.setAutoCommit(true); 1328 executor = new CacheDriverExecutor(conn); 1329 Map dataRecord = new HashMap<String, Object>(); 1330 dataRecord.put("file_id", fileId); 1331 dataRecord.put("file_part_last_index", filePartLastIndex); 1332 dataRecord.put("file_size", fileSize); 1333 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1334 dataRecord.put("updated_at", currentTime); 1335 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1336 int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord); 1337 1338 afterWrite(); 1339 1340 return rows > 0; 1341 } catch (SQLException e) { 1342 throw new IOException(e.getMessage(), e); 1343 } finally { 1344 try { 1345 if (executor != null) 1346 executor.close(); 1347 } catch (Exception e) { 1348 log.error(e.getMessage(), e); 1349 } 1350 } 1351 } 1352 1353 @Override 1354 public boolean complete() throws IOException { 1355 Clock clock = new Clock(); 1356 Timestamp currentTime = clock.getSqlTimestamp(); 1357 CacheDriverExecutor executor = null; 1358 Connection conn = null; 1359 try { 1360 conn = dataSourceWriter.getConnection(); 1361 conn.setAutoCommit(true); 1362 executor = new CacheDriverExecutor(conn); 1363 Map dataRecord = new HashMap<String, Object>(); 1364 dataRecord.put("file_name", origin.getName()); 1365 dataRecord.put("file_parent_path", getParent()); 1366 dataRecord.put("file_id", fileId); 1367 1368 Map<String, Object> sumRecord = executor.first(formatSql(SQL_SELECT_FOR_SUM_FILE_PART_SIZE_INDEX), 1369 dataRecord); 1370 1371 if (sumRecord == null) { 1372 log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath())); 1373 } 1374 1375 if (sumRecord != null) { 1376 long fileSize = Long.parseLong(sumRecord.get("file_part_size") + ""); 1377 long filePartLastIndex = Integer.parseInt(sumRecord.get("file_part_index") + ""); 1378 1379 dataRecord.put("file_part_last_index", filePartLastIndex); 1380 dataRecord.put("file_size", fileSize); 1381 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1382 dataRecord.put("updated_at", currentTime); 1383 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1384 int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord); 1385 1386 afterWrite(); 1387 1388 return rows > 0; 1389 } 1390 return false; 1391 } catch (SQLException e) { 1392 throw new IOException(e.getMessage(), e); 1393 } finally { 1394 try { 1395 if (executor != null) 1396 executor.close(); 1397 } catch (Exception e) { 1398 log.error(e.getMessage(), e); 1399 } 1400 } 1401 } 1402 1403 public boolean forceDelete() throws IOException { 1404 return forceDeleteAll(true); 1405 } 1406 1407 public boolean forceDeleteDir(boolean realDeleted) throws IOException { 1408 return forceDelete(realDeleted, "D"); 1409 } 1410 1411 public boolean forceDeleteLink(boolean realDeleted) throws IOException { 1412 return forceDelete(realDeleted, "L"); 1413 } 1414 1415 public boolean forceDeleteFile(boolean realDeleted) throws IOException { 1416 return forceDelete(realDeleted, "F"); 1417 } 1418 1419 public boolean forceDelete(boolean realDeleted, String fileType) throws IOException { 1420 if (fileType.equalsIgnoreCase("D")) { 1421 return forceDeleteAll(realDeleted); 1422 } else { 1423 boolean allowed = beforeDelete(realDeleted); 1424 if (allowed) { 1425 CacheDriverExecutor executor = null; 1426 Connection conn = null; 1427 try { 1428 Clock clock = new Clock(); 1429 Timestamp currentTime = clock.getSqlTimestamp(); 1430 conn = dataSourceWriter.getConnection(); 1431 conn.setAutoCommit(true); 1432 executor = new CacheDriverExecutor(conn); 1433 Map dataRecord = new HashMap<String, Object>(); 1434 dataRecord.put("file_deleted", realDeleted ? 1 : 0); 1435 dataRecord.put("deleted", realDeleted ? 0 : 1); 1436 dataRecord.put("file_name", origin.getName()); 1437 dataRecord.put("file_type", fileType.toUpperCase()); 1438 dataRecord.put("file_parent_path", getParent()); 1439 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1440 dataRecord.put("deleted_user_id", getDefaultModifyUserId()); 1441 dataRecord.put("updated_at", currentTime); 1442 dataRecord.put("deleted_at", currentTime); 1443 dataRecord.put("root_folder", getPath()); 1444 dataRecord.put("like_subfolder", parsePath(getPath() + "/%")); 1445 int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_FILE_LINK), dataRecord); 1446 log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows); 1447 1448 afterDelete(realDeleted); 1449 1450 return resultRows > 0; 1451 } catch (SQLException e) { 1452 throw new IOException(e.getMessage(), e); 1453 } finally { 1454 try { 1455 if (executor != null) 1456 executor.close(); 1457 } catch (Exception e) { 1458 log.error(e.getMessage(), e); 1459 } 1460 } 1461 } 1462 } 1463 return false; 1464 } 1465 1466 public boolean forceDeleteAll(boolean realDeleted) throws IOException { 1467 boolean allowed = beforeDelete(realDeleted); 1468 if (allowed) { 1469 CacheDriverExecutor executor = null; 1470 Connection conn = null; 1471 try { 1472 Clock clock = new Clock(); 1473 Timestamp currentTime = clock.getSqlTimestamp(); 1474 conn = dataSourceWriter.getConnection(); 1475 conn.setAutoCommit(true); 1476 executor = new CacheDriverExecutor(conn); 1477 Map dataRecord = new HashMap<String, Object>(); 1478 dataRecord.put("file_deleted", realDeleted ? 1 : 0); 1479 dataRecord.put("deleted", realDeleted ? 0 : 1); 1480 dataRecord.put("file_name", origin.getName()); 1481 dataRecord.put("file_parent_path", getParent()); 1482 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1483 dataRecord.put("deleted_user_id", getDefaultModifyUserId()); 1484 dataRecord.put("updated_at", currentTime); 1485 dataRecord.put("deleted_at", currentTime); 1486 dataRecord.put("root_folder", getPath()); 1487 dataRecord.put("like_subfolder", parsePath(getPath() + "/%")); 1488 int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_ALL), dataRecord); 1489 log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows); 1490 1491 afterDelete(realDeleted); 1492 1493 return resultRows > 0; 1494 } catch (SQLException e) { 1495 throw new IOException(e.getMessage(), e); 1496 } finally { 1497 try { 1498 if (executor != null) 1499 executor.close(); 1500 } catch (Exception e) { 1501 log.error(e.getMessage(), e); 1502 } 1503 } 1504 } 1505 return false; 1506 } 1507 1508 @Override 1509 public boolean delete() throws IOException { 1510 return forceDeleteAll(true); 1511 } 1512 1513 public boolean delete(boolean realDeleted) throws IOException { 1514 return forceDeleteAll(realDeleted); 1515 } 1516 1517 @Override 1518 public boolean beforeDelete(boolean realDeleted) { 1519 return true; 1520 } 1521 1522 @Override 1523 public void afterDelete(boolean realDeleted) { 1524 1525 } 1526 1527 @Override 1528 public boolean beforeMkdirs() { 1529 return true; 1530 } 1531 1532 @Override 1533 public void afterMkdirs() { 1534 1535 } 1536 1537 @Override 1538 public boolean beforeCreateLink(String target) { 1539 return true; 1540 } 1541 1542 @Override 1543 public void afterCreateLink(String target) { 1544 1545 } 1546 1547 @Override 1548 public boolean beforeWrite(){ 1549 return true; 1550 } 1551 1552 @Override 1553 public void afterWrite(){ 1554 1555 } 1556 1557 @Override 1558 public boolean exists() throws IOException { 1559 return getFirstFilePart() != null; 1560 } 1561 1562 public boolean exists(boolean refresh) throws IOException { 1563 if(refresh){ 1564 return getFirstFilePartRefresh() != null; 1565 }else{ 1566 return exists(); 1567 } 1568 } 1569 1570 @Override 1571 public boolean mkdirs() throws IOException { 1572 return mkdirs(false); 1573 } 1574 1575 public boolean mkdirs(boolean checkSourceHostname) throws IOException { 1576 1577 boolean allowed = beforeMkdirs(); 1578 1579 if(!allowed) return false; 1580 1581 Clock clock = new Clock(); 1582 CacheDriverExecutor executor = null; 1583 Connection conn = null; 1584 try { 1585 if (getFirstFilePart() != null) { 1586 return false; 1587 } 1588 if (checkSourceHostname) { 1589 String lastSourceHostname = getLastSourceHostname(); 1590 if (lastSourceHostname != null) { 1591 String clientHostname = CommonTools.getHostname(); 1592 boolean sameHostname = clientHostname.equals(lastSourceHostname); 1593 if (!sameHostname) { 1594 return false; 1595 } 1596 } 1597 } 1598 log.debug("Mkidrs: {}", getPath()); 1599 calculateDataMappingDataSourceWriter(this); 1600 Timestamp currentTime = clock.getSqlTimestamp(); 1601 conn = dataSourceWriter.getConnection(); 1602 conn.setAutoCommit(true); 1603 executor = new CacheDriverExecutor(conn); 1604 Map dataRecord = new HashMap<String, Object>(); 1605 dataRecord.put("id", 1606 generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix)); 1607 dataRecord.put("file_name", origin.getName()); 1608 dataRecord.put("file_parent_path", getParent()); 1609 dataRecord.put("file_type", "D"); 1610 dataRecord.put("file_part_index", 0); 1611 dataRecord.put("file_part_size", 0); 1612 dataRecord.put("file_part_data_ds", "NONE"); 1613 dataRecord.put("file_part_data_table", this.dataMappingTable); 1614 dataRecord.put("file_part_last_index", 0); 1615 dataRecord.put("file_size", 0); 1616 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1617 dataRecord.put("file_deleted", 0); 1618 dataRecord.put("created_user_id", getDefaultModifyUserId()); 1619 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1620 dataRecord.put("created_at", currentTime); 1621 dataRecord.put("updated_at", currentTime); 1622 dataRecord.put("file_id", fileId); 1623 dataRecord.put("source_hostname", CommonTools.getHostname()); 1624 dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1625 dataRecord.put("deleted_on_hostnames", ";"); 1626 int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord); 1627 1628 afterMkdirs(); 1629 1630 return rows > 0; 1631 } catch (Exception e) { 1632 throw new IOException(e.getMessage(), e); 1633 } finally { 1634 try { 1635 if (executor != null) 1636 executor.close(); 1637 } catch (Exception e) { 1638 log.error(e.getMessage(), e); 1639 } 1640 } 1641 } 1642 1643 @Override 1644 public boolean write(byte[] data, boolean append) throws IOException { 1645 return writeTo(data, append); 1646 } 1647 1648 public synchronized boolean writeBlock(byte[] data, boolean append) throws IOException { 1649 return writeTo(data, append); 1650 } 1651 1652 private boolean writeToBatch(int currentFilePartStartIndex, List<byte[]> partDataList) throws IOException { 1653 1654 boolean allowed = beforeWrite(); 1655 1656 if(!allowed) return false; 1657 1658 CacheDriverExecutor executor = null; 1659 CacheDriverExecutor dmExecutor = null; 1660 List<Map<String, Object>> rfList = new ArrayList<Map<String, Object>>(); 1661 List<Map<String, Object>> dmList = new ArrayList<Map<String, Object>>(); 1662 try { 1663 Clock clock = new Clock(); 1664 log.debug("Write: {}", getPath()); 1665 int size = partDataList.size(); 1666 Timestamp currentTime = clock.getSqlTimestamp(); 1667 String hostname = CommonTools.getHostname(); 1668 calculateDataMappingDataSourceWriter(this); 1669 for (int filePartIndex = 0; filePartIndex < size; filePartIndex++) { 1670 byte[] data = partDataList.get(filePartIndex); 1671 String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix, 1672 identityCardSuffix); 1673 Map rfRecord = new HashMap<String, Object>(); 1674 rfRecord.put("id", remoteFileId); 1675 rfRecord.put("file_name", origin.getName()); 1676 rfRecord.put("file_parent_path", getParent()); 1677 rfRecord.put("file_type", "F"); 1678 rfRecord.put("file_part_index", filePartIndex + currentFilePartStartIndex); 1679 rfRecord.put("file_part_size", data.length); 1680 rfRecord.put("file_part_data_ds", this.dataMappingDs); 1681 rfRecord.put("file_part_data_table", this.dataMappingTable); 1682 rfRecord.put("file_part_last_index", -1); 1683 rfRecord.put("file_size", 0); 1684 rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1685 rfRecord.put("file_deleted", 0); 1686 rfRecord.put("created_user_id", getDefaultModifyUserId()); 1687 rfRecord.put("updated_user_id", getDefaultModifyUserId()); 1688 rfRecord.put("created_at", currentTime); 1689 rfRecord.put("updated_at", currentTime); 1690 rfRecord.put("file_id", fileId); 1691 rfRecord.put("source_hostname", hostname); 1692 rfRecord.put("synced_on_hostnames", String.format(";%s;", hostname)); 1693 rfRecord.put("deleted_on_hostnames", ";"); 1694 rfList.add(rfRecord); 1695 1696 Map<String, Object> dataRecord = new HashMap<String, Object>(); 1697 dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this)); 1698 dataRecord.put("file_part_data", encrypt(data)); 1699 dataRecord.put("remote_file_id", remoteFileId); 1700 dataRecord.put("file_id", fileId); 1701 dataRecord.put("created_at", currentTime); 1702 dataRecord.put("updated_at", currentTime); 1703 dmList.add(dataRecord); 1704 } 1705 Connection conn = dataSourceWriter.getConnection(); 1706 conn.setAutoCommit(true); 1707 executor = new CacheDriverExecutor(conn); 1708 executor.executeBatch(formatSql(SQL_INSERT_INTO_FILE), rfList); 1709 1710 conn = dataMappingDataSourceWriter.getConnection(); 1711 conn.setAutoCommit(true); 1712 dmExecutor = new CacheDriverExecutor(conn); 1713 dmExecutor.executeBatch(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dmList); 1714 1715 return true; 1716 } catch (Exception e) { 1717 throw new IOException(e.getMessage(), e); 1718 } finally { 1719 try { 1720 if (executor != null) 1721 executor.close(); 1722 } catch (Exception e) { 1723 log.error(e.getMessage(), e); 1724 } 1725 try { 1726 if (dmExecutor != null) 1727 dmExecutor.close(); 1728 } catch (Exception e) { 1729 log.error(e.getMessage(), e); 1730 } 1731 } 1732 } 1733 1734 private boolean writeTo(byte[] data, boolean append) throws IOException { 1735 1736 boolean allowed = beforeWrite(); 1737 1738 if(!allowed) return false; 1739 1740 CacheDriverExecutor executor = null; 1741 CacheDriverExecutor dmExecutor = null; 1742 try { 1743 if (!append) { 1744 if (exists()) 1745 throw new IOException( 1746 String.format("The remote file '%s' already exists.", origin.getAbsolutePath())); 1747 } 1748 Clock clock = new Clock(); 1749 Timestamp currentTime = clock.getSqlTimestamp(); 1750 Map<String, Object> lastRecord = getLastIndexRecord(); 1751 int filePartIndex = 0; 1752 if (lastRecord != null) { 1753 if (lastRecord.get("file_type").equals("D")) { 1754 throw new IOException(String.format("This is a folder '%s'.", origin.getAbsolutePath())); 1755 } 1756 if (lastRecord.get("file_type").equals("L")) { 1757 throw new IOException(String.format("This is a link '%s'.", origin.getAbsolutePath())); 1758 } 1759 if (append) { 1760 filePartIndex = Integer.parseInt(lastRecord.get("file_part_index") + "") + 1; 1761 } 1762 boolean isDeleted = false; 1763 Object deletedValue = lastRecord.get("deleted"); 1764 if (deletedValue instanceof Boolean) { 1765 isDeleted = (Boolean) deletedValue; 1766 } else { 1767 isDeleted = Integer.parseInt(deletedValue + "") == 1; 1768 } 1769 if (isDeleted) { 1770 throw new IOException(String.format("The file '%s' is deleted.", origin.getAbsolutePath())); 1771 } 1772 } 1773 calculateDataMappingDataSourceWriter(this); 1774 log.debug("Write: {}", getPath()); 1775 String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix, 1776 identityCardSuffix); 1777 Map rfRecord = new HashMap<String, Object>(); 1778 rfRecord.put("id", remoteFileId); 1779 rfRecord.put("file_name", origin.getName()); 1780 rfRecord.put("file_parent_path", getParent()); 1781 rfRecord.put("file_type", "F"); 1782 rfRecord.put("file_part_index", filePartIndex); 1783 rfRecord.put("file_part_size", data.length); 1784 rfRecord.put("file_part_data_ds", this.dataMappingDs); 1785 rfRecord.put("file_part_data_table", this.dataMappingTable); 1786 rfRecord.put("file_part_last_index", -1); 1787 rfRecord.put("file_size", 0); 1788 rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1789 rfRecord.put("file_deleted", 0); 1790 rfRecord.put("created_user_id", getDefaultModifyUserId()); 1791 rfRecord.put("updated_user_id", getDefaultModifyUserId()); 1792 rfRecord.put("created_at", currentTime); 1793 rfRecord.put("updated_at", currentTime); 1794 rfRecord.put("file_id", fileId); 1795 rfRecord.put("source_hostname", CommonTools.getHostname()); 1796 rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1797 rfRecord.put("deleted_on_hostnames", ";"); 1798 1799 Connection conn = dataSourceWriter.getConnection(); 1800 conn.setAutoCommit(true); 1801 executor = new CacheDriverExecutor(conn); 1802 executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord); 1803 1804 Map dataRecord = new HashMap<String, Object>(); 1805 dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this)); 1806 dataRecord.put("file_part_data", encrypt(data)); 1807 dataRecord.put("remote_file_id", remoteFileId); 1808 dataRecord.put("file_id", fileId); 1809 dataRecord.put("created_at", currentTime); 1810 dataRecord.put("updated_at", currentTime); 1811 1812 conn = dataMappingDataSourceWriter.getConnection(); 1813 conn.setAutoCommit(true); 1814 dmExecutor = new CacheDriverExecutor(conn); 1815 dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord); 1816 1817 return true; 1818 } catch (Exception e) { 1819 throw new IOException(e.getMessage(), e); 1820 } finally { 1821 try { 1822 if (executor != null) 1823 executor.close(); 1824 } catch (Exception e) { 1825 log.error(e.getMessage(), e); 1826 } 1827 try { 1828 if (dmExecutor != null) 1829 dmExecutor.close(); 1830 } catch (Exception e) { 1831 log.error(e.getMessage(), e); 1832 } 1833 } 1834 } 1835 1836 @Override 1837 public boolean write(byte[] data) throws IOException { 1838 return write(data, false); 1839 } 1840 1841 @Override 1842 public boolean write(String data, boolean append) throws IOException { 1843 return write(data.getBytes(CHARSET), append); 1844 } 1845 1846 @Override 1847 public boolean write(String data) throws IOException { 1848 return write(data.getBytes(CHARSET)); 1849 } 1850 1851 @Override 1852 public boolean createLink(String target) throws IOException { 1853 boolean allowed = beforeCreateLink(target); 1854 1855 if(!allowed) return false; 1856 1857 Clock clock = new Clock(); 1858 CacheDriverExecutor executor = null; 1859 CacheDriverExecutor dmExecutor = null; 1860 try { 1861 if (getFirstFilePart() != null) { 1862 return false; 1863 } 1864 log.debug("CreateLink: {}", getPath()); 1865 Timestamp currentTime = clock.getSqlTimestamp(); 1866 calculateDataMappingDataSourceWriter(this); 1867 String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix, 1868 identityCardSuffix); 1869 byte[] data = target.getBytes(CHARSET); 1870 Map rfRecord = new HashMap<String, Object>(); 1871 rfRecord.put("id", remoteFileId); 1872 rfRecord.put("file_name", origin.getName()); 1873 rfRecord.put("file_parent_path", getParent()); 1874 rfRecord.put("file_type", "L"); 1875 rfRecord.put("file_part_index", 0); 1876 rfRecord.put("file_part_size", data.length); 1877 rfRecord.put("file_part_data_ds", this.dataMappingDs); 1878 rfRecord.put("file_part_data_table", this.dataMappingTable); 1879 rfRecord.put("file_part_last_index", -1); 1880 rfRecord.put("file_size", 0); 1881 rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1882 rfRecord.put("file_deleted", 0); 1883 rfRecord.put("created_user_id", getDefaultModifyUserId()); 1884 rfRecord.put("updated_user_id", getDefaultModifyUserId()); 1885 rfRecord.put("created_at", currentTime); 1886 rfRecord.put("updated_at", currentTime); 1887 rfRecord.put("file_id", fileId); 1888 rfRecord.put("source_hostname", CommonTools.getHostname()); 1889 rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1890 rfRecord.put("deleted_on_hostnames", ";"); 1891 1892 Connection conn = dataSourceWriter.getConnection(); 1893 conn.setAutoCommit(true); 1894 executor = new CacheDriverExecutor(conn); 1895 int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord); 1896 1897 Map dataRecord = new HashMap<String, Object>(); 1898 dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this)); 1899 dataRecord.put("file_part_data", encrypt(data)); 1900 dataRecord.put("remote_file_id", remoteFileId); 1901 dataRecord.put("file_id", fileId); 1902 dataRecord.put("created_at", currentTime); 1903 dataRecord.put("updated_at", currentTime); 1904 1905 conn = dataMappingDataSourceWriter.getConnection(); 1906 conn.setAutoCommit(true); 1907 dmExecutor = new CacheDriverExecutor(conn); 1908 dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord); 1909 1910 complete(); 1911 1912 afterCreateLink(target); 1913 1914 return rows > 0; 1915 } catch (Exception e) { 1916 throw new IOException(e.getMessage(), e); 1917 } finally { 1918 try { 1919 if (executor != null) 1920 executor.close(); 1921 } catch (Exception e) { 1922 log.error(e.getMessage(), e); 1923 } 1924 try { 1925 if (dmExecutor != null) 1926 dmExecutor.close(); 1927 } catch (Exception e) { 1928 log.error(e.getMessage(), e); 1929 } 1930 } 1931 } 1932 1933 @Override 1934 public String readLink() throws IOException { 1935 return new String(readAllBytes(),BaseFile.CHARSET); 1936 } 1937 1938 @Override 1939 public byte[] readAllBytes() throws IOException { 1940 return readAllBytesFrom(); 1941 } 1942 1943 private byte[] readAllBytesFrom() throws IOException { 1944 if (isDir()) { 1945 throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath())); 1946 } 1947 if (!exists()) { 1948 throw new IOException(String.format("The file '%s' does not exist.", origin.getAbsolutePath())); 1949 } 1950 CacheDriverExecutor executor = null; 1951 Connection conn = null; 1952 ByteArrayOutputStream baos = null; 1953 try { 1954 conn = dataSourceReader.getConnection(); 1955 conn.setAutoCommit(true); 1956 executor = new CacheDriverExecutor(conn); 1957 Map dataRecord = getCompletedRecord(); 1958 List<Map<String, Object>> filePartList = executor.find(formatSql(SQL_SELECT_FILE), dataRecord); 1959 baos = new ByteArrayOutputStream(); 1960 for (Map<String, Object> fpItem : filePartList) { 1961 Map<String, Object> fp = readPart(fpItem); 1962 Object fpData = null; 1963 1964 if(fp == null){ 1965 fpData = new byte[]{}; 1966 }else{ 1967 fpData = fp.get("file_part_data"); 1968 } 1969 1970 if (fpData instanceof byte[]) { 1971 byte[] partData = (byte[]) fpData; 1972 baos.write(decrypt(partData)); 1973 baos.flush(); 1974 } 1975 } 1976 return baos == null ? null : baos.toByteArray(); 1977 } catch (Exception e) { 1978 throw new IOException(e.getMessage(), e); 1979 } finally { 1980 try { 1981 if (baos != null) 1982 baos.close(); 1983 } catch (Exception e) { 1984 log.error(e.getMessage(), e); 1985 } 1986 try { 1987 if (executor != null) 1988 executor.close(); 1989 } catch (Exception e) { 1990 log.error(e.getMessage(), e); 1991 } 1992 } 1993 } 1994 1995 @Override 1996 public boolean isFile() throws IOException { 1997 Map<String, Object> lastRecord = getFirstFilePart(); 1998 1999 if (lastRecord == null) { 2000 return false; 2001 } 2002 return lastRecord.get("file_type").equals("F"); 2003 } 2004 2005 @Override 2006 public boolean isDirectory() throws IOException { 2007 Map<String, Object> lastRecord = getFirstFilePart(); 2008 2009 if (lastRecord == null) { 2010 return false; 2011 } 2012 return lastRecord.get("file_type").equals("D"); 2013 } 2014 2015 @Override 2016 public boolean isLink() throws IOException { 2017 Map<String, Object> lastRecord = getFirstFilePart(); 2018 2019 if (lastRecord == null) { 2020 return false; 2021 } 2022 return lastRecord.get("file_type").equals("L"); 2023 } 2024 2025 @Override 2026 public String readAllString() throws IOException { 2027 ByteArrayOutputStream baos = null; 2028 try { 2029 byte[] bytes = readAllBytes(); 2030 2031 if(bytes == null) return null; 2032 2033 baos = new ByteArrayOutputStream(); 2034 baos.write(bytes); 2035 baos.flush(); 2036 return baos.toString(); 2037 } finally { 2038 if (baos != null) { 2039 try { 2040 baos.close(); 2041 } catch (IOException e) { 2042 log.error(e.getMessage(), e); 2043 } 2044 } 2045 } 2046 } 2047 2048 @Override 2049 public long size() throws IOException { 2050 Map<String, Object> record = getCompletedRecord(); 2051 if (record != null) { 2052 return Long.parseLong(record.get("file_size") + ""); 2053 } 2054 return 0L; 2055 } 2056 2057 public List<Map<String, Object>> list() throws IOException { 2058 return list(getPath(), "%"); 2059 } 2060 2061 public List<Map<String, Object>> list(String likeFileName) throws IOException { 2062 return list(getPath(), likeFileName); 2063 } 2064 2065 public List<Map<String, Object>> list(String likeFolderName, String likeFileName) throws IOException { 2066 CacheDriverExecutor executor = null; 2067 Connection conn = null; 2068 try { 2069 conn = dataSourceReader.getConnection(); 2070 conn.setAutoCommit(true); 2071 executor = new CacheDriverExecutor(conn); 2072 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2073 if (CommonTools.isBlank(likeFileName)) { 2074 dataRecord.put("file_name", "%"); 2075 } else { 2076 dataRecord.put("file_name", likeFileName); 2077 } 2078 if (CommonTools.isBlank(likeFolderName)) { 2079 dataRecord.put("file_parent_path", "%"); 2080 } else { 2081 dataRecord.put("file_parent_path", likeFolderName); 2082 } 2083 return executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord); 2084 } catch (SQLException e) { 2085 throw new IOException(e.getMessage(), e); 2086 } finally { 2087 if (executor != null) { 2088 try { 2089 executor.close(); 2090 } catch (Exception e) { 2091 throw new IOException(e.getMessage(), e); 2092 } 2093 } 2094 } 2095 } 2096 2097 public void listAll(String likeFileName, CacheArray rows) throws IOException { 2098 listAll(getPath(), likeFileName, rows); 2099 } 2100 2101 public void listAll(CacheArray rows) throws IOException { 2102 listAll(getPath(), "%", rows); 2103 } 2104 2105 public void listAll(String likeFolderName, String likeFileName, CacheArray rows) throws IOException { 2106 CacheDriverExecutor executor = null; 2107 Connection conn = null; 2108 try { 2109 conn = dataSourceReader.getConnection(); 2110 conn.setAutoCommit(true); 2111 executor = new CacheDriverExecutor(conn); 2112 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2113 if (CommonTools.isBlank(likeFileName)) { 2114 dataRecord.put("file_name", "%"); 2115 } else { 2116 dataRecord.put("file_name", likeFileName); 2117 } 2118 if (CommonTools.isBlank(likeFolderName)) { 2119 dataRecord.put("file_parent_path", "%"); 2120 } else { 2121 dataRecord.put("file_parent_path", likeFolderName); 2122 } 2123 executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord, rows); 2124 } catch (SQLException e) { 2125 throw new IOException(e.getMessage(), e); 2126 } finally { 2127 if (executor != null) { 2128 try { 2129 executor.close(); 2130 } catch (Exception e) { 2131 throw new IOException(e.getMessage(), e); 2132 } 2133 } 2134 } 2135 } 2136 2137 protected void listAllForDelete(String likeFolderName, String likeFileName, CacheArray rows) throws IOException { 2138 CacheDriverExecutor executor = null; 2139 Connection conn = null; 2140 try { 2141 long deleteCutoffTimeMs = 0L; 2142 2143 if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime(); 2144 2145 Timestamp currentTime = new Clock().getSqlTimestamp(); 2146 2147 conn = dataSourceReader.getConnection(); 2148 conn.setAutoCommit(true); 2149 executor = new CacheDriverExecutor(conn); 2150 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2151 if (CommonTools.isBlank(likeFileName)) { 2152 dataRecord.put("file_name", "%"); 2153 } else { 2154 dataRecord.put("file_name", likeFileName); 2155 } 2156 if (CommonTools.isBlank(likeFolderName)) { 2157 dataRecord.put("file_parent_path", "%"); 2158 } else { 2159 dataRecord.put("file_parent_path", likeFolderName); 2160 } 2161 dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 2162 dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs)); 2163 dataRecord.put("end_time", currentTime); 2164 dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME)); 2165 executor.find(0, MAX_QUEUE_SIZE,formatSql(SQL_SELECT_FOR_LIST_ALL_DELETE), dataRecord, rows); 2166 } catch (SQLException e) { 2167 throw new IOException(e.getMessage(), e); 2168 } finally { 2169 if (executor != null) { 2170 try { 2171 executor.close(); 2172 } catch (Exception e) { 2173 throw new IOException(e.getMessage(), e); 2174 } 2175 } 2176 } 2177 } 2178 2179 protected void listAllForSync(String likeFolderName, String likeFileName, CacheArray rows) throws IOException { 2180 CacheDriverExecutor executor = null; 2181 Connection conn = null; 2182 try { 2183 long deleteCutoffTimeMs = 0L; 2184 2185 if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = SYNC_CUTOFF_TIME.getTime(); 2186 2187 Timestamp currentTime = new Clock().getSqlTimestamp(); 2188 2189 conn = dataSourceReader.getConnection(); 2190 conn.setAutoCommit(true); 2191 executor = new CacheDriverExecutor(conn); 2192 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2193 if (CommonTools.isBlank(likeFileName)) { 2194 dataRecord.put("file_name", "%"); 2195 } else { 2196 dataRecord.put("file_name", likeFileName); 2197 } 2198 if (CommonTools.isBlank(likeFolderName)) { 2199 dataRecord.put("file_parent_path", "%"); 2200 } else { 2201 dataRecord.put("file_parent_path", likeFolderName); 2202 } 2203 dataRecord.put("synced_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 2204 dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs)); 2205 dataRecord.put("end_time", currentTime); 2206 dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME)); 2207 executor.find(0, MAX_QUEUE_SIZE, formatSql(SQL_SELECT_FOR_LIST_ALL_SYNC), dataRecord, rows); 2208 } catch (SQLException e) { 2209 throw new IOException(e.getMessage(), e); 2210 } finally { 2211 if (executor != null) { 2212 try { 2213 executor.close(); 2214 } catch (Exception e) { 2215 throw new IOException(e.getMessage(), e); 2216 } 2217 } 2218 } 2219 } 2220 2221 public void setModifyUserId(String modifyUserId) { 2222 this.modifyUserId = modifyUserId; 2223 } 2224 2225 private String getDefaultModifyUserId() { 2226 if (CommonTools.isBlank(modifyUserId)) { 2227 return DEFAULE_MODIFY_USER_ID; 2228 } else { 2229 return modifyUserId; 2230 } 2231 } 2232 2233 @Override 2234 public void setModifyTime(Timestamp modifyTime) throws IOException { 2235 this.modifyTime = modifyTime; 2236 } 2237 2238 @Override 2239 public Timestamp getModifyTime() throws IOException { 2240 Map<String, Object> record = getFirstFilePart(); 2241 if (record != null) { 2242 return (Timestamp) record.get("file_modify_time"); 2243 } 2244 return null; 2245 } 2246 2247 public boolean isTimeout() throws IOException { 2248 return isTimeout(LOGIC_TIMEOUT_MS); 2249 } 2250 2251 @Override 2252 public void copyTo(String toPath) throws IOException { 2253 log.debug("CopyTo: {} -> {}", getPath(), toPath); 2254 if (!exists()) { 2255 log.mark(String.format("The remote file '%s' does not exist.", this.getPath())); 2256 } 2257 RemoteFile destRf = new RemoteFile(toPath); 2258 final Timestamp originModifyTime = this.getModifyTime(); 2259 destRf.setModifyUserId(modifyUserId); 2260 destRf.setModifyTime(originModifyTime); 2261 if (isLink()) { 2262 if (destRf.exists(true)) { 2263 destRf.forceDeleteLink(false); 2264 } 2265 destRf.createLink(readLink()); 2266 } 2267 if (isFile()) { 2268 final RemoteFile _destRf = destRf; 2269 if (_destRf.exists(true)) { 2270 _destRf.forceDeleteFile(false); 2271 } 2272 merge(new FilePart() { 2273 @Override 2274 protected void process(int partIndex, byte[] data) { 2275 try { 2276 _destRf.write(decrypt(data), partIndex > 0); 2277 } catch (Exception e) { 2278 log.error(e.getMessage(), e); 2279 } 2280 } 2281 2282 @Override 2283 protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) { 2284 try { 2285 /**This is 0 bytes file**/ 2286 if (lastPartIndex == -1) { 2287 2288 if (_destRf.exists(true)) 2289 _destRf.forceDeleteFile(false); 2290 2291 _destRf.setModifyTime(originModifyTime); 2292 _destRf.createEmptyFile(); 2293 } else { 2294 _destRf.setModifyTime(originModifyTime); 2295 _destRf.complete(); 2296 } 2297 } catch (IOException e) { 2298 log.error(e.getMessage(), e); 2299 } 2300 } 2301 2302 @Override 2303 protected void ended(int lastPartIndex, long fileSize) { 2304 2305 } 2306 2307 }); 2308 } 2309 if (isDir()) { 2310 List<Map<String, Object>> copyList = list(getPath(), "%"); 2311 int size = copyList.size(); 2312 if (size > 0) { 2313 if (!destRf.exists(true)) { 2314 destRf.mkdirs(); 2315 } 2316 } 2317 String copyRootPath = getPath(); 2318 copyList = list(parsePath(getPath() + "/%"), "%"); 2319 if (copyList.size() > 0) { 2320 if (!destRf.exists(true)) { 2321 destRf.mkdirs(); 2322 } 2323 } 2324 for (Map<String, Object> item : copyList) { 2325 String itemParentPath = (String) item.get("file_parent_path"); 2326 String destParentPath = itemParentPath.replaceFirst(copyRootPath + "/", toPath); 2327 RemoteFile sourceRf = new RemoteFile( 2328 String.format("%s/%s", item.get("file_parent_path"), item.get("file_name"))); 2329 String destRfPath = new File(String.format("%s/%s", destParentPath, item.get("file_name"))) 2330 .getAbsolutePath(); 2331 if (sourceRf.isDir()) { 2332 RemoteFile tmpRf = new RemoteFile(destRfPath + "/"); 2333 tmpRf.setModifyTime(sourceRf.getModifyTime()); 2334 if (!tmpRf.exists(true)) { 2335 tmpRf.mkdirs(); 2336 } 2337 } else { 2338 sourceRf.copyTo(destRfPath); 2339 } 2340 } 2341 } 2342 } 2343 2344 @Override 2345 public void moveTo(String toPath) throws IOException { 2346 log.debug("MoveTo: {} -> {}", getPath(), toPath); 2347 RemoteFile destRf = new RemoteFile(toPath); 2348 copyTo(toPath); 2349 boolean exists = this.exists(true); 2350 if (exists && destRf.exists(true) && destRf.isCompleted()) { 2351 forceDeleteAll(true); 2352 } 2353 } 2354 2355 private void startFullAsyncThreadForDelete(boolean subFolders, long timerMs, String replaceTo, 2356 Runnable completedCallback) { 2357 RemoteFile the = this; 2358 try { 2359 if (syncRoot == null) { 2360 File replaceToFile = new File(replaceTo); 2361 if (replaceToFile.getParent() == null) { 2362 throw new IOException("Cannot execute delete task."); 2363 } 2364 String replaceToParent = replacePath(replaceToFile.getParent()); 2365 syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName())); 2366 } 2367 2368 CacheArray rows = new CacheArray(); 2369 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 2370 @Override 2371 public void execute(Integer index, Object o) { 2372 Map<String, Object> item = (Map<String, Object>) o; 2373 String fileName = (String) item.get("file_name"); 2374 String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName); 2375 Thread.currentThread() 2376 .setName(String.format("RemoteFile-startFullAsyncThreadForDelete-%s", fileName)); 2377 RemoteFile rf = new RemoteFile(fullPath); 2378 if(isDebug(rf)){ 2379 debugLog("CheckUsage",rf,this); 2380 } 2381 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 2382 rf.fileId = (String) item.get("file_id"); 2383 String toDf = rf.getPath(); 2384 if (!CommonTools.isBlank(replaceTo)) { 2385 if (subFolders) { 2386 toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/"); 2387 } else { 2388 toDf = fullPath.replaceFirst(getPath(), replaceTo + "/"); 2389 } 2390 } 2391 DiskFile df = new DiskFile(toDf); 2392 df.copyAttrs(df, false, the.syncRoot); 2393 executeSyncRunnable(this, item, df, rf); 2394 } else{ 2395 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 2396 } 2397 } 2398 2399 @Override 2400 public void terminated() { 2401 log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId()); 2402 } 2403 2404 @Override 2405 public void completed(Integer size) { 2406 Thread.currentThread().setName(String 2407 .format("RemoteFile-startFullAsyncThreadForDelete-completed-%s", the.origin.getName())); 2408 if (isStopSync()) { 2409 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start delete task thread."); 2410 } 2411 if (timerMs <= 0) { 2412 LoggerFactory.getLogger(RemoteFile.class).mark("Completed start delete task thread."); 2413 } 2414 if (!isStopSync() && timerMs > 0) { 2415 try { 2416 Thread.sleep(timerMs); 2417 if (!isStopSync()) { 2418 startFullAsyncThreadForDelete(!subFolders, timerMs, replaceTo, completedCallback); 2419 } 2420 } catch (Exception e) { 2421 log.error(e.getMessage(), e); 2422 } 2423 } 2424 if (completedCallback != null) { 2425 completedCallback.run(); 2426 } 2427 } 2428 2429 }; 2430 rows.filter(filter); 2431 listAllForDelete(getPath() + (subFolders ? "/%" : ""), "%", rows); 2432 } catch (Exception e) { 2433 log.error(e.getMessage(), e); 2434 if (timerMs > 0) { 2435 try { 2436 Thread.sleep(timerMs); 2437 startFullAsyncThreadForDelete(subFolders, timerMs, replaceTo, completedCallback); 2438 } catch (InterruptedException ee) { 2439 log.error(ee.getMessage(), ee); 2440 } 2441 } 2442 } 2443 } 2444 2445 private void startFullAsyncThread(boolean subFolders, long timerMs, String replaceTo, Runnable completedCallback) { 2446 RemoteFile the = this; 2447 try { 2448 if (syncRoot == null) { 2449 File replaceToFile = new File(replaceTo); 2450 if (replaceToFile.getParent() == null) { 2451 throw new IOException("Cannot sync the root path."); 2452 } 2453 String replaceToParent = replacePath(replaceToFile.getParent()); 2454 syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName())); 2455 } 2456 2457 CacheArray rows = new CacheArray(); 2458 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 2459 @Override 2460 public void execute(Integer index, Object o) { 2461 Map<String, Object> item = (Map<String, Object>) o; 2462 String fileName = (String) item.get("file_name"); 2463 Thread.currentThread().setName(String.format("RemoteFile-startFullAsyncThread-%s", fileName)); 2464 String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName); 2465 RemoteFile rf = new RemoteFile(fullPath); 2466 if(isDebug(the)){ 2467 debugLog("CheckUsage",the,this); 2468 } 2469 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 2470 String toDf = rf.getPath(); 2471 if (!CommonTools.isBlank(replaceTo)) { 2472 if (subFolders) { 2473 toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/"); 2474 } else { 2475 toDf = fullPath.replaceFirst(getPath(), replaceTo + "/"); 2476 } 2477 } 2478 DiskFile df = new DiskFile(toDf); 2479 df.copyAttrs(df, false, the.syncRoot); 2480 executeSyncRunnable(this, item, df, rf); 2481 } else{ 2482 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 2483 } 2484 } 2485 2486 @Override 2487 public void terminated() { 2488 log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId()); 2489 } 2490 2491 @Override 2492 public void completed(Integer size) { 2493 Thread.currentThread().setName( 2494 String.format("RemoteFile-startFullAsyncThread-completed-%s", the.origin.getName())); 2495 if (isStopSync()) { 2496 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start full async thread."); 2497 } 2498 if (timerMs <= 0) { 2499 LoggerFactory.getLogger(RemoteFile.class).mark("Completed start full async thread."); 2500 } 2501 if (!isStopSync() && timerMs > 0) { 2502 try { 2503 Thread.sleep(timerMs); 2504 if (!isStopSync()) { 2505 startFullAsyncThread(!subFolders, timerMs, replaceTo, completedCallback); 2506 } 2507 } catch (Exception e) { 2508 log.error(e.getMessage(), e); 2509 } 2510 } 2511 if (completedCallback != null) { 2512 completedCallback.run(); 2513 } 2514 } 2515 2516 }; 2517 rows.filter(filter); 2518 listAllForSync(getPath() + (subFolders ? "/%" : ""), "%", rows); 2519 } catch (Exception e) { 2520 log.error(e.getMessage(), e); 2521 if (timerMs > 0) { 2522 try { 2523 Thread.sleep(timerMs); 2524 startFullAsyncThread(subFolders, timerMs, replaceTo, completedCallback); 2525 } catch (InterruptedException ee) { 2526 log.error(ee.getMessage(), ee); 2527 } 2528 } 2529 } 2530 } 2531 2532 public void startFullAsync(long timerMs) throws IOException { 2533 startFullAsync(timerMs, getPath() + "/", null); 2534 } 2535 2536 public void startFullAsync(long timerMs, Runnable completedCallback) throws IOException { 2537 startFullAsync(timerMs, getPath() + "/", completedCallback); 2538 } 2539 2540 public void startFullAsync(long timerMs, String replaceTo) throws IOException { 2541 startFullAsync(timerMs, replaceTo, null); 2542 } 2543 2544 public void startFullAsync(long timerMs, String replaceTo, Runnable completedCallback) throws IOException { 2545 2546 stopSync = false; 2547 2548 RemoteFile the = this; 2549 Clock clock = new Clock(); 2550 Timestamp toTime = new Timestamp(clock.getTime() - timerMs); 2551 Executors.newFixedThreadPool(1).execute(new Runnable() { 2552 2553 @Override 2554 public void run() { 2555 startFullAsyncThread(false, timerMs, replaceTo, completedCallback); 2556 } 2557 2558 }); 2559 2560 Executors.newFixedThreadPool(1).execute(new Runnable() { 2561 2562 @Override 2563 public void run() { 2564 startFullAsyncThreadForDelete(false, timerMs, replaceTo, completedCallback); 2565 } 2566 2567 }); 2568 } 2569 2570 public DriverDataSource getDataSourceReader() { 2571 return dataSourceReader; 2572 } 2573 2574 public DriverDataSource getDataSourceWriter() { 2575 return dataSourceWriter; 2576 } 2577 2578 public ConfigProperties getConfigProperties() { 2579 return configProperties; 2580 } 2581 2582 private byte[] encrypt(byte[] data) throws Exception { 2583 boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false); 2584 if (useEncrypt) { 2585 String aesKey = configProperties.getString("EncryptAesKey"); 2586 return CipherTools.AESEncrypt(aesKey,data); 2587 } else { 2588 return data; 2589 } 2590 } 2591 2592 private byte[] decrypt(byte[] encData) throws Exception { 2593 boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false); 2594 if (useEncrypt) { 2595 String aesKey = configProperties.getString("EncryptAesKey"); 2596 return CipherTools.AESDecrypt(aesKey,encData); 2597 } else { 2598 return encData; 2599 } 2600 } 2601 2602 public String getSyncedOnHostnames() throws IOException { 2603 Map<String, Object> record = getCompletedRecord(); 2604 if (record != null) { 2605 return (String) record.get("synced_on_hostnames"); 2606 } 2607 return null; 2608 } 2609 2610 public String getDeletedOnHostnames() throws IOException { 2611 Map<String, Object> record = getCompletedRecord(); 2612 if (record != null) { 2613 return (String) record.get("deleted_on_hostnames"); 2614 } 2615 return null; 2616 } 2617 2618 public String getSourceHostname() throws IOException { 2619 Map<String, Object> record = getCompletedRecord(); 2620 if (record != null) { 2621 return (String) record.get("source_hostname"); 2622 } 2623 return null; 2624 } 2625 2626 protected String getLastSourceHostname() throws IOException { 2627 Map<String, Object> record = getLastUpdateRecord(); 2628 if (record != null) { 2629 return (String) record.get("source_hostname"); 2630 } 2631 return null; 2632 } 2633 2634 protected Timestamp getLastOperateTime() throws IOException { 2635 Map<String, Object> record = getLastUpdateRecord(); 2636 if (record != null) { 2637 Timestamp updatedAt = (Timestamp) record.get("updated_at"); 2638 Timestamp deletedAt = (Timestamp) record.get("deleted_at"); 2639 2640 return deletedAt == null ? updatedAt : deletedAt; 2641 } 2642 return null; 2643 } 2644 2645 protected Timestamp getLastModifyTime() throws IOException { 2646 Map<String, Object> record = getLastUpdateRecord(); 2647 if (record != null) { 2648 return (Timestamp) record.get("file_modify_time"); 2649 } 2650 return null; 2651 } 2652 2653 protected boolean isLastOperateDelete() throws IOException { 2654 Map<String, Object> record = getLastUpdateRecord(); 2655 if (record != null) { 2656 Timestamp deletedAt = (Timestamp) record.get("deleted_at"); 2657 2658 return deletedAt == null ? false : true; 2659 } 2660 return false; 2661 } 2662 2663 protected boolean isLastOperateRealDelete() throws IOException { 2664 Map<String, Object> record = getLastUpdateRecord(); 2665 if (record != null) { 2666 Object deletedValue = record.get("file_deleted"); 2667 if (deletedValue != null) { 2668 if (deletedValue instanceof Boolean) { 2669 return (Boolean) deletedValue; 2670 } else { 2671 return Integer.parseInt(deletedValue + "") == 1; 2672 } 2673 } 2674 } 2675 return false; 2676 } 2677 2678 protected void listAllForScanDelete(Integer filePartDataTable, CacheArray rows) throws IOException { 2679 CacheDriverExecutor executor = null; 2680 Connection conn = null; 2681 try { 2682 long deleteCutoffTimeMs = 0L; 2683 2684 if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime(); 2685 2686 Timestamp currentTime = new Clock().getSqlTimestamp(); 2687 conn = dataSourceReader.getConnection(); 2688 conn.setAutoCommit(true); 2689 executor = new CacheDriverExecutor(conn); 2690 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2691 dataRecord.put("file_parent_path", getPath()); 2692 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 2693 dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 2694 dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs)); 2695 dataRecord.put("end_time", currentTime); 2696 dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME)); 2697 dataRecord.put("file_part_data_table", filePartDataTable); 2698 executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL_SCAN_DELETE), dataRecord, rows); 2699 } catch (SQLException e) { 2700 throw new IOException(e.getMessage(), e); 2701 } finally { 2702 if (executor != null) { 2703 try { 2704 executor.close(); 2705 } catch (Exception e) { 2706 throw new IOException(e.getMessage(), e); 2707 } 2708 } 2709 } 2710 } 2711 2712 protected boolean isSyncedOnHostname() throws IOException { 2713 Map<String, Object> record = getCompletedRecord(); 2714 if (record != null) { 2715 String syncedOnHostnames = (String) record.get("synced_on_hostnames"); 2716 return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1; 2717 } 2718 return false; 2719 } 2720 2721 protected boolean isSyncedOnHostname(Object id) throws IOException { 2722 Map<String, Object> record = getRecordById(id); 2723 if (record != null) { 2724 String syncedOnHostnames = (String) record.get("synced_on_hostnames"); 2725 return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1; 2726 } 2727 return false; 2728 } 2729 2730 protected boolean isDeletedOnHostname(Object id) throws IOException { 2731 Map<String, Object> record = getRecordById(id); 2732 if (record != null) { 2733 String deletedOnHostnames = (String) record.get("deleted_on_hostnames"); 2734 return deletedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1; 2735 } 2736 return false; 2737 } 2738 2739 public RemoteFile getParentFile() { 2740 return new RemoteFile(getParent()); 2741 } 2742 2743 private void executeSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final DiskFile df, 2744 final RemoteFile rf) { 2745 if (!df.isLogicModify()) { 2746 df.logicModify(); 2747 if (mergePool == null) { 2748 LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE); 2749 mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 2750 } 2751 mergePool.execute(getSyncRunnable(filter, item, rf, df)); 2752 } 2753 } 2754 2755 private Runnable getSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final RemoteFile rf, 2756 final DiskFile df) { 2757 Runnable runnable = new Runnable() { 2758 @Override 2759 public void run() { 2760 try { 2761 String fileName = (String) item.get("file_name"); 2762 Thread.currentThread() 2763 .setName(String.format("RemoteFile-getSyncRunnable-%s-%s", fileName, item.get("id"))); 2764 boolean isDebug = isDebug(rf); 2765 boolean isDeleted = false; 2766 Object deletedValue = item.get("file_deleted"); 2767 2768 if (deletedValue instanceof Boolean) { 2769 isDeleted = (Boolean) deletedValue; 2770 } else { 2771 isDeleted = Integer.parseInt(deletedValue + "") == 1; 2772 } 2773 2774 String clientHostname = CommonTools.getHostname(); 2775 String lastSourceHostname = rf.getLastSourceHostname(); 2776 2777 boolean isClientHostname = lastSourceHostname != null && !lastSourceHostname.equals(clientHostname); 2778 boolean rootExists = syncRoot != null && syncRoot.exists(); 2779 2780 if(!rootExists){ 2781 Logger.systemError(RemoteFile.class,"The root path '{}' does not exist.",syncRoot.getPath()); 2782 } 2783 2784 if (isDeleted && rootExists) { 2785 if (df.exists()) { 2786 if (isDebug) { 2787 LoggerFactory.getLogger(RemoteFile.class).mark("DeleteDiskFile - {}", df.getPath()); 2788 } 2789 df.delete(); 2790 rf.deletedToDiskAndSub(item.get("id")); 2791 } else { 2792 rf.deletedToDiskAndSub(item.get("id")); 2793 } 2794 df.removeLogicModify(); 2795 } 2796 2797 String sourceHostname = rf.getSourceHostname(); 2798 isClientHostname = sourceHostname != null && !sourceHostname.equals(clientHostname); 2799 boolean checkHaveParent = configProperties.getBoolean("CheckHaveParent", false); 2800 boolean haveParent = true; 2801 if (checkHaveParent) { 2802 haveParent = df.getOrigin().getParentFile().exists(); 2803 } 2804 String fileType = (String) item.get("file_type"); 2805 2806 if (!isDeleted && isClientHostname && haveParent && rootExists) { 2807 Timestamp rfModifyTime = rf.getModifyTime(); 2808 df.setModifyTimeMsFromClock(rfModifyTime.getTime()); 2809 2810 if (fileType.equals("L")) { 2811 if (df.exists() && !df.isLink()) { 2812 df.delete(); 2813 } 2814 if (df.exists()) { 2815 Timestamp dfModifyTime = df.getModifyTimeForClock(); 2816 if (rfModifyTime.getTime() > dfModifyTime.getTime()) { 2817 if (isDebug) { 2818 LoggerFactory.getLogger(RemoteFile.class).mark("ReCreateDiskLink - {}", 2819 df.getPath()); 2820 } 2821 df.delete(); 2822 if (df.createLink(rf.readLink())) { 2823 if (df.getOrigin().getName().startsWith(".")) { 2824 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2825 } 2826 rf.syncedToDisk(); 2827 } 2828 } else { 2829 rf.syncedToDisk(); 2830 } 2831 } else { 2832 if (isDebug) { 2833 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", df.getPath()); 2834 } 2835 if (df.createLink(rf.readLink())) { 2836 if (df.getOrigin().getName().startsWith(".")) { 2837 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2838 } 2839 rf.syncedToDisk(); 2840 } 2841 } 2842 df.removeLogicModify(); 2843 } 2844 if (fileType.equals("F")) { 2845 if (df.exists()) { 2846 Timestamp dfModifyTime = df.getModifyTimeForClock(); 2847 boolean changed = rfModifyTime.getTime() > dfModifyTime.getTime(); 2848 if (changed) { 2849 DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE); 2850 writingDf.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot); 2851 writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime()); 2852 if (!writingDf.isLogicModify()) { 2853 writingDf.logicModify(); 2854 if (RemoteFile.addQueuePathMapping(rf.getPath())) { 2855 if (isDebug(rf)) { 2856 LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}", 2857 df.getPath()); 2858 } 2859 rf.writeToDisk(writingDf); 2860 } 2861 } 2862 } else { 2863 rf.syncedToDisk(); 2864 df.removeLogicModify(); 2865 } 2866 } else { 2867 if (rf.exists()) { 2868 DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE); 2869 writingDf.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot); 2870 writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime()); 2871 if (!writingDf.isLogicModify()) { 2872 writingDf.logicModify(); 2873 if (RemoteFile.addQueuePathMapping(rf.getPath())) { 2874 if (isDebug(rf)) { 2875 LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}", 2876 df.getPath()); 2877 } 2878 rf.writeToDisk(writingDf); 2879 } 2880 } 2881 } 2882 } 2883 } 2884 if (fileType.equals("D")) { 2885 if (rf.exists()) { 2886 if (isDebug) { 2887 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskDir - {}", df.getPath()); 2888 } 2889 if (df.exists()) { 2890 if (df.isDir()) { 2891 rf.syncedToDisk(); 2892 } else { 2893 df.delete(); 2894 if (df.mkdirs()) { 2895 if (df.getOrigin().getName().startsWith(".")) { 2896 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2897 } 2898 rf.syncedToDisk(); 2899 } 2900 } 2901 } else { 2902 if (df.mkdirs()) { 2903 if (df.getOrigin().getName().startsWith(".")) { 2904 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2905 } 2906 rf.syncedToDisk(); 2907 } 2908 } 2909 } 2910 df.removeLogicModify(); 2911 } 2912 } 2913 } catch (Exception e) { 2914 log.error(e.getMessage(), e); 2915 } 2916 } 2917 }; 2918 return runnable; 2919 } 2920 2921 public void startArchiveForFileDeleted(long timerMs, String archiveTime) throws IOException { 2922 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 2923 startArchiveForFileDeleted(timerMs, archiveMs, null); 2924 } 2925 2926 public void startArchiveForFileDeleted(long timerMs, long archiveMs) throws IOException { 2927 startArchiveForFileDeleted(timerMs, archiveMs, null); 2928 } 2929 2930 public void startArchiveForFileDeleted(long timerMs, String archiveTime, Runnable completedCallback) 2931 throws IOException { 2932 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 2933 startArchiveForFileDeleted(timerMs, archiveMs, completedCallback); 2934 } 2935 2936 public void startArchiveForFileDeleted(long timerMs, long archiveMs, Runnable completedCallback) 2937 throws IOException { 2938 final RemoteFile the = this; 2939 if (archiveMs <= 0) { 2940 throw new IOException(String.format( 2941 "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs)); 2942 } 2943 2944 stopSync = false; 2945 2946 CacheDriverExecutor executor = null; 2947 Connection conn = null; 2948 try { 2949 Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", getPath())); 2950 conn = dataSourceReader.getConnection(); 2951 conn.setAutoCommit(true); 2952 executor = new CacheDriverExecutor(conn); 2953 Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs); 2954 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2955 dataRecord.put("file_parent_path", getPath()); 2956 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 2957 dataRecord.put("before_deleted_at", beforeTime); 2958 CacheArray rows = new CacheArray(); 2959 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 2960 2961 @Override 2962 public void completed(Integer size) { 2963 if (isStopSync()) { 2964 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for file deleted."); 2965 } 2966 if (timerMs <= 0) { 2967 LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for file deleted."); 2968 } 2969 2970 if (completedCallback != null) { 2971 completedCallback.run(); 2972 } 2973 2974 if (!isStopSync() && timerMs > 0) { 2975 while(true){ 2976 2977 if(isStopSync()) break; 2978 2979 try { 2980 Thread.sleep(timerMs); 2981 if (!isStopSync()) { 2982 boolean allowed = checkUsage(the); 2983 if(allowed){ 2984 startArchiveForFileDeleted(timerMs, archiveMs, completedCallback); 2985 break; 2986 } 2987 } 2988 } catch (Exception e) { 2989 log.error(e.getMessage(), e); 2990 } 2991 } 2992 } 2993 2994 } 2995 2996 @Override 2997 public void execute(Integer index, Object o) { 2998 Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForFileDeleted-%s", index)); 2999 if(isDebug(the)){ 3000 debugLog("CheckUsage",the,this); 3001 } 3002 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 3003 CacheDriverExecutor _executor = null; 3004 try { 3005 Map<String, Object> item = (Map<String, Object>) o; 3006 boolean notDir = !item.get("file_type").equals("D"); 3007 if(notDir){ 3008 calculateDataMappingDataSourceReader(the, item); 3009 Object fileId = item.get("file_id"); 3010 Map<String, Object> dataRecord = new HashMap<String, Object>(); 3011 dataRecord.put("file_id", fileId); 3012 dataRecord.put("deleted_at", new Clock().getSqlTimestamp()); 3013 Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", fileId)); 3014 Connection _conn = dataMappingDataSourceReader.getConnection(); 3015 _conn.setAutoCommit(true); 3016 _executor = new CacheDriverExecutor(_conn); 3017 _executor.execute(formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA), dataRecord); 3018 } 3019 } catch (Exception e) { 3020 log.error(e.getMessage(), e); 3021 } finally { 3022 try { 3023 if (_executor != null) 3024 _executor.close(); 3025 } catch (Exception e) { 3026 log.error(e.getMessage(), e); 3027 } 3028 } 3029 }else{ 3030 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 3031 } 3032 } 3033 }; 3034 rows.filter(filter); 3035 final CacheDriverExecutor finalExecutor = executor; 3036 Executors.newFixedThreadPool(1).execute(new Runnable(){ 3037 @Override 3038 public void run(){ 3039 try{ 3040 finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_FILE_DELETED), dataRecord, rows); 3041 }catch(Exception e){ 3042 log.error(e.getMessage(),e); 3043 } 3044 } 3045 }); 3046 } catch (SQLException e) { 3047 throw new IOException(e.getMessage(), e); 3048 } finally { 3049 if (executor != null) { 3050 try { 3051 executor.close(); 3052 } catch (Exception e) { 3053 throw new IOException(e.getMessage(), e); 3054 } 3055 } 3056 } 3057 } 3058 3059 public void startArchiveForDeleted(long timerMs, String archiveTime) throws IOException { 3060 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 3061 startArchiveForDeleted(timerMs, archiveMs, null); 3062 } 3063 3064 public void startArchiveForDeleted(long timerMs, long archiveMs) throws IOException { 3065 startArchiveForDeleted(timerMs, archiveMs, null); 3066 } 3067 3068 public void startArchiveForDeleted(long timerMs, String archiveTime, Runnable completedCallback) 3069 throws IOException { 3070 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 3071 startArchiveForDeleted(timerMs, archiveMs, completedCallback); 3072 } 3073 3074 public void startArchiveForDeleted(long timerMs, long archiveMs, Runnable completedCallback) throws IOException { 3075 final RemoteFile the = this; 3076 if (archiveMs <= 0) { 3077 throw new IOException(String.format( 3078 "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs)); 3079 } 3080 3081 stopSync = false; 3082 3083 CacheDriverExecutor executor = null; 3084 Connection conn = null; 3085 try { 3086 Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", getPath())); 3087 conn = dataSourceReader.getConnection(); 3088 conn.setAutoCommit(true); 3089 executor = new CacheDriverExecutor(conn); 3090 Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs); 3091 Map<String, Object> dataRecord = new HashMap<String, Object>(); 3092 dataRecord.put("file_parent_path", getPath()); 3093 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 3094 dataRecord.put("before_deleted_at", beforeTime); 3095 3096 CacheArray rows = new CacheArray(); 3097 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 3098 3099 @Override 3100 public void completed(Integer size) { 3101 if (isStopSync()) { 3102 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for deleted."); 3103 } 3104 if (timerMs <= 0) { 3105 LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for deleted."); 3106 } 3107 3108 if (completedCallback != null) { 3109 completedCallback.run(); 3110 } 3111 3112 if (!isStopSync() && timerMs > 0) { 3113 while(true){ 3114 3115 if(isStopSync()) break; 3116 3117 try { 3118 Thread.sleep(timerMs); 3119 if (!isStopSync()) { 3120 boolean allowed = checkUsage(the); 3121 if(allowed){ 3122 startArchiveForDeleted(timerMs, archiveMs, completedCallback); 3123 break; 3124 } 3125 } 3126 } catch (Exception e) { 3127 log.error(e.getMessage(), e); 3128 } 3129 } 3130 } 3131 } 3132 3133 @Override 3134 public void execute(Integer index, Object o) { 3135 Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForDeleted-%s", index)); 3136 CacheDriverExecutor dmExecutor = null; 3137 CacheDriverExecutor dmDelExecutor = null; 3138 CacheDriverExecutor executor = null; 3139 if(isDebug(the)){ 3140 debugLog("CheckUsage",the,this); 3141 } 3142 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 3143 try { 3144 Clock clock = new Clock(); 3145 Timestamp currentTime = clock.getSqlTimestamp(); 3146 Map<String, Object> item = (Map<String, Object>) o; 3147 Object fileId = item.get("file_id"); 3148 Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", fileId)); 3149 calculateDataMappingDataSourceReader(the, item); 3150 Map<String, Object> dataRecord = null; 3151 boolean haveSubFiles = false; 3152 /**Need use dataMappingDataSourceReader**/ 3153 if (item.get("file_type").equals("D")) { 3154 int count = countAllSubFiles(item.get("file_parent_path") + "", 3155 item.get("file_name") + ""); 3156 haveSubFiles = count > 0; 3157 } 3158 if (!haveSubFiles) { 3159 if (!item.get("file_type").equals("D")) { 3160 Connection dmConn = dataMappingDataSourceReader.getConnection(); 3161 dmConn.setAutoCommit(true); 3162 dmExecutor = new CacheDriverExecutor(dmConn); 3163 dataRecord = new HashMap<String, Object>(); 3164 dataRecord.put("file_id", item.get("file_id")); 3165 dataRecord.put("deleted_at", currentTime); 3166 dmExecutor.execute( 3167 formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA), 3168 dataRecord); 3169 } 3170 Connection conn = dataSourceWriter.getConnection(); 3171 conn.setAutoCommit(true); 3172 executor = new CacheDriverExecutor(conn); 3173 dataRecord = new HashMap<String, Object>(); 3174 dataRecord.put("file_id", item.get("file_id")); 3175 executor.execute(formatSql(SQL_DELETE_BY_FILE_ID_FROM_FILE), dataRecord); 3176 } 3177 3178 Connection dmDelConn = dataMappingDataSourceReader.getConnection(); 3179 dmDelConn.setAutoCommit(true); 3180 dmDelExecutor = new CacheDriverExecutor(dmDelConn); 3181 dataRecord = new HashMap<String, Object>(); 3182 dataRecord.put("deleted_at", new Timestamp(currentTime.getTime() - archiveMs)); 3183 dmDelExecutor.execute(formatSqlForFilePartData(SQL_DELETE_BY_RF_ID_FROM_FILE_DATA), 3184 dataRecord); 3185 } catch (Exception e) { 3186 log.error(e.getMessage(), e); 3187 } finally { 3188 try { 3189 if (dmExecutor != null) 3190 dmExecutor.close(); 3191 } catch (Exception e) { 3192 log.error(e.getMessage(), e); 3193 } 3194 try { 3195 if (dmDelExecutor != null) 3196 dmDelExecutor.close(); 3197 } catch (Exception e) { 3198 log.error(e.getMessage(), e); 3199 } 3200 try { 3201 if (executor != null) 3202 executor.close(); 3203 } catch (Exception e) { 3204 log.error(e.getMessage(), e); 3205 } 3206 } 3207 3208 }else{ 3209 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 3210 } 3211 } 3212 }; 3213 rows.filter(filter); 3214 final CacheDriverExecutor finalExecutor = executor; 3215 Executors.newFixedThreadPool(1).execute(new Runnable(){ 3216 @Override 3217 public void run(){ 3218 try{ 3219 finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_DELETED), dataRecord, rows); 3220 }catch(Exception e){ 3221 log.error(e.getMessage(),e); 3222 } 3223 } 3224 }); 3225 } catch (SQLException e) { 3226 throw new IOException(e.getMessage(), e); 3227 } finally { 3228 if (executor != null) { 3229 try { 3230 executor.close(); 3231 } catch (Exception e) { 3232 throw new IOException(e.getMessage(), e); 3233 } 3234 } 3235 } 3236 } 3237 3238 public void startArchiveForTimeout(long timerMs) throws IOException { 3239 startArchiveForTimeout(timerMs,null); 3240 } 3241 3242 public void startArchiveForTimeout(long timerMs,Runnable completedCallback) throws IOException { 3243 final RemoteFile the = this; 3244 3245 stopSync = false; 3246 3247 CacheDriverExecutor executor = null; 3248 Connection conn = null; 3249 try { 3250 Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", getPath())); 3251 conn = dataSourceReader.getConnection(); 3252 conn.setAutoCommit(true); 3253 executor = new CacheDriverExecutor(conn); 3254 Timestamp beforeTime = new Timestamp(new Clock().getTime() - (LOGIC_TIMEOUT_MS*BATCH_SIZE)); 3255 Map<String, Object> dataRecord = new HashMap<String, Object>(); 3256 dataRecord.put("file_parent_path", getPath()); 3257 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 3258 dataRecord.put("before_updated_at", beforeTime); 3259 3260 CacheArray rows = new CacheArray(); 3261 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 3262 3263 @Override 3264 public void completed(Integer size) { 3265 if (isStopSync()) { 3266 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for timeout deleted."); 3267 } 3268 if (timerMs <= 0) { 3269 LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for timeout deleted."); 3270 } 3271 3272 if (completedCallback != null) { 3273 completedCallback.run(); 3274 } 3275 3276 if (!isStopSync() && timerMs > 0) { 3277 while(true){ 3278 3279 if(isStopSync()) break; 3280 3281 try { 3282 Thread.sleep(timerMs); 3283 if (!isStopSync()) { 3284 boolean allowed = checkUsage(the); 3285 if(allowed){ 3286 startArchiveForTimeout(timerMs, completedCallback); 3287 break; 3288 } 3289 } 3290 } catch (Exception e) { 3291 log.error(e.getMessage(), e); 3292 } 3293 } 3294 } 3295 3296 } 3297 3298 @Override 3299 public void execute(Integer index, Object o) { 3300 Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForTimeout-%s", index)); 3301 if(isDebug(the)){ 3302 debugLog("CheckUsage",the,this); 3303 } 3304 CacheDriverExecutor executor = null; 3305 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 3306 try { 3307 Clock clock = new Clock(); 3308 Timestamp currentTime = clock.getSqlTimestamp(); 3309 Map<String, Object> item = (Map<String, Object>) o; 3310 Object fileId = item.get("file_id"); 3311 Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", fileId)); 3312 Connection conn = dataSourceWriter.getConnection(); 3313 conn.setAutoCommit(true); 3314 executor = new CacheDriverExecutor(conn); 3315 Map<String,Object> dataRecord = new HashMap<String, Object>(); 3316 dataRecord.put("file_id",fileId); 3317 dataRecord.put("deleted_at",currentTime); 3318 dataRecord.put("deleted_user_id",getDefaultModifyUserId()); 3319 executor.execute(formatSql(SQL_UPDATE_FOR_TIMEOUT_DELETE), dataRecord); 3320 } catch (Exception e) { 3321 log.error(e.getMessage(), e); 3322 } finally { 3323 try { 3324 if (executor != null) 3325 executor.close(); 3326 } catch (Exception e) { 3327 log.error(e.getMessage(), e); 3328 } 3329 } 3330 3331 }else{ 3332 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 3333 } 3334 } 3335 }; 3336 rows.filter(filter); 3337 final CacheDriverExecutor finalExecutor = executor; 3338 Executors.newFixedThreadPool(1).execute(new Runnable(){ 3339 @Override 3340 public void run(){ 3341 try{ 3342 finalExecutor.find(formatSql(SQL_SELECT_FOR_ALL_TIMEOUT), dataRecord, rows); 3343 }catch(Exception e){ 3344 log.error(e.getMessage(),e); 3345 } 3346 } 3347 }); 3348 } catch (SQLException e) { 3349 throw new IOException(e.getMessage(), e); 3350 } finally { 3351 if (executor != null) { 3352 try { 3353 executor.close(); 3354 } catch (Exception e) { 3355 throw new IOException(e.getMessage(), e); 3356 } 3357 } 3358 } 3359 } 3360 3361 public static Map<String, DriverDataSource> getDriverDataSourceMap() { 3362 return Collections.synchronizedMap(DATASOURCE_REMOTE_FILE); 3363 } 3364 3365 private static synchronized String generateId(boolean identityCardEnable, String identityCardName, 3366 String identityCardPrefix, int identityCardSuffix) throws SQLException { 3367 if (identityCardEnable) { 3368 if (identityCard == null) { 3369 identityCard = new IdentityCard(identityCardName); 3370 } 3371 return identityCard.generateId(16, identityCardPrefix, "", identityCardSuffix); 3372 } else { 3373 return CommonTools.generateId(16); 3374 } 3375 } 3376 3377 private static synchronized String generateDataId(boolean identityCardEnable, String identityCardName, 3378 int identityCardSuffix, RemoteFile theRf) throws SQLException { 3379 if (identityCardEnable) { 3380 if (identityCard == null) { 3381 identityCard = new IdentityCard(identityCardName + "Data"); 3382 } 3383 return identityCard.generateId(16, String.format("%04d", theRf.dataMappingTable), "", identityCardSuffix); 3384 } else { 3385 return CommonTools.generateId(16); 3386 } 3387 } 3388 3389 protected static synchronized boolean addQueuePathMapping(String path) { 3390 Long addTimeMs = null; 3391 3392 boolean allowed = checkQueuePathMapping(); 3393 3394 if (!allowed) 3395 return false; 3396 3397 addTimeMs = QUEUE_PATH_MAPPING.get(path); 3398 3399 if (addTimeMs == null) { 3400 QUEUE_PATH_MAPPING.put(path, System.currentTimeMillis()); 3401 return true; 3402 } 3403 3404 return false; 3405 } 3406 3407 public static synchronized boolean checkQueuePathMapping() { 3408 Long addTimeMs = null; 3409 Map<String, Long> copyQueueMap = getQueuePathMapping(); 3410 List<String> keyList = new ArrayList<String>(copyQueueMap.keySet()); 3411 for (String key : keyList) { 3412 addTimeMs = copyQueueMap.get(key); 3413 if (addTimeMs != null) { 3414 boolean timeout = (System.currentTimeMillis() - addTimeMs) >= LOGIC_TIMEOUT_MS; 3415 3416 if (timeout) { 3417 QUEUE_PATH_MAPPING.remove(key); 3418 } 3419 } 3420 } 3421 3422 if (QUEUE_PATH_MAPPING.keySet().size() >= MAX_QUEUE_SIZE) 3423 return false; 3424 3425 return true; 3426 } 3427 3428 protected static synchronized void removeQueuePathMapping(String path) { 3429 QUEUE_PATH_MAPPING.remove(path); 3430 } 3431 3432 public static synchronized Map<String, Long> getQueuePathMapping() { 3433 return Collections.synchronizedMap(QUEUE_PATH_MAPPING); 3434 } 3435 3436 protected static synchronized void loadDataMapping(RemoteFile rf) throws Exception { 3437 List<String> dmArray = rf.configProperties.getArray("DataMapping", new String[] {}); 3438 int size = dmArray.size(); 3439 List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey); 3440 3441 if (dmList == null) 3442 dmList = new ArrayList<String>(); 3443 3444 if (!DATA_MAPPING_INDEX.containsKey(rf.dsKey)) 3445 DATA_MAPPING_INDEX.put(rf.dsKey, 0); 3446 3447 for (int i = 0; i < size; i++) { 3448 String dm = dmArray.get(i); 3449 String originKey = String.format("DataMapping[%s]", i); 3450 Map<String, Object> dmMap = rf.configProperties.getMap(originKey); 3451 String shortKey = String.format("%s.%s", rf.dsKey, dm); 3452 if (dmMap != null) { 3453 String dmKey = String.format("%s.%s", rf.dsKey, originKey); 3454 3455 if (!DATA_MAPPING.containsKey(shortKey)) { 3456 ConfigProperties dmCp = new ConfigProperties(); 3457 dmCp.putAllAndReturnSlef(dmMap); 3458 int priority = dmCp.getInteger(String.format("%s.Priority", originKey), 0); 3459 if (i == 0 && priority < 1) { 3460 throw new Exception("First DataMapping priority cannot be less than 1."); 3461 } 3462 String dataSourceFileOriginValue = dmCp.getString(String.format("%s.DataSource", originKey)); 3463 String dataSourceReaderFileOriginValue = dmCp 3464 .getString(String.format("%s.DataSourceReader", originKey), dataSourceFileOriginValue); 3465 String dataSourceWriterFileOriginValue = dmCp 3466 .getString(String.format("%s.DataSourceWriter", originKey), dataSourceFileOriginValue); 3467 3468 String dmDsReaderKey = String.format("%s.reader", CommonTools.md5(dataSourceReaderFileOriginValue)); 3469 String dmDsWriterKey = String.format("%s.writer", CommonTools.md5(dataSourceWriterFileOriginValue)); 3470 DriverDataSource dmDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey); 3471 DriverDataSource dmDataSourceWriter = DATASOURCE_REMOTE_FILE.get(dmDsWriterKey); 3472 if (dmDataSourceReader == null) { 3473 DATASOURCE_REMOTE_FILE.put(dmDsReaderKey, 3474 new DriverDataSource(new File(dataSourceReaderFileOriginValue))); 3475 } 3476 if (dmDataSourceWriter == null) { 3477 DATASOURCE_REMOTE_FILE.put(dmDsWriterKey, 3478 new DriverDataSource(new File(dataSourceWriterFileOriginValue))); 3479 } 3480 String tableRangOriginValue = dmCp.getString(String.format("%s.TableRange", originKey), "0"); 3481 ConfigProperties shortCp = new ConfigProperties(); 3482 List<Integer> tableRange = new ArrayList<Integer>(); 3483 if (tableRangOriginValue.contains("-")) { 3484 String[] ranges = tableRangOriginValue.split("-"); 3485 Integer begin = Integer.parseInt(ranges[0].trim()); 3486 Integer end = Integer.parseInt(ranges[1].trim()); 3487 for (int j = begin; j <= end; j++) { 3488 tableRange.add(begin + j); 3489 } 3490 } else if (tableRangOriginValue.contains(",")) { 3491 String[] ranges = tableRangOriginValue.split(","); 3492 int rangesLen = ranges.length; 3493 for (int j = 0; j < rangesLen; j++) { 3494 tableRange.add(Integer.parseInt(ranges[j].trim())); 3495 } 3496 } else { 3497 tableRange.add(Integer.parseInt(tableRangOriginValue.trim())); 3498 } 3499 shortCp.put("TableRange", tableRange); 3500 shortCp.putAndReturnSlef("DataSourceReader", dmDsReaderKey); 3501 shortCp.putAndReturnSlef("DataSourceWriter", dmDsWriterKey); 3502 shortCp.putAndReturnSlef("Priority", priority); 3503 shortCp.putAndReturnSlef("UsingCount", 0); 3504 shortCp.putAndReturnSlef("Name", dm); 3505 int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size()); 3506 shortCp.putAndReturnSlef("Table", tableRange.get(randomIndex)); 3507 DATA_MAPPING.put(shortKey, shortCp); 3508 dmList.add(dm); 3509 DATA_MAPPING_LIST.put(rf.dsKey, dmList); 3510 } 3511 } 3512 } 3513 3514 } 3515 3516 protected static synchronized void calculateDataMappingDataSourceReader(RemoteFile rf, 3517 Map<String, Object> rfRecord) { 3518 rf.dataMappingDs = (String) rfRecord.get("file_part_data_ds"); 3519 rf.dataMappingTable = (Integer) rfRecord.get("file_part_data_table"); 3520 String shortKey = String.format("%s.%s", rf.dsKey, rf.dataMappingDs); 3521 ConfigProperties cp = DATA_MAPPING.get(shortKey); 3522 if (cp != null) { 3523 String dmDsReaderKey = cp.getString("DataSourceReader"); 3524 rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey); 3525 } 3526 } 3527 3528 protected static synchronized void calculateDataMappingDataSourceWriter(RemoteFile rf) { 3529 if (DATA_MAPPING_INDEX.get(rf.dsKey) >= DATA_MAPPING_LIST.size() - 1) { 3530 DATA_MAPPING_INDEX.put(rf.dsKey, 0); 3531 } 3532 ConfigProperties cp = getDataMapping(rf, DATA_MAPPING_INDEX.get(rf.dsKey)); 3533 List<Integer> tableRange = (List<Integer>) cp.get("TableRange"); 3534 Integer priority = cp.getInteger("Priority", 0); 3535 Integer usingCount = cp.getInteger("UsingCount", 0); 3536 String key = String.format("%s.%s", rf.dsKey, cp.getString("Name")); 3537 if (usingCount < priority) { 3538 rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceReader")); 3539 rf.dataMappingDataSourceWriter = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceWriter")); 3540 rf.dataMappingDs = cp.getString("Name"); 3541 rf.dataMappingTable = cp.getInteger("Table"); 3542 usingCount++; 3543 if (usingCount >= priority) { 3544 DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1); 3545 cp.putAndReturnSlef("UsingCount", 0); 3546 int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size()); 3547 cp.putAndReturnSlef("Table", tableRange.get(randomIndex)); 3548 DATA_MAPPING.put(key, cp); 3549 } else { 3550 cp.putAndReturnSlef("UsingCount", usingCount); 3551 int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size()); 3552 cp.putAndReturnSlef("Table", tableRange.get(randomIndex)); 3553 DATA_MAPPING.put(key, cp); 3554 } 3555 } else { 3556 DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1); 3557 calculateDataMappingDataSourceWriter(rf); 3558 } 3559 } 3560 3561 protected static synchronized ConfigProperties getDataMapping(RemoteFile rf, int index) { 3562 List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey); 3563 String dm = dmList.get(index); 3564 String key = String.format("%s.%s", rf.dsKey, dm); 3565 return DATA_MAPPING.get(key); 3566 } 3567 3568 protected String getDataMappingTableName() { 3569 return String.format("%s_data_%s", this.tableName, this.dataMappingTable); 3570 } 3571 3572 private static void debugLog(String point,RemoteFile rf,CacheArrayFilter filter){ 3573 debugLog(RemoteFile.class,point,rf,filter); 3574 } 3575 3576}