001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.file; 025 026import java.io.File; 027import com.killcoding.datasource.DriverDataSource; 028import com.killcoding.tool.ConfigProperties; 029import com.killcoding.tool.CommonTools; 030import com.killcoding.log.Logger; 031import com.killcoding.log.LoggerFactory; 032import java.io.FileInputStream; 033import java.io.IOException; 034import java.sql.Connection; 035import java.sql.SQLException; 036import com.killcoding.datasource.CacheDriverExecutor; 037import java.util.Map; 038import java.util.HashMap; 039import com.killcoding.datasource.Clock; 040import java.sql.Timestamp; 041import java.nio.file.Files; 042import java.util.concurrent.ConcurrentHashMap; 043import java.util.List; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.io.ByteArrayOutputStream; 047import java.sql.Blob; 048import java.nio.file.Paths; 049import java.nio.file.Path; 050import java.net.URI; 051import java.io.ByteArrayInputStream; 052import java.nio.ByteBuffer; 053import java.util.concurrent.ExecutorService; 054import java.util.concurrent.Executors; 055import java.util.concurrent.Future; 056import java.nio.file.LinkOption; 057import javax.crypto.Cipher; 058import javax.crypto.CipherInputStream; 059import javax.crypto.CipherOutputStream; 060import javax.crypto.KeyGenerator; 061import javax.crypto.SecretKey; 062import javax.crypto.SecretKeyFactory; 063import javax.crypto.spec.DESKeySpec; 064import javax.crypto.spec.IvParameterSpec; 065import javax.crypto.spec.SecretKeySpec; 066import com.killcoding.file.DiskFile; 067import java.util.Collection; 068import com.killcoding.cache.CacheArray; 069import com.killcoding.cache.CacheArrayFilter; 070import java.nio.file.attribute.BasicFileAttributes; 071import java.util.concurrent.Callable; 072import com.killcoding.datasource.IdentityCard; 073import java.util.Collections; 074import java.util.concurrent.ThreadLocalRandom; 075import com.killcoding.tool.CipherTools; 076import com.killcoding.cache.Cache; 077import java.util.Date; 078 079public class RemoteFile extends BaseFile implements RemoteFileSQL { 080 081 private static final Map<String, DriverDataSource> DATASOURCE_REMOTE_FILE = new ConcurrentHashMap<String, DriverDataSource>(); 082 083 private static final Map<String, ConfigProperties> CONFIG_PROPS_REMOTE_FILE = new ConcurrentHashMap<String, ConfigProperties>(); 084 085 private static final Map<String, Long> QUEUE_PATH_MAPPING = new ConcurrentHashMap<String, Long>(); 086 087 private static final Map<String, ConfigProperties> DATA_MAPPING = new ConcurrentHashMap<String, ConfigProperties>(); 088 089 private static final Map<String, List<String>> DATA_MAPPING_LIST = new ConcurrentHashMap<String, List<String>>(); 090 091 private static final Map<String, Integer> DATA_MAPPING_INDEX = new ConcurrentHashMap<String, Integer>(); 092 093 protected static boolean FIRST_LOADED = false; 094 095 public static Integer MAX_POOL_SIZE = 100; 096 public static Double MAX_CONNECT_USAGE = 0.5D; 097 public static Double MAX_MEMORY_USAGE = 0.8D; 098 public static Integer MAX_QUEUE_SIZE = 10; 099 public static Long MAX_FILE_SZIE = -1L; 100 public static boolean SHOW_LINK = false; 101 public static boolean SHOW_HIDDEN = false; 102 public static boolean COPY_STRUCTURE_ONLY = false; 103 public static Integer MAX_FILE_PARENT_PATH_LENGTH = 200; 104 public static Integer MAX_FILE_NAME_LENGTH = 200; 105 public static Integer CACHE_SECONDS = 5; 106 public static Date SYNC_CUTOFF_TIME = null; 107 public static Date DELETE_CUTOFF_TIME = null; 108 109 private static IdentityCard identityCard = null; 110 private static File appRemoteFile = null; 111 private DriverDataSource dataSourceReader = null; 112 private DriverDataSource dataSourceWriter = null; 113 private String dataSourceReaderPath = null; 114 private String dataSourceWriterPath = null; 115 private String modifyUserId = null; 116 private Integer filePartSize = null; 117 private Boolean identityCardEnable = false; 118 private String identityCardName = null; 119 private String identityCardPrefix = null; 120 private Integer identityCardSuffix = 0; 121 private Timestamp modifyTime = null; 122 123 protected ConfigProperties configProperties = null; 124 protected String fileId = null; 125 protected String tableName = null; 126 protected Integer dataMappingTable = null; 127 protected String dataMappingDs = null; 128 protected DriverDataSource dataMappingDataSourceReader = null; 129 protected DriverDataSource dataMappingDataSourceWriter = null; 130 protected String dsKey = null; 131 132 public static Long BEFORE_THE_QUEUE_TIME = 60000L; 133 public static Integer BATCH_SIZE = 10; 134 public static String DEFAULE_MODIFY_USER_ID = null; 135 136 private Logger log = null; 137 138 private static ExecutorService mergePool = null; 139 140 public RemoteFile(String path) { 141 super(path); 142 log = LoggerFactory.getLogger(getClass()); 143 fileId = CommonTools.generateId(16); 144 initGlobal(); 145 } 146 147 public static synchronized void initPool(int poolSize) { 148 if (mergePool == null) { 149 MAX_POOL_SIZE = poolSize; 150 LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE); 151 mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 152 } 153 } 154 155 private synchronized void init() { 156 dsKey = CommonTools.md5(this.appRemoteFile.getAbsolutePath()); 157 dataSourceReader = DATASOURCE_REMOTE_FILE.get(String.format("%s.reader", dsKey)); 158 dataSourceWriter = DATASOURCE_REMOTE_FILE.get(String.format("%s.writer", dsKey)); 159 configProperties = CONFIG_PROPS_REMOTE_FILE.get(dsKey); 160 if (dataSourceReader == null && dataSourceWriter == null) { 161 try { 162 if (!appRemoteFile.exists()) 163 throw new IOException(String.format("Not exist '%s'.", appRemoteFile.getAbsolutePath())); 164 165 configProperties = new ConfigProperties(); 166 Runnable updatedRun = new Runnable() { 167 @Override 168 public void run() { 169 try { 170 reload(); 171 LoggerFactory.getLogger(RemoteFile.class).mark("RemoteFile Reloaded"); 172 } catch (Exception e) { 173 log.error(e.getMessage(), e); 174 } 175 } 176 }; 177 configProperties.load(appRemoteFile, updatedRun); 178 179 CONFIG_PROPS_REMOTE_FILE.put(dsKey, configProperties); 180 181 String dataSourcePath = configProperties.getString("DataSource"); 182 dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath); 183 dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath); 184 reload(); 185 } catch (Exception e) { 186 log.error(e.getMessage(), e); 187 } 188 } 189 if (configProperties != null) { 190 String dataSourcePath = configProperties.getString("DataSource"); 191 dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath); 192 dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath); 193 reload(); 194 } 195 calculateDataMappingDataSourceWriter(this); 196 } 197 198 private synchronized void initGlobal() { 199 if(this.appRemoteFile == null){ 200 this.appRemoteFile = CommonTools.getSystemProperties(RemoteFile.class, "APP_REMOTE_FILE", 201 "RemoteFile.properties"); 202 } 203 init(); 204 } 205 206 private synchronized void reload() { 207 try { 208 LOGIC_TIMEOUT_MS = configProperties.getMilliSeconds("LogicTimeout", 300000L); 209 LOGIC_ACCESS_TIMEOUT_MS = configProperties.getMilliSeconds("LogicAccessTimeout", 60000L); 210 LOGIC_CHECK_TIMEOUT_MS = configProperties.getMilliSeconds("LogicCheckTimeout", 60000L); 211 MAX_FILE_SZIE = configProperties.getFileSize("MaxFileSize", 1024 * 1024 * 1L); 212 SHOW_HIDDEN = configProperties.getBoolean("ShowHidden", false); 213 SHOW_LINK = configProperties.getBoolean("ShowLink", false); 214 COPY_STRUCTURE_ONLY = configProperties.getBoolean("CopyStructureOnly", false); 215 MAX_QUEUE_SIZE = configProperties.getInteger("MaxQueueSize", 10); 216 BATCH_SIZE = configProperties.getInteger("BatchSize", 10); 217 MAX_CONNECT_USAGE = configProperties.getDouble("MaxConnectUsage", 0.5D); 218 MAX_MEMORY_USAGE = configProperties.getDouble("MaxMemoryUsage", 0.8D); 219 BEFORE_THE_QUEUE_TIME = configProperties.getMilliSeconds("BeforeTheQueueTime", 15000L); 220 MAX_FILE_PARENT_PATH_LENGTH = configProperties.getInteger("MaxFileParentPathLength", 200); 221 MAX_FILE_NAME_LENGTH = configProperties.getInteger("MaxFileNameLength", 200); 222 CACHE_SECONDS = configProperties.getInteger("CacheSeconds", 5); 223 224 SYNC_CUTOFF_TIME = configProperties.getDateTime("SyncCutoffTime","0000-01-01 00:00:00.0000"); 225 DELETE_CUTOFF_TIME = configProperties.getDateTime("DeleteCutoffTime","0000-01-01 00:00:00.0000"); 226 227 if(BEFORE_THE_QUEUE_TIME < 0) BEFORE_THE_QUEUE_TIME = 15000L; 228 229 if(MAX_CONNECT_USAGE < 0) MAX_CONNECT_USAGE = 0D; 230 231 if(MAX_MEMORY_USAGE < 0) MAX_MEMORY_USAGE = 0D; 232 233 if(BATCH_SIZE <= 0) BATCH_SIZE = 10; 234 235 if(MAX_QUEUE_SIZE <= 0) MAX_QUEUE_SIZE = 10; 236 237 if(MAX_FILE_SZIE <= 0) MAX_FILE_SZIE = 1024 * 1024 * 1L; 238 239 if(LOGIC_TIMEOUT_MS <= 0) LOGIC_TIMEOUT_MS = 900000L; 240 241 if(LOGIC_ACCESS_TIMEOUT_MS < 0) LOGIC_ACCESS_TIMEOUT_MS = 60000L; 242 243 if(LOGIC_CHECK_TIMEOUT_MS < 0) LOGIC_CHECK_TIMEOUT_MS = 60000L; 244 245 if(MAX_FILE_PARENT_PATH_LENGTH <= 0) MAX_FILE_PARENT_PATH_LENGTH = 200; 246 247 if(MAX_FILE_NAME_LENGTH <= 0) MAX_FILE_NAME_LENGTH = 200; 248 249 if(CACHE_SECONDS < 0) CACHE_SECONDS = 0; 250 251 log.debug("LOGIC_TIMEOUT_MS={}", LOGIC_TIMEOUT_MS); 252 log.debug("LOGIC_ACCESS_TIMEOUT_MS={}", LOGIC_ACCESS_TIMEOUT_MS); 253 log.debug("LOGIC_CHECK_TIMEOUT_MS={}", LOGIC_CHECK_TIMEOUT_MS); 254 log.debug("MAX_FILE_SZIE={}", MAX_FILE_SZIE); 255 log.debug("SHOW_HIDDEN={}", SHOW_HIDDEN); 256 log.debug("SHOW_LINK={}", SHOW_LINK); 257 log.debug("COPY_STRUCTURE_ONLY={}", COPY_STRUCTURE_ONLY); 258 log.debug("MAX_QUEUE_SIZE={}", MAX_QUEUE_SIZE); 259 log.debug("BATCH_SIZE={}", BATCH_SIZE); 260 log.debug("MAX_CONNECT_USAGE={}", MAX_CONNECT_USAGE); 261 log.debug("MAX_MEMORY_USAGE={}", MAX_MEMORY_USAGE); 262 log.debug("MAX_FILE_PARENT_PATH_LENGTH={}", MAX_FILE_PARENT_PATH_LENGTH); 263 log.debug("MAX_FILE_NAME_LENGTH={}", MAX_FILE_NAME_LENGTH); 264 log.debug("CACHE_SECONDS={}", CACHE_SECONDS); 265 266 String dataSourcePath = configProperties.getString("DataSource"); 267 String reloadDataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath); 268 String reloadDataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath); 269 filePartSize = configProperties.getFileSize("FilePartSize", 1024 * 1024 * 1L).intValue(); 270 271 if(filePartSize <= 0) filePartSize = 1024 * 1024 * 1; 272 273 tableName = configProperties.getString("TableName", "remote_file"); 274 identityCardEnable = configProperties.getBoolean("IdentityCardEnable", false); 275 identityCardName = configProperties.getString("IdentityCardName", "RemoteFile"); 276 identityCardPrefix = configProperties.getString("IdentityCardPrefix", "RMF"); 277 identityCardSuffix = configProperties.getInteger("IdentityCardSuffix", 0); 278 279 String readerDsKey = String.format("%s.reader", dsKey); 280 String writerDsKey = String.format("%s.writer", dsKey); 281 /** 282 May be referenced when expansion is needed 283 if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) { 284 DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(readerDsKey); 285 if (currentDs != null) { 286 currentDs.closeAll(); 287 } 288 **/ 289 if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) { 290 dataSourceReaderPath = reloadDataSourceReaderPath; 291 dataSourceReader = new DriverDataSource(new File(dataSourceReaderPath)); 292 DATASOURCE_REMOTE_FILE.put(readerDsKey, dataSourceReader); 293 } 294 /** 295 May be referenced when expansion is needed 296 if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) { 297 DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(writerDsKey); 298 if (currentDs != null) { 299 currentDs.closeAll(); 300 } 301 **/ 302 if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) { 303 dataSourceWriterPath = reloadDataSourceWriterPath; 304 dataSourceWriter = new DriverDataSource(new File(dataSourceWriterPath)); 305 DATASOURCE_REMOTE_FILE.put(writerDsKey, dataSourceWriter); 306 } 307 308 loadDataMapping(this); 309 310 FIRST_LOADED = true; 311 } catch (Exception e) { 312 log.error(e.getMessage(), e); 313 } 314 } 315 316 public String formatSql(String sql) { 317 return String.format(sql, tableName); 318 } 319 320 public String formatSqlForFilePartData(String sql) { 321 return String.format(sql, getDataMappingTableName()); 322 } 323 324 public DiskFile writeToDisk(DiskFile diskFile) throws IOException { 325 if (!exists()) { 326 log.mark(String.format("The remote file '%s' does not exist.", this.getPath())); 327 } 328 RemoteFile the = this; 329 330 Timestamp rfModifyTime = the.getModifyTime(); 331 if (rfModifyTime != null) { 332 diskFile.setModifyTimeMsFromClock(rfModifyTime.getTime()); 333 } 334 if (isLink()) { 335 if (isDebug(the)) { 336 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", diskFile.getPath()); 337 } 338 diskFile.createLink(new String(readAllBytes(),BaseFile.CHARSET)); 339 } else if (isFile()) { 340 merge(new FilePart() { 341 final List<byte[]> partDataList = new ArrayList<byte[]>(); 342 boolean isWritingDf = false; 343 DiskFile realDf = null; 344 345 @Override 346 protected void process(int partIndex, byte[] data) throws IOException { 347 Thread.currentThread() 348 .setName(String.format("RemoteFile-writeToDisk-merge-%s", diskFile.getOrigin().getName())); 349 350 try { 351 352 if (syncRoot != null) { 353 if (!diskFile.isLogicModify()) { 354 throw new IOException( 355 String.format("The file '%s' missing modify logic lock.", the.getPath())); 356 } 357 } 358 359 if (diskFile != null) 360 diskFile.logicModify(); 361 362 String fileName = diskFile.getOrigin().getName(); 363 isWritingDf = fileName.matches(TMP_WRITING_SWP); 364 if (isWritingDf) { 365 if (partIndex == 0) { 366 if (diskFile.exists()) { 367 diskFile.delete(); 368 } 369 if (isDebug(the)) { 370 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}", 371 diskFile.getPath()); 372 } 373 String realFileName = fileName.replaceFirst(BaseFile.TMP_WRITING_FOR_REPLACE, ""); 374 String realDfPath = String.format("%s/%s", diskFile.getParent(), realFileName); 375 realDf = new DiskFile(realDfPath); 376 DiskFile.copyAttrs(realDf, diskFile.isCopyStructureOnly(), diskFile.syncRoot); 377 if(!diskFile.manualOpen(true)){ 378 throw new IOException( 379 String.format("The disk file '%s' cannot open.", diskFile.getPath())); 380 } 381 } 382 partDataList.add(decrypt(data)); 383 if (partDataList.size() >= BATCH_SIZE) { 384 385 if (realDf != null) 386 realDf.logicModify(); 387 388 ByteArrayOutputStream ops = new ByteArrayOutputStream(); 389 int size = partDataList.size(); 390 for (int i = 0; i < size; i++) { 391 byte[] partData = partDataList.get(i); 392 ops.write(partData); 393 } 394 diskFile.manualWrite(ops.toByteArray()); 395 partDataList.clear(); 396 } 397 } else { 398 if (partIndex == 0) { 399 if (diskFile.exists()) { 400 throw new IOException( 401 String.format("The remote file '%s' already exist.", diskFile.getPath())); 402 } 403 if (isDebug(the)) { 404 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}", 405 diskFile.getPath()); 406 } 407 if(!diskFile.manualOpen(true)){ 408 throw new IOException( 409 String.format("The disk file '%s' cannot open.", diskFile.getPath())); 410 } 411 } 412 partDataList.add(decrypt(data)); 413 if (partDataList.size() >= BATCH_SIZE) { 414 ByteArrayOutputStream ops = new ByteArrayOutputStream(); 415 int size = partDataList.size(); 416 for (int i = 0; i < size; i++) { 417 byte[] partData = partDataList.get(i); 418 ops.write(partData); 419 } 420 diskFile.manualWrite(ops.toByteArray()); 421 partDataList.clear(); 422 } 423 } 424 425 } catch (Exception e) { 426 log.warn(e.getMessage(), e); 427 if (diskFile != null) { 428 diskFile.manualClose(); 429 diskFile.delete(); 430 } 431 throw new IOException(e.getMessage(), e); 432 } 433 } 434 435 @Override 436 protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) throws IOException { 437 if (partDataList.size() > 0) { 438 ByteArrayOutputStream ops = new ByteArrayOutputStream(); 439 int size = partDataList.size(); 440 for (int i = 0; i < size; i++) { 441 byte[] partData = partDataList.get(i); 442 ops.write(partData); 443 } 444 diskFile.manualWrite(ops.toByteArray()); 445 partDataList.clear(); 446 } 447 diskFile.manualClose(); 448 try { 449 if (syncRoot != null && !diskFile.isLogicModify()) { 450 diskFile.delete(); 451 throw new IOException( 452 String.format("The file '%s' missing modify logic lock.", diskFile.getPath())); 453 } 454 if (isWritingDf) { 455 if (diskFile.exists()) { 456 diskFile.moveTo(realDf.getPath()); 457 if (isDebug(the)) { 458 LoggerFactory.getLogger(RemoteFile.class).mark("CompletedDiskFile - {}", 459 realDf.getPath()); 460 } 461 } 462 if (realDf.exists()) 463 the.syncedToDisk(); 464 } 465 if (realDf == null) { 466 if (diskFile != null) { 467 if (diskFile.getOrigin().getName().startsWith(".")) { 468 Files.setAttribute(Paths.get(diskFile.getPath()), "dos:hidden", true); 469 } 470 } 471 } else { 472 if (realDf.getOrigin().getName().startsWith(".")) { 473 Files.setAttribute(Paths.get(realDf.getPath()), "dos:hidden", true); 474 } 475 } 476 477 RemoteFile.removeQueuePathMapping(the.getPath()); 478 479 if (diskFile != null) 480 diskFile.removeLogicModify(); 481 482 if (realDf != null) 483 realDf.removeLogicModify(); 484 485 } catch (Exception e) { 486 log.error(e.getMessage(), e); 487 } 488 } 489 490 @Override 491 protected void ended(int lastPartIndex, long fileSize) { 492 try { 493 if (diskFile != null) { 494 diskFile.manualClose(); 495 } 496 } catch (Exception e) { 497 log.error(e.getMessage(), e); 498 } 499 } 500 }); 501 } else { 502 List<Map<String, Object>> writeList = list(getPath(), "%"); 503 writeList.addAll(list(parsePath(getPath() + "/%"), "%")); 504 505 int size = writeList.size(); 506 if (size > 0) { 507 if (!diskFile.exists()) { 508 if (isDebug(the)) { 509 log.mark("CreateDiskDir - {} ", diskFile.getPath()); 510 } 511 diskFile.mkdirs(); 512 } 513 } 514 515 for (Map<String, Object> item : writeList) { 516 RemoteFile rf = new RemoteFile( 517 String.format("%s/%s", item.get("file_parent_path"), item.get("file_name"))); 518 519 if (rf.getPath().equals(getPath())) 520 continue; 521 522 String dfPath = rf.getPath().replaceFirst(getPath(), diskFile.getPath()); 523 DiskFile df = new DiskFile(dfPath); 524 DiskFile.copyAttrs(df, false, this.syncRoot); 525 Timestamp mt = rf.getModifyTime(); 526 if (mt != null) { 527 df.setModifyTimeMsFromClock(mt.getTime()); 528 } 529 if (rf.isDir()) { 530 if (!df.exists()) { 531 if (isDebug(the)) { 532 log.mark("CreateDiskDir - {}", diskFile.getPath()); 533 } 534 df.mkdirs(); 535 } 536 } else { 537 rf.writeToDisk(df); 538 } 539 } 540 } 541 return diskFile; 542 } 543 544 public DiskFile writeToDisk() throws IOException { 545 DiskFile df = new DiskFile(this.origin.getAbsolutePath()); 546 DiskFile.copyAttrs(df, false, this.syncRoot); 547 return writeToDisk(df); 548 } 549 550 public DiskFile writeToDisk(String path) throws IOException { 551 DiskFile df = new DiskFile(path); 552 DiskFile.copyAttrs(df, false, this.syncRoot); 553 return writeToDisk(df); 554 } 555 556 public boolean isCompleted() throws IOException { 557 return getCompletedRecord() != null; 558 } 559 560 public Map<String, Object> getCompletedRecord() throws IOException { 561 String sql = formatSql(SQL_SELECT_FOR_COMPLETED); 562 Map dataRecord = new HashMap<String, Object>(); 563 dataRecord.put("file_name", origin.getName()); 564 dataRecord.put("file_parent_path", getParent()); 565 String cacheKey = String.format("first-%s-%s",sql,dataRecord); 566 Map<String,Object> record = (Map<String,Object>)Cache.get(cacheKey); 567 if(record == null){ 568 CacheDriverExecutor executor = null; 569 Connection conn = null; 570 try { 571 conn = dataSourceReader.getConnection(); 572 conn.setAutoCommit(true); 573 executor = new CacheDriverExecutor(conn); 574 record = executor.first(sql, dataRecord); 575 if(record != null){ 576 Cache.set(cacheKey,record,CACHE_SECONDS); 577 } 578 } catch (SQLException e) { 579 throw new IOException(e.getMessage(), e); 580 } finally { 581 try { 582 if (executor != null) 583 executor.close(); 584 } catch (Exception e) { 585 log.error(e.getMessage(), e); 586 } 587 } 588 } 589 return record; 590 } 591 592 public Map<String, Object> getRecordById(Object id) throws IOException { 593 String sql = formatSql(SQL_SELECT_GET_RECORD_BY_ID); 594 Map dataRecord = new HashMap<String, Object>(); 595 dataRecord.put("id", id); 596 Map<String, Object> record = null; 597 if (record == null) { 598 CacheDriverExecutor executor = null; 599 Connection conn = null; 600 try { 601 conn = dataSourceReader.getConnection(); 602 conn.setAutoCommit(true); 603 executor = new CacheDriverExecutor(conn); 604 record = executor.first(sql, dataRecord); 605 } catch (SQLException e) { 606 throw new IOException(e.getMessage(), e); 607 } finally { 608 try { 609 if (executor != null) 610 executor.close(); 611 } catch (Exception e) { 612 log.error(e.getMessage(), e); 613 } 614 } 615 } 616 return record; 617 } 618 619 public Map<String, Object> getRecordByFileId(Object fileId) throws IOException { 620 String sql = formatSql(SQL_SELECT_GET_RECORD_BY_FILE_ID); 621 Map dataRecord = new HashMap<String, Object>(); 622 dataRecord.put("file_id", fileId); 623 Map<String, Object> record = null; 624 if (record == null) { 625 CacheDriverExecutor executor = null; 626 Connection conn = null; 627 try { 628 conn = dataSourceReader.getConnection(); 629 conn.setAutoCommit(true); 630 executor = new CacheDriverExecutor(conn); 631 record = executor.first(sql, dataRecord); 632 } catch (SQLException e) { 633 throw new IOException(e.getMessage(), e); 634 } finally { 635 try { 636 if (executor != null) 637 executor.close(); 638 } catch (Exception e) { 639 log.error(e.getMessage(), e); 640 } 641 } 642 } 643 return record; 644 } 645 646 public Map<String, Object> getLastUpdateRecord() throws IOException { 647 String sql = formatSql(SQL_SELECT_FOR_LAST_UPDATE); 648 Map dataRecord = new HashMap<String, Object>(); 649 dataRecord.put("file_name", origin.getName()); 650 dataRecord.put("file_parent_path", getParent()); 651 Map<String, Object> record = null; 652 CacheDriverExecutor executor = null; 653 Connection conn = null; 654 try { 655 conn = dataSourceReader.getConnection(); 656 conn.setAutoCommit(true); 657 executor = new CacheDriverExecutor(conn); 658 record = executor.first(sql, dataRecord); 659 return record; 660 } catch (SQLException e) { 661 throw new IOException(e.getMessage(), e); 662 } finally { 663 try { 664 if (executor != null) 665 executor.close(); 666 } catch (Exception e) { 667 log.error(e.getMessage(), e); 668 } 669 } 670 } 671 672 protected Timestamp getFirstCreatedAt() throws IOException { 673 String sql = formatSql(SQL_SELECT_FOR_FIRST_CREATED_AT); 674 Map dataRecord = new HashMap<String, Object>(); 675 dataRecord.put("file_parent_path", getPath()); 676 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 677 dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 678 CacheDriverExecutor executor = null; 679 Connection conn = null; 680 try { 681 conn = dataSourceReader.getConnection(); 682 conn.setAutoCommit(true); 683 executor = new CacheDriverExecutor(conn); 684 Map<String, Object> record = executor.first(sql, dataRecord); 685 return record == null ? null : (Timestamp)record.get("created_at"); 686 } catch (SQLException e) { 687 throw new IOException(e.getMessage(), e); 688 } finally { 689 try { 690 if (executor != null) 691 executor.close(); 692 } catch (Exception e) { 693 log.error(e.getMessage(), e); 694 } 695 } 696 } 697 698 protected Integer getMaxFilePartDataTable() throws IOException { 699 String sql = formatSql(SQL_SELECT_FOR_MAX_DATA_TABLE); 700 Map dataRecord = new HashMap<String, Object>(); 701 dataRecord.put("file_parent_path", getPath()); 702 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 703 CacheDriverExecutor executor = null; 704 Connection conn = null; 705 try { 706 conn = dataSourceReader.getConnection(); 707 conn.setAutoCommit(true); 708 executor = new CacheDriverExecutor(conn); 709 Map<String, Object> record = executor.first(sql, dataRecord); 710 return record == null ? 0 : Integer.parseInt(record.get("file_part_data_table")+""); 711 } catch (SQLException e) { 712 throw new IOException(e.getMessage(), e); 713 } finally { 714 try { 715 if (executor != null) 716 executor.close(); 717 } catch (Exception e) { 718 log.error(e.getMessage(), e); 719 } 720 } 721 } 722 723 private Map<String, Object> getSelectByPathForCheckExists() throws IOException { 724 String sql = formatSql(SQL_SELECT_BY_PATH_FOR_CHECK_EXISTS); 725 Map dataRecord = new HashMap<String, Object>(); 726 dataRecord.put("file_name", origin.getName()); 727 dataRecord.put("file_parent_path", getParent()); 728 Map<String, Object> record = null; 729 if (record == null) { 730 CacheDriverExecutor executor = null; 731 Connection conn = null; 732 try { 733 conn = dataSourceReader.getConnection(); 734 conn.setAutoCommit(true); 735 executor = new CacheDriverExecutor(conn); 736 record = executor.first(sql, dataRecord); 737 return record; 738 } catch (SQLException e) { 739 throw new IOException(e.getMessage(), e); 740 } finally { 741 try { 742 if (executor != null) 743 executor.close(); 744 } catch (Exception e) { 745 log.error(e.getMessage(), e); 746 } 747 } 748 } 749 return record; 750 } 751 752 private int countAllSubFiles(String fileParentPath, String fileName) throws IOException { 753 String sql = formatSql(SQL_SELECT_FOR_COUNT_ALL_SUB_FILES); 754 Map dataRecord = new HashMap<String, Object>(); 755 dataRecord.put("file_name", fileName); 756 dataRecord.put("file_parent_path", fileParentPath); 757 Map<String, Object> record = null; 758 if (record == null) { 759 CacheDriverExecutor executor = null; 760 Connection conn = null; 761 try { 762 conn = dataSourceReader.getConnection(); 763 conn.setAutoCommit(true); 764 executor = new CacheDriverExecutor(conn); 765 record = executor.first(sql, dataRecord); 766 return Integer.parseInt(record.get("as_count") + ""); 767 } catch (SQLException e) { 768 throw new IOException(e.getMessage(), e); 769 } finally { 770 try { 771 if (executor != null) 772 executor.close(); 773 } catch (Exception e) { 774 log.error(e.getMessage(), e); 775 } 776 } 777 } 778 return -1; 779 } 780 781 private Map<String, Object> getFirstFilePart() throws IOException { 782 String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE); 783 Map dataRecord = new HashMap<String, Object>(); 784 dataRecord.put("file_name", origin.getName()); 785 dataRecord.put("file_parent_path", getParent()); 786 String cacheKey = String.format("first-%s-%s",sql,dataRecord); 787 Map<String, Object> record = (Map<String,Object>)Cache.get(cacheKey); 788 if(record == null){ 789 CacheDriverExecutor executor = null; 790 Connection conn = null; 791 try { 792 conn = dataSourceReader.getConnection(); 793 conn.setAutoCommit(true); 794 executor = new CacheDriverExecutor(conn); 795 record = executor.first(sql, dataRecord); 796 if(record != null){ 797 Cache.set(cacheKey,record,CACHE_SECONDS); 798 } 799 } catch (SQLException e) { 800 throw new IOException(e.getMessage(), e); 801 } finally { 802 try { 803 if (executor != null) 804 executor.close(); 805 } catch (Exception e) { 806 log.error(e.getMessage(), e); 807 } 808 } 809 } 810 return record; 811 } 812 813 private Map<String, Object> getFirstFilePartRefresh() throws IOException { 814 String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE); 815 Map dataRecord = new HashMap<String, Object>(); 816 dataRecord.put("file_name", origin.getName()); 817 dataRecord.put("file_parent_path", getParent()); 818 String cacheKey = String.format("first-%s-%s",sql,dataRecord); 819 Cache.remove(cacheKey); 820 CacheDriverExecutor executor = null; 821 Connection conn = null; 822 try { 823 conn = dataSourceReader.getConnection(); 824 conn.setAutoCommit(true); 825 executor = new CacheDriverExecutor(conn); 826 Map<String,Object> record = executor.first(sql, dataRecord); 827 if(record != null){ 828 Cache.set(cacheKey,record,CACHE_SECONDS); 829 } 830 return record; 831 } catch (SQLException e) { 832 throw new IOException(e.getMessage(), e); 833 } finally { 834 try { 835 if (executor != null) 836 executor.close(); 837 } catch (Exception e) { 838 log.error(e.getMessage(), e); 839 } 840 } 841 } 842 843 private Map<String, Object> getLastIndexRecord() throws IOException { 844 String sql = formatSql(SQL_SELECT_FOR_LAST_FILE_PART); 845 Map dataRecord = new HashMap<String, Object>(); 846 dataRecord.put("file_name", origin.getName()); 847 dataRecord.put("file_parent_path", getParent()); 848 dataRecord.put("file_id", fileId); 849 Map<String, Object> record = null; 850 if (record == null) { 851 CacheDriverExecutor executor = null; 852 Connection conn = null; 853 try { 854 conn = dataSourceReader.getConnection(); 855 conn.setAutoCommit(true); 856 executor = new CacheDriverExecutor(conn); 857 record = executor.first(sql, dataRecord); 858 } catch (SQLException e) { 859 throw new IOException(e.getMessage(), e); 860 } finally { 861 try { 862 if (executor != null) 863 executor.close(); 864 } catch (Exception e) { 865 log.error(e.getMessage(), e); 866 } 867 } 868 } 869 return record; 870 } 871 872 public boolean isTimeout(long timeoutMs) throws IOException { 873 CacheDriverExecutor executor = null; 874 Connection conn = null; 875 try { 876 Clock clock = new Clock(); 877 conn = dataSourceReader.getConnection(); 878 conn.setAutoCommit(true); 879 executor = new CacheDriverExecutor(conn); 880 Map dataRecord = new HashMap<String, Object>(); 881 dataRecord.put("file_name", origin.getName()); 882 dataRecord.put("file_parent_path", getParent()); 883 Map<String, Object> completedRecord = getCompletedRecord(); 884 885 if (completedRecord != null) 886 return false; 887 888 Map<String, Object> record = executor.first(formatSql(SQL_SELECT_FOR_TIMEOUT), dataRecord); 889 890 if (record == null) 891 return false; 892 893 boolean completed = executor.first(formatSql(SQL_SELECT_CHECK_COMPLETED), record) != null; 894 if (!completed) { 895 boolean timeout = (clock.getTime() - ((Timestamp) record.get("updated_at")).getTime()) > timeoutMs; 896 897 return timeout; 898 } 899 return false; 900 } catch (SQLException e) { 901 throw new IOException(e.getMessage(), e); 902 } finally { 903 try { 904 if (executor != null) 905 executor.close(); 906 } catch (Exception e) { 907 log.error(e.getMessage(), e); 908 } 909 } 910 } 911 912 private Map<String, Object> readPart(Map<String, Object> rfRecord) throws IOException { 913 CacheDriverExecutor executor = null; 914 Connection conn = null; 915 try { 916 calculateDataMappingDataSourceReader(this, rfRecord); 917 if(this.dataMappingTable <= -1){ 918 return null; 919 } 920 Clock clock = new Clock(); 921 conn = dataMappingDataSourceReader.getConnection(); 922 conn.setAutoCommit(true); 923 executor = new CacheDriverExecutor(conn); 924 Map<String, Object> dataRecord = new HashMap<String, Object>(); 925 dataRecord.put("remote_file_id", rfRecord.get("id")); 926 return executor.first(formatSqlForFilePartData(SQL_SELECT_BY_RF_ID_FROM_FILE_DATA), dataRecord); 927 } catch (SQLException e) { 928 throw new IOException(e.getMessage(), e); 929 } finally { 930 try { 931 if (executor != null) 932 executor.close(); 933 } catch (Exception e) { 934 log.error(e.getMessage(), e); 935 } 936 } 937 } 938 939 public void merge(FilePart filePart) throws IOException { 940 if (isLink()) { 941 throw new IOException(String.format("The file '%s' is a link.", origin.getAbsolutePath())); 942 } 943 if (isDir()) { 944 throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath())); 945 } 946 CacheDriverExecutor executor = null; 947 Connection conn = null; 948 try { 949 Clock clock = new Clock(); 950 conn = dataSourceReader.getConnection(); 951 conn.setAutoCommit(true); 952 executor = new CacheDriverExecutor(conn); 953 Map dataRecord = getCompletedRecord(); 954 if (dataRecord != null) { 955 CacheArray rows = new CacheArray(); 956 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 957 long fileSize = 0L; 958 Map<String, Object> fp = null; 959 Map<String, Object> item = null; 960 961 @Override 962 public void execute(Integer index, Object o) { 963 964 try { 965 item = (Map<String, Object>) o; 966 fp = readPart(item); 967 Object fpData = null; 968 969 if(fp == null){ 970 fpData = new byte[]{}; 971 }else{ 972 fpData = fp.get("file_part_data"); 973 } 974 Thread.currentThread() 975 .setName(String.format("RemoteFile-merge-%s", item.get("file_name"))); 976 byte[] partData = null; 977 if (fpData instanceof byte[]) { 978 partData = (byte[]) fpData; 979 } 980 fileSize += partData.length; 981 filePart.process(index, partData); 982 } catch (Exception e) { 983 log.error(e.getMessage(), e); 984 this.terminated = true; 985 } 986 } 987 988 @Override 989 public void completed(Integer size) { 990 try { 991 if (item != null) { 992 Thread.currentThread() 993 .setName(String.format("RemoteFile-merge-completed-%s", item.get("file_name"))); 994 } 995 filePart.completed(size, new Clock().getSqlTimestamp(), fileSize); 996 } catch (IOException e) { 997 log.error(e.getMessage(), e); 998 } 999 } 1000 1001 @Override 1002 public void terminated() { 1003 if (item != null) { 1004 Thread.currentThread() 1005 .setName(String.format("RemoteFile-merge-trminated-%s", item.get("file_name"))); 1006 log.error("Merge terminated: {}/{}", item.get("file_parent_path"), item.get("file_name")); 1007 } 1008 } 1009 }; 1010 rows.filter(filter); 1011 executor.find(formatSql(SQL_SELECT_FILE_HAVE_DATA), dataRecord, rows); 1012 } 1013 } catch (SQLException e) { 1014 throw new IOException(e.getMessage(), e); 1015 } finally { 1016 if (executor != null) { 1017 try { 1018 executor.close(); 1019 } catch (Exception e) { 1020 throw new IOException(e.getMessage(), e); 1021 } 1022 } 1023 } 1024 } 1025 1026 protected void copyFrom(boolean copyStructureOnly, DiskFile diskFile) throws IOException { 1027 var the = this; 1028 if (diskFile.isLink()) { 1029 the.setModifyTime(diskFile.getModifyTimeForClock()); 1030 the.createLink(diskFile.readLink()); 1031 } 1032 if (diskFile.isDir()) { 1033 the.setModifyTime(diskFile.getModifyTimeForClock()); 1034 the.mkdirs(); 1035 } 1036 if (diskFile.isFile()) { 1037 the.setModifyTime(diskFile.getModifyTimeForClock()); 1038 if (copyStructureOnly) { 1039 the.createStructureFile(diskFile.size()); 1040 diskFile.removeLogicAccess(); 1041 DiskFile.removeQueuePathMapping(diskFile.getPath()); 1042 } else { 1043 diskFile.split(filePartSize, new FilePart() { 1044 final List<byte[]> partDataList = new ArrayList<byte[]>(); 1045 int currentFilePartStartIndex = 0; 1046 1047 @Override 1048 protected void process(int partIndex, byte[] data) throws IOException { 1049 try{ 1050 if (partIndex == 0) { 1051 if (diskFile.syncRoot != null) { 1052 if (diskFile.isLogicModify()) { 1053 throw new IOException( 1054 String.format("The file '%s' is already logic locked by another thread.", 1055 diskFile.getPath())); 1056 } 1057 } 1058 if (the.exists(true)) { 1059 throw new IOException( 1060 String.format("The remote file '%s' already exist.", the.getPath())); 1061 } 1062 if (isDebug(the)) { 1063 LoggerFactory.getLogger(RemoteFile.class).mark("CreateRemoteFile - {}", the.getPath()); 1064 } 1065 } 1066 1067 partDataList.add(data); 1068 if (partDataList.size() >= BATCH_SIZE) { 1069 diskFile.logicAccess(); 1070 if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){ 1071 throw new IOException(String.format("The remote file '%s' write failed.",the.getPath())); 1072 } 1073 partDataList.clear(); 1074 currentFilePartStartIndex += BATCH_SIZE; 1075 } 1076 }catch(Exception e){ 1077 throw new IOException(e.getMessage(),e); 1078 } 1079 } 1080 1081 @Override 1082 protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) 1083 throws IOException { 1084 try { 1085 if (diskFile.syncRoot != null) { 1086 if (diskFile.isLogicModify()) { 1087 throw new IOException( 1088 String.format("The file '%s' is already logic locked by another thread.", 1089 diskFile.getPath())); 1090 } 1091 } 1092 /**This is 0 bytes file**/ 1093 if (lastPartIndex == -1) { 1094 1095 if (the.exists(true)) { 1096 the.forceDeleteFile(false); 1097 } 1098 1099 the.setModifyTime(modifyTime); 1100 the.createEmptyFile(); 1101 } else { 1102 the.setModifyTime(modifyTime); 1103 if (partDataList.size() > 0) { 1104 if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){ 1105 throw new IOException(String.format("The remote file '%s' write failed.",the.getPath())); 1106 } 1107 partDataList.clear(); 1108 } 1109 the.complete(lastPartIndex, fileSize); 1110 } 1111 if (isDebug(the)) { 1112 LoggerFactory.getLogger(RemoteFile.class).mark("CompletedRemoteFile - {}", 1113 the.getPath()); 1114 } 1115 DiskFile.removeQueuePathMapping(diskFile.getPath()); 1116 } catch (Exception e) { 1117 throw new IOException(e.getMessage(),e); 1118 } 1119 } 1120 1121 @Override 1122 protected void ended(int lastPartIndex, long fileSize) { 1123 1124 } 1125 1126 }); 1127 } 1128 } 1129 } 1130 1131 private boolean createEmptyFile() throws IOException { 1132 boolean allowed = beforeWrite(); 1133 1134 if(!allowed) return false; 1135 1136 Clock clock = new Clock(); 1137 Timestamp currentTime = clock.getSqlTimestamp(); 1138 CacheDriverExecutor executor = null; 1139 Connection conn = null; 1140 try { 1141 calculateDataMappingDataSourceWriter(this); 1142 conn = dataSourceWriter.getConnection(); 1143 conn.setAutoCommit(true); 1144 executor = new CacheDriverExecutor(conn); 1145 Map dataRecord = new HashMap<String, Object>(); 1146 dataRecord.put("id", 1147 generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix)); 1148 dataRecord.put("file_name", origin.getName()); 1149 dataRecord.put("file_parent_path", getParent()); 1150 dataRecord.put("file_type", "F"); 1151 dataRecord.put("file_part_index", 0); 1152 dataRecord.put("file_part_size", 0); 1153 dataRecord.put("file_part_data_ds", "NONE"); 1154 dataRecord.put("file_part_data_table", this.dataMappingTable); 1155 dataRecord.put("file_part_last_index", 0); 1156 dataRecord.put("file_size", 0); 1157 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1158 dataRecord.put("file_deleted", 0); 1159 dataRecord.put("created_user_id", getDefaultModifyUserId()); 1160 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1161 dataRecord.put("created_at", currentTime); 1162 dataRecord.put("updated_at", currentTime); 1163 dataRecord.put("file_id", fileId); 1164 dataRecord.put("source_hostname", CommonTools.getHostname()); 1165 dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1166 dataRecord.put("deleted_on_hostnames", ";"); 1167 executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord); 1168 1169 afterWrite(); 1170 1171 return true; 1172 } catch (Exception e) { 1173 throw new IOException(e.getMessage(), e); 1174 } finally { 1175 try { 1176 if (executor != null) 1177 executor.close(); 1178 } catch (Exception e) { 1179 log.error(e.getMessage(), e); 1180 } 1181 } 1182 } 1183 1184 private boolean createStructureFile(long fileSize) throws IOException { 1185 boolean allowed = beforeWrite(); 1186 1187 if(!allowed) return false; 1188 1189 log.debug("CreateStructureFile: {}", getPath()); 1190 Clock clock = new Clock(); 1191 Timestamp currentTime = clock.getSqlTimestamp(); 1192 CacheDriverExecutor executor = null; 1193 Connection conn = null; 1194 try { 1195 calculateDataMappingDataSourceWriter(this); 1196 conn = dataSourceWriter.getConnection(); 1197 conn.setAutoCommit(true); 1198 executor = new CacheDriverExecutor(conn); 1199 Map dataRecord = new HashMap<String, Object>(); 1200 dataRecord.put("id", 1201 generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix)); 1202 dataRecord.put("file_name", origin.getName()); 1203 dataRecord.put("file_parent_path", getParent()); 1204 dataRecord.put("file_type", "F"); 1205 dataRecord.put("file_part_index", 0); 1206 dataRecord.put("file_part_size", fileSize); 1207 dataRecord.put("file_part_data_ds", "NONE"); 1208 dataRecord.put("file_part_data_table", this.dataMappingTable); 1209 dataRecord.put("file_part_last_index", 0); 1210 dataRecord.put("file_size", fileSize); 1211 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1212 dataRecord.put("file_deleted", 0); 1213 dataRecord.put("created_user_id", getDefaultModifyUserId()); 1214 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1215 dataRecord.put("created_at", currentTime); 1216 dataRecord.put("updated_at", currentTime); 1217 dataRecord.put("file_id", fileId); 1218 dataRecord.put("source_hostname", CommonTools.getHostname()); 1219 dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1220 dataRecord.put("deleted_on_hostnames", ";"); 1221 executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord); 1222 1223 afterWrite(); 1224 1225 return true; 1226 } catch (Exception e) { 1227 throw new IOException(e.getMessage(), e); 1228 } finally { 1229 try { 1230 if (executor != null) 1231 executor.close(); 1232 } catch (Exception e) { 1233 log.error(e.getMessage(), e); 1234 } 1235 } 1236 } 1237 1238 protected boolean syncedToDisk() throws IOException { 1239 Clock clock = new Clock(); 1240 CacheDriverExecutor executor = null; 1241 Connection conn = null; 1242 try { 1243 conn = dataSourceWriter.getConnection(); 1244 conn.setAutoCommit(true); 1245 executor = new CacheDriverExecutor(conn); 1246 Map<String, Object> record = getCompletedRecord(); 1247 if (record == null) { 1248 log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath())); 1249 } 1250 if (record != null) { 1251 String hostname = CommonTools.getHostname(); 1252 String syncedOnHostnames = (String) record.get("synced_on_hostnames"); 1253 syncedOnHostnames = syncedOnHostnames.replaceAll(String.format(";%s;",hostname),";"); 1254 String shn = String.format("%s%s;", syncedOnHostnames, hostname); 1255 Map<String, Object> dataRecord = new HashMap<String, Object>(); 1256 dataRecord.put("file_parent_path",record.get("file_parent_path")); 1257 dataRecord.put("file_name", record.get("file_name")); 1258 dataRecord.put("synced_on_hostnames",shn); 1259 dataRecord.put("updated_at", clock.getSqlTimestamp()); 1260 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1261 executor.execute(formatSql(SQL_UPDATE_FOR_SYNCED_TO_DISK), dataRecord); 1262 return true; 1263 } 1264 return false; 1265 } catch (SQLException e) { 1266 throw new IOException(e.getMessage(), e); 1267 } finally { 1268 try { 1269 if (executor != null) 1270 executor.close(); 1271 } catch (Exception e) { 1272 log.error(e.getMessage(), e); 1273 } 1274 } 1275 } 1276 1277 protected boolean deletedToDiskAndSub(Object id) throws IOException { 1278 Clock clock = new Clock(); 1279 CacheDriverExecutor executor = null; 1280 Connection conn = null; 1281 try { 1282 conn = dataSourceWriter.getConnection(); 1283 conn.setAutoCommit(true); 1284 executor = new CacheDriverExecutor(conn); 1285 Map<String, Object> record = getRecordById(id); 1286 if (record == null) { 1287 log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath())); 1288 } 1289 if (record != null) { 1290 String hostname = CommonTools.getHostname(); 1291 String deletedOnHostnames = (String) record.get("deleted_on_hostnames"); 1292 deletedOnHostnames = deletedOnHostnames.replaceAll(String.format(";%s;",hostname),";"); 1293 String dhn = String.format("%s%s;", deletedOnHostnames, hostname); 1294 Timestamp now = clock.getSqlTimestamp(); 1295 final Map<String, Object> dataRecord = new HashMap<String, Object>(); 1296 dataRecord.put("file_parent_path", record.get("file_parent_path")); 1297 dataRecord.put("file_name", record.get("file_name")); 1298 dataRecord.put("deleted_on_hostnames", dhn); 1299 dataRecord.put("updated_at", now); 1300 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1301 executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK), dataRecord); 1302 if (record.get("file_type").equals("D")) { 1303 String dirPath = String.format("%s/%s", record.get("file_parent_path"), 1304 record.get("file_name")); 1305 dataRecord.put("file_parent_path", dirPath); 1306 dataRecord.put("like_file_parent_path", dirPath + "/%"); 1307 executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK_SUB_FILES), dataRecord); 1308 } 1309 return true; 1310 } 1311 return false; 1312 } catch (SQLException e) { 1313 throw new IOException(e.getMessage(), e); 1314 } finally { 1315 try { 1316 if (executor != null) 1317 executor.close(); 1318 } catch (Exception e) { 1319 log.error(e.getMessage(), e); 1320 } 1321 } 1322 } 1323 1324 private boolean complete(int filePartLastIndex, long fileSize) throws IOException { 1325 Clock clock = new Clock(); 1326 CacheDriverExecutor executor = null; 1327 Timestamp currentTime = clock.getSqlTimestamp(); 1328 Connection conn = null; 1329 try { 1330 conn = dataSourceWriter.getConnection(); 1331 conn.setAutoCommit(true); 1332 executor = new CacheDriverExecutor(conn); 1333 Map dataRecord = new HashMap<String, Object>(); 1334 dataRecord.put("file_id", fileId); 1335 dataRecord.put("file_part_last_index", filePartLastIndex); 1336 dataRecord.put("file_size", fileSize); 1337 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1338 dataRecord.put("updated_at", currentTime); 1339 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1340 int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord); 1341 1342 afterWrite(); 1343 1344 return rows > 0; 1345 } catch (SQLException e) { 1346 throw new IOException(e.getMessage(), e); 1347 } finally { 1348 try { 1349 if (executor != null) 1350 executor.close(); 1351 } catch (Exception e) { 1352 log.error(e.getMessage(), e); 1353 } 1354 } 1355 } 1356 1357 @Override 1358 public boolean complete() throws IOException { 1359 Clock clock = new Clock(); 1360 Timestamp currentTime = clock.getSqlTimestamp(); 1361 CacheDriverExecutor executor = null; 1362 Connection conn = null; 1363 try { 1364 conn = dataSourceWriter.getConnection(); 1365 conn.setAutoCommit(true); 1366 executor = new CacheDriverExecutor(conn); 1367 Map dataRecord = new HashMap<String, Object>(); 1368 dataRecord.put("file_name", origin.getName()); 1369 dataRecord.put("file_parent_path", getParent()); 1370 dataRecord.put("file_id", fileId); 1371 1372 Map<String, Object> sumRecord = executor.first(formatSql(SQL_SELECT_FOR_SUM_FILE_PART_SIZE_INDEX), 1373 dataRecord); 1374 1375 if (sumRecord == null) { 1376 log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath())); 1377 } 1378 1379 if (sumRecord != null) { 1380 long fileSize = Long.parseLong(sumRecord.get("file_part_size") + ""); 1381 long filePartLastIndex = Integer.parseInt(sumRecord.get("file_part_index") + ""); 1382 1383 dataRecord.put("file_part_last_index", filePartLastIndex); 1384 dataRecord.put("file_size", fileSize); 1385 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1386 dataRecord.put("updated_at", currentTime); 1387 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1388 int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord); 1389 1390 afterWrite(); 1391 1392 return rows > 0; 1393 } 1394 return false; 1395 } catch (SQLException e) { 1396 throw new IOException(e.getMessage(), e); 1397 } finally { 1398 try { 1399 if (executor != null) 1400 executor.close(); 1401 } catch (Exception e) { 1402 log.error(e.getMessage(), e); 1403 } 1404 } 1405 } 1406 1407 public boolean forceDelete() throws IOException { 1408 return forceDeleteAll(true); 1409 } 1410 1411 public boolean forceDeleteDir(boolean realDeleted) throws IOException { 1412 return forceDelete(realDeleted, "D"); 1413 } 1414 1415 public boolean forceDeleteLink(boolean realDeleted) throws IOException { 1416 return forceDelete(realDeleted, "L"); 1417 } 1418 1419 public boolean forceDeleteFile(boolean realDeleted) throws IOException { 1420 return forceDelete(realDeleted, "F"); 1421 } 1422 1423 public boolean forceDelete(boolean realDeleted, String fileType) throws IOException { 1424 if (fileType.equalsIgnoreCase("D")) { 1425 return forceDeleteAll(realDeleted); 1426 } else { 1427 boolean allowed = beforeDelete(realDeleted); 1428 if (allowed) { 1429 CacheDriverExecutor executor = null; 1430 Connection conn = null; 1431 try { 1432 Clock clock = new Clock(); 1433 Timestamp currentTime = clock.getSqlTimestamp(); 1434 conn = dataSourceWriter.getConnection(); 1435 conn.setAutoCommit(true); 1436 executor = new CacheDriverExecutor(conn); 1437 Map dataRecord = new HashMap<String, Object>(); 1438 dataRecord.put("file_deleted", realDeleted ? 1 : 0); 1439 dataRecord.put("deleted", realDeleted ? 0 : 1); 1440 dataRecord.put("file_name", origin.getName()); 1441 dataRecord.put("file_type", fileType.toUpperCase()); 1442 dataRecord.put("file_parent_path", getParent()); 1443 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1444 dataRecord.put("deleted_user_id", getDefaultModifyUserId()); 1445 dataRecord.put("updated_at", currentTime); 1446 dataRecord.put("deleted_at", currentTime); 1447 dataRecord.put("root_folder", getPath()); 1448 dataRecord.put("like_subfolder", parsePath(getPath() + "/%")); 1449 int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_FILE_LINK), dataRecord); 1450 log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows); 1451 1452 afterDelete(realDeleted); 1453 1454 return resultRows > 0; 1455 } catch (SQLException e) { 1456 throw new IOException(e.getMessage(), e); 1457 } finally { 1458 try { 1459 if (executor != null) 1460 executor.close(); 1461 } catch (Exception e) { 1462 log.error(e.getMessage(), e); 1463 } 1464 } 1465 } 1466 } 1467 return false; 1468 } 1469 1470 public boolean forceDeleteAll(boolean realDeleted) throws IOException { 1471 boolean allowed = beforeDelete(realDeleted); 1472 if (allowed) { 1473 CacheDriverExecutor executor = null; 1474 Connection conn = null; 1475 try { 1476 Clock clock = new Clock(); 1477 Timestamp currentTime = clock.getSqlTimestamp(); 1478 conn = dataSourceWriter.getConnection(); 1479 conn.setAutoCommit(true); 1480 executor = new CacheDriverExecutor(conn); 1481 Map dataRecord = new HashMap<String, Object>(); 1482 dataRecord.put("file_deleted", realDeleted ? 1 : 0); 1483 dataRecord.put("deleted", realDeleted ? 0 : 1); 1484 dataRecord.put("file_name", origin.getName()); 1485 dataRecord.put("file_parent_path", getParent()); 1486 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1487 dataRecord.put("deleted_user_id", getDefaultModifyUserId()); 1488 dataRecord.put("updated_at", currentTime); 1489 dataRecord.put("deleted_at", currentTime); 1490 dataRecord.put("root_folder", getPath()); 1491 dataRecord.put("like_subfolder", parsePath(getPath() + "/%")); 1492 int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_ALL), dataRecord); 1493 log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows); 1494 1495 afterDelete(realDeleted); 1496 1497 return resultRows > 0; 1498 } catch (SQLException e) { 1499 throw new IOException(e.getMessage(), e); 1500 } finally { 1501 try { 1502 if (executor != null) 1503 executor.close(); 1504 } catch (Exception e) { 1505 log.error(e.getMessage(), e); 1506 } 1507 } 1508 } 1509 return false; 1510 } 1511 1512 @Override 1513 public boolean delete() throws IOException { 1514 return forceDeleteAll(true); 1515 } 1516 1517 public boolean delete(boolean realDeleted) throws IOException { 1518 return forceDeleteAll(realDeleted); 1519 } 1520 1521 @Override 1522 public boolean beforeDelete(boolean realDeleted) { 1523 return true; 1524 } 1525 1526 @Override 1527 public void afterDelete(boolean realDeleted) { 1528 1529 } 1530 1531 @Override 1532 public boolean beforeMkdirs() { 1533 return true; 1534 } 1535 1536 @Override 1537 public void afterMkdirs() { 1538 1539 } 1540 1541 @Override 1542 public boolean beforeCreateLink(String target) { 1543 return true; 1544 } 1545 1546 @Override 1547 public void afterCreateLink(String target) { 1548 1549 } 1550 1551 @Override 1552 public boolean beforeWrite(){ 1553 return true; 1554 } 1555 1556 @Override 1557 public void afterWrite(){ 1558 1559 } 1560 1561 @Override 1562 public boolean exists() throws IOException { 1563 return getFirstFilePart() != null; 1564 } 1565 1566 public boolean exists(boolean refresh) throws IOException { 1567 if(refresh){ 1568 return getFirstFilePartRefresh() != null; 1569 }else{ 1570 return exists(); 1571 } 1572 } 1573 1574 @Override 1575 public boolean mkdirs() throws IOException { 1576 return mkdirs(false); 1577 } 1578 1579 public boolean mkdirs(boolean checkSourceHostname) throws IOException { 1580 1581 boolean allowed = beforeMkdirs(); 1582 1583 if(!allowed) return false; 1584 1585 Clock clock = new Clock(); 1586 CacheDriverExecutor executor = null; 1587 Connection conn = null; 1588 try { 1589 if (getFirstFilePart() != null) { 1590 return false; 1591 } 1592 if (checkSourceHostname) { 1593 String lastSourceHostname = getLastSourceHostname(); 1594 if (lastSourceHostname != null) { 1595 String clientHostname = CommonTools.getHostname(); 1596 boolean sameHostname = clientHostname.equals(lastSourceHostname); 1597 if (!sameHostname) { 1598 return false; 1599 } 1600 } 1601 } 1602 log.debug("Mkidrs: {}", getPath()); 1603 calculateDataMappingDataSourceWriter(this); 1604 Timestamp currentTime = clock.getSqlTimestamp(); 1605 conn = dataSourceWriter.getConnection(); 1606 conn.setAutoCommit(true); 1607 executor = new CacheDriverExecutor(conn); 1608 Map dataRecord = new HashMap<String, Object>(); 1609 dataRecord.put("id", 1610 generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix)); 1611 dataRecord.put("file_name", origin.getName()); 1612 dataRecord.put("file_parent_path", getParent()); 1613 dataRecord.put("file_type", "D"); 1614 dataRecord.put("file_part_index", 0); 1615 dataRecord.put("file_part_size", 0); 1616 dataRecord.put("file_part_data_ds", "NONE"); 1617 dataRecord.put("file_part_data_table", this.dataMappingTable); 1618 dataRecord.put("file_part_last_index", 0); 1619 dataRecord.put("file_size", 0); 1620 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1621 dataRecord.put("file_deleted", 0); 1622 dataRecord.put("created_user_id", getDefaultModifyUserId()); 1623 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1624 dataRecord.put("created_at", currentTime); 1625 dataRecord.put("updated_at", currentTime); 1626 dataRecord.put("file_id", fileId); 1627 dataRecord.put("source_hostname", CommonTools.getHostname()); 1628 dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1629 dataRecord.put("deleted_on_hostnames", ";"); 1630 int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord); 1631 1632 afterMkdirs(); 1633 1634 return rows > 0; 1635 } catch (Exception e) { 1636 throw new IOException(e.getMessage(), e); 1637 } finally { 1638 try { 1639 if (executor != null) 1640 executor.close(); 1641 } catch (Exception e) { 1642 log.error(e.getMessage(), e); 1643 } 1644 } 1645 } 1646 1647 @Override 1648 public boolean write(byte[] data, boolean append) throws IOException { 1649 return writeTo(data, append); 1650 } 1651 1652 public synchronized boolean writeBlock(byte[] data, boolean append) throws IOException { 1653 return writeTo(data, append); 1654 } 1655 1656 private boolean writeToBatch(int currentFilePartStartIndex, List<byte[]> partDataList) throws IOException { 1657 1658 boolean allowed = beforeWrite(); 1659 1660 if(!allowed) return false; 1661 1662 CacheDriverExecutor executor = null; 1663 CacheDriverExecutor dmExecutor = null; 1664 List<Map<String, Object>> rfList = new ArrayList<Map<String, Object>>(); 1665 List<Map<String, Object>> dmList = new ArrayList<Map<String, Object>>(); 1666 try { 1667 Clock clock = new Clock(); 1668 log.debug("Write: {}", getPath()); 1669 int size = partDataList.size(); 1670 Timestamp currentTime = clock.getSqlTimestamp(); 1671 String hostname = CommonTools.getHostname(); 1672 calculateDataMappingDataSourceWriter(this); 1673 for (int filePartIndex = 0; filePartIndex < size; filePartIndex++) { 1674 byte[] data = partDataList.get(filePartIndex); 1675 String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix, 1676 identityCardSuffix); 1677 Map rfRecord = new HashMap<String, Object>(); 1678 rfRecord.put("id", remoteFileId); 1679 rfRecord.put("file_name", origin.getName()); 1680 rfRecord.put("file_parent_path", getParent()); 1681 rfRecord.put("file_type", "F"); 1682 rfRecord.put("file_part_index", filePartIndex + currentFilePartStartIndex); 1683 rfRecord.put("file_part_size", data.length); 1684 rfRecord.put("file_part_data_ds", this.dataMappingDs); 1685 rfRecord.put("file_part_data_table", this.dataMappingTable); 1686 rfRecord.put("file_part_last_index", -1); 1687 rfRecord.put("file_size", 0); 1688 rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1689 rfRecord.put("file_deleted", 0); 1690 rfRecord.put("created_user_id", getDefaultModifyUserId()); 1691 rfRecord.put("updated_user_id", getDefaultModifyUserId()); 1692 rfRecord.put("created_at", currentTime); 1693 rfRecord.put("updated_at", currentTime); 1694 rfRecord.put("file_id", fileId); 1695 rfRecord.put("source_hostname", hostname); 1696 rfRecord.put("synced_on_hostnames", String.format(";%s;", hostname)); 1697 rfRecord.put("deleted_on_hostnames", ";"); 1698 rfList.add(rfRecord); 1699 1700 Map<String, Object> dataRecord = new HashMap<String, Object>(); 1701 dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this)); 1702 dataRecord.put("file_part_data", encrypt(data)); 1703 dataRecord.put("remote_file_id", remoteFileId); 1704 dataRecord.put("file_id", fileId); 1705 dataRecord.put("created_at", currentTime); 1706 dataRecord.put("updated_at", currentTime); 1707 dmList.add(dataRecord); 1708 } 1709 Connection conn = dataSourceWriter.getConnection(); 1710 conn.setAutoCommit(true); 1711 executor = new CacheDriverExecutor(conn); 1712 executor.executeBatch(formatSql(SQL_INSERT_INTO_FILE), rfList); 1713 1714 conn = dataMappingDataSourceWriter.getConnection(); 1715 conn.setAutoCommit(true); 1716 dmExecutor = new CacheDriverExecutor(conn); 1717 dmExecutor.executeBatch(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dmList); 1718 1719 return true; 1720 } catch (Exception e) { 1721 throw new IOException(e.getMessage(), e); 1722 } finally { 1723 try { 1724 if (executor != null) 1725 executor.close(); 1726 } catch (Exception e) { 1727 log.error(e.getMessage(), e); 1728 } 1729 try { 1730 if (dmExecutor != null) 1731 dmExecutor.close(); 1732 } catch (Exception e) { 1733 log.error(e.getMessage(), e); 1734 } 1735 } 1736 } 1737 1738 private boolean writeTo(byte[] data, boolean append) throws IOException { 1739 1740 boolean allowed = beforeWrite(); 1741 1742 if(!allowed) return false; 1743 1744 CacheDriverExecutor executor = null; 1745 CacheDriverExecutor dmExecutor = null; 1746 try { 1747 if (!append) { 1748 if (exists()) 1749 throw new IOException( 1750 String.format("The remote file '%s' already exists.", origin.getAbsolutePath())); 1751 } 1752 Clock clock = new Clock(); 1753 Timestamp currentTime = clock.getSqlTimestamp(); 1754 Map<String, Object> lastRecord = getLastIndexRecord(); 1755 int filePartIndex = 0; 1756 if (lastRecord != null) { 1757 if (lastRecord.get("file_type").equals("D")) { 1758 throw new IOException(String.format("This is a folder '%s'.", origin.getAbsolutePath())); 1759 } 1760 if (lastRecord.get("file_type").equals("L")) { 1761 throw new IOException(String.format("This is a link '%s'.", origin.getAbsolutePath())); 1762 } 1763 if (append) { 1764 filePartIndex = Integer.parseInt(lastRecord.get("file_part_index") + "") + 1; 1765 } 1766 boolean isDeleted = false; 1767 Object deletedValue = lastRecord.get("deleted"); 1768 if (deletedValue instanceof Boolean) { 1769 isDeleted = (Boolean) deletedValue; 1770 } else { 1771 isDeleted = Integer.parseInt(deletedValue + "") == 1; 1772 } 1773 if (isDeleted) { 1774 throw new IOException(String.format("The file '%s' is deleted.", origin.getAbsolutePath())); 1775 } 1776 } 1777 calculateDataMappingDataSourceWriter(this); 1778 log.debug("Write: {}", getPath()); 1779 String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix, 1780 identityCardSuffix); 1781 Map rfRecord = new HashMap<String, Object>(); 1782 rfRecord.put("id", remoteFileId); 1783 rfRecord.put("file_name", origin.getName()); 1784 rfRecord.put("file_parent_path", getParent()); 1785 rfRecord.put("file_type", "F"); 1786 rfRecord.put("file_part_index", filePartIndex); 1787 rfRecord.put("file_part_size", data.length); 1788 rfRecord.put("file_part_data_ds", this.dataMappingDs); 1789 rfRecord.put("file_part_data_table", this.dataMappingTable); 1790 rfRecord.put("file_part_last_index", -1); 1791 rfRecord.put("file_size", 0); 1792 rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1793 rfRecord.put("file_deleted", 0); 1794 rfRecord.put("created_user_id", getDefaultModifyUserId()); 1795 rfRecord.put("updated_user_id", getDefaultModifyUserId()); 1796 rfRecord.put("created_at", currentTime); 1797 rfRecord.put("updated_at", currentTime); 1798 rfRecord.put("file_id", fileId); 1799 rfRecord.put("source_hostname", CommonTools.getHostname()); 1800 rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1801 rfRecord.put("deleted_on_hostnames", ";"); 1802 1803 Connection conn = dataSourceWriter.getConnection(); 1804 conn.setAutoCommit(true); 1805 executor = new CacheDriverExecutor(conn); 1806 executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord); 1807 1808 Map dataRecord = new HashMap<String, Object>(); 1809 dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this)); 1810 dataRecord.put("file_part_data", encrypt(data)); 1811 dataRecord.put("remote_file_id", remoteFileId); 1812 dataRecord.put("file_id", fileId); 1813 dataRecord.put("created_at", currentTime); 1814 dataRecord.put("updated_at", currentTime); 1815 1816 conn = dataMappingDataSourceWriter.getConnection(); 1817 conn.setAutoCommit(true); 1818 dmExecutor = new CacheDriverExecutor(conn); 1819 dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord); 1820 1821 return true; 1822 } catch (Exception e) { 1823 throw new IOException(e.getMessage(), e); 1824 } finally { 1825 try { 1826 if (executor != null) 1827 executor.close(); 1828 } catch (Exception e) { 1829 log.error(e.getMessage(), e); 1830 } 1831 try { 1832 if (dmExecutor != null) 1833 dmExecutor.close(); 1834 } catch (Exception e) { 1835 log.error(e.getMessage(), e); 1836 } 1837 } 1838 } 1839 1840 @Override 1841 public boolean write(byte[] data) throws IOException { 1842 return write(data, false); 1843 } 1844 1845 @Override 1846 public boolean write(String data, boolean append) throws IOException { 1847 return write(data.getBytes(CHARSET), append); 1848 } 1849 1850 @Override 1851 public boolean write(String data) throws IOException { 1852 return write(data.getBytes(CHARSET)); 1853 } 1854 1855 @Override 1856 public boolean createLink(String target) throws IOException { 1857 boolean allowed = beforeCreateLink(target); 1858 1859 if(!allowed) return false; 1860 1861 Clock clock = new Clock(); 1862 CacheDriverExecutor executor = null; 1863 CacheDriverExecutor dmExecutor = null; 1864 try { 1865 if (getFirstFilePart() != null) { 1866 return false; 1867 } 1868 log.debug("CreateLink: {}", getPath()); 1869 Timestamp currentTime = clock.getSqlTimestamp(); 1870 calculateDataMappingDataSourceWriter(this); 1871 String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix, 1872 identityCardSuffix); 1873 byte[] data = target.getBytes(CHARSET); 1874 Map rfRecord = new HashMap<String, Object>(); 1875 rfRecord.put("id", remoteFileId); 1876 rfRecord.put("file_name", origin.getName()); 1877 rfRecord.put("file_parent_path", getParent()); 1878 rfRecord.put("file_type", "L"); 1879 rfRecord.put("file_part_index", 0); 1880 rfRecord.put("file_part_size", data.length); 1881 rfRecord.put("file_part_data_ds", this.dataMappingDs); 1882 rfRecord.put("file_part_data_table", this.dataMappingTable); 1883 rfRecord.put("file_part_last_index", -1); 1884 rfRecord.put("file_size", 0); 1885 rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1886 rfRecord.put("file_deleted", 0); 1887 rfRecord.put("created_user_id", getDefaultModifyUserId()); 1888 rfRecord.put("updated_user_id", getDefaultModifyUserId()); 1889 rfRecord.put("created_at", currentTime); 1890 rfRecord.put("updated_at", currentTime); 1891 rfRecord.put("file_id", fileId); 1892 rfRecord.put("source_hostname", CommonTools.getHostname()); 1893 rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1894 rfRecord.put("deleted_on_hostnames", ";"); 1895 1896 Connection conn = dataSourceWriter.getConnection(); 1897 conn.setAutoCommit(true); 1898 executor = new CacheDriverExecutor(conn); 1899 int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord); 1900 1901 Map dataRecord = new HashMap<String, Object>(); 1902 dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this)); 1903 dataRecord.put("file_part_data", encrypt(data)); 1904 dataRecord.put("remote_file_id", remoteFileId); 1905 dataRecord.put("file_id", fileId); 1906 dataRecord.put("created_at", currentTime); 1907 dataRecord.put("updated_at", currentTime); 1908 1909 conn = dataMappingDataSourceWriter.getConnection(); 1910 conn.setAutoCommit(true); 1911 dmExecutor = new CacheDriverExecutor(conn); 1912 dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord); 1913 1914 complete(); 1915 1916 afterCreateLink(target); 1917 1918 return rows > 0; 1919 } catch (Exception e) { 1920 throw new IOException(e.getMessage(), e); 1921 } finally { 1922 try { 1923 if (executor != null) 1924 executor.close(); 1925 } catch (Exception e) { 1926 log.error(e.getMessage(), e); 1927 } 1928 try { 1929 if (dmExecutor != null) 1930 dmExecutor.close(); 1931 } catch (Exception e) { 1932 log.error(e.getMessage(), e); 1933 } 1934 } 1935 } 1936 1937 @Override 1938 public String readLink() throws IOException { 1939 return new String(readAllBytes(),BaseFile.CHARSET); 1940 } 1941 1942 @Override 1943 public byte[] readAllBytes() throws IOException { 1944 return readAllBytesFrom(); 1945 } 1946 1947 private byte[] readAllBytesFrom() throws IOException { 1948 if (isDir()) { 1949 throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath())); 1950 } 1951 if (!exists()) { 1952 throw new IOException(String.format("The file '%s' does not exist.", origin.getAbsolutePath())); 1953 } 1954 CacheDriverExecutor executor = null; 1955 Connection conn = null; 1956 ByteArrayOutputStream baos = null; 1957 try { 1958 conn = dataSourceReader.getConnection(); 1959 conn.setAutoCommit(true); 1960 executor = new CacheDriverExecutor(conn); 1961 Map dataRecord = getCompletedRecord(); 1962 List<Map<String, Object>> filePartList = executor.find(formatSql(SQL_SELECT_FILE), dataRecord); 1963 baos = new ByteArrayOutputStream(); 1964 for (Map<String, Object> fpItem : filePartList) { 1965 Map<String, Object> fp = readPart(fpItem); 1966 Object fpData = null; 1967 1968 if(fp == null){ 1969 fpData = new byte[]{}; 1970 }else{ 1971 fpData = fp.get("file_part_data"); 1972 } 1973 1974 if (fpData instanceof byte[]) { 1975 byte[] partData = (byte[]) fpData; 1976 baos.write(decrypt(partData)); 1977 baos.flush(); 1978 } 1979 } 1980 return baos == null ? null : baos.toByteArray(); 1981 } catch (Exception e) { 1982 throw new IOException(e.getMessage(), e); 1983 } finally { 1984 try { 1985 if (baos != null) 1986 baos.close(); 1987 } catch (Exception e) { 1988 log.error(e.getMessage(), e); 1989 } 1990 try { 1991 if (executor != null) 1992 executor.close(); 1993 } catch (Exception e) { 1994 log.error(e.getMessage(), e); 1995 } 1996 } 1997 } 1998 1999 @Override 2000 public boolean isFile() throws IOException { 2001 Map<String, Object> lastRecord = getFirstFilePart(); 2002 2003 if (lastRecord == null) { 2004 return false; 2005 } 2006 return lastRecord.get("file_type").equals("F"); 2007 } 2008 2009 @Override 2010 public boolean isDirectory() throws IOException { 2011 Map<String, Object> lastRecord = getFirstFilePart(); 2012 2013 if (lastRecord == null) { 2014 return false; 2015 } 2016 return lastRecord.get("file_type").equals("D"); 2017 } 2018 2019 @Override 2020 public boolean isLink() throws IOException { 2021 Map<String, Object> lastRecord = getFirstFilePart(); 2022 2023 if (lastRecord == null) { 2024 return false; 2025 } 2026 return lastRecord.get("file_type").equals("L"); 2027 } 2028 2029 @Override 2030 public String readAllString() throws IOException { 2031 ByteArrayOutputStream baos = null; 2032 try { 2033 byte[] bytes = readAllBytes(); 2034 2035 if(bytes == null) return null; 2036 2037 baos = new ByteArrayOutputStream(); 2038 baos.write(bytes); 2039 baos.flush(); 2040 return baos.toString(); 2041 } finally { 2042 if (baos != null) { 2043 try { 2044 baos.close(); 2045 } catch (IOException e) { 2046 log.error(e.getMessage(), e); 2047 } 2048 } 2049 } 2050 } 2051 2052 @Override 2053 public long size() throws IOException { 2054 Map<String, Object> record = getCompletedRecord(); 2055 if (record != null) { 2056 return Long.parseLong(record.get("file_size") + ""); 2057 } 2058 return 0L; 2059 } 2060 2061 public List<Map<String, Object>> list() throws IOException { 2062 return list(getPath(), "%"); 2063 } 2064 2065 public List<Map<String, Object>> list(String likeFileName) throws IOException { 2066 return list(getPath(), likeFileName); 2067 } 2068 2069 public List<Map<String, Object>> list(String likeFolderName, String likeFileName) throws IOException { 2070 CacheDriverExecutor executor = null; 2071 Connection conn = null; 2072 try { 2073 conn = dataSourceReader.getConnection(); 2074 conn.setAutoCommit(true); 2075 executor = new CacheDriverExecutor(conn); 2076 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2077 if (CommonTools.isBlank(likeFileName)) { 2078 dataRecord.put("file_name", "%"); 2079 } else { 2080 dataRecord.put("file_name", likeFileName); 2081 } 2082 if (CommonTools.isBlank(likeFolderName)) { 2083 dataRecord.put("file_parent_path", "%"); 2084 } else { 2085 dataRecord.put("file_parent_path", likeFolderName); 2086 } 2087 return executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord); 2088 } catch (SQLException e) { 2089 throw new IOException(e.getMessage(), e); 2090 } finally { 2091 if (executor != null) { 2092 try { 2093 executor.close(); 2094 } catch (Exception e) { 2095 throw new IOException(e.getMessage(), e); 2096 } 2097 } 2098 } 2099 } 2100 2101 public void listAll(String likeFileName, CacheArray rows) throws IOException { 2102 listAll(getPath(), likeFileName, rows); 2103 } 2104 2105 public void listAll(CacheArray rows) throws IOException { 2106 listAll(getPath(), "%", rows); 2107 } 2108 2109 public void listAll(String likeFolderName, String likeFileName, CacheArray rows) throws IOException { 2110 CacheDriverExecutor executor = null; 2111 Connection conn = null; 2112 try { 2113 conn = dataSourceReader.getConnection(); 2114 conn.setAutoCommit(true); 2115 executor = new CacheDriverExecutor(conn); 2116 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2117 if (CommonTools.isBlank(likeFileName)) { 2118 dataRecord.put("file_name", "%"); 2119 } else { 2120 dataRecord.put("file_name", likeFileName); 2121 } 2122 if (CommonTools.isBlank(likeFolderName)) { 2123 dataRecord.put("file_parent_path", "%"); 2124 } else { 2125 dataRecord.put("file_parent_path", likeFolderName); 2126 } 2127 executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord, rows); 2128 } catch (SQLException e) { 2129 throw new IOException(e.getMessage(), e); 2130 } finally { 2131 if (executor != null) { 2132 try { 2133 executor.close(); 2134 } catch (Exception e) { 2135 throw new IOException(e.getMessage(), e); 2136 } 2137 } 2138 } 2139 } 2140 2141 protected void listAllForDelete(String likeFolderName, String likeFileName, CacheArray rows) throws IOException { 2142 CacheDriverExecutor executor = null; 2143 Connection conn = null; 2144 try { 2145 long deleteCutoffTimeMs = 0L; 2146 2147 if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime(); 2148 2149 Timestamp currentTime = new Clock().getSqlTimestamp(); 2150 2151 conn = dataSourceReader.getConnection(); 2152 conn.setAutoCommit(true); 2153 executor = new CacheDriverExecutor(conn); 2154 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2155 if (CommonTools.isBlank(likeFileName)) { 2156 dataRecord.put("file_name", "%"); 2157 } else { 2158 dataRecord.put("file_name", likeFileName); 2159 } 2160 if (CommonTools.isBlank(likeFolderName)) { 2161 dataRecord.put("file_parent_path", "%"); 2162 } else { 2163 dataRecord.put("file_parent_path", likeFolderName); 2164 } 2165 dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 2166 dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs)); 2167 dataRecord.put("end_time", currentTime); 2168 dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME)); 2169 executor.find(0, MAX_QUEUE_SIZE,formatSql(SQL_SELECT_FOR_LIST_ALL_DELETE), dataRecord, rows); 2170 } catch (SQLException e) { 2171 throw new IOException(e.getMessage(), e); 2172 } finally { 2173 if (executor != null) { 2174 try { 2175 executor.close(); 2176 } catch (Exception e) { 2177 throw new IOException(e.getMessage(), e); 2178 } 2179 } 2180 } 2181 } 2182 2183 protected void listAllForSync(String likeFolderName, String likeFileName, CacheArray rows) throws IOException { 2184 CacheDriverExecutor executor = null; 2185 Connection conn = null; 2186 try { 2187 long deleteCutoffTimeMs = 0L; 2188 2189 if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = SYNC_CUTOFF_TIME.getTime(); 2190 2191 Timestamp currentTime = new Clock().getSqlTimestamp(); 2192 2193 conn = dataSourceReader.getConnection(); 2194 conn.setAutoCommit(true); 2195 executor = new CacheDriverExecutor(conn); 2196 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2197 if (CommonTools.isBlank(likeFileName)) { 2198 dataRecord.put("file_name", "%"); 2199 } else { 2200 dataRecord.put("file_name", likeFileName); 2201 } 2202 if (CommonTools.isBlank(likeFolderName)) { 2203 dataRecord.put("file_parent_path", "%"); 2204 } else { 2205 dataRecord.put("file_parent_path", likeFolderName); 2206 } 2207 dataRecord.put("synced_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 2208 dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs)); 2209 dataRecord.put("end_time", currentTime); 2210 dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME)); 2211 executor.find(0, MAX_QUEUE_SIZE, formatSql(SQL_SELECT_FOR_LIST_ALL_SYNC), dataRecord, rows); 2212 } catch (SQLException e) { 2213 throw new IOException(e.getMessage(), e); 2214 } finally { 2215 if (executor != null) { 2216 try { 2217 executor.close(); 2218 } catch (Exception e) { 2219 throw new IOException(e.getMessage(), e); 2220 } 2221 } 2222 } 2223 } 2224 2225 public void setModifyUserId(String modifyUserId) { 2226 this.modifyUserId = modifyUserId; 2227 } 2228 2229 private String getDefaultModifyUserId() { 2230 if (CommonTools.isBlank(modifyUserId)) { 2231 return DEFAULE_MODIFY_USER_ID; 2232 } else { 2233 return modifyUserId; 2234 } 2235 } 2236 2237 @Override 2238 public void setModifyTime(Timestamp modifyTime) throws IOException { 2239 this.modifyTime = modifyTime; 2240 } 2241 2242 @Override 2243 public Timestamp getModifyTime() throws IOException { 2244 Map<String, Object> record = getFirstFilePart(); 2245 if (record != null) { 2246 return (Timestamp) record.get("file_modify_time"); 2247 } 2248 return null; 2249 } 2250 2251 public boolean isTimeout() throws IOException { 2252 return isTimeout(LOGIC_TIMEOUT_MS); 2253 } 2254 2255 @Override 2256 public void copyTo(String toPath) throws IOException { 2257 log.debug("CopyTo: {} -> {}", getPath(), toPath); 2258 if (!exists()) { 2259 log.mark(String.format("The remote file '%s' does not exist.", this.getPath())); 2260 } 2261 RemoteFile destRf = new RemoteFile(toPath); 2262 final Timestamp originModifyTime = this.getModifyTime(); 2263 destRf.setModifyUserId(modifyUserId); 2264 destRf.setModifyTime(originModifyTime); 2265 if (isLink()) { 2266 if (destRf.exists(true)) { 2267 destRf.forceDeleteLink(false); 2268 } 2269 destRf.createLink(readLink()); 2270 } 2271 if (isFile()) { 2272 final RemoteFile _destRf = destRf; 2273 if (_destRf.exists(true)) { 2274 _destRf.forceDeleteFile(false); 2275 } 2276 merge(new FilePart() { 2277 @Override 2278 protected void process(int partIndex, byte[] data) { 2279 try { 2280 _destRf.write(decrypt(data), partIndex > 0); 2281 } catch (Exception e) { 2282 log.error(e.getMessage(), e); 2283 } 2284 } 2285 2286 @Override 2287 protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) { 2288 try { 2289 /**This is 0 bytes file**/ 2290 if (lastPartIndex == -1) { 2291 2292 if (_destRf.exists(true)) 2293 _destRf.forceDeleteFile(false); 2294 2295 _destRf.setModifyTime(originModifyTime); 2296 _destRf.createEmptyFile(); 2297 } else { 2298 _destRf.setModifyTime(originModifyTime); 2299 _destRf.complete(); 2300 } 2301 } catch (IOException e) { 2302 log.error(e.getMessage(), e); 2303 } 2304 } 2305 2306 @Override 2307 protected void ended(int lastPartIndex, long fileSize) { 2308 2309 } 2310 2311 }); 2312 } 2313 if (isDir()) { 2314 List<Map<String, Object>> copyList = list(getPath(), "%"); 2315 int size = copyList.size(); 2316 if (size > 0) { 2317 if (!destRf.exists(true)) { 2318 destRf.mkdirs(); 2319 } 2320 } 2321 String copyRootPath = getPath(); 2322 copyList = list(parsePath(getPath() + "/%"), "%"); 2323 if (copyList.size() > 0) { 2324 if (!destRf.exists(true)) { 2325 destRf.mkdirs(); 2326 } 2327 } 2328 for (Map<String, Object> item : copyList) { 2329 String itemParentPath = (String) item.get("file_parent_path"); 2330 String destParentPath = itemParentPath.replaceFirst(copyRootPath + "/", toPath); 2331 RemoteFile sourceRf = new RemoteFile( 2332 String.format("%s/%s", item.get("file_parent_path"), item.get("file_name"))); 2333 String destRfPath = new File(String.format("%s/%s", destParentPath, item.get("file_name"))) 2334 .getAbsolutePath(); 2335 if (sourceRf.isDir()) { 2336 RemoteFile tmpRf = new RemoteFile(destRfPath + "/"); 2337 tmpRf.setModifyTime(sourceRf.getModifyTime()); 2338 if (!tmpRf.exists(true)) { 2339 tmpRf.mkdirs(); 2340 } 2341 } else { 2342 sourceRf.copyTo(destRfPath); 2343 } 2344 } 2345 } 2346 } 2347 2348 @Override 2349 public void moveTo(String toPath) throws IOException { 2350 log.debug("MoveTo: {} -> {}", getPath(), toPath); 2351 RemoteFile destRf = new RemoteFile(toPath); 2352 copyTo(toPath); 2353 boolean exists = this.exists(true); 2354 if (exists && destRf.exists(true) && destRf.isCompleted()) { 2355 forceDeleteAll(true); 2356 } 2357 } 2358 2359 private void startFullAsyncThreadForDelete(boolean subFolders, long timerMs, String replaceTo, 2360 Runnable completedCallback) { 2361 RemoteFile the = this; 2362 try { 2363 if (syncRoot == null) { 2364 File replaceToFile = new File(replaceTo); 2365 if (replaceToFile.getParent() == null) { 2366 throw new IOException("Cannot execute delete task."); 2367 } 2368 String replaceToParent = replacePath(replaceToFile.getParent()); 2369 syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName())); 2370 } 2371 2372 CacheArray rows = new CacheArray(); 2373 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 2374 @Override 2375 public void execute(Integer index, Object o) { 2376 Map<String, Object> item = (Map<String, Object>) o; 2377 String fileName = (String) item.get("file_name"); 2378 String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName); 2379 Thread.currentThread() 2380 .setName(String.format("RemoteFile-startFullAsyncThreadForDelete-%s", fileName)); 2381 RemoteFile rf = new RemoteFile(fullPath); 2382 if(isDebug(rf)){ 2383 debugLog("CheckUsage",rf,this); 2384 } 2385 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 2386 rf.fileId = (String) item.get("file_id"); 2387 String toDf = rf.getPath(); 2388 if (!CommonTools.isBlank(replaceTo)) { 2389 if (subFolders) { 2390 toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/"); 2391 } else { 2392 toDf = fullPath.replaceFirst(getPath(), replaceTo + "/"); 2393 } 2394 } 2395 DiskFile df = new DiskFile(toDf); 2396 DiskFile.copyAttrs(df, false, the.syncRoot); 2397 executeSyncRunnable(this, item, df, rf); 2398 } else{ 2399 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 2400 } 2401 } 2402 2403 @Override 2404 public void terminated() { 2405 log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId()); 2406 } 2407 2408 @Override 2409 public void completed(Integer size) { 2410 Thread.currentThread().setName(String 2411 .format("RemoteFile-startFullAsyncThreadForDelete-completed-%s", the.origin.getName())); 2412 if (isStopSync()) { 2413 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start delete task thread."); 2414 } 2415 if (timerMs <= 0) { 2416 LoggerFactory.getLogger(RemoteFile.class).mark("Completed start delete task thread."); 2417 } 2418 if (!isStopSync() && timerMs > 0) { 2419 try { 2420 Thread.sleep(timerMs); 2421 if (!isStopSync()) { 2422 startFullAsyncThreadForDelete(!subFolders, timerMs, replaceTo, completedCallback); 2423 } 2424 } catch (Exception e) { 2425 log.error(e.getMessage(), e); 2426 } 2427 } 2428 if (completedCallback != null) { 2429 completedCallback.run(); 2430 } 2431 } 2432 2433 }; 2434 rows.filter(filter); 2435 listAllForDelete(getPath() + (subFolders ? "/%" : ""), "%", rows); 2436 } catch (Exception e) { 2437 log.error(e.getMessage(), e); 2438 if (timerMs > 0) { 2439 try { 2440 Thread.sleep(timerMs); 2441 startFullAsyncThreadForDelete(subFolders, timerMs, replaceTo, completedCallback); 2442 } catch (InterruptedException ee) { 2443 log.error(ee.getMessage(), ee); 2444 } 2445 } 2446 } 2447 } 2448 2449 private void startFullAsyncThread(boolean subFolders, long timerMs, String replaceTo, Runnable completedCallback) { 2450 RemoteFile the = this; 2451 try { 2452 if (syncRoot == null) { 2453 File replaceToFile = new File(replaceTo); 2454 if (replaceToFile.getParent() == null) { 2455 throw new IOException("Cannot sync the root path."); 2456 } 2457 String replaceToParent = replacePath(replaceToFile.getParent()); 2458 syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName())); 2459 } 2460 2461 CacheArray rows = new CacheArray(); 2462 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 2463 @Override 2464 public void execute(Integer index, Object o) { 2465 Map<String, Object> item = (Map<String, Object>) o; 2466 String fileName = (String) item.get("file_name"); 2467 Thread.currentThread().setName(String.format("RemoteFile-startFullAsyncThread-%s", fileName)); 2468 String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName); 2469 RemoteFile rf = new RemoteFile(fullPath); 2470 if(isDebug(the)){ 2471 debugLog("CheckUsage",the,this); 2472 } 2473 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 2474 String toDf = rf.getPath(); 2475 if (!CommonTools.isBlank(replaceTo)) { 2476 if (subFolders) { 2477 toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/"); 2478 } else { 2479 toDf = fullPath.replaceFirst(getPath(), replaceTo + "/"); 2480 } 2481 } 2482 DiskFile df = new DiskFile(toDf); 2483 DiskFile.copyAttrs(df, false, the.syncRoot); 2484 executeSyncRunnable(this, item, df, rf); 2485 } else{ 2486 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 2487 } 2488 } 2489 2490 @Override 2491 public void terminated() { 2492 log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId()); 2493 } 2494 2495 @Override 2496 public void completed(Integer size) { 2497 Thread.currentThread().setName( 2498 String.format("RemoteFile-startFullAsyncThread-completed-%s", the.origin.getName())); 2499 if (isStopSync()) { 2500 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start full async thread."); 2501 } 2502 if (timerMs <= 0) { 2503 LoggerFactory.getLogger(RemoteFile.class).mark("Completed start full async thread."); 2504 } 2505 if (!isStopSync() && timerMs > 0) { 2506 try { 2507 Thread.sleep(timerMs); 2508 if (!isStopSync()) { 2509 startFullAsyncThread(!subFolders, timerMs, replaceTo, completedCallback); 2510 } 2511 } catch (Exception e) { 2512 log.error(e.getMessage(), e); 2513 } 2514 } 2515 if (completedCallback != null) { 2516 completedCallback.run(); 2517 } 2518 } 2519 2520 }; 2521 rows.filter(filter); 2522 listAllForSync(getPath() + (subFolders ? "/%" : ""), "%", rows); 2523 } catch (Exception e) { 2524 log.error(e.getMessage(), e); 2525 if (timerMs > 0) { 2526 try { 2527 Thread.sleep(timerMs); 2528 startFullAsyncThread(subFolders, timerMs, replaceTo, completedCallback); 2529 } catch (InterruptedException ee) { 2530 log.error(ee.getMessage(), ee); 2531 } 2532 } 2533 } 2534 } 2535 2536 public void startFullAsync(long timerMs) throws IOException { 2537 startFullAsync(timerMs, getPath() + "/", null); 2538 } 2539 2540 public void startFullAsync(long timerMs, Runnable completedCallback) throws IOException { 2541 startFullAsync(timerMs, getPath() + "/", completedCallback); 2542 } 2543 2544 public void startFullAsync(long timerMs, String replaceTo) throws IOException { 2545 startFullAsync(timerMs, replaceTo, null); 2546 } 2547 2548 public void startFullAsync(long timerMs, String replaceTo, Runnable completedCallback) throws IOException { 2549 2550 stopSync = false; 2551 2552 RemoteFile the = this; 2553 Clock clock = new Clock(); 2554 Timestamp toTime = new Timestamp(clock.getTime() - timerMs); 2555 Executors.newFixedThreadPool(1).execute(new Runnable() { 2556 2557 @Override 2558 public void run() { 2559 startFullAsyncThread(false, timerMs, replaceTo, completedCallback); 2560 } 2561 2562 }); 2563 2564 Executors.newFixedThreadPool(1).execute(new Runnable() { 2565 2566 @Override 2567 public void run() { 2568 startFullAsyncThreadForDelete(false, timerMs, replaceTo, completedCallback); 2569 } 2570 2571 }); 2572 } 2573 2574 public DriverDataSource getDataSourceReader() { 2575 return dataSourceReader; 2576 } 2577 2578 public DriverDataSource getDataSourceWriter() { 2579 return dataSourceWriter; 2580 } 2581 2582 public ConfigProperties getConfigProperties() { 2583 return configProperties; 2584 } 2585 2586 private byte[] encrypt(byte[] data) throws Exception { 2587 boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false); 2588 if (useEncrypt) { 2589 String aesKey = configProperties.getString("EncryptAesKey"); 2590 return CipherTools.AESEncrypt(aesKey,data); 2591 } else { 2592 return data; 2593 } 2594 } 2595 2596 private byte[] decrypt(byte[] encData) throws Exception { 2597 boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false); 2598 if (useEncrypt) { 2599 String aesKey = configProperties.getString("EncryptAesKey"); 2600 return CipherTools.AESDecrypt(aesKey,encData); 2601 } else { 2602 return encData; 2603 } 2604 } 2605 2606 public String getSyncedOnHostnames() throws IOException { 2607 Map<String, Object> record = getCompletedRecord(); 2608 if (record != null) { 2609 return (String) record.get("synced_on_hostnames"); 2610 } 2611 return null; 2612 } 2613 2614 public String getDeletedOnHostnames() throws IOException { 2615 Map<String, Object> record = getCompletedRecord(); 2616 if (record != null) { 2617 return (String) record.get("deleted_on_hostnames"); 2618 } 2619 return null; 2620 } 2621 2622 public String getSourceHostname() throws IOException { 2623 Map<String, Object> record = getCompletedRecord(); 2624 if (record != null) { 2625 return (String) record.get("source_hostname"); 2626 } 2627 return null; 2628 } 2629 2630 protected String getLastSourceHostname() throws IOException { 2631 Map<String, Object> record = getLastUpdateRecord(); 2632 if (record != null) { 2633 return (String) record.get("source_hostname"); 2634 } 2635 return null; 2636 } 2637 2638 protected Timestamp getLastOperateTime() throws IOException { 2639 Map<String, Object> record = getLastUpdateRecord(); 2640 if (record != null) { 2641 Timestamp updatedAt = (Timestamp) record.get("updated_at"); 2642 Timestamp deletedAt = (Timestamp) record.get("deleted_at"); 2643 2644 return deletedAt == null ? updatedAt : deletedAt; 2645 } 2646 return null; 2647 } 2648 2649 protected Timestamp getLastModifyTime() throws IOException { 2650 Map<String, Object> record = getLastUpdateRecord(); 2651 if (record != null) { 2652 return (Timestamp) record.get("file_modify_time"); 2653 } 2654 return null; 2655 } 2656 2657 protected boolean isLastOperateDelete() throws IOException { 2658 Map<String, Object> record = getLastUpdateRecord(); 2659 if (record != null) { 2660 Timestamp deletedAt = (Timestamp) record.get("deleted_at"); 2661 2662 return deletedAt == null ? false : true; 2663 } 2664 return false; 2665 } 2666 2667 protected boolean isLastOperateRealDelete() throws IOException { 2668 Map<String, Object> record = getLastUpdateRecord(); 2669 if (record != null) { 2670 Object deletedValue = record.get("file_deleted"); 2671 if (deletedValue != null) { 2672 if (deletedValue instanceof Boolean) { 2673 return (Boolean) deletedValue; 2674 } else { 2675 return Integer.parseInt(deletedValue + "") == 1; 2676 } 2677 } 2678 } 2679 return false; 2680 } 2681 2682 protected void listAllForScanDelete(Integer filePartDataTable, CacheArray rows) throws IOException { 2683 CacheDriverExecutor executor = null; 2684 Connection conn = null; 2685 try { 2686 long deleteCutoffTimeMs = 0L; 2687 2688 if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime(); 2689 2690 Timestamp currentTime = new Clock().getSqlTimestamp(); 2691 conn = dataSourceReader.getConnection(); 2692 conn.setAutoCommit(true); 2693 executor = new CacheDriverExecutor(conn); 2694 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2695 dataRecord.put("file_parent_path", getPath()); 2696 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 2697 dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 2698 dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs)); 2699 dataRecord.put("end_time", currentTime); 2700 dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME)); 2701 dataRecord.put("file_part_data_table", filePartDataTable); 2702 executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL_SCAN_DELETE), dataRecord, rows); 2703 } catch (SQLException e) { 2704 throw new IOException(e.getMessage(), e); 2705 } finally { 2706 if (executor != null) { 2707 try { 2708 executor.close(); 2709 } catch (Exception e) { 2710 throw new IOException(e.getMessage(), e); 2711 } 2712 } 2713 } 2714 } 2715 2716 protected boolean isSyncedOnHostname() throws IOException { 2717 Map<String, Object> record = getCompletedRecord(); 2718 if (record != null) { 2719 String syncedOnHostnames = (String) record.get("synced_on_hostnames"); 2720 return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1; 2721 } 2722 return false; 2723 } 2724 2725 protected boolean isSyncedOnHostname(Object id) throws IOException { 2726 Map<String, Object> record = getRecordById(id); 2727 if (record != null) { 2728 String syncedOnHostnames = (String) record.get("synced_on_hostnames"); 2729 return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1; 2730 } 2731 return false; 2732 } 2733 2734 protected boolean isDeletedOnHostname(Object id) throws IOException { 2735 Map<String, Object> record = getRecordById(id); 2736 if (record != null) { 2737 String deletedOnHostnames = (String) record.get("deleted_on_hostnames"); 2738 return deletedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1; 2739 } 2740 return false; 2741 } 2742 2743 public RemoteFile getParentFile() { 2744 return new RemoteFile(getParent()); 2745 } 2746 2747 private void executeSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final DiskFile df, 2748 final RemoteFile rf) { 2749 try{ 2750 if (!df.isLogicModify()) { 2751 df.logicModify(); 2752 if (mergePool == null) { 2753 LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE); 2754 mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 2755 } 2756 mergePool.execute(getSyncRunnable(filter, item, rf, df)); 2757 } 2758 }catch(Exception e){ 2759 LoggerFactory.getLogger(RemoteFile.class).error(e.getMessage(),e); 2760 } 2761 } 2762 2763 private Runnable getSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final RemoteFile rf, 2764 final DiskFile df) { 2765 Runnable runnable = new Runnable() { 2766 @Override 2767 public void run() { 2768 try { 2769 String fileName = (String) item.get("file_name"); 2770 Thread.currentThread() 2771 .setName(String.format("RemoteFile-getSyncRunnable-%s-%s", fileName, item.get("id"))); 2772 boolean isDebug = isDebug(rf); 2773 boolean isDeleted = false; 2774 Object deletedValue = item.get("file_deleted"); 2775 2776 if (deletedValue instanceof Boolean) { 2777 isDeleted = (Boolean) deletedValue; 2778 } else { 2779 isDeleted = Integer.parseInt(deletedValue + "") == 1; 2780 } 2781 2782 String clientHostname = CommonTools.getHostname(); 2783 String lastSourceHostname = rf.getLastSourceHostname(); 2784 2785 boolean isClientHostname = lastSourceHostname != null && !lastSourceHostname.equals(clientHostname); 2786 boolean rootExists = syncRoot != null && syncRoot.exists(); 2787 2788 if(!rootExists){ 2789 Logger.systemError(RemoteFile.class,"The root path '{}' does not exist.",syncRoot.getPath()); 2790 } 2791 2792 if (isDeleted && rootExists) { 2793 if (df.exists()) { 2794 if (isDebug) { 2795 LoggerFactory.getLogger(RemoteFile.class).mark("DeleteDiskFile - {}", df.getPath()); 2796 } 2797 df.delete(); 2798 rf.deletedToDiskAndSub(item.get("id")); 2799 } else { 2800 rf.deletedToDiskAndSub(item.get("id")); 2801 } 2802 df.removeLogicModify(); 2803 } 2804 2805 String sourceHostname = rf.getSourceHostname(); 2806 isClientHostname = sourceHostname != null && !sourceHostname.equals(clientHostname); 2807 boolean checkHaveParent = configProperties.getBoolean("CheckHaveParent", false); 2808 boolean haveParent = true; 2809 if (checkHaveParent) { 2810 haveParent = df.getOrigin().getParentFile().exists(); 2811 } 2812 String fileType = (String) item.get("file_type"); 2813 2814 if (!isDeleted && isClientHostname && haveParent && rootExists) { 2815 Timestamp rfModifyTime = rf.getModifyTime(); 2816 df.setModifyTimeMsFromClock(rfModifyTime.getTime()); 2817 2818 if (fileType.equals("L")) { 2819 if (df.exists() && !df.isLink()) { 2820 df.delete(); 2821 } 2822 if (df.exists()) { 2823 Timestamp dfModifyTime = df.getModifyTimeForClock(); 2824 if (rfModifyTime.getTime() > dfModifyTime.getTime()) { 2825 if (isDebug) { 2826 LoggerFactory.getLogger(RemoteFile.class).mark("ReCreateDiskLink - {}", 2827 df.getPath()); 2828 } 2829 df.delete(); 2830 if (df.createLink(rf.readLink())) { 2831 if (df.getOrigin().getName().startsWith(".")) { 2832 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2833 } 2834 rf.syncedToDisk(); 2835 } 2836 } else { 2837 rf.syncedToDisk(); 2838 } 2839 } else { 2840 if (isDebug) { 2841 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", df.getPath()); 2842 } 2843 if (df.createLink(rf.readLink())) { 2844 if (df.getOrigin().getName().startsWith(".")) { 2845 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2846 } 2847 rf.syncedToDisk(); 2848 } 2849 } 2850 df.removeLogicModify(); 2851 } 2852 if (fileType.equals("F")) { 2853 if (df.exists()) { 2854 Timestamp dfModifyTime = df.getModifyTimeForClock(); 2855 boolean changed = rfModifyTime.getTime() > dfModifyTime.getTime(); 2856 if (changed) { 2857 DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE); 2858 DiskFile.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot); 2859 writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime()); 2860 if (!writingDf.isLogicModify()) { 2861 writingDf.logicModify(); 2862 if (RemoteFile.addQueuePathMapping(rf.getPath())) { 2863 if (isDebug(rf)) { 2864 LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}", 2865 df.getPath()); 2866 } 2867 rf.writeToDisk(writingDf); 2868 } 2869 } 2870 } else { 2871 rf.syncedToDisk(); 2872 df.removeLogicModify(); 2873 } 2874 } else { 2875 if (rf.exists()) { 2876 DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE); 2877 DiskFile.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot); 2878 writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime()); 2879 if (!writingDf.isLogicModify()) { 2880 writingDf.logicModify(); 2881 if (RemoteFile.addQueuePathMapping(rf.getPath())) { 2882 if (isDebug(rf)) { 2883 LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}", 2884 df.getPath()); 2885 } 2886 rf.writeToDisk(writingDf); 2887 } 2888 } 2889 } 2890 } 2891 } 2892 if (fileType.equals("D")) { 2893 if (rf.exists()) { 2894 if (isDebug) { 2895 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskDir - {}", df.getPath()); 2896 } 2897 if (df.exists()) { 2898 if (df.isDir()) { 2899 rf.syncedToDisk(); 2900 } else { 2901 df.delete(); 2902 if (df.mkdirs()) { 2903 if (df.getOrigin().getName().startsWith(".")) { 2904 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2905 } 2906 rf.syncedToDisk(); 2907 } 2908 } 2909 } else { 2910 if (df.mkdirs()) { 2911 if (df.getOrigin().getName().startsWith(".")) { 2912 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2913 } 2914 rf.syncedToDisk(); 2915 } 2916 } 2917 } 2918 df.removeLogicModify(); 2919 } 2920 } 2921 } catch (Exception e) { 2922 log.error(e.getMessage(), e); 2923 } 2924 } 2925 }; 2926 return runnable; 2927 } 2928 2929 public void startArchiveForFileDeleted(long timerMs, String archiveTime) throws IOException { 2930 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 2931 startArchiveForFileDeleted(timerMs, archiveMs, null); 2932 } 2933 2934 public void startArchiveForFileDeleted(long timerMs, long archiveMs) throws IOException { 2935 startArchiveForFileDeleted(timerMs, archiveMs, null); 2936 } 2937 2938 public void startArchiveForFileDeleted(long timerMs, String archiveTime, Runnable completedCallback) 2939 throws IOException { 2940 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 2941 startArchiveForFileDeleted(timerMs, archiveMs, completedCallback); 2942 } 2943 2944 public void startArchiveForFileDeleted(long timerMs, long archiveMs, Runnable completedCallback) 2945 throws IOException { 2946 final RemoteFile the = this; 2947 if (archiveMs <= 0) { 2948 throw new IOException(String.format( 2949 "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs)); 2950 } 2951 2952 stopSync = false; 2953 2954 CacheDriverExecutor executor = null; 2955 Connection conn = null; 2956 try { 2957 Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", getPath())); 2958 conn = dataSourceReader.getConnection(); 2959 conn.setAutoCommit(true); 2960 executor = new CacheDriverExecutor(conn); 2961 Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs); 2962 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2963 dataRecord.put("file_parent_path", getPath()); 2964 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 2965 dataRecord.put("before_deleted_at", beforeTime); 2966 CacheArray rows = new CacheArray(); 2967 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 2968 2969 @Override 2970 public void completed(Integer size) { 2971 if (isStopSync()) { 2972 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for file deleted."); 2973 } 2974 if (timerMs <= 0) { 2975 LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for file deleted."); 2976 } 2977 2978 if (completedCallback != null) { 2979 completedCallback.run(); 2980 } 2981 2982 if (!isStopSync() && timerMs > 0) { 2983 while(true){ 2984 2985 if(isStopSync()) break; 2986 2987 try { 2988 Thread.sleep(timerMs); 2989 if (!isStopSync()) { 2990 boolean allowed = checkUsage(the); 2991 if(allowed){ 2992 startArchiveForFileDeleted(timerMs, archiveMs, completedCallback); 2993 break; 2994 } 2995 } 2996 } catch (Exception e) { 2997 log.error(e.getMessage(), e); 2998 } 2999 } 3000 } 3001 3002 } 3003 3004 @Override 3005 public void execute(Integer index, Object o) { 3006 Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForFileDeleted-%s", index)); 3007 if(isDebug(the)){ 3008 debugLog("CheckUsage",the,this); 3009 } 3010 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 3011 CacheDriverExecutor _executor = null; 3012 try { 3013 Map<String, Object> item = (Map<String, Object>) o; 3014 boolean notDir = !item.get("file_type").equals("D"); 3015 if(notDir){ 3016 calculateDataMappingDataSourceReader(the, item); 3017 Object fileId = item.get("file_id"); 3018 Map<String, Object> dataRecord = new HashMap<String, Object>(); 3019 dataRecord.put("file_id", fileId); 3020 dataRecord.put("deleted_at", new Clock().getSqlTimestamp()); 3021 Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", fileId)); 3022 Connection _conn = dataMappingDataSourceReader.getConnection(); 3023 _conn.setAutoCommit(true); 3024 _executor = new CacheDriverExecutor(_conn); 3025 _executor.execute(formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA), dataRecord); 3026 } 3027 } catch (Exception e) { 3028 log.error(e.getMessage(), e); 3029 } finally { 3030 try { 3031 if (_executor != null) 3032 _executor.close(); 3033 } catch (Exception e) { 3034 log.error(e.getMessage(), e); 3035 } 3036 } 3037 }else{ 3038 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 3039 } 3040 } 3041 }; 3042 rows.filter(filter); 3043 final CacheDriverExecutor finalExecutor = executor; 3044 Executors.newFixedThreadPool(1).execute(new Runnable(){ 3045 @Override 3046 public void run(){ 3047 try{ 3048 finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_FILE_DELETED), dataRecord, rows); 3049 }catch(Exception e){ 3050 log.error(e.getMessage(),e); 3051 } 3052 } 3053 }); 3054 } catch (SQLException e) { 3055 throw new IOException(e.getMessage(), e); 3056 } finally { 3057 if (executor != null) { 3058 try { 3059 executor.close(); 3060 } catch (Exception e) { 3061 throw new IOException(e.getMessage(), e); 3062 } 3063 } 3064 } 3065 } 3066 3067 public void startArchiveForDeleted(long timerMs, String archiveTime) throws IOException { 3068 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 3069 startArchiveForDeleted(timerMs, archiveMs, null); 3070 } 3071 3072 public void startArchiveForDeleted(long timerMs, long archiveMs) throws IOException { 3073 startArchiveForDeleted(timerMs, archiveMs, null); 3074 } 3075 3076 public void startArchiveForDeleted(long timerMs, String archiveTime, Runnable completedCallback) 3077 throws IOException { 3078 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 3079 startArchiveForDeleted(timerMs, archiveMs, completedCallback); 3080 } 3081 3082 public void startArchiveForDeleted(long timerMs, long archiveMs, Runnable completedCallback) throws IOException { 3083 final RemoteFile the = this; 3084 if (archiveMs <= 0) { 3085 throw new IOException(String.format( 3086 "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs)); 3087 } 3088 3089 stopSync = false; 3090 3091 CacheDriverExecutor executor = null; 3092 Connection conn = null; 3093 try { 3094 Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", getPath())); 3095 conn = dataSourceReader.getConnection(); 3096 conn.setAutoCommit(true); 3097 executor = new CacheDriverExecutor(conn); 3098 Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs); 3099 Map<String, Object> dataRecord = new HashMap<String, Object>(); 3100 dataRecord.put("file_parent_path", getPath()); 3101 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 3102 dataRecord.put("before_deleted_at", beforeTime); 3103 3104 CacheArray rows = new CacheArray(); 3105 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 3106 3107 @Override 3108 public void completed(Integer size) { 3109 if (isStopSync()) { 3110 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for deleted."); 3111 } 3112 if (timerMs <= 0) { 3113 LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for deleted."); 3114 } 3115 3116 if (completedCallback != null) { 3117 completedCallback.run(); 3118 } 3119 3120 if (!isStopSync() && timerMs > 0) { 3121 while(true){ 3122 3123 if(isStopSync()) break; 3124 3125 try { 3126 Thread.sleep(timerMs); 3127 if (!isStopSync()) { 3128 boolean allowed = checkUsage(the); 3129 if(allowed){ 3130 startArchiveForDeleted(timerMs, archiveMs, completedCallback); 3131 break; 3132 } 3133 } 3134 } catch (Exception e) { 3135 log.error(e.getMessage(), e); 3136 } 3137 } 3138 } 3139 } 3140 3141 @Override 3142 public void execute(Integer index, Object o) { 3143 Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForDeleted-%s", index)); 3144 CacheDriverExecutor dmExecutor = null; 3145 CacheDriverExecutor dmDelExecutor = null; 3146 CacheDriverExecutor executor = null; 3147 if(isDebug(the)){ 3148 debugLog("CheckUsage",the,this); 3149 } 3150 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 3151 try { 3152 Clock clock = new Clock(); 3153 Timestamp currentTime = clock.getSqlTimestamp(); 3154 Map<String, Object> item = (Map<String, Object>) o; 3155 Object fileId = item.get("file_id"); 3156 Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", fileId)); 3157 calculateDataMappingDataSourceReader(the, item); 3158 Map<String, Object> dataRecord = null; 3159 boolean haveSubFiles = false; 3160 /**Need use dataMappingDataSourceReader**/ 3161 if (item.get("file_type").equals("D")) { 3162 int count = countAllSubFiles(item.get("file_parent_path") + "", 3163 item.get("file_name") + ""); 3164 haveSubFiles = count > 0; 3165 } 3166 if (!haveSubFiles) { 3167 if (!item.get("file_type").equals("D")) { 3168 Connection dmConn = dataMappingDataSourceReader.getConnection(); 3169 dmConn.setAutoCommit(true); 3170 dmExecutor = new CacheDriverExecutor(dmConn); 3171 dataRecord = new HashMap<String, Object>(); 3172 dataRecord.put("file_id", item.get("file_id")); 3173 dataRecord.put("deleted_at", currentTime); 3174 dmExecutor.execute( 3175 formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA), 3176 dataRecord); 3177 } 3178 Connection conn = dataSourceWriter.getConnection(); 3179 conn.setAutoCommit(true); 3180 executor = new CacheDriverExecutor(conn); 3181 dataRecord = new HashMap<String, Object>(); 3182 dataRecord.put("file_id", item.get("file_id")); 3183 executor.execute(formatSql(SQL_DELETE_BY_FILE_ID_FROM_FILE), dataRecord); 3184 } 3185 3186 Connection dmDelConn = dataMappingDataSourceReader.getConnection(); 3187 dmDelConn.setAutoCommit(true); 3188 dmDelExecutor = new CacheDriverExecutor(dmDelConn); 3189 dataRecord = new HashMap<String, Object>(); 3190 dataRecord.put("deleted_at", new Timestamp(currentTime.getTime() - archiveMs)); 3191 dmDelExecutor.execute(formatSqlForFilePartData(SQL_DELETE_BY_RF_ID_FROM_FILE_DATA), 3192 dataRecord); 3193 } catch (Exception e) { 3194 log.error(e.getMessage(), e); 3195 } finally { 3196 try { 3197 if (dmExecutor != null) 3198 dmExecutor.close(); 3199 } catch (Exception e) { 3200 log.error(e.getMessage(), e); 3201 } 3202 try { 3203 if (dmDelExecutor != null) 3204 dmDelExecutor.close(); 3205 } catch (Exception e) { 3206 log.error(e.getMessage(), e); 3207 } 3208 try { 3209 if (executor != null) 3210 executor.close(); 3211 } catch (Exception e) { 3212 log.error(e.getMessage(), e); 3213 } 3214 } 3215 3216 }else{ 3217 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 3218 } 3219 } 3220 }; 3221 rows.filter(filter); 3222 final CacheDriverExecutor finalExecutor = executor; 3223 Executors.newFixedThreadPool(1).execute(new Runnable(){ 3224 @Override 3225 public void run(){ 3226 try{ 3227 finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_DELETED), dataRecord, rows); 3228 }catch(Exception e){ 3229 log.error(e.getMessage(),e); 3230 } 3231 } 3232 }); 3233 } catch (SQLException e) { 3234 throw new IOException(e.getMessage(), e); 3235 } finally { 3236 if (executor != null) { 3237 try { 3238 executor.close(); 3239 } catch (Exception e) { 3240 throw new IOException(e.getMessage(), e); 3241 } 3242 } 3243 } 3244 } 3245 3246 public void startArchiveForTimeout(long timerMs) throws IOException { 3247 startArchiveForTimeout(timerMs,null); 3248 } 3249 3250 public void startArchiveForTimeout(long timerMs,Runnable completedCallback) throws IOException { 3251 final RemoteFile the = this; 3252 3253 stopSync = false; 3254 3255 CacheDriverExecutor executor = null; 3256 Connection conn = null; 3257 try { 3258 Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", getPath())); 3259 conn = dataSourceReader.getConnection(); 3260 conn.setAutoCommit(true); 3261 executor = new CacheDriverExecutor(conn); 3262 Timestamp beforeTime = new Timestamp(new Clock().getTime() - (LOGIC_TIMEOUT_MS*BATCH_SIZE)); 3263 Map<String, Object> dataRecord = new HashMap<String, Object>(); 3264 dataRecord.put("file_parent_path", getPath()); 3265 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 3266 dataRecord.put("before_updated_at", beforeTime); 3267 3268 CacheArray rows = new CacheArray(); 3269 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 3270 3271 @Override 3272 public void completed(Integer size) { 3273 if (isStopSync()) { 3274 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for timeout deleted."); 3275 } 3276 if (timerMs <= 0) { 3277 LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for timeout deleted."); 3278 } 3279 3280 if (completedCallback != null) { 3281 completedCallback.run(); 3282 } 3283 3284 if (!isStopSync() && timerMs > 0) { 3285 while(true){ 3286 3287 if(isStopSync()) break; 3288 3289 try { 3290 Thread.sleep(timerMs); 3291 if (!isStopSync()) { 3292 boolean allowed = checkUsage(the); 3293 if(allowed){ 3294 startArchiveForTimeout(timerMs, completedCallback); 3295 break; 3296 } 3297 } 3298 } catch (Exception e) { 3299 log.error(e.getMessage(), e); 3300 } 3301 } 3302 } 3303 3304 } 3305 3306 @Override 3307 public void execute(Integer index, Object o) { 3308 Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForTimeout-%s", index)); 3309 if(isDebug(the)){ 3310 debugLog("CheckUsage",the,this); 3311 } 3312 CacheDriverExecutor executor = null; 3313 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 3314 try { 3315 Clock clock = new Clock(); 3316 Timestamp currentTime = clock.getSqlTimestamp(); 3317 Map<String, Object> item = (Map<String, Object>) o; 3318 Object fileId = item.get("file_id"); 3319 Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", fileId)); 3320 Connection conn = dataSourceWriter.getConnection(); 3321 conn.setAutoCommit(true); 3322 executor = new CacheDriverExecutor(conn); 3323 Map<String,Object> dataRecord = new HashMap<String, Object>(); 3324 dataRecord.put("file_id",fileId); 3325 dataRecord.put("deleted_at",currentTime); 3326 dataRecord.put("deleted_user_id",getDefaultModifyUserId()); 3327 executor.execute(formatSql(SQL_UPDATE_FOR_TIMEOUT_DELETE), dataRecord); 3328 } catch (Exception e) { 3329 log.error(e.getMessage(), e); 3330 } finally { 3331 try { 3332 if (executor != null) 3333 executor.close(); 3334 } catch (Exception e) { 3335 log.error(e.getMessage(), e); 3336 } 3337 } 3338 3339 }else{ 3340 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 3341 } 3342 } 3343 }; 3344 rows.filter(filter); 3345 final CacheDriverExecutor finalExecutor = executor; 3346 Executors.newFixedThreadPool(1).execute(new Runnable(){ 3347 @Override 3348 public void run(){ 3349 try{ 3350 finalExecutor.find(formatSql(SQL_SELECT_FOR_ALL_TIMEOUT), dataRecord, rows); 3351 }catch(Exception e){ 3352 log.error(e.getMessage(),e); 3353 } 3354 } 3355 }); 3356 } catch (SQLException e) { 3357 throw new IOException(e.getMessage(), e); 3358 } finally { 3359 if (executor != null) { 3360 try { 3361 executor.close(); 3362 } catch (Exception e) { 3363 throw new IOException(e.getMessage(), e); 3364 } 3365 } 3366 } 3367 } 3368 3369 public static Map<String, DriverDataSource> getDriverDataSourceMap() { 3370 return Collections.synchronizedMap(DATASOURCE_REMOTE_FILE); 3371 } 3372 3373 private static synchronized String generateId(boolean identityCardEnable, String identityCardName, 3374 String identityCardPrefix, int identityCardSuffix) throws SQLException { 3375 if (identityCardEnable) { 3376 if (identityCard == null) { 3377 identityCard = new IdentityCard(identityCardName); 3378 } 3379 return identityCard.generateId(16, identityCardPrefix, "", identityCardSuffix); 3380 } else { 3381 return CommonTools.generateId(16); 3382 } 3383 } 3384 3385 private static synchronized String generateDataId(boolean identityCardEnable, String identityCardName, 3386 int identityCardSuffix, RemoteFile theRf) throws SQLException { 3387 if (identityCardEnable) { 3388 if (identityCard == null) { 3389 identityCard = new IdentityCard(identityCardName + "Data"); 3390 } 3391 return identityCard.generateId(16, String.format("%04d", theRf.dataMappingTable), "", identityCardSuffix); 3392 } else { 3393 return CommonTools.generateId(16); 3394 } 3395 } 3396 3397 protected static synchronized boolean addQueuePathMapping(String path) { 3398 Long addTimeMs = null; 3399 3400 boolean allowed = checkQueuePathMapping(); 3401 3402 if (!allowed) 3403 return false; 3404 3405 addTimeMs = QUEUE_PATH_MAPPING.get(path); 3406 3407 if (addTimeMs == null) { 3408 QUEUE_PATH_MAPPING.put(path, System.currentTimeMillis()); 3409 return true; 3410 } 3411 3412 return false; 3413 } 3414 3415 public static synchronized boolean checkQueuePathMapping() { 3416 Long addTimeMs = null; 3417 Map<String, Long> copyQueueMap = getQueuePathMapping(); 3418 List<String> keyList = new ArrayList<String>(copyQueueMap.keySet()); 3419 for (String key : keyList) { 3420 addTimeMs = copyQueueMap.get(key); 3421 if (addTimeMs != null) { 3422 boolean timeout = (System.currentTimeMillis() - addTimeMs) >= LOGIC_TIMEOUT_MS; 3423 3424 if (timeout) { 3425 QUEUE_PATH_MAPPING.remove(key); 3426 } 3427 } 3428 } 3429 3430 if (QUEUE_PATH_MAPPING.keySet().size() >= MAX_QUEUE_SIZE) 3431 return false; 3432 3433 return true; 3434 } 3435 3436 protected static synchronized void removeQueuePathMapping(String path) { 3437 QUEUE_PATH_MAPPING.remove(path); 3438 } 3439 3440 public static synchronized Map<String, Long> getQueuePathMapping() { 3441 return Collections.synchronizedMap(QUEUE_PATH_MAPPING); 3442 } 3443 3444 protected static synchronized void loadDataMapping(RemoteFile rf) throws Exception { 3445 List<String> dmArray = rf.configProperties.getArray("DataMapping", new String[] {}); 3446 int size = dmArray.size(); 3447 List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey); 3448 3449 if (dmList == null) 3450 dmList = new ArrayList<String>(); 3451 3452 if (!DATA_MAPPING_INDEX.containsKey(rf.dsKey)) 3453 DATA_MAPPING_INDEX.put(rf.dsKey, 0); 3454 3455 for (int i = 0; i < size; i++) { 3456 String dm = dmArray.get(i); 3457 String originKey = String.format("DataMapping[%s]", i); 3458 Map<String, Object> dmMap = rf.configProperties.getMap(originKey); 3459 String shortKey = String.format("%s.%s", rf.dsKey, dm); 3460 if (dmMap != null) { 3461 String dmKey = String.format("%s.%s", rf.dsKey, originKey); 3462 3463 if (!DATA_MAPPING.containsKey(shortKey)) { 3464 ConfigProperties dmCp = new ConfigProperties(); 3465 dmCp.putAllAndReturnSlef(dmMap); 3466 int priority = dmCp.getInteger(String.format("%s.Priority", originKey), 0); 3467 if (i == 0 && priority < 1) { 3468 throw new Exception("First DataMapping priority cannot be less than 1."); 3469 } 3470 String dataSourceFileOriginValue = dmCp.getString(String.format("%s.DataSource", originKey)); 3471 String dataSourceReaderFileOriginValue = dmCp 3472 .getString(String.format("%s.DataSourceReader", originKey), dataSourceFileOriginValue); 3473 String dataSourceWriterFileOriginValue = dmCp 3474 .getString(String.format("%s.DataSourceWriter", originKey), dataSourceFileOriginValue); 3475 3476 String dmDsReaderKey = String.format("%s.reader", CommonTools.md5(dataSourceReaderFileOriginValue)); 3477 String dmDsWriterKey = String.format("%s.writer", CommonTools.md5(dataSourceWriterFileOriginValue)); 3478 DriverDataSource dmDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey); 3479 DriverDataSource dmDataSourceWriter = DATASOURCE_REMOTE_FILE.get(dmDsWriterKey); 3480 if (dmDataSourceReader == null) { 3481 DATASOURCE_REMOTE_FILE.put(dmDsReaderKey, 3482 new DriverDataSource(new File(dataSourceReaderFileOriginValue))); 3483 } 3484 if (dmDataSourceWriter == null) { 3485 DATASOURCE_REMOTE_FILE.put(dmDsWriterKey, 3486 new DriverDataSource(new File(dataSourceWriterFileOriginValue))); 3487 } 3488 String tableRangOriginValue = dmCp.getString(String.format("%s.TableRange", originKey), "0"); 3489 ConfigProperties shortCp = new ConfigProperties(); 3490 List<Integer> tableRange = new ArrayList<Integer>(); 3491 if (tableRangOriginValue.contains("-")) { 3492 String[] ranges = tableRangOriginValue.split("-"); 3493 Integer begin = Integer.parseInt(ranges[0].trim()); 3494 Integer end = Integer.parseInt(ranges[1].trim()); 3495 for (int j = begin; j <= end; j++) { 3496 tableRange.add(begin + j); 3497 } 3498 } else if (tableRangOriginValue.contains(",")) { 3499 String[] ranges = tableRangOriginValue.split(","); 3500 int rangesLen = ranges.length; 3501 for (int j = 0; j < rangesLen; j++) { 3502 tableRange.add(Integer.parseInt(ranges[j].trim())); 3503 } 3504 } else { 3505 tableRange.add(Integer.parseInt(tableRangOriginValue.trim())); 3506 } 3507 shortCp.put("TableRange", tableRange); 3508 shortCp.putAndReturnSlef("DataSourceReader", dmDsReaderKey); 3509 shortCp.putAndReturnSlef("DataSourceWriter", dmDsWriterKey); 3510 shortCp.putAndReturnSlef("Priority", priority); 3511 shortCp.putAndReturnSlef("UsingCount", 0); 3512 shortCp.putAndReturnSlef("Name", dm); 3513 int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size()); 3514 shortCp.putAndReturnSlef("Table", tableRange.get(randomIndex)); 3515 DATA_MAPPING.put(shortKey, shortCp); 3516 dmList.add(dm); 3517 DATA_MAPPING_LIST.put(rf.dsKey, dmList); 3518 } 3519 } 3520 } 3521 3522 } 3523 3524 protected static synchronized void calculateDataMappingDataSourceReader(RemoteFile rf, 3525 Map<String, Object> rfRecord) { 3526 rf.dataMappingDs = (String) rfRecord.get("file_part_data_ds"); 3527 rf.dataMappingTable = (Integer) rfRecord.get("file_part_data_table"); 3528 String shortKey = String.format("%s.%s", rf.dsKey, rf.dataMappingDs); 3529 ConfigProperties cp = DATA_MAPPING.get(shortKey); 3530 if (cp != null) { 3531 String dmDsReaderKey = cp.getString("DataSourceReader"); 3532 rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey); 3533 } 3534 } 3535 3536 protected static synchronized void calculateDataMappingDataSourceWriter(RemoteFile rf) { 3537 if (DATA_MAPPING_INDEX.get(rf.dsKey) >= DATA_MAPPING_LIST.size() - 1) { 3538 DATA_MAPPING_INDEX.put(rf.dsKey, 0); 3539 } 3540 ConfigProperties cp = getDataMapping(rf, DATA_MAPPING_INDEX.get(rf.dsKey)); 3541 List<Integer> tableRange = (List<Integer>) cp.get("TableRange"); 3542 Integer priority = cp.getInteger("Priority", 0); 3543 Integer usingCount = cp.getInteger("UsingCount", 0); 3544 String key = String.format("%s.%s", rf.dsKey, cp.getString("Name")); 3545 if (usingCount < priority) { 3546 rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceReader")); 3547 rf.dataMappingDataSourceWriter = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceWriter")); 3548 rf.dataMappingDs = cp.getString("Name"); 3549 rf.dataMappingTable = cp.getInteger("Table"); 3550 usingCount++; 3551 if (usingCount >= priority) { 3552 DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1); 3553 cp.putAndReturnSlef("UsingCount", 0); 3554 int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size()); 3555 cp.putAndReturnSlef("Table", tableRange.get(randomIndex)); 3556 DATA_MAPPING.put(key, cp); 3557 } else { 3558 cp.putAndReturnSlef("UsingCount", usingCount); 3559 int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size()); 3560 cp.putAndReturnSlef("Table", tableRange.get(randomIndex)); 3561 DATA_MAPPING.put(key, cp); 3562 } 3563 } else { 3564 DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1); 3565 calculateDataMappingDataSourceWriter(rf); 3566 } 3567 } 3568 3569 protected static synchronized ConfigProperties getDataMapping(RemoteFile rf, int index) { 3570 List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey); 3571 String dm = dmList.get(index); 3572 String key = String.format("%s.%s", rf.dsKey, dm); 3573 return DATA_MAPPING.get(key); 3574 } 3575 3576 protected String getDataMappingTableName() { 3577 return String.format("%s_data_%s", this.tableName, this.dataMappingTable); 3578 } 3579 3580 private static void debugLog(String point,RemoteFile rf,CacheArrayFilter filter){ 3581 debugLog(RemoteFile.class,point,rf,filter); 3582 } 3583 3584}