001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.file; 025 026import java.io.File; 027import com.killcoding.datasource.DriverDataSource; 028import com.killcoding.tool.ConfigProperties; 029import com.killcoding.tool.CommonTools; 030import com.killcoding.log.Logger; 031import com.killcoding.log.LoggerFactory; 032import java.io.FileInputStream; 033import java.io.IOException; 034import java.sql.Connection; 035import java.sql.SQLException; 036import com.killcoding.datasource.CacheDriverExecutor; 037import java.util.Map; 038import java.util.HashMap; 039import com.killcoding.datasource.Clock; 040import java.sql.Timestamp; 041import java.nio.file.Files; 042import java.util.concurrent.ConcurrentHashMap; 043import java.util.List; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.io.ByteArrayOutputStream; 047import java.sql.Blob; 048import java.nio.file.Paths; 049import java.nio.file.Path; 050import java.net.URI; 051import java.io.ByteArrayInputStream; 052import java.nio.ByteBuffer; 053import java.util.concurrent.ExecutorService; 054import java.util.concurrent.Executors; 055import java.util.concurrent.Future; 056import java.nio.file.LinkOption; 057import javax.crypto.Cipher; 058import javax.crypto.CipherInputStream; 059import javax.crypto.CipherOutputStream; 060import javax.crypto.KeyGenerator; 061import javax.crypto.SecretKey; 062import javax.crypto.SecretKeyFactory; 063import javax.crypto.spec.DESKeySpec; 064import javax.crypto.spec.IvParameterSpec; 065import javax.crypto.spec.SecretKeySpec; 066import com.killcoding.file.DiskFile; 067import java.util.Collection; 068import com.killcoding.cache.CacheArray; 069import com.killcoding.cache.CacheArrayFilter; 070import java.nio.file.attribute.BasicFileAttributes; 071import java.util.concurrent.Callable; 072import com.killcoding.datasource.IdentityCard; 073import java.util.Collections; 074import java.util.concurrent.ThreadLocalRandom; 075import com.killcoding.tool.CipherTools; 076import com.killcoding.cache.Cache; 077import java.util.Date; 078 079public class RemoteFile extends BaseFile implements RemoteFileSQL { 080 081 private static final Map<String, DriverDataSource> DATASOURCE_REMOTE_FILE = new ConcurrentHashMap<String, DriverDataSource>(); 082 083 private static final Map<String, ConfigProperties> CONFIG_PROPS_REMOTE_FILE = new ConcurrentHashMap<String, ConfigProperties>(); 084 085 private static final Map<String, Long> QUEUE_PATH_MAPPING = new ConcurrentHashMap<String, Long>(); 086 087 private static final Map<String, ConfigProperties> DATA_MAPPING = new ConcurrentHashMap<String, ConfigProperties>(); 088 089 private static final Map<String, List<String>> DATA_MAPPING_LIST = new ConcurrentHashMap<String, List<String>>(); 090 091 private static final Map<String, Integer> DATA_MAPPING_INDEX = new ConcurrentHashMap<String, Integer>(); 092 093 protected static boolean FIRST_LOADED = false; 094 095 public static Integer MAX_POOL_SIZE = 100; 096 public static Double MAX_CONNECT_USAGE = 0.5D; 097 public static Double MAX_MEMORY_USAGE = 0.8D; 098 public static Integer MAX_QUEUE_SIZE = 10; 099 public static Long MAX_FILE_SZIE = -1L; 100 public static boolean SHOW_LINK = false; 101 public static boolean SHOW_HIDDEN = false; 102 public static boolean COPY_STRUCTURE_ONLY = false; 103 public static Integer MAX_FILE_PARENT_PATH_LENGTH = 200; 104 public static Integer MAX_FILE_NAME_LENGTH = 200; 105 public static Integer CACHE_SECONDS = 5; 106 public static Date SYNC_CUTOFF_TIME = null; 107 public static Date DELETE_CUTOFF_TIME = null; 108 109 private static IdentityCard identityCard = null; 110 private static File appRemoteFile = null; 111 private DriverDataSource dataSourceReader = null; 112 private DriverDataSource dataSourceWriter = null; 113 private String dataSourceReaderPath = null; 114 private String dataSourceWriterPath = null; 115 private String modifyUserId = null; 116 private Integer filePartSize = null; 117 private Boolean identityCardEnable = false; 118 private String identityCardName = null; 119 private String identityCardPrefix = null; 120 private Integer identityCardSuffix = 0; 121 private Timestamp modifyTime = null; 122 123 protected ConfigProperties configProperties = null; 124 protected String fileId = null; 125 protected String tableName = null; 126 protected Integer dataMappingTable = null; 127 protected String dataMappingDs = null; 128 protected DriverDataSource dataMappingDataSourceReader = null; 129 protected DriverDataSource dataMappingDataSourceWriter = null; 130 protected String dsKey = null; 131 132 public static Long BEFORE_THE_QUEUE_TIME = 60000L; 133 public static Integer BATCH_SIZE = 10; 134 public static String DEFAULE_MODIFY_USER_ID = null; 135 136 private Logger log = null; 137 138 private static ExecutorService mergePool = null; 139 140 public RemoteFile(String path) { 141 super(path); 142 log = LoggerFactory.getLogger(getClass()); 143 fileId = CommonTools.generateId(16); 144 initGlobal(); 145 } 146 147 public RemoteFile(File origin) { 148 super(origin); 149 log = LoggerFactory.getLogger(getClass()); 150 fileId = CommonTools.generateId(16); 151 initGlobal(); 152 } 153 154 public static synchronized void initPool(int poolSize) { 155 if (mergePool == null) { 156 MAX_POOL_SIZE = poolSize; 157 LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE); 158 mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 159 } 160 } 161 162 private synchronized void init() { 163 dsKey = CommonTools.md5(this.appRemoteFile.getAbsolutePath()); 164 dataSourceReader = DATASOURCE_REMOTE_FILE.get(String.format("%s.reader", dsKey)); 165 dataSourceWriter = DATASOURCE_REMOTE_FILE.get(String.format("%s.writer", dsKey)); 166 configProperties = CONFIG_PROPS_REMOTE_FILE.get(dsKey); 167 if (dataSourceReader == null && dataSourceWriter == null) { 168 try { 169 if (!appRemoteFile.exists()) 170 throw new IOException(String.format("Not exist '%s'.", appRemoteFile.getAbsolutePath())); 171 172 configProperties = new ConfigProperties(); 173 Runnable updatedRun = new Runnable() { 174 @Override 175 public void run() { 176 try { 177 reload(); 178 LoggerFactory.getLogger(RemoteFile.class).mark("RemoteFile Reloaded"); 179 } catch (Exception e) { 180 log.error(e.getMessage(), e); 181 } 182 } 183 }; 184 configProperties.load(appRemoteFile, updatedRun); 185 186 CONFIG_PROPS_REMOTE_FILE.put(dsKey, configProperties); 187 188 String dataSourcePath = configProperties.getString("DataSource"); 189 dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath); 190 dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath); 191 reload(); 192 } catch (Exception e) { 193 log.error(e.getMessage(), e); 194 } 195 } 196 if (configProperties != null) { 197 String dataSourcePath = configProperties.getString("DataSource"); 198 dataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath); 199 dataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath); 200 reload(); 201 } 202 calculateDataMappingDataSourceWriter(this); 203 } 204 205 private synchronized void initGlobal() { 206 if(this.appRemoteFile == null){ 207 this.appRemoteFile = CommonTools.getSystemProperties(RemoteFile.class, "APP_REMOTE_FILE", 208 "RemoteFile.properties"); 209 } 210 init(); 211 } 212 213 private synchronized void reload() { 214 try { 215 LOGIC_TIMEOUT_MS = configProperties.getMilliSeconds("LogicTimeout", 300000L); 216 LOGIC_ACCESS_TIMEOUT_MS = configProperties.getMilliSeconds("LogicAccessTimeout", 60000L); 217 LOGIC_CHECK_TIMEOUT_MS = configProperties.getMilliSeconds("LogicCheckTimeout", 60000L); 218 MAX_FILE_SZIE = configProperties.getFileSize("MaxFileSize", 1024 * 1024 * 1L); 219 SHOW_HIDDEN = configProperties.getBoolean("ShowHidden", false); 220 SHOW_LINK = configProperties.getBoolean("ShowLink", false); 221 COPY_STRUCTURE_ONLY = configProperties.getBoolean("CopyStructureOnly", false); 222 MAX_QUEUE_SIZE = configProperties.getInteger("MaxQueueSize", 10); 223 BATCH_SIZE = configProperties.getInteger("BatchSize", 10); 224 MAX_CONNECT_USAGE = configProperties.getDouble("MaxConnectUsage", 0.5D); 225 MAX_MEMORY_USAGE = configProperties.getDouble("MaxMemoryUsage", 0.8D); 226 BEFORE_THE_QUEUE_TIME = configProperties.getMilliSeconds("BeforeTheQueueTime", 15000L); 227 MAX_FILE_PARENT_PATH_LENGTH = configProperties.getInteger("MaxFileParentPathLength", 200); 228 MAX_FILE_NAME_LENGTH = configProperties.getInteger("MaxFileNameLength", 200); 229 CACHE_SECONDS = configProperties.getInteger("CacheSeconds", 5); 230 231 SYNC_CUTOFF_TIME = configProperties.getDateTime("SyncCutoffTime","0000-01-01 00:00:00.0000"); 232 DELETE_CUTOFF_TIME = configProperties.getDateTime("DeleteCutoffTime","0000-01-01 00:00:00.0000"); 233 234 if(BEFORE_THE_QUEUE_TIME < 0) BEFORE_THE_QUEUE_TIME = 15000L; 235 236 if(MAX_CONNECT_USAGE < 0) MAX_CONNECT_USAGE = 0D; 237 238 if(MAX_MEMORY_USAGE < 0) MAX_MEMORY_USAGE = 0D; 239 240 if(BATCH_SIZE <= 0) BATCH_SIZE = 10; 241 242 if(MAX_QUEUE_SIZE <= 0) MAX_QUEUE_SIZE = 10; 243 244 if(MAX_FILE_SZIE <= 0) MAX_FILE_SZIE = 1024 * 1024 * 1L; 245 246 if(LOGIC_TIMEOUT_MS <= 0) LOGIC_TIMEOUT_MS = 900000L; 247 248 if(LOGIC_ACCESS_TIMEOUT_MS < 0) LOGIC_ACCESS_TIMEOUT_MS = 60000L; 249 250 if(LOGIC_CHECK_TIMEOUT_MS < 0) LOGIC_CHECK_TIMEOUT_MS = 60000L; 251 252 if(MAX_FILE_PARENT_PATH_LENGTH <= 0) MAX_FILE_PARENT_PATH_LENGTH = 200; 253 254 if(MAX_FILE_NAME_LENGTH <= 0) MAX_FILE_NAME_LENGTH = 200; 255 256 if(CACHE_SECONDS < 0) CACHE_SECONDS = 0; 257 258 log.debug("LOGIC_TIMEOUT_MS={}", LOGIC_TIMEOUT_MS); 259 log.debug("LOGIC_ACCESS_TIMEOUT_MS={}", LOGIC_ACCESS_TIMEOUT_MS); 260 log.debug("LOGIC_CHECK_TIMEOUT_MS={}", LOGIC_CHECK_TIMEOUT_MS); 261 log.debug("MAX_FILE_SZIE={}", MAX_FILE_SZIE); 262 log.debug("SHOW_HIDDEN={}", SHOW_HIDDEN); 263 log.debug("SHOW_LINK={}", SHOW_LINK); 264 log.debug("COPY_STRUCTURE_ONLY={}", COPY_STRUCTURE_ONLY); 265 log.debug("MAX_QUEUE_SIZE={}", MAX_QUEUE_SIZE); 266 log.debug("BATCH_SIZE={}", BATCH_SIZE); 267 log.debug("MAX_CONNECT_USAGE={}", MAX_CONNECT_USAGE); 268 log.debug("MAX_MEMORY_USAGE={}", MAX_MEMORY_USAGE); 269 log.debug("MAX_FILE_PARENT_PATH_LENGTH={}", MAX_FILE_PARENT_PATH_LENGTH); 270 log.debug("MAX_FILE_NAME_LENGTH={}", MAX_FILE_NAME_LENGTH); 271 log.debug("CACHE_SECONDS={}", CACHE_SECONDS); 272 273 String dataSourcePath = configProperties.getString("DataSource"); 274 String reloadDataSourceReaderPath = configProperties.getString("DataSourceReader", dataSourcePath); 275 String reloadDataSourceWriterPath = configProperties.getString("DataSourceWriter", dataSourcePath); 276 filePartSize = configProperties.getFileSize("FilePartSize", 1024 * 1024 * 1L).intValue(); 277 278 if(filePartSize <= 0) filePartSize = 1024 * 1024 * 1; 279 280 tableName = configProperties.getString("TableName", "remote_file"); 281 identityCardEnable = configProperties.getBoolean("IdentityCardEnable", false); 282 identityCardName = configProperties.getString("IdentityCardName", "RemoteFile"); 283 identityCardPrefix = configProperties.getString("IdentityCardPrefix", "RMF"); 284 identityCardSuffix = configProperties.getInteger("IdentityCardSuffix", 0); 285 286 String readerDsKey = String.format("%s.reader", dsKey); 287 String writerDsKey = String.format("%s.writer", dsKey); 288 /** 289 May be referenced when expansion is needed 290 if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) { 291 DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(readerDsKey); 292 if (currentDs != null) { 293 currentDs.closeAll(); 294 } 295 **/ 296 if (dataSourceReader == null || !dataSourceReaderPath.equals(reloadDataSourceReaderPath)) { 297 dataSourceReaderPath = reloadDataSourceReaderPath; 298 dataSourceReader = new DriverDataSource(new File(dataSourceReaderPath)); 299 DATASOURCE_REMOTE_FILE.put(readerDsKey, dataSourceReader); 300 } 301 /** 302 May be referenced when expansion is needed 303 if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) { 304 DriverDataSource currentDs = DATASOURCE_REMOTE_FILE.get(writerDsKey); 305 if (currentDs != null) { 306 currentDs.closeAll(); 307 } 308 **/ 309 if (dataSourceWriter == null || !dataSourceWriterPath.equals(reloadDataSourceWriterPath)) { 310 dataSourceWriterPath = reloadDataSourceWriterPath; 311 dataSourceWriter = new DriverDataSource(new File(dataSourceWriterPath)); 312 DATASOURCE_REMOTE_FILE.put(writerDsKey, dataSourceWriter); 313 } 314 315 loadDataMapping(this); 316 317 FIRST_LOADED = true; 318 } catch (Exception e) { 319 log.error(e.getMessage(), e); 320 } 321 } 322 323 public String formatSql(String sql) { 324 return String.format(sql, tableName); 325 } 326 327 public String formatSqlForFilePartData(String sql) { 328 return String.format(sql, getDataMappingTableName()); 329 } 330 331 public DiskFile writeToDisk(DiskFile diskFile) throws IOException { 332 if (!exists()) { 333 log.mark(String.format("The remote file '%s' does not exist.", this.getPath())); 334 } 335 RemoteFile the = this; 336 337 Timestamp rfModifyTime = the.getModifyTime(); 338 if (rfModifyTime != null) { 339 diskFile.setModifyTimeMsFromClock(rfModifyTime.getTime()); 340 } 341 if (isLink()) { 342 if (isDebug(the)) { 343 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", diskFile.getPath()); 344 } 345 diskFile.createLink(new String(readAllBytes(),BaseFile.CHARSET)); 346 } else if (isFile()) { 347 merge(new FilePart() { 348 final List<byte[]> partDataList = new ArrayList<byte[]>(); 349 boolean isWritingDf = false; 350 DiskFile realDf = null; 351 352 @Override 353 protected void process(int partIndex, byte[] data) throws IOException { 354 Thread.currentThread() 355 .setName(String.format("RemoteFile-writeToDisk-merge-%s", diskFile.getOrigin().getName())); 356 357 try { 358 359 if (syncRoot != null) { 360 if (!diskFile.isLogicModify()) { 361 throw new IOException( 362 String.format("The file '%s' missing modify logic lock.", the.getPath())); 363 } 364 } 365 366 if (diskFile != null) 367 diskFile.logicModify(); 368 369 String fileName = diskFile.getOrigin().getName(); 370 isWritingDf = fileName.matches(TMP_WRITING_SWP); 371 if (isWritingDf) { 372 if (partIndex == 0) { 373 if (diskFile.exists()) { 374 diskFile.delete(); 375 } 376 if (isDebug(the)) { 377 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}", 378 diskFile.getPath()); 379 } 380 String realFileName = fileName.replaceFirst(BaseFile.TMP_WRITING_FOR_REPLACE, ""); 381 String realDfPath = String.format("%s/%s", diskFile.getParent(), realFileName); 382 realDf = new DiskFile(realDfPath); 383 DiskFile.copyAttrs(realDf, diskFile.isCopyStructureOnly(), diskFile.syncRoot); 384 if(!diskFile.manualOpen(true)){ 385 throw new IOException( 386 String.format("The disk file '%s' cannot open.", diskFile.getPath())); 387 } 388 } 389 partDataList.add(decrypt(data)); 390 if (partDataList.size() >= BATCH_SIZE) { 391 392 if (realDf != null) 393 realDf.logicModify(); 394 395 ByteArrayOutputStream ops = new ByteArrayOutputStream(); 396 int size = partDataList.size(); 397 for (int i = 0; i < size; i++) { 398 byte[] partData = partDataList.get(i); 399 ops.write(partData); 400 } 401 diskFile.manualWrite(ops.toByteArray()); 402 partDataList.clear(); 403 } 404 } else { 405 if (partIndex == 0) { 406 if (diskFile.exists()) { 407 throw new IOException( 408 String.format("The remote file '%s' already exist.", diskFile.getPath())); 409 } 410 if (isDebug(the)) { 411 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskFile - {}", 412 diskFile.getPath()); 413 } 414 if(!diskFile.manualOpen(true)){ 415 throw new IOException( 416 String.format("The disk file '%s' cannot open.", diskFile.getPath())); 417 } 418 } 419 partDataList.add(decrypt(data)); 420 if (partDataList.size() >= BATCH_SIZE) { 421 ByteArrayOutputStream ops = new ByteArrayOutputStream(); 422 int size = partDataList.size(); 423 for (int i = 0; i < size; i++) { 424 byte[] partData = partDataList.get(i); 425 ops.write(partData); 426 } 427 diskFile.manualWrite(ops.toByteArray()); 428 partDataList.clear(); 429 } 430 } 431 432 } catch (Exception e) { 433 log.warn(e.getMessage(), e); 434 if (diskFile != null) { 435 diskFile.manualClose(); 436 diskFile.delete(); 437 } 438 throw new IOException(e.getMessage(), e); 439 } 440 } 441 442 @Override 443 protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) throws IOException { 444 if (partDataList.size() > 0) { 445 ByteArrayOutputStream ops = new ByteArrayOutputStream(); 446 int size = partDataList.size(); 447 for (int i = 0; i < size; i++) { 448 byte[] partData = partDataList.get(i); 449 ops.write(partData); 450 } 451 diskFile.manualWrite(ops.toByteArray()); 452 partDataList.clear(); 453 } 454 diskFile.manualClose(); 455 try { 456 if (syncRoot != null && !diskFile.isLogicModify()) { 457 diskFile.delete(); 458 throw new IOException( 459 String.format("The file '%s' missing modify logic lock.", diskFile.getPath())); 460 } 461 if (isWritingDf) { 462 if (diskFile.exists()) { 463 diskFile.moveTo(realDf.getPath()); 464 if (isDebug(the)) { 465 LoggerFactory.getLogger(RemoteFile.class).mark("CompletedDiskFile - {}", 466 realDf.getPath()); 467 } 468 } 469 if (realDf.exists()) 470 the.syncedToDisk(); 471 } 472 if (realDf == null) { 473 if (diskFile != null) { 474 if (diskFile.getOrigin().getName().startsWith(".")) { 475 Files.setAttribute(Paths.get(diskFile.getPath()), "dos:hidden", true); 476 } 477 } 478 } else { 479 if (realDf.getOrigin().getName().startsWith(".")) { 480 Files.setAttribute(Paths.get(realDf.getPath()), "dos:hidden", true); 481 } 482 } 483 484 RemoteFile.removeQueuePathMapping(the.getPath()); 485 486 if (diskFile != null) 487 diskFile.removeLogicModify(); 488 489 if (realDf != null) 490 realDf.removeLogicModify(); 491 492 } catch (Exception e) { 493 log.error(e.getMessage(), e); 494 } 495 } 496 497 @Override 498 protected void ended(int lastPartIndex, long fileSize) { 499 try { 500 if (diskFile != null) { 501 diskFile.manualClose(); 502 } 503 } catch (Exception e) { 504 log.error(e.getMessage(), e); 505 } 506 } 507 }); 508 } else { 509 List<Map<String, Object>> writeList = list(getPath(), "%"); 510 writeList.addAll(list(parsePath(getPath() + "/%"), "%")); 511 512 int size = writeList.size(); 513 if (size > 0) { 514 if (!diskFile.exists()) { 515 if (isDebug(the)) { 516 log.mark("CreateDiskDir - {} ", diskFile.getPath()); 517 } 518 diskFile.mkdirs(); 519 } 520 } 521 522 for (Map<String, Object> item : writeList) { 523 RemoteFile rf = new RemoteFile( 524 String.format("%s/%s", item.get("file_parent_path"), item.get("file_name"))); 525 526 if (rf.getPath().equals(getPath())) 527 continue; 528 529 String dfPath = rf.getPath().replaceFirst(getPath(), diskFile.getPath()); 530 DiskFile df = new DiskFile(dfPath); 531 DiskFile.copyAttrs(df, false, this.syncRoot); 532 Timestamp mt = rf.getModifyTime(); 533 if (mt != null) { 534 df.setModifyTimeMsFromClock(mt.getTime()); 535 } 536 if (rf.isDir()) { 537 if (!df.exists()) { 538 if (isDebug(the)) { 539 log.mark("CreateDiskDir - {}", diskFile.getPath()); 540 } 541 df.mkdirs(); 542 } 543 } else { 544 rf.writeToDisk(df); 545 } 546 } 547 } 548 return diskFile; 549 } 550 551 public DiskFile writeToDisk() throws IOException { 552 DiskFile df = new DiskFile(this.origin.getAbsolutePath()); 553 DiskFile.copyAttrs(df, false, this.syncRoot); 554 return writeToDisk(df); 555 } 556 557 public DiskFile writeToDisk(String path) throws IOException { 558 DiskFile df = new DiskFile(path); 559 DiskFile.copyAttrs(df, false, this.syncRoot); 560 return writeToDisk(df); 561 } 562 563 public boolean isCompleted() throws IOException { 564 return getCompletedRecord() != null; 565 } 566 567 public Map<String, Object> getCompletedRecord() throws IOException { 568 String sql = formatSql(SQL_SELECT_FOR_COMPLETED); 569 Map dataRecord = new HashMap<String, Object>(); 570 dataRecord.put("file_name", origin.getName()); 571 dataRecord.put("file_parent_path", getParent()); 572 String cacheKey = String.format("first-%s-%s",sql,dataRecord); 573 Map<String,Object> record = (Map<String,Object>)Cache.get(cacheKey); 574 if(record == null){ 575 CacheDriverExecutor executor = null; 576 Connection conn = null; 577 try { 578 conn = dataSourceReader.getConnection(); 579 conn.setAutoCommit(true); 580 executor = new CacheDriverExecutor(conn); 581 record = executor.first(sql, dataRecord); 582 if(record != null){ 583 Cache.set(cacheKey,record,CACHE_SECONDS); 584 } 585 } catch (SQLException e) { 586 throw new IOException(e.getMessage(), e); 587 } finally { 588 try { 589 if (executor != null) 590 executor.close(); 591 } catch (Exception e) { 592 log.error(e.getMessage(), e); 593 } 594 } 595 } 596 return record; 597 } 598 599 public Map<String, Object> getRecordById(Object id) throws IOException { 600 String sql = formatSql(SQL_SELECT_GET_RECORD_BY_ID); 601 Map dataRecord = new HashMap<String, Object>(); 602 dataRecord.put("id", id); 603 Map<String, Object> record = null; 604 if (record == null) { 605 CacheDriverExecutor executor = null; 606 Connection conn = null; 607 try { 608 conn = dataSourceReader.getConnection(); 609 conn.setAutoCommit(true); 610 executor = new CacheDriverExecutor(conn); 611 record = executor.first(sql, dataRecord); 612 } catch (SQLException e) { 613 throw new IOException(e.getMessage(), e); 614 } finally { 615 try { 616 if (executor != null) 617 executor.close(); 618 } catch (Exception e) { 619 log.error(e.getMessage(), e); 620 } 621 } 622 } 623 return record; 624 } 625 626 public Map<String, Object> getRecordByFileId(Object fileId) throws IOException { 627 String sql = formatSql(SQL_SELECT_GET_RECORD_BY_FILE_ID); 628 Map dataRecord = new HashMap<String, Object>(); 629 dataRecord.put("file_id", fileId); 630 Map<String, Object> record = null; 631 if (record == null) { 632 CacheDriverExecutor executor = null; 633 Connection conn = null; 634 try { 635 conn = dataSourceReader.getConnection(); 636 conn.setAutoCommit(true); 637 executor = new CacheDriverExecutor(conn); 638 record = executor.first(sql, dataRecord); 639 } catch (SQLException e) { 640 throw new IOException(e.getMessage(), e); 641 } finally { 642 try { 643 if (executor != null) 644 executor.close(); 645 } catch (Exception e) { 646 log.error(e.getMessage(), e); 647 } 648 } 649 } 650 return record; 651 } 652 653 public Map<String, Object> getLastUpdateRecord() throws IOException { 654 String sql = formatSql(SQL_SELECT_FOR_LAST_UPDATE); 655 Map dataRecord = new HashMap<String, Object>(); 656 dataRecord.put("file_name", origin.getName()); 657 dataRecord.put("file_parent_path", getParent()); 658 Map<String, Object> record = null; 659 CacheDriverExecutor executor = null; 660 Connection conn = null; 661 try { 662 conn = dataSourceReader.getConnection(); 663 conn.setAutoCommit(true); 664 executor = new CacheDriverExecutor(conn); 665 record = executor.first(sql, dataRecord); 666 return record; 667 } catch (SQLException e) { 668 throw new IOException(e.getMessage(), e); 669 } finally { 670 try { 671 if (executor != null) 672 executor.close(); 673 } catch (Exception e) { 674 log.error(e.getMessage(), e); 675 } 676 } 677 } 678 679 protected Timestamp getFirstCreatedAt() throws IOException { 680 String sql = formatSql(SQL_SELECT_FOR_FIRST_CREATED_AT); 681 Map dataRecord = new HashMap<String, Object>(); 682 dataRecord.put("file_parent_path", getPath()); 683 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 684 dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 685 CacheDriverExecutor executor = null; 686 Connection conn = null; 687 try { 688 conn = dataSourceReader.getConnection(); 689 conn.setAutoCommit(true); 690 executor = new CacheDriverExecutor(conn); 691 Map<String, Object> record = executor.first(sql, dataRecord); 692 return record == null ? null : (Timestamp)record.get("created_at"); 693 } catch (SQLException e) { 694 throw new IOException(e.getMessage(), e); 695 } finally { 696 try { 697 if (executor != null) 698 executor.close(); 699 } catch (Exception e) { 700 log.error(e.getMessage(), e); 701 } 702 } 703 } 704 705 protected Integer getMaxFilePartDataTable() throws IOException { 706 String sql = formatSql(SQL_SELECT_FOR_MAX_DATA_TABLE); 707 Map dataRecord = new HashMap<String, Object>(); 708 dataRecord.put("file_parent_path", getPath()); 709 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 710 CacheDriverExecutor executor = null; 711 Connection conn = null; 712 try { 713 conn = dataSourceReader.getConnection(); 714 conn.setAutoCommit(true); 715 executor = new CacheDriverExecutor(conn); 716 Map<String, Object> record = executor.first(sql, dataRecord); 717 return record == null ? 0 : Integer.parseInt(record.get("file_part_data_table")+""); 718 } catch (SQLException e) { 719 throw new IOException(e.getMessage(), e); 720 } finally { 721 try { 722 if (executor != null) 723 executor.close(); 724 } catch (Exception e) { 725 log.error(e.getMessage(), e); 726 } 727 } 728 } 729 730 private Map<String, Object> getSelectByPathForCheckExists() throws IOException { 731 String sql = formatSql(SQL_SELECT_BY_PATH_FOR_CHECK_EXISTS); 732 Map dataRecord = new HashMap<String, Object>(); 733 dataRecord.put("file_name", origin.getName()); 734 dataRecord.put("file_parent_path", getParent()); 735 Map<String, Object> record = null; 736 if (record == null) { 737 CacheDriverExecutor executor = null; 738 Connection conn = null; 739 try { 740 conn = dataSourceReader.getConnection(); 741 conn.setAutoCommit(true); 742 executor = new CacheDriverExecutor(conn); 743 record = executor.first(sql, dataRecord); 744 return record; 745 } catch (SQLException e) { 746 throw new IOException(e.getMessage(), e); 747 } finally { 748 try { 749 if (executor != null) 750 executor.close(); 751 } catch (Exception e) { 752 log.error(e.getMessage(), e); 753 } 754 } 755 } 756 return record; 757 } 758 759 private int countAllSubFiles(String fileParentPath, String fileName) throws IOException { 760 String sql = formatSql(SQL_SELECT_FOR_COUNT_ALL_SUB_FILES); 761 Map dataRecord = new HashMap<String, Object>(); 762 dataRecord.put("file_name", fileName); 763 dataRecord.put("file_parent_path", fileParentPath); 764 Map<String, Object> record = null; 765 if (record == null) { 766 CacheDriverExecutor executor = null; 767 Connection conn = null; 768 try { 769 conn = dataSourceReader.getConnection(); 770 conn.setAutoCommit(true); 771 executor = new CacheDriverExecutor(conn); 772 record = executor.first(sql, dataRecord); 773 return Integer.parseInt(record.get("as_count") + ""); 774 } catch (SQLException e) { 775 throw new IOException(e.getMessage(), e); 776 } finally { 777 try { 778 if (executor != null) 779 executor.close(); 780 } catch (Exception e) { 781 log.error(e.getMessage(), e); 782 } 783 } 784 } 785 return -1; 786 } 787 788 private Map<String, Object> getFirstFilePart() throws IOException { 789 String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE); 790 Map dataRecord = new HashMap<String, Object>(); 791 dataRecord.put("file_name", origin.getName()); 792 dataRecord.put("file_parent_path", getParent()); 793 String cacheKey = String.format("first-%s-%s",sql,dataRecord); 794 Map<String, Object> record = (Map<String,Object>)Cache.get(cacheKey); 795 if(record == null){ 796 CacheDriverExecutor executor = null; 797 Connection conn = null; 798 try { 799 conn = dataSourceReader.getConnection(); 800 conn.setAutoCommit(true); 801 executor = new CacheDriverExecutor(conn); 802 record = executor.first(sql, dataRecord); 803 if(record != null){ 804 Cache.set(cacheKey,record,CACHE_SECONDS); 805 } 806 } catch (SQLException e) { 807 throw new IOException(e.getMessage(), e); 808 } finally { 809 try { 810 if (executor != null) 811 executor.close(); 812 } catch (Exception e) { 813 log.error(e.getMessage(), e); 814 } 815 } 816 } 817 return record; 818 } 819 820 private Map<String, Object> getFirstFilePartRefresh() throws IOException { 821 String sql = formatSql(SQL_SELECT_FOR_FIRST_FILE); 822 Map dataRecord = new HashMap<String, Object>(); 823 dataRecord.put("file_name", origin.getName()); 824 dataRecord.put("file_parent_path", getParent()); 825 String cacheKey = String.format("first-%s-%s",sql,dataRecord); 826 Cache.remove(cacheKey); 827 CacheDriverExecutor executor = null; 828 Connection conn = null; 829 try { 830 conn = dataSourceReader.getConnection(); 831 conn.setAutoCommit(true); 832 executor = new CacheDriverExecutor(conn); 833 Map<String,Object> record = executor.first(sql, dataRecord); 834 if(record != null){ 835 Cache.set(cacheKey,record,CACHE_SECONDS); 836 } 837 return record; 838 } catch (SQLException e) { 839 throw new IOException(e.getMessage(), e); 840 } finally { 841 try { 842 if (executor != null) 843 executor.close(); 844 } catch (Exception e) { 845 log.error(e.getMessage(), e); 846 } 847 } 848 } 849 850 private Map<String, Object> getLastIndexRecord() throws IOException { 851 String sql = formatSql(SQL_SELECT_FOR_LAST_FILE_PART); 852 Map dataRecord = new HashMap<String, Object>(); 853 dataRecord.put("file_name", origin.getName()); 854 dataRecord.put("file_parent_path", getParent()); 855 dataRecord.put("file_id", fileId); 856 Map<String, Object> record = null; 857 if (record == null) { 858 CacheDriverExecutor executor = null; 859 Connection conn = null; 860 try { 861 conn = dataSourceReader.getConnection(); 862 conn.setAutoCommit(true); 863 executor = new CacheDriverExecutor(conn); 864 record = executor.first(sql, dataRecord); 865 } catch (SQLException e) { 866 throw new IOException(e.getMessage(), e); 867 } finally { 868 try { 869 if (executor != null) 870 executor.close(); 871 } catch (Exception e) { 872 log.error(e.getMessage(), e); 873 } 874 } 875 } 876 return record; 877 } 878 879 public boolean isTimeout(long timeoutMs) throws IOException { 880 CacheDriverExecutor executor = null; 881 Connection conn = null; 882 try { 883 Clock clock = new Clock(); 884 conn = dataSourceReader.getConnection(); 885 conn.setAutoCommit(true); 886 executor = new CacheDriverExecutor(conn); 887 Map dataRecord = new HashMap<String, Object>(); 888 dataRecord.put("file_name", origin.getName()); 889 dataRecord.put("file_parent_path", getParent()); 890 Map<String, Object> completedRecord = getCompletedRecord(); 891 892 if (completedRecord != null) 893 return false; 894 895 Map<String, Object> record = executor.first(formatSql(SQL_SELECT_FOR_TIMEOUT), dataRecord); 896 897 if (record == null) 898 return false; 899 900 boolean completed = executor.first(formatSql(SQL_SELECT_CHECK_COMPLETED), record) != null; 901 if (!completed) { 902 boolean timeout = (clock.getTime() - ((Timestamp) record.get("updated_at")).getTime()) > timeoutMs; 903 904 return timeout; 905 } 906 return false; 907 } catch (SQLException e) { 908 throw new IOException(e.getMessage(), e); 909 } finally { 910 try { 911 if (executor != null) 912 executor.close(); 913 } catch (Exception e) { 914 log.error(e.getMessage(), e); 915 } 916 } 917 } 918 919 private Map<String, Object> readPart(Map<String, Object> rfRecord) throws IOException { 920 CacheDriverExecutor executor = null; 921 Connection conn = null; 922 try { 923 calculateDataMappingDataSourceReader(this, rfRecord); 924 if(this.dataMappingTable <= -1){ 925 return null; 926 } 927 Clock clock = new Clock(); 928 conn = dataMappingDataSourceReader.getConnection(); 929 conn.setAutoCommit(true); 930 executor = new CacheDriverExecutor(conn); 931 Map<String, Object> dataRecord = new HashMap<String, Object>(); 932 dataRecord.put("remote_file_id", rfRecord.get("id")); 933 return executor.first(formatSqlForFilePartData(SQL_SELECT_BY_RF_ID_FROM_FILE_DATA), dataRecord); 934 } catch (SQLException e) { 935 throw new IOException(e.getMessage(), e); 936 } finally { 937 try { 938 if (executor != null) 939 executor.close(); 940 } catch (Exception e) { 941 log.error(e.getMessage(), e); 942 } 943 } 944 } 945 946 public void merge(FilePart filePart) throws IOException { 947 if (isLink()) { 948 throw new IOException(String.format("The file '%s' is a link.", origin.getAbsolutePath())); 949 } 950 if (isDir()) { 951 throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath())); 952 } 953 CacheDriverExecutor executor = null; 954 Connection conn = null; 955 try { 956 Clock clock = new Clock(); 957 conn = dataSourceReader.getConnection(); 958 conn.setAutoCommit(true); 959 executor = new CacheDriverExecutor(conn); 960 Map dataRecord = getCompletedRecord(); 961 if (dataRecord != null) { 962 CacheArray rows = new CacheArray(); 963 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 964 long fileSize = 0L; 965 Map<String, Object> fp = null; 966 Map<String, Object> item = null; 967 968 @Override 969 public void execute(Integer index, Object o) { 970 971 try { 972 item = (Map<String, Object>) o; 973 fp = readPart(item); 974 Object fpData = null; 975 976 if(fp == null){ 977 fpData = new byte[]{}; 978 }else{ 979 fpData = fp.get("file_part_data"); 980 } 981 Thread.currentThread() 982 .setName(String.format("RemoteFile-merge-%s", item.get("file_name"))); 983 byte[] partData = null; 984 if (fpData instanceof byte[]) { 985 partData = (byte[]) fpData; 986 } 987 fileSize += partData.length; 988 filePart.process(index, partData); 989 } catch (Exception e) { 990 log.error(e.getMessage(), e); 991 this.terminated = true; 992 } 993 } 994 995 @Override 996 public void completed(Integer size) { 997 try { 998 if (item != null) { 999 Thread.currentThread() 1000 .setName(String.format("RemoteFile-merge-completed-%s", item.get("file_name"))); 1001 } 1002 filePart.completed(size, new Clock().getSqlTimestamp(), fileSize); 1003 } catch (IOException e) { 1004 log.error(e.getMessage(), e); 1005 } 1006 } 1007 1008 @Override 1009 public void terminated() { 1010 if (item != null) { 1011 Thread.currentThread() 1012 .setName(String.format("RemoteFile-merge-trminated-%s", item.get("file_name"))); 1013 log.error("Merge terminated: {}/{}", item.get("file_parent_path"), item.get("file_name")); 1014 } 1015 } 1016 }; 1017 rows.filter(filter); 1018 executor.find(formatSql(SQL_SELECT_FILE_HAVE_DATA), dataRecord, rows); 1019 } 1020 } catch (SQLException e) { 1021 throw new IOException(e.getMessage(), e); 1022 } finally { 1023 if (executor != null) { 1024 try { 1025 executor.close(); 1026 } catch (Exception e) { 1027 throw new IOException(e.getMessage(), e); 1028 } 1029 } 1030 } 1031 } 1032 1033 protected void copyFrom(boolean copyStructureOnly, DiskFile diskFile) throws IOException { 1034 var the = this; 1035 if (diskFile.isLink()) { 1036 the.setModifyTime(diskFile.getModifyTimeForClock()); 1037 the.createLink(diskFile.readLink()); 1038 } 1039 if (diskFile.isDir()) { 1040 the.setModifyTime(diskFile.getModifyTimeForClock()); 1041 the.mkdirs(); 1042 } 1043 if (diskFile.isFile()) { 1044 the.setModifyTime(diskFile.getModifyTimeForClock()); 1045 if (copyStructureOnly) { 1046 the.createStructureFile(diskFile.size()); 1047 diskFile.removeLogicAccess(); 1048 DiskFile.removeQueuePathMapping(diskFile.getPath()); 1049 } else { 1050 diskFile.split(filePartSize, new FilePart() { 1051 final List<byte[]> partDataList = new ArrayList<byte[]>(); 1052 int currentFilePartStartIndex = 0; 1053 1054 @Override 1055 protected void process(int partIndex, byte[] data) throws IOException { 1056 try{ 1057 if (partIndex == 0) { 1058 if (diskFile.syncRoot != null) { 1059 if (diskFile.isLogicModify()) { 1060 throw new IOException( 1061 String.format("The file '%s' is already logic locked by another thread.", 1062 diskFile.getPath())); 1063 } 1064 } 1065 if (the.exists(true)) { 1066 throw new IOException( 1067 String.format("The remote file '%s' already exist.", the.getPath())); 1068 } 1069 if (isDebug(the)) { 1070 LoggerFactory.getLogger(RemoteFile.class).mark("CreateRemoteFile - {}", the.getPath()); 1071 } 1072 } 1073 1074 partDataList.add(data); 1075 if (partDataList.size() >= BATCH_SIZE) { 1076 diskFile.logicAccess(); 1077 if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){ 1078 throw new IOException(String.format("The remote file '%s' write failed.",the.getPath())); 1079 } 1080 partDataList.clear(); 1081 currentFilePartStartIndex += BATCH_SIZE; 1082 } 1083 }catch(Exception e){ 1084 throw new IOException(e.getMessage(),e); 1085 } 1086 } 1087 1088 @Override 1089 protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) 1090 throws IOException { 1091 try { 1092 if (diskFile.syncRoot != null) { 1093 if (diskFile.isLogicModify()) { 1094 throw new IOException( 1095 String.format("The file '%s' is already logic locked by another thread.", 1096 diskFile.getPath())); 1097 } 1098 } 1099 /**This is 0 bytes file**/ 1100 if (lastPartIndex == -1) { 1101 1102 if (the.exists(true)) { 1103 the.forceDeleteFile(false); 1104 } 1105 1106 the.setModifyTime(modifyTime); 1107 the.createEmptyFile(); 1108 } else { 1109 the.setModifyTime(modifyTime); 1110 if (partDataList.size() > 0) { 1111 if(!the.writeToBatch(currentFilePartStartIndex, partDataList)){ 1112 throw new IOException(String.format("The remote file '%s' write failed.",the.getPath())); 1113 } 1114 partDataList.clear(); 1115 } 1116 the.complete(lastPartIndex, fileSize); 1117 } 1118 if (isDebug(the)) { 1119 LoggerFactory.getLogger(RemoteFile.class).mark("CompletedRemoteFile - {}", 1120 the.getPath()); 1121 } 1122 DiskFile.removeQueuePathMapping(diskFile.getPath()); 1123 } catch (Exception e) { 1124 throw new IOException(e.getMessage(),e); 1125 } 1126 } 1127 1128 @Override 1129 protected void ended(int lastPartIndex, long fileSize) { 1130 1131 } 1132 1133 }); 1134 } 1135 } 1136 } 1137 1138 private boolean createEmptyFile() throws IOException { 1139 boolean allowed = beforeWrite(); 1140 1141 if(!allowed) return false; 1142 1143 Clock clock = new Clock(); 1144 Timestamp currentTime = clock.getSqlTimestamp(); 1145 CacheDriverExecutor executor = null; 1146 Connection conn = null; 1147 try { 1148 calculateDataMappingDataSourceWriter(this); 1149 conn = dataSourceWriter.getConnection(); 1150 conn.setAutoCommit(true); 1151 executor = new CacheDriverExecutor(conn); 1152 Map dataRecord = new HashMap<String, Object>(); 1153 dataRecord.put("id", 1154 generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix)); 1155 dataRecord.put("file_name", origin.getName()); 1156 dataRecord.put("file_parent_path", getParent()); 1157 dataRecord.put("file_type", "F"); 1158 dataRecord.put("file_part_index", 0); 1159 dataRecord.put("file_part_size", 0); 1160 dataRecord.put("file_part_data_ds", "NONE"); 1161 dataRecord.put("file_part_data_table", this.dataMappingTable); 1162 dataRecord.put("file_part_last_index", 0); 1163 dataRecord.put("file_size", 0); 1164 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1165 dataRecord.put("file_deleted", 0); 1166 dataRecord.put("created_user_id", getDefaultModifyUserId()); 1167 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1168 dataRecord.put("created_at", currentTime); 1169 dataRecord.put("updated_at", currentTime); 1170 dataRecord.put("file_id", fileId); 1171 dataRecord.put("source_hostname", CommonTools.getHostname()); 1172 dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1173 dataRecord.put("deleted_on_hostnames", ";"); 1174 executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord); 1175 1176 afterWrite(); 1177 1178 return true; 1179 } catch (Exception e) { 1180 throw new IOException(e.getMessage(), e); 1181 } finally { 1182 try { 1183 if (executor != null) 1184 executor.close(); 1185 } catch (Exception e) { 1186 log.error(e.getMessage(), e); 1187 } 1188 } 1189 } 1190 1191 private boolean createStructureFile(long fileSize) throws IOException { 1192 boolean allowed = beforeWrite(); 1193 1194 if(!allowed) return false; 1195 1196 log.debug("CreateStructureFile: {}", getPath()); 1197 Clock clock = new Clock(); 1198 Timestamp currentTime = clock.getSqlTimestamp(); 1199 CacheDriverExecutor executor = null; 1200 Connection conn = null; 1201 try { 1202 calculateDataMappingDataSourceWriter(this); 1203 conn = dataSourceWriter.getConnection(); 1204 conn.setAutoCommit(true); 1205 executor = new CacheDriverExecutor(conn); 1206 Map dataRecord = new HashMap<String, Object>(); 1207 dataRecord.put("id", 1208 generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix)); 1209 dataRecord.put("file_name", origin.getName()); 1210 dataRecord.put("file_parent_path", getParent()); 1211 dataRecord.put("file_type", "F"); 1212 dataRecord.put("file_part_index", 0); 1213 dataRecord.put("file_part_size", fileSize); 1214 dataRecord.put("file_part_data_ds", "NONE"); 1215 dataRecord.put("file_part_data_table", this.dataMappingTable); 1216 dataRecord.put("file_part_last_index", 0); 1217 dataRecord.put("file_size", fileSize); 1218 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1219 dataRecord.put("file_deleted", 0); 1220 dataRecord.put("created_user_id", getDefaultModifyUserId()); 1221 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1222 dataRecord.put("created_at", currentTime); 1223 dataRecord.put("updated_at", currentTime); 1224 dataRecord.put("file_id", fileId); 1225 dataRecord.put("source_hostname", CommonTools.getHostname()); 1226 dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1227 dataRecord.put("deleted_on_hostnames", ";"); 1228 executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord); 1229 1230 afterWrite(); 1231 1232 return true; 1233 } catch (Exception e) { 1234 throw new IOException(e.getMessage(), e); 1235 } finally { 1236 try { 1237 if (executor != null) 1238 executor.close(); 1239 } catch (Exception e) { 1240 log.error(e.getMessage(), e); 1241 } 1242 } 1243 } 1244 1245 protected boolean syncedToDisk() throws IOException { 1246 Clock clock = new Clock(); 1247 CacheDriverExecutor executor = null; 1248 Connection conn = null; 1249 try { 1250 conn = dataSourceWriter.getConnection(); 1251 conn.setAutoCommit(true); 1252 executor = new CacheDriverExecutor(conn); 1253 Map<String, Object> record = getCompletedRecord(); 1254 if (record == null) { 1255 log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath())); 1256 } 1257 if (record != null) { 1258 String hostname = CommonTools.getHostname(); 1259 String syncedOnHostnames = (String) record.get("synced_on_hostnames"); 1260 syncedOnHostnames = syncedOnHostnames.replaceAll(String.format(";%s;",hostname),";"); 1261 String shn = String.format("%s%s;", syncedOnHostnames, hostname); 1262 Map<String, Object> dataRecord = new HashMap<String, Object>(); 1263 dataRecord.put("file_parent_path",record.get("file_parent_path")); 1264 dataRecord.put("file_name", record.get("file_name")); 1265 dataRecord.put("synced_on_hostnames",shn); 1266 dataRecord.put("updated_at", clock.getSqlTimestamp()); 1267 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1268 executor.execute(formatSql(SQL_UPDATE_FOR_SYNCED_TO_DISK), dataRecord); 1269 return true; 1270 } 1271 return false; 1272 } catch (SQLException e) { 1273 throw new IOException(e.getMessage(), e); 1274 } finally { 1275 try { 1276 if (executor != null) 1277 executor.close(); 1278 } catch (Exception e) { 1279 log.error(e.getMessage(), e); 1280 } 1281 } 1282 } 1283 1284 protected boolean deletedToDiskAndSub(Object id) throws IOException { 1285 Clock clock = new Clock(); 1286 CacheDriverExecutor executor = null; 1287 Connection conn = null; 1288 try { 1289 conn = dataSourceWriter.getConnection(); 1290 conn.setAutoCommit(true); 1291 executor = new CacheDriverExecutor(conn); 1292 Map<String, Object> record = getRecordById(id); 1293 if (record == null) { 1294 log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath())); 1295 } 1296 if (record != null) { 1297 String hostname = CommonTools.getHostname(); 1298 String deletedOnHostnames = (String) record.get("deleted_on_hostnames"); 1299 deletedOnHostnames = deletedOnHostnames.replaceAll(String.format(";%s;",hostname),";"); 1300 String dhn = String.format("%s%s;", deletedOnHostnames, hostname); 1301 Timestamp now = clock.getSqlTimestamp(); 1302 final Map<String, Object> dataRecord = new HashMap<String, Object>(); 1303 dataRecord.put("file_parent_path", record.get("file_parent_path")); 1304 dataRecord.put("file_name", record.get("file_name")); 1305 dataRecord.put("deleted_on_hostnames", dhn); 1306 dataRecord.put("updated_at", now); 1307 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1308 executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK), dataRecord); 1309 if (record.get("file_type").equals("D")) { 1310 String dirPath = String.format("%s/%s", record.get("file_parent_path"), 1311 record.get("file_name")); 1312 dataRecord.put("file_parent_path", dirPath); 1313 dataRecord.put("like_file_parent_path", dirPath + "/%"); 1314 executor.execute(formatSql(SQL_UPDATE_FOR_DELETED_TO_DISK_SUB_FILES), dataRecord); 1315 } 1316 return true; 1317 } 1318 return false; 1319 } catch (SQLException e) { 1320 throw new IOException(e.getMessage(), e); 1321 } finally { 1322 try { 1323 if (executor != null) 1324 executor.close(); 1325 } catch (Exception e) { 1326 log.error(e.getMessage(), e); 1327 } 1328 } 1329 } 1330 1331 private boolean complete(int filePartLastIndex, long fileSize) throws IOException { 1332 Clock clock = new Clock(); 1333 CacheDriverExecutor executor = null; 1334 Timestamp currentTime = clock.getSqlTimestamp(); 1335 Connection conn = null; 1336 try { 1337 conn = dataSourceWriter.getConnection(); 1338 conn.setAutoCommit(true); 1339 executor = new CacheDriverExecutor(conn); 1340 Map dataRecord = new HashMap<String, Object>(); 1341 dataRecord.put("file_id", fileId); 1342 dataRecord.put("file_part_last_index", filePartLastIndex); 1343 dataRecord.put("file_size", fileSize); 1344 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1345 dataRecord.put("updated_at", currentTime); 1346 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1347 int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord); 1348 1349 afterWrite(); 1350 1351 return rows > 0; 1352 } catch (SQLException e) { 1353 throw new IOException(e.getMessage(), e); 1354 } finally { 1355 try { 1356 if (executor != null) 1357 executor.close(); 1358 } catch (Exception e) { 1359 log.error(e.getMessage(), e); 1360 } 1361 } 1362 } 1363 1364 @Override 1365 public boolean complete() throws IOException { 1366 Clock clock = new Clock(); 1367 Timestamp currentTime = clock.getSqlTimestamp(); 1368 CacheDriverExecutor executor = null; 1369 Connection conn = null; 1370 try { 1371 conn = dataSourceWriter.getConnection(); 1372 conn.setAutoCommit(true); 1373 executor = new CacheDriverExecutor(conn); 1374 Map dataRecord = new HashMap<String, Object>(); 1375 dataRecord.put("file_name", origin.getName()); 1376 dataRecord.put("file_parent_path", getParent()); 1377 dataRecord.put("file_id", fileId); 1378 1379 Map<String, Object> sumRecord = executor.first(formatSql(SQL_SELECT_FOR_SUM_FILE_PART_SIZE_INDEX), 1380 dataRecord); 1381 1382 if (sumRecord == null) { 1383 log.debug(String.format("Remmote file not found '%s'.", origin.getAbsolutePath())); 1384 } 1385 1386 if (sumRecord != null) { 1387 long fileSize = Long.parseLong(sumRecord.get("file_part_size") + ""); 1388 long filePartLastIndex = Integer.parseInt(sumRecord.get("file_part_index") + ""); 1389 1390 dataRecord.put("file_part_last_index", filePartLastIndex); 1391 dataRecord.put("file_size", fileSize); 1392 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1393 dataRecord.put("updated_at", currentTime); 1394 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1395 int rows = executor.execute(formatSql(SQL_UPDATE_FOR_COMPLETED), dataRecord); 1396 1397 afterWrite(); 1398 1399 return rows > 0; 1400 } 1401 return false; 1402 } catch (SQLException e) { 1403 throw new IOException(e.getMessage(), e); 1404 } finally { 1405 try { 1406 if (executor != null) 1407 executor.close(); 1408 } catch (Exception e) { 1409 log.error(e.getMessage(), e); 1410 } 1411 } 1412 } 1413 1414 public boolean forceDelete() throws IOException { 1415 return forceDeleteAll(true); 1416 } 1417 1418 public boolean forceDeleteDir(boolean realDeleted) throws IOException { 1419 return forceDelete(realDeleted, "D"); 1420 } 1421 1422 public boolean forceDeleteLink(boolean realDeleted) throws IOException { 1423 return forceDelete(realDeleted, "L"); 1424 } 1425 1426 public boolean forceDeleteFile(boolean realDeleted) throws IOException { 1427 return forceDelete(realDeleted, "F"); 1428 } 1429 1430 public boolean forceDelete(boolean realDeleted, String fileType) throws IOException { 1431 if (fileType.equalsIgnoreCase("D")) { 1432 return forceDeleteAll(realDeleted); 1433 } else { 1434 boolean allowed = beforeDelete(realDeleted); 1435 if (allowed) { 1436 CacheDriverExecutor executor = null; 1437 Connection conn = null; 1438 try { 1439 Clock clock = new Clock(); 1440 Timestamp currentTime = clock.getSqlTimestamp(); 1441 conn = dataSourceWriter.getConnection(); 1442 conn.setAutoCommit(true); 1443 executor = new CacheDriverExecutor(conn); 1444 Map dataRecord = new HashMap<String, Object>(); 1445 dataRecord.put("file_deleted", realDeleted ? 1 : 0); 1446 dataRecord.put("deleted", realDeleted ? 0 : 1); 1447 dataRecord.put("file_name", origin.getName()); 1448 dataRecord.put("file_type", fileType.toUpperCase()); 1449 dataRecord.put("file_parent_path", getParent()); 1450 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1451 dataRecord.put("deleted_user_id", getDefaultModifyUserId()); 1452 dataRecord.put("updated_at", currentTime); 1453 dataRecord.put("deleted_at", currentTime); 1454 dataRecord.put("root_folder", getPath()); 1455 dataRecord.put("like_subfolder", parsePath(getPath() + "/%")); 1456 int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_FILE_LINK), dataRecord); 1457 log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows); 1458 1459 afterDelete(realDeleted); 1460 1461 return resultRows > 0; 1462 } catch (SQLException e) { 1463 throw new IOException(e.getMessage(), e); 1464 } finally { 1465 try { 1466 if (executor != null) 1467 executor.close(); 1468 } catch (Exception e) { 1469 log.error(e.getMessage(), e); 1470 } 1471 } 1472 } 1473 } 1474 return false; 1475 } 1476 1477 public boolean forceDeleteAll(boolean realDeleted) throws IOException { 1478 boolean allowed = beforeDelete(realDeleted); 1479 if (allowed) { 1480 CacheDriverExecutor executor = null; 1481 Connection conn = null; 1482 try { 1483 Clock clock = new Clock(); 1484 Timestamp currentTime = clock.getSqlTimestamp(); 1485 conn = dataSourceWriter.getConnection(); 1486 conn.setAutoCommit(true); 1487 executor = new CacheDriverExecutor(conn); 1488 Map dataRecord = new HashMap<String, Object>(); 1489 dataRecord.put("file_deleted", realDeleted ? 1 : 0); 1490 dataRecord.put("deleted", realDeleted ? 0 : 1); 1491 dataRecord.put("file_name", origin.getName()); 1492 dataRecord.put("file_parent_path", getParent()); 1493 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1494 dataRecord.put("deleted_user_id", getDefaultModifyUserId()); 1495 dataRecord.put("updated_at", currentTime); 1496 dataRecord.put("deleted_at", currentTime); 1497 dataRecord.put("root_folder", getPath()); 1498 dataRecord.put("like_subfolder", parsePath(getPath() + "/%")); 1499 int resultRows = executor.execute(formatSql(SQL_UPDATE_FOR_DELETE_ALL), dataRecord); 1500 log.debug("ForceDelete({}): {} -> {}", realDeleted, getPath(), resultRows); 1501 1502 afterDelete(realDeleted); 1503 1504 return resultRows > 0; 1505 } catch (SQLException e) { 1506 throw new IOException(e.getMessage(), e); 1507 } finally { 1508 try { 1509 if (executor != null) 1510 executor.close(); 1511 } catch (Exception e) { 1512 log.error(e.getMessage(), e); 1513 } 1514 } 1515 } 1516 return false; 1517 } 1518 1519 @Override 1520 public boolean delete() throws IOException { 1521 return forceDeleteAll(true); 1522 } 1523 1524 public boolean delete(boolean realDeleted) throws IOException { 1525 return forceDeleteAll(realDeleted); 1526 } 1527 1528 @Override 1529 public boolean beforeDelete(boolean realDeleted) { 1530 return true; 1531 } 1532 1533 @Override 1534 public void afterDelete(boolean realDeleted) { 1535 1536 } 1537 1538 @Override 1539 public boolean beforeMkdirs() { 1540 return true; 1541 } 1542 1543 @Override 1544 public void afterMkdirs() { 1545 1546 } 1547 1548 @Override 1549 public boolean beforeCreateLink(String target) { 1550 return true; 1551 } 1552 1553 @Override 1554 public void afterCreateLink(String target) { 1555 1556 } 1557 1558 @Override 1559 public boolean beforeWrite(){ 1560 return true; 1561 } 1562 1563 @Override 1564 public void afterWrite(){ 1565 1566 } 1567 1568 @Override 1569 public boolean exists() throws IOException { 1570 return getFirstFilePart() != null; 1571 } 1572 1573 public boolean exists(boolean refresh) throws IOException { 1574 if(refresh){ 1575 return getFirstFilePartRefresh() != null; 1576 }else{ 1577 return exists(); 1578 } 1579 } 1580 1581 @Override 1582 public boolean mkdirs() throws IOException { 1583 return mkdirs(false); 1584 } 1585 1586 public boolean mkdirs(boolean checkSourceHostname) throws IOException { 1587 1588 boolean allowed = beforeMkdirs(); 1589 1590 if(!allowed) return false; 1591 1592 Clock clock = new Clock(); 1593 CacheDriverExecutor executor = null; 1594 Connection conn = null; 1595 try { 1596 if (getFirstFilePart() != null) { 1597 return false; 1598 } 1599 if (checkSourceHostname) { 1600 String lastSourceHostname = getLastSourceHostname(); 1601 if (lastSourceHostname != null) { 1602 String clientHostname = CommonTools.getHostname(); 1603 boolean sameHostname = clientHostname.equals(lastSourceHostname); 1604 if (!sameHostname) { 1605 return false; 1606 } 1607 } 1608 } 1609 log.debug("Mkidrs: {}", getPath()); 1610 calculateDataMappingDataSourceWriter(this); 1611 Timestamp currentTime = clock.getSqlTimestamp(); 1612 conn = dataSourceWriter.getConnection(); 1613 conn.setAutoCommit(true); 1614 executor = new CacheDriverExecutor(conn); 1615 Map dataRecord = new HashMap<String, Object>(); 1616 dataRecord.put("id", 1617 generateId(identityCardEnable, identityCardName, identityCardPrefix, identityCardSuffix)); 1618 dataRecord.put("file_name", origin.getName()); 1619 dataRecord.put("file_parent_path", getParent()); 1620 dataRecord.put("file_type", "D"); 1621 dataRecord.put("file_part_index", 0); 1622 dataRecord.put("file_part_size", 0); 1623 dataRecord.put("file_part_data_ds", "NONE"); 1624 dataRecord.put("file_part_data_table", this.dataMappingTable); 1625 dataRecord.put("file_part_last_index", 0); 1626 dataRecord.put("file_size", 0); 1627 dataRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1628 dataRecord.put("file_deleted", 0); 1629 dataRecord.put("created_user_id", getDefaultModifyUserId()); 1630 dataRecord.put("updated_user_id", getDefaultModifyUserId()); 1631 dataRecord.put("created_at", currentTime); 1632 dataRecord.put("updated_at", currentTime); 1633 dataRecord.put("file_id", fileId); 1634 dataRecord.put("source_hostname", CommonTools.getHostname()); 1635 dataRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1636 dataRecord.put("deleted_on_hostnames", ";"); 1637 int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), dataRecord); 1638 1639 afterMkdirs(); 1640 1641 return rows > 0; 1642 } catch (Exception e) { 1643 throw new IOException(e.getMessage(), e); 1644 } finally { 1645 try { 1646 if (executor != null) 1647 executor.close(); 1648 } catch (Exception e) { 1649 log.error(e.getMessage(), e); 1650 } 1651 } 1652 } 1653 1654 @Override 1655 public boolean write(byte[] data, boolean append) throws IOException { 1656 return writeTo(data, append); 1657 } 1658 1659 public synchronized boolean writeBlock(byte[] data, boolean append) throws IOException { 1660 return writeTo(data, append); 1661 } 1662 1663 private boolean writeToBatch(int currentFilePartStartIndex, List<byte[]> partDataList) throws IOException { 1664 1665 boolean allowed = beforeWrite(); 1666 1667 if(!allowed) return false; 1668 1669 CacheDriverExecutor executor = null; 1670 CacheDriverExecutor dmExecutor = null; 1671 List<Map<String, Object>> rfList = new ArrayList<Map<String, Object>>(); 1672 List<Map<String, Object>> dmList = new ArrayList<Map<String, Object>>(); 1673 try { 1674 Clock clock = new Clock(); 1675 log.debug("Write: {}", getPath()); 1676 int size = partDataList.size(); 1677 Timestamp currentTime = clock.getSqlTimestamp(); 1678 String hostname = CommonTools.getHostname(); 1679 calculateDataMappingDataSourceWriter(this); 1680 for (int filePartIndex = 0; filePartIndex < size; filePartIndex++) { 1681 byte[] data = partDataList.get(filePartIndex); 1682 String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix, 1683 identityCardSuffix); 1684 Map rfRecord = new HashMap<String, Object>(); 1685 rfRecord.put("id", remoteFileId); 1686 rfRecord.put("file_name", origin.getName()); 1687 rfRecord.put("file_parent_path", getParent()); 1688 rfRecord.put("file_type", "F"); 1689 rfRecord.put("file_part_index", filePartIndex + currentFilePartStartIndex); 1690 rfRecord.put("file_part_size", data.length); 1691 rfRecord.put("file_part_data_ds", this.dataMappingDs); 1692 rfRecord.put("file_part_data_table", this.dataMappingTable); 1693 rfRecord.put("file_part_last_index", -1); 1694 rfRecord.put("file_size", 0); 1695 rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1696 rfRecord.put("file_deleted", 0); 1697 rfRecord.put("created_user_id", getDefaultModifyUserId()); 1698 rfRecord.put("updated_user_id", getDefaultModifyUserId()); 1699 rfRecord.put("created_at", currentTime); 1700 rfRecord.put("updated_at", currentTime); 1701 rfRecord.put("file_id", fileId); 1702 rfRecord.put("source_hostname", hostname); 1703 rfRecord.put("synced_on_hostnames", String.format(";%s;", hostname)); 1704 rfRecord.put("deleted_on_hostnames", ";"); 1705 rfList.add(rfRecord); 1706 1707 Map<String, Object> dataRecord = new HashMap<String, Object>(); 1708 dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this)); 1709 dataRecord.put("file_part_data", encrypt(data)); 1710 dataRecord.put("remote_file_id", remoteFileId); 1711 dataRecord.put("file_id", fileId); 1712 dataRecord.put("created_at", currentTime); 1713 dataRecord.put("updated_at", currentTime); 1714 dmList.add(dataRecord); 1715 } 1716 Connection conn = dataSourceWriter.getConnection(); 1717 conn.setAutoCommit(true); 1718 executor = new CacheDriverExecutor(conn); 1719 executor.executeBatch(formatSql(SQL_INSERT_INTO_FILE), rfList); 1720 1721 conn = dataMappingDataSourceWriter.getConnection(); 1722 conn.setAutoCommit(true); 1723 dmExecutor = new CacheDriverExecutor(conn); 1724 dmExecutor.executeBatch(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dmList); 1725 1726 return true; 1727 } catch (Exception e) { 1728 throw new IOException(e.getMessage(), e); 1729 } finally { 1730 try { 1731 if (executor != null) 1732 executor.close(); 1733 } catch (Exception e) { 1734 log.error(e.getMessage(), e); 1735 } 1736 try { 1737 if (dmExecutor != null) 1738 dmExecutor.close(); 1739 } catch (Exception e) { 1740 log.error(e.getMessage(), e); 1741 } 1742 } 1743 } 1744 1745 private boolean writeTo(byte[] data, boolean append) throws IOException { 1746 1747 boolean allowed = beforeWrite(); 1748 1749 if(!allowed) return false; 1750 1751 CacheDriverExecutor executor = null; 1752 CacheDriverExecutor dmExecutor = null; 1753 try { 1754 if (!append) { 1755 if (exists()) 1756 throw new IOException( 1757 String.format("The remote file '%s' already exists.", origin.getAbsolutePath())); 1758 } 1759 Clock clock = new Clock(); 1760 Timestamp currentTime = clock.getSqlTimestamp(); 1761 Map<String, Object> lastRecord = getLastIndexRecord(); 1762 int filePartIndex = 0; 1763 if (lastRecord != null) { 1764 if (lastRecord.get("file_type").equals("D")) { 1765 throw new IOException(String.format("This is a folder '%s'.", origin.getAbsolutePath())); 1766 } 1767 if (lastRecord.get("file_type").equals("L")) { 1768 throw new IOException(String.format("This is a link '%s'.", origin.getAbsolutePath())); 1769 } 1770 if (append) { 1771 filePartIndex = Integer.parseInt(lastRecord.get("file_part_index") + "") + 1; 1772 } 1773 boolean isDeleted = false; 1774 Object deletedValue = lastRecord.get("deleted"); 1775 if (deletedValue instanceof Boolean) { 1776 isDeleted = (Boolean) deletedValue; 1777 } else { 1778 isDeleted = Integer.parseInt(deletedValue + "") == 1; 1779 } 1780 if (isDeleted) { 1781 throw new IOException(String.format("The file '%s' is deleted.", origin.getAbsolutePath())); 1782 } 1783 } 1784 calculateDataMappingDataSourceWriter(this); 1785 log.debug("Write: {}", getPath()); 1786 String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix, 1787 identityCardSuffix); 1788 Map rfRecord = new HashMap<String, Object>(); 1789 rfRecord.put("id", remoteFileId); 1790 rfRecord.put("file_name", origin.getName()); 1791 rfRecord.put("file_parent_path", getParent()); 1792 rfRecord.put("file_type", "F"); 1793 rfRecord.put("file_part_index", filePartIndex); 1794 rfRecord.put("file_part_size", data.length); 1795 rfRecord.put("file_part_data_ds", this.dataMappingDs); 1796 rfRecord.put("file_part_data_table", this.dataMappingTable); 1797 rfRecord.put("file_part_last_index", -1); 1798 rfRecord.put("file_size", 0); 1799 rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1800 rfRecord.put("file_deleted", 0); 1801 rfRecord.put("created_user_id", getDefaultModifyUserId()); 1802 rfRecord.put("updated_user_id", getDefaultModifyUserId()); 1803 rfRecord.put("created_at", currentTime); 1804 rfRecord.put("updated_at", currentTime); 1805 rfRecord.put("file_id", fileId); 1806 rfRecord.put("source_hostname", CommonTools.getHostname()); 1807 rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1808 rfRecord.put("deleted_on_hostnames", ";"); 1809 1810 Connection conn = dataSourceWriter.getConnection(); 1811 conn.setAutoCommit(true); 1812 executor = new CacheDriverExecutor(conn); 1813 executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord); 1814 1815 Map dataRecord = new HashMap<String, Object>(); 1816 dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this)); 1817 dataRecord.put("file_part_data", encrypt(data)); 1818 dataRecord.put("remote_file_id", remoteFileId); 1819 dataRecord.put("file_id", fileId); 1820 dataRecord.put("created_at", currentTime); 1821 dataRecord.put("updated_at", currentTime); 1822 1823 conn = dataMappingDataSourceWriter.getConnection(); 1824 conn.setAutoCommit(true); 1825 dmExecutor = new CacheDriverExecutor(conn); 1826 dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord); 1827 1828 return true; 1829 } catch (Exception e) { 1830 throw new IOException(e.getMessage(), e); 1831 } finally { 1832 try { 1833 if (executor != null) 1834 executor.close(); 1835 } catch (Exception e) { 1836 log.error(e.getMessage(), e); 1837 } 1838 try { 1839 if (dmExecutor != null) 1840 dmExecutor.close(); 1841 } catch (Exception e) { 1842 log.error(e.getMessage(), e); 1843 } 1844 } 1845 } 1846 1847 @Override 1848 public boolean write(byte[] data) throws IOException { 1849 return write(data, false); 1850 } 1851 1852 @Override 1853 public boolean write(String data, boolean append) throws IOException { 1854 return write(data.getBytes(CHARSET), append); 1855 } 1856 1857 @Override 1858 public boolean write(String data) throws IOException { 1859 return write(data.getBytes(CHARSET)); 1860 } 1861 1862 @Override 1863 public boolean createLink(String target) throws IOException { 1864 boolean allowed = beforeCreateLink(target); 1865 1866 if(!allowed) return false; 1867 1868 Clock clock = new Clock(); 1869 CacheDriverExecutor executor = null; 1870 CacheDriverExecutor dmExecutor = null; 1871 try { 1872 if (getFirstFilePart() != null) { 1873 return false; 1874 } 1875 log.debug("CreateLink: {}", getPath()); 1876 Timestamp currentTime = clock.getSqlTimestamp(); 1877 calculateDataMappingDataSourceWriter(this); 1878 String remoteFileId = generateId(identityCardEnable, identityCardName, identityCardPrefix, 1879 identityCardSuffix); 1880 byte[] data = target.getBytes(CHARSET); 1881 Map rfRecord = new HashMap<String, Object>(); 1882 rfRecord.put("id", remoteFileId); 1883 rfRecord.put("file_name", origin.getName()); 1884 rfRecord.put("file_parent_path", getParent()); 1885 rfRecord.put("file_type", "L"); 1886 rfRecord.put("file_part_index", 0); 1887 rfRecord.put("file_part_size", data.length); 1888 rfRecord.put("file_part_data_ds", this.dataMappingDs); 1889 rfRecord.put("file_part_data_table", this.dataMappingTable); 1890 rfRecord.put("file_part_last_index", -1); 1891 rfRecord.put("file_size", 0); 1892 rfRecord.put("file_modify_time", modifyTime == null ? currentTime : modifyTime); 1893 rfRecord.put("file_deleted", 0); 1894 rfRecord.put("created_user_id", getDefaultModifyUserId()); 1895 rfRecord.put("updated_user_id", getDefaultModifyUserId()); 1896 rfRecord.put("created_at", currentTime); 1897 rfRecord.put("updated_at", currentTime); 1898 rfRecord.put("file_id", fileId); 1899 rfRecord.put("source_hostname", CommonTools.getHostname()); 1900 rfRecord.put("synced_on_hostnames", String.format(";%s;", CommonTools.getHostname())); 1901 rfRecord.put("deleted_on_hostnames", ";"); 1902 1903 Connection conn = dataSourceWriter.getConnection(); 1904 conn.setAutoCommit(true); 1905 executor = new CacheDriverExecutor(conn); 1906 int rows = executor.execute(formatSql(SQL_INSERT_INTO_FILE), rfRecord); 1907 1908 Map dataRecord = new HashMap<String, Object>(); 1909 dataRecord.put("id", generateDataId(identityCardEnable, identityCardName, identityCardSuffix, this)); 1910 dataRecord.put("file_part_data", encrypt(data)); 1911 dataRecord.put("remote_file_id", remoteFileId); 1912 dataRecord.put("file_id", fileId); 1913 dataRecord.put("created_at", currentTime); 1914 dataRecord.put("updated_at", currentTime); 1915 1916 conn = dataMappingDataSourceWriter.getConnection(); 1917 conn.setAutoCommit(true); 1918 dmExecutor = new CacheDriverExecutor(conn); 1919 dmExecutor.execute(formatSqlForFilePartData(SQL_INSERT_INTO_FILE_DATA), dataRecord); 1920 1921 complete(); 1922 1923 afterCreateLink(target); 1924 1925 return rows > 0; 1926 } catch (Exception e) { 1927 throw new IOException(e.getMessage(), e); 1928 } finally { 1929 try { 1930 if (executor != null) 1931 executor.close(); 1932 } catch (Exception e) { 1933 log.error(e.getMessage(), e); 1934 } 1935 try { 1936 if (dmExecutor != null) 1937 dmExecutor.close(); 1938 } catch (Exception e) { 1939 log.error(e.getMessage(), e); 1940 } 1941 } 1942 } 1943 1944 @Override 1945 public String readLink() throws IOException { 1946 return new String(readAllBytes(),BaseFile.CHARSET); 1947 } 1948 1949 @Override 1950 public byte[] readAllBytes() throws IOException { 1951 return readAllBytesFrom(); 1952 } 1953 1954 private byte[] readAllBytesFrom() throws IOException { 1955 if (isDir()) { 1956 throw new IOException(String.format("The file '%s' is a folder.", origin.getAbsolutePath())); 1957 } 1958 if (!exists()) { 1959 throw new IOException(String.format("The file '%s' does not exist.", origin.getAbsolutePath())); 1960 } 1961 CacheDriverExecutor executor = null; 1962 Connection conn = null; 1963 ByteArrayOutputStream baos = null; 1964 try { 1965 conn = dataSourceReader.getConnection(); 1966 conn.setAutoCommit(true); 1967 executor = new CacheDriverExecutor(conn); 1968 Map dataRecord = getCompletedRecord(); 1969 List<Map<String, Object>> filePartList = executor.find(formatSql(SQL_SELECT_FILE), dataRecord); 1970 baos = new ByteArrayOutputStream(); 1971 for (Map<String, Object> fpItem : filePartList) { 1972 Map<String, Object> fp = readPart(fpItem); 1973 Object fpData = null; 1974 1975 if(fp == null){ 1976 fpData = new byte[]{}; 1977 }else{ 1978 fpData = fp.get("file_part_data"); 1979 } 1980 1981 if (fpData instanceof byte[]) { 1982 byte[] partData = (byte[]) fpData; 1983 baos.write(decrypt(partData)); 1984 baos.flush(); 1985 } 1986 } 1987 return baos == null ? null : baos.toByteArray(); 1988 } catch (Exception e) { 1989 throw new IOException(e.getMessage(), e); 1990 } finally { 1991 try { 1992 if (baos != null) 1993 baos.close(); 1994 } catch (Exception e) { 1995 log.error(e.getMessage(), e); 1996 } 1997 try { 1998 if (executor != null) 1999 executor.close(); 2000 } catch (Exception e) { 2001 log.error(e.getMessage(), e); 2002 } 2003 } 2004 } 2005 2006 @Override 2007 public boolean isFile() throws IOException { 2008 Map<String, Object> lastRecord = getFirstFilePart(); 2009 2010 if (lastRecord == null) { 2011 return false; 2012 } 2013 return lastRecord.get("file_type").equals("F"); 2014 } 2015 2016 @Override 2017 public boolean isDirectory() throws IOException { 2018 Map<String, Object> lastRecord = getFirstFilePart(); 2019 2020 if (lastRecord == null) { 2021 return false; 2022 } 2023 return lastRecord.get("file_type").equals("D"); 2024 } 2025 2026 @Override 2027 public boolean isLink() throws IOException { 2028 Map<String, Object> lastRecord = getFirstFilePart(); 2029 2030 if (lastRecord == null) { 2031 return false; 2032 } 2033 return lastRecord.get("file_type").equals("L"); 2034 } 2035 2036 @Override 2037 public String readAllString() throws IOException { 2038 ByteArrayOutputStream baos = null; 2039 try { 2040 byte[] bytes = readAllBytes(); 2041 2042 if(bytes == null) return null; 2043 2044 baos = new ByteArrayOutputStream(); 2045 baos.write(bytes); 2046 baos.flush(); 2047 return baos.toString(); 2048 } finally { 2049 if (baos != null) { 2050 try { 2051 baos.close(); 2052 } catch (IOException e) { 2053 log.error(e.getMessage(), e); 2054 } 2055 } 2056 } 2057 } 2058 2059 @Override 2060 public long size() throws IOException { 2061 Map<String, Object> record = getCompletedRecord(); 2062 if (record != null) { 2063 return Long.parseLong(record.get("file_size") + ""); 2064 } 2065 return 0L; 2066 } 2067 2068 public List<Map<String, Object>> list() throws IOException { 2069 return list(getPath(), "%"); 2070 } 2071 2072 public List<Map<String, Object>> list(String likeFileName) throws IOException { 2073 return list(getPath(), likeFileName); 2074 } 2075 2076 public List<Map<String, Object>> list(String likeFolderName, String likeFileName) throws IOException { 2077 CacheDriverExecutor executor = null; 2078 Connection conn = null; 2079 try { 2080 conn = dataSourceReader.getConnection(); 2081 conn.setAutoCommit(true); 2082 executor = new CacheDriverExecutor(conn); 2083 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2084 if (CommonTools.isBlank(likeFileName)) { 2085 dataRecord.put("file_name", "%"); 2086 } else { 2087 dataRecord.put("file_name", likeFileName); 2088 } 2089 if (CommonTools.isBlank(likeFolderName)) { 2090 dataRecord.put("file_parent_path", "%"); 2091 } else { 2092 dataRecord.put("file_parent_path", likeFolderName); 2093 } 2094 return executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord); 2095 } catch (SQLException e) { 2096 throw new IOException(e.getMessage(), e); 2097 } finally { 2098 if (executor != null) { 2099 try { 2100 executor.close(); 2101 } catch (Exception e) { 2102 throw new IOException(e.getMessage(), e); 2103 } 2104 } 2105 } 2106 } 2107 2108 public void listAll(String likeFileName, CacheArray rows) throws IOException { 2109 listAll(getPath(), likeFileName, rows); 2110 } 2111 2112 public void listAll(CacheArray rows) throws IOException { 2113 listAll(getPath(), "%", rows); 2114 } 2115 2116 public void listAll(String likeFolderName, String likeFileName, CacheArray rows) throws IOException { 2117 CacheDriverExecutor executor = null; 2118 Connection conn = null; 2119 try { 2120 conn = dataSourceReader.getConnection(); 2121 conn.setAutoCommit(true); 2122 executor = new CacheDriverExecutor(conn); 2123 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2124 if (CommonTools.isBlank(likeFileName)) { 2125 dataRecord.put("file_name", "%"); 2126 } else { 2127 dataRecord.put("file_name", likeFileName); 2128 } 2129 if (CommonTools.isBlank(likeFolderName)) { 2130 dataRecord.put("file_parent_path", "%"); 2131 } else { 2132 dataRecord.put("file_parent_path", likeFolderName); 2133 } 2134 executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL), dataRecord, rows); 2135 } catch (SQLException e) { 2136 throw new IOException(e.getMessage(), e); 2137 } finally { 2138 if (executor != null) { 2139 try { 2140 executor.close(); 2141 } catch (Exception e) { 2142 throw new IOException(e.getMessage(), e); 2143 } 2144 } 2145 } 2146 } 2147 2148 protected void listAllForDelete(String likeFolderName, String likeFileName, CacheArray rows) throws IOException { 2149 CacheDriverExecutor executor = null; 2150 Connection conn = null; 2151 try { 2152 long deleteCutoffTimeMs = 0L; 2153 2154 if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime(); 2155 2156 Timestamp currentTime = new Clock().getSqlTimestamp(); 2157 2158 conn = dataSourceReader.getConnection(); 2159 conn.setAutoCommit(true); 2160 executor = new CacheDriverExecutor(conn); 2161 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2162 if (CommonTools.isBlank(likeFileName)) { 2163 dataRecord.put("file_name", "%"); 2164 } else { 2165 dataRecord.put("file_name", likeFileName); 2166 } 2167 if (CommonTools.isBlank(likeFolderName)) { 2168 dataRecord.put("file_parent_path", "%"); 2169 } else { 2170 dataRecord.put("file_parent_path", likeFolderName); 2171 } 2172 dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 2173 dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs)); 2174 dataRecord.put("end_time", currentTime); 2175 dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME)); 2176 executor.find(0, MAX_QUEUE_SIZE,formatSql(SQL_SELECT_FOR_LIST_ALL_DELETE), dataRecord, rows); 2177 } catch (SQLException e) { 2178 throw new IOException(e.getMessage(), e); 2179 } finally { 2180 if (executor != null) { 2181 try { 2182 executor.close(); 2183 } catch (Exception e) { 2184 throw new IOException(e.getMessage(), e); 2185 } 2186 } 2187 } 2188 } 2189 2190 protected void listAllForSync(String likeFolderName, String likeFileName, CacheArray rows) throws IOException { 2191 CacheDriverExecutor executor = null; 2192 Connection conn = null; 2193 try { 2194 long deleteCutoffTimeMs = 0L; 2195 2196 if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = SYNC_CUTOFF_TIME.getTime(); 2197 2198 Timestamp currentTime = new Clock().getSqlTimestamp(); 2199 2200 conn = dataSourceReader.getConnection(); 2201 conn.setAutoCommit(true); 2202 executor = new CacheDriverExecutor(conn); 2203 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2204 if (CommonTools.isBlank(likeFileName)) { 2205 dataRecord.put("file_name", "%"); 2206 } else { 2207 dataRecord.put("file_name", likeFileName); 2208 } 2209 if (CommonTools.isBlank(likeFolderName)) { 2210 dataRecord.put("file_parent_path", "%"); 2211 } else { 2212 dataRecord.put("file_parent_path", likeFolderName); 2213 } 2214 dataRecord.put("synced_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 2215 dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs)); 2216 dataRecord.put("end_time", currentTime); 2217 dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME)); 2218 executor.find(0, MAX_QUEUE_SIZE, formatSql(SQL_SELECT_FOR_LIST_ALL_SYNC), dataRecord, rows); 2219 } catch (SQLException e) { 2220 throw new IOException(e.getMessage(), e); 2221 } finally { 2222 if (executor != null) { 2223 try { 2224 executor.close(); 2225 } catch (Exception e) { 2226 throw new IOException(e.getMessage(), e); 2227 } 2228 } 2229 } 2230 } 2231 2232 public void setModifyUserId(String modifyUserId) { 2233 this.modifyUserId = modifyUserId; 2234 } 2235 2236 private String getDefaultModifyUserId() { 2237 if (CommonTools.isBlank(modifyUserId)) { 2238 return DEFAULE_MODIFY_USER_ID; 2239 } else { 2240 return modifyUserId; 2241 } 2242 } 2243 2244 @Override 2245 public void setModifyTime(Timestamp modifyTime) throws IOException { 2246 this.modifyTime = modifyTime; 2247 } 2248 2249 @Override 2250 public Timestamp getModifyTime() throws IOException { 2251 Map<String, Object> record = getFirstFilePart(); 2252 if (record != null) { 2253 return (Timestamp) record.get("file_modify_time"); 2254 } 2255 return null; 2256 } 2257 2258 public boolean isTimeout() throws IOException { 2259 return isTimeout(LOGIC_TIMEOUT_MS); 2260 } 2261 2262 @Override 2263 public void copyTo(String toPath) throws IOException { 2264 log.debug("CopyTo: {} -> {}", getPath(), toPath); 2265 if (!exists()) { 2266 log.mark(String.format("The remote file '%s' does not exist.", this.getPath())); 2267 } 2268 RemoteFile destRf = new RemoteFile(toPath); 2269 final Timestamp originModifyTime = this.getModifyTime(); 2270 destRf.setModifyUserId(modifyUserId); 2271 destRf.setModifyTime(originModifyTime); 2272 if (isLink()) { 2273 if (destRf.exists(true)) { 2274 destRf.forceDeleteLink(false); 2275 } 2276 destRf.createLink(readLink()); 2277 } 2278 if (isFile()) { 2279 final RemoteFile _destRf = destRf; 2280 if (_destRf.exists(true)) { 2281 _destRf.forceDeleteFile(false); 2282 } 2283 merge(new FilePart() { 2284 @Override 2285 protected void process(int partIndex, byte[] data) { 2286 try { 2287 _destRf.write(decrypt(data), partIndex > 0); 2288 } catch (Exception e) { 2289 log.error(e.getMessage(), e); 2290 } 2291 } 2292 2293 @Override 2294 protected void completed(int lastPartIndex, Timestamp modifyTime, long fileSize) { 2295 try { 2296 /**This is 0 bytes file**/ 2297 if (lastPartIndex == -1) { 2298 2299 if (_destRf.exists(true)) 2300 _destRf.forceDeleteFile(false); 2301 2302 _destRf.setModifyTime(originModifyTime); 2303 _destRf.createEmptyFile(); 2304 } else { 2305 _destRf.setModifyTime(originModifyTime); 2306 _destRf.complete(); 2307 } 2308 } catch (IOException e) { 2309 log.error(e.getMessage(), e); 2310 } 2311 } 2312 2313 @Override 2314 protected void ended(int lastPartIndex, long fileSize) { 2315 2316 } 2317 2318 }); 2319 } 2320 if (isDir()) { 2321 List<Map<String, Object>> copyList = list(getPath(), "%"); 2322 int size = copyList.size(); 2323 if (size > 0) { 2324 if (!destRf.exists(true)) { 2325 destRf.mkdirs(); 2326 } 2327 } 2328 String copyRootPath = getPath(); 2329 copyList = list(parsePath(getPath() + "/%"), "%"); 2330 if (copyList.size() > 0) { 2331 if (!destRf.exists(true)) { 2332 destRf.mkdirs(); 2333 } 2334 } 2335 for (Map<String, Object> item : copyList) { 2336 String itemParentPath = (String) item.get("file_parent_path"); 2337 String destParentPath = itemParentPath.replaceFirst(copyRootPath + "/", toPath); 2338 RemoteFile sourceRf = new RemoteFile( 2339 String.format("%s/%s", item.get("file_parent_path"), item.get("file_name"))); 2340 String destRfPath = new File(String.format("%s/%s", destParentPath, item.get("file_name"))) 2341 .getAbsolutePath(); 2342 if (sourceRf.isDir()) { 2343 RemoteFile tmpRf = new RemoteFile(destRfPath + "/"); 2344 tmpRf.setModifyTime(sourceRf.getModifyTime()); 2345 if (!tmpRf.exists(true)) { 2346 tmpRf.mkdirs(); 2347 } 2348 } else { 2349 sourceRf.copyTo(destRfPath); 2350 } 2351 } 2352 } 2353 } 2354 2355 @Override 2356 public void moveTo(String toPath) throws IOException { 2357 log.debug("MoveTo: {} -> {}", getPath(), toPath); 2358 RemoteFile destRf = new RemoteFile(toPath); 2359 copyTo(toPath); 2360 boolean exists = this.exists(true); 2361 if (exists && destRf.exists(true) && destRf.isCompleted()) { 2362 forceDeleteAll(true); 2363 } 2364 } 2365 2366 private void startFullAsyncThreadForDelete(boolean subFolders, long timerMs, String replaceTo, 2367 Runnable completedCallback) { 2368 RemoteFile the = this; 2369 try { 2370 if (syncRoot == null) { 2371 File replaceToFile = new File(replaceTo); 2372 if (replaceToFile.getParent() == null) { 2373 throw new IOException("Cannot execute delete task."); 2374 } 2375 String replaceToParent = replacePath(replaceToFile.getParent()); 2376 syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName())); 2377 } 2378 2379 CacheArray rows = new CacheArray(); 2380 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 2381 @Override 2382 public void execute(Integer index, Object o) { 2383 Map<String, Object> item = (Map<String, Object>) o; 2384 String fileName = (String) item.get("file_name"); 2385 String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName); 2386 Thread.currentThread() 2387 .setName(String.format("RemoteFile-startFullAsyncThreadForDelete-%s", fileName)); 2388 RemoteFile rf = new RemoteFile(fullPath); 2389 if(isDebug(rf)){ 2390 debugLog("CheckUsage",rf,this); 2391 } 2392 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 2393 rf.fileId = (String) item.get("file_id"); 2394 String toDf = rf.getPath(); 2395 if (!CommonTools.isBlank(replaceTo)) { 2396 if (subFolders) { 2397 toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/"); 2398 } else { 2399 toDf = fullPath.replaceFirst(getPath(), replaceTo + "/"); 2400 } 2401 } 2402 DiskFile df = new DiskFile(toDf); 2403 DiskFile.copyAttrs(df, false, the.syncRoot); 2404 executeSyncRunnable(this, item, df, rf); 2405 } else{ 2406 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 2407 } 2408 } 2409 2410 @Override 2411 public void terminated() { 2412 log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId()); 2413 } 2414 2415 @Override 2416 public void completed(Integer size) { 2417 Thread.currentThread().setName(String 2418 .format("RemoteFile-startFullAsyncThreadForDelete-completed-%s", the.origin.getName())); 2419 if (isStopSync()) { 2420 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start delete task thread."); 2421 } 2422 if (timerMs <= 0) { 2423 LoggerFactory.getLogger(RemoteFile.class).mark("Completed start delete task thread."); 2424 } 2425 if (!isStopSync() && timerMs > 0) { 2426 try { 2427 Thread.sleep(timerMs); 2428 if (!isStopSync()) { 2429 startFullAsyncThreadForDelete(!subFolders, timerMs, replaceTo, completedCallback); 2430 } 2431 } catch (Exception e) { 2432 log.error(e.getMessage(), e); 2433 } 2434 } 2435 if (completedCallback != null) { 2436 completedCallback.run(); 2437 } 2438 } 2439 2440 }; 2441 rows.filter(filter); 2442 listAllForDelete(getPath() + (subFolders ? "/%" : ""), "%", rows); 2443 } catch (Exception e) { 2444 log.error(e.getMessage(), e); 2445 if (timerMs > 0) { 2446 try { 2447 Thread.sleep(timerMs); 2448 startFullAsyncThreadForDelete(subFolders, timerMs, replaceTo, completedCallback); 2449 } catch (InterruptedException ee) { 2450 log.error(ee.getMessage(), ee); 2451 } 2452 } 2453 } 2454 } 2455 2456 private void startFullAsyncThread(boolean subFolders, long timerMs, String replaceTo, Runnable completedCallback) { 2457 RemoteFile the = this; 2458 try { 2459 if (syncRoot == null) { 2460 File replaceToFile = new File(replaceTo); 2461 if (replaceToFile.getParent() == null) { 2462 throw new IOException("Cannot sync the root path."); 2463 } 2464 String replaceToParent = replacePath(replaceToFile.getParent()); 2465 syncRoot = new File(String.format("%s/%s", replaceToParent, replaceToFile.getName())); 2466 } 2467 2468 CacheArray rows = new CacheArray(); 2469 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 2470 @Override 2471 public void execute(Integer index, Object o) { 2472 Map<String, Object> item = (Map<String, Object>) o; 2473 String fileName = (String) item.get("file_name"); 2474 Thread.currentThread().setName(String.format("RemoteFile-startFullAsyncThread-%s", fileName)); 2475 String fullPath = String.format("%s/%s", item.get("file_parent_path"), fileName); 2476 RemoteFile rf = new RemoteFile(fullPath); 2477 if(isDebug(the)){ 2478 debugLog("CheckUsage",the,this); 2479 } 2480 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 2481 String toDf = rf.getPath(); 2482 if (!CommonTools.isBlank(replaceTo)) { 2483 if (subFolders) { 2484 toDf = fullPath.replaceFirst(getPath() + "/", replaceTo + "/"); 2485 } else { 2486 toDf = fullPath.replaceFirst(getPath(), replaceTo + "/"); 2487 } 2488 } 2489 DiskFile df = new DiskFile(toDf); 2490 DiskFile.copyAttrs(df, false, the.syncRoot); 2491 executeSyncRunnable(this, item, df, rf); 2492 } else{ 2493 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 2494 } 2495 } 2496 2497 @Override 2498 public void terminated() { 2499 log.warn("Terminated: {}-{}", Thread.currentThread().getName(), Thread.currentThread().getId()); 2500 } 2501 2502 @Override 2503 public void completed(Integer size) { 2504 Thread.currentThread().setName( 2505 String.format("RemoteFile-startFullAsyncThread-completed-%s", the.origin.getName())); 2506 if (isStopSync()) { 2507 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped start full async thread."); 2508 } 2509 if (timerMs <= 0) { 2510 LoggerFactory.getLogger(RemoteFile.class).mark("Completed start full async thread."); 2511 } 2512 if (!isStopSync() && timerMs > 0) { 2513 try { 2514 Thread.sleep(timerMs); 2515 if (!isStopSync()) { 2516 startFullAsyncThread(!subFolders, timerMs, replaceTo, completedCallback); 2517 } 2518 } catch (Exception e) { 2519 log.error(e.getMessage(), e); 2520 } 2521 } 2522 if (completedCallback != null) { 2523 completedCallback.run(); 2524 } 2525 } 2526 2527 }; 2528 rows.filter(filter); 2529 listAllForSync(getPath() + (subFolders ? "/%" : ""), "%", rows); 2530 } catch (Exception e) { 2531 log.error(e.getMessage(), e); 2532 if (timerMs > 0) { 2533 try { 2534 Thread.sleep(timerMs); 2535 startFullAsyncThread(subFolders, timerMs, replaceTo, completedCallback); 2536 } catch (InterruptedException ee) { 2537 log.error(ee.getMessage(), ee); 2538 } 2539 } 2540 } 2541 } 2542 2543 public void startFullAsync(long timerMs) throws IOException { 2544 startFullAsync(timerMs, getPath() + "/", null); 2545 } 2546 2547 public void startFullAsync(long timerMs, Runnable completedCallback) throws IOException { 2548 startFullAsync(timerMs, getPath() + "/", completedCallback); 2549 } 2550 2551 public void startFullAsync(long timerMs, String replaceTo) throws IOException { 2552 startFullAsync(timerMs, replaceTo, null); 2553 } 2554 2555 public void startFullAsync(long timerMs, String replaceTo, Runnable completedCallback) throws IOException { 2556 2557 stopSync = false; 2558 2559 RemoteFile the = this; 2560 Clock clock = new Clock(); 2561 Timestamp toTime = new Timestamp(clock.getTime() - timerMs); 2562 Executors.newFixedThreadPool(1).execute(new Runnable() { 2563 2564 @Override 2565 public void run() { 2566 startFullAsyncThread(false, timerMs, replaceTo, completedCallback); 2567 } 2568 2569 }); 2570 2571 Executors.newFixedThreadPool(1).execute(new Runnable() { 2572 2573 @Override 2574 public void run() { 2575 startFullAsyncThreadForDelete(false, timerMs, replaceTo, completedCallback); 2576 } 2577 2578 }); 2579 } 2580 2581 public DriverDataSource getDataSourceReader() { 2582 return dataSourceReader; 2583 } 2584 2585 public DriverDataSource getDataSourceWriter() { 2586 return dataSourceWriter; 2587 } 2588 2589 public ConfigProperties getConfigProperties() { 2590 return configProperties; 2591 } 2592 2593 private byte[] encrypt(byte[] data) throws Exception { 2594 boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false); 2595 if (useEncrypt) { 2596 String aesKey = configProperties.getString("EncryptAesKey"); 2597 return CipherTools.AESEncrypt(aesKey,data); 2598 } else { 2599 return data; 2600 } 2601 } 2602 2603 private byte[] decrypt(byte[] encData) throws Exception { 2604 boolean useEncrypt = configProperties.getBoolean("EncryptEnable", false); 2605 if (useEncrypt) { 2606 String aesKey = configProperties.getString("EncryptAesKey"); 2607 return CipherTools.AESDecrypt(aesKey,encData); 2608 } else { 2609 return encData; 2610 } 2611 } 2612 2613 public String getSyncedOnHostnames() throws IOException { 2614 Map<String, Object> record = getCompletedRecord(); 2615 if (record != null) { 2616 return (String) record.get("synced_on_hostnames"); 2617 } 2618 return null; 2619 } 2620 2621 public String getDeletedOnHostnames() throws IOException { 2622 Map<String, Object> record = getCompletedRecord(); 2623 if (record != null) { 2624 return (String) record.get("deleted_on_hostnames"); 2625 } 2626 return null; 2627 } 2628 2629 public String getSourceHostname() throws IOException { 2630 Map<String, Object> record = getCompletedRecord(); 2631 if (record != null) { 2632 return (String) record.get("source_hostname"); 2633 } 2634 return null; 2635 } 2636 2637 protected String getLastSourceHostname() throws IOException { 2638 Map<String, Object> record = getLastUpdateRecord(); 2639 if (record != null) { 2640 return (String) record.get("source_hostname"); 2641 } 2642 return null; 2643 } 2644 2645 protected Timestamp getLastOperateTime() throws IOException { 2646 Map<String, Object> record = getLastUpdateRecord(); 2647 if (record != null) { 2648 Timestamp updatedAt = (Timestamp) record.get("updated_at"); 2649 Timestamp deletedAt = (Timestamp) record.get("deleted_at"); 2650 2651 return deletedAt == null ? updatedAt : deletedAt; 2652 } 2653 return null; 2654 } 2655 2656 protected Timestamp getLastModifyTime() throws IOException { 2657 Map<String, Object> record = getLastUpdateRecord(); 2658 if (record != null) { 2659 return (Timestamp) record.get("file_modify_time"); 2660 } 2661 return null; 2662 } 2663 2664 protected boolean isLastOperateDelete() throws IOException { 2665 Map<String, Object> record = getLastUpdateRecord(); 2666 if (record != null) { 2667 Timestamp deletedAt = (Timestamp) record.get("deleted_at"); 2668 2669 return deletedAt == null ? false : true; 2670 } 2671 return false; 2672 } 2673 2674 protected boolean isLastOperateRealDelete() throws IOException { 2675 Map<String, Object> record = getLastUpdateRecord(); 2676 if (record != null) { 2677 Object deletedValue = record.get("file_deleted"); 2678 if (deletedValue != null) { 2679 if (deletedValue instanceof Boolean) { 2680 return (Boolean) deletedValue; 2681 } else { 2682 return Integer.parseInt(deletedValue + "") == 1; 2683 } 2684 } 2685 } 2686 return false; 2687 } 2688 2689 protected void listAllForScanDelete(Integer filePartDataTable, CacheArray rows) throws IOException { 2690 CacheDriverExecutor executor = null; 2691 Connection conn = null; 2692 try { 2693 long deleteCutoffTimeMs = 0L; 2694 2695 if(DELETE_CUTOFF_TIME != null) deleteCutoffTimeMs = DELETE_CUTOFF_TIME.getTime(); 2696 2697 Timestamp currentTime = new Clock().getSqlTimestamp(); 2698 conn = dataSourceReader.getConnection(); 2699 conn.setAutoCommit(true); 2700 executor = new CacheDriverExecutor(conn); 2701 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2702 dataRecord.put("file_parent_path", getPath()); 2703 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 2704 dataRecord.put("deleted_on_hostnames", "%;" + CommonTools.getHostname() + ";%"); 2705 dataRecord.put("begin_time", new Timestamp(deleteCutoffTimeMs)); 2706 dataRecord.put("end_time", currentTime); 2707 dataRecord.put("created_at", new Timestamp(currentTime.getTime() - BEFORE_THE_QUEUE_TIME)); 2708 dataRecord.put("file_part_data_table", filePartDataTable); 2709 executor.find(formatSql(SQL_SELECT_FOR_LIST_ALL_SCAN_DELETE), dataRecord, rows); 2710 } catch (SQLException e) { 2711 throw new IOException(e.getMessage(), e); 2712 } finally { 2713 if (executor != null) { 2714 try { 2715 executor.close(); 2716 } catch (Exception e) { 2717 throw new IOException(e.getMessage(), e); 2718 } 2719 } 2720 } 2721 } 2722 2723 protected boolean isSyncedOnHostname() throws IOException { 2724 Map<String, Object> record = getCompletedRecord(); 2725 if (record != null) { 2726 String syncedOnHostnames = (String) record.get("synced_on_hostnames"); 2727 return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1; 2728 } 2729 return false; 2730 } 2731 2732 protected boolean isSyncedOnHostname(Object id) throws IOException { 2733 Map<String, Object> record = getRecordById(id); 2734 if (record != null) { 2735 String syncedOnHostnames = (String) record.get("synced_on_hostnames"); 2736 return syncedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1; 2737 } 2738 return false; 2739 } 2740 2741 protected boolean isDeletedOnHostname(Object id) throws IOException { 2742 Map<String, Object> record = getRecordById(id); 2743 if (record != null) { 2744 String deletedOnHostnames = (String) record.get("deleted_on_hostnames"); 2745 return deletedOnHostnames.indexOf(";" + CommonTools.getHostname() + ";") > -1; 2746 } 2747 return false; 2748 } 2749 2750 public RemoteFile getParentFile() { 2751 return new RemoteFile(getParent()); 2752 } 2753 2754 private void executeSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final DiskFile df, 2755 final RemoteFile rf) { 2756 try{ 2757 if (!df.isLogicModify()) { 2758 df.logicModify(); 2759 if (mergePool == null) { 2760 LoggerFactory.getLogger(RemoteFile.class).mark("MAX_POOL_SIZE={}", MAX_POOL_SIZE); 2761 mergePool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 2762 } 2763 mergePool.execute(getSyncRunnable(filter, item, rf, df)); 2764 } 2765 }catch(Exception e){ 2766 LoggerFactory.getLogger(RemoteFile.class).error(e.getMessage(),e); 2767 } 2768 } 2769 2770 private Runnable getSyncRunnable(final CacheArrayFilter filter, final Map<String, Object> item, final RemoteFile rf, 2771 final DiskFile df) { 2772 Runnable runnable = new Runnable() { 2773 @Override 2774 public void run() { 2775 try { 2776 String fileName = (String) item.get("file_name"); 2777 Thread.currentThread() 2778 .setName(String.format("RemoteFile-getSyncRunnable-%s-%s", fileName, item.get("id"))); 2779 boolean isDebug = isDebug(rf); 2780 boolean isDeleted = false; 2781 Object deletedValue = item.get("file_deleted"); 2782 2783 if (deletedValue instanceof Boolean) { 2784 isDeleted = (Boolean) deletedValue; 2785 } else { 2786 isDeleted = Integer.parseInt(deletedValue + "") == 1; 2787 } 2788 2789 String clientHostname = CommonTools.getHostname(); 2790 String lastSourceHostname = rf.getLastSourceHostname(); 2791 2792 boolean isClientHostname = lastSourceHostname != null && !lastSourceHostname.equals(clientHostname); 2793 boolean rootExists = syncRoot != null && syncRoot.exists(); 2794 2795 if(!rootExists){ 2796 Logger.systemError(RemoteFile.class,"The root path '{}' does not exist.",syncRoot.getPath()); 2797 } 2798 2799 if (isDeleted && rootExists) { 2800 if (df.exists()) { 2801 if (isDebug) { 2802 LoggerFactory.getLogger(RemoteFile.class).mark("DeleteDiskFile - {}", df.getPath()); 2803 } 2804 df.delete(); 2805 rf.deletedToDiskAndSub(item.get("id")); 2806 } else { 2807 rf.deletedToDiskAndSub(item.get("id")); 2808 } 2809 df.removeLogicModify(); 2810 } 2811 2812 String sourceHostname = rf.getSourceHostname(); 2813 isClientHostname = sourceHostname != null && !sourceHostname.equals(clientHostname); 2814 boolean checkHaveParent = configProperties.getBoolean("CheckHaveParent", false); 2815 boolean haveParent = true; 2816 if (checkHaveParent) { 2817 haveParent = df.getOrigin().getParentFile().exists(); 2818 } 2819 String fileType = (String) item.get("file_type"); 2820 2821 if (!isDeleted && isClientHostname && haveParent && rootExists) { 2822 Timestamp rfModifyTime = rf.getModifyTime(); 2823 df.setModifyTimeMsFromClock(rfModifyTime.getTime()); 2824 2825 if (fileType.equals("L")) { 2826 if (df.exists() && !df.isLink()) { 2827 df.delete(); 2828 } 2829 if (df.exists()) { 2830 Timestamp dfModifyTime = df.getModifyTimeForClock(); 2831 if (rfModifyTime.getTime() > dfModifyTime.getTime()) { 2832 if (isDebug) { 2833 LoggerFactory.getLogger(RemoteFile.class).mark("ReCreateDiskLink - {}", 2834 df.getPath()); 2835 } 2836 df.delete(); 2837 if (df.createLink(rf.readLink())) { 2838 if (df.getOrigin().getName().startsWith(".")) { 2839 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2840 } 2841 rf.syncedToDisk(); 2842 } 2843 } else { 2844 rf.syncedToDisk(); 2845 } 2846 } else { 2847 if (isDebug) { 2848 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskLink - {}", df.getPath()); 2849 } 2850 if (df.createLink(rf.readLink())) { 2851 if (df.getOrigin().getName().startsWith(".")) { 2852 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2853 } 2854 rf.syncedToDisk(); 2855 } 2856 } 2857 df.removeLogicModify(); 2858 } 2859 if (fileType.equals("F")) { 2860 if (df.exists()) { 2861 Timestamp dfModifyTime = df.getModifyTimeForClock(); 2862 boolean changed = rfModifyTime.getTime() > dfModifyTime.getTime(); 2863 if (changed) { 2864 DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE); 2865 DiskFile.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot); 2866 writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime()); 2867 if (!writingDf.isLogicModify()) { 2868 writingDf.logicModify(); 2869 if (RemoteFile.addQueuePathMapping(rf.getPath())) { 2870 if (isDebug(rf)) { 2871 LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}", 2872 df.getPath()); 2873 } 2874 rf.writeToDisk(writingDf); 2875 } 2876 } 2877 } else { 2878 rf.syncedToDisk(); 2879 df.removeLogicModify(); 2880 } 2881 } else { 2882 if (rf.exists()) { 2883 DiskFile writingDf = new DiskFile(df.getPath() + TMP_WRITING_FILE); 2884 DiskFile.copyAttrs(writingDf, df.isCopyStructureOnly(), df.syncRoot); 2885 writingDf.setModifyTimeMsFromClock(rfModifyTime.getTime()); 2886 if (!writingDf.isLogicModify()) { 2887 writingDf.logicModify(); 2888 if (RemoteFile.addQueuePathMapping(rf.getPath())) { 2889 if (isDebug(rf)) { 2890 LoggerFactory.getLogger(RemoteFile.class).mark("DiskQueuing - {}", 2891 df.getPath()); 2892 } 2893 rf.writeToDisk(writingDf); 2894 } 2895 } 2896 } 2897 } 2898 } 2899 if (fileType.equals("D")) { 2900 if (rf.exists()) { 2901 if (isDebug) { 2902 LoggerFactory.getLogger(RemoteFile.class).mark("CreateDiskDir - {}", df.getPath()); 2903 } 2904 if (df.exists()) { 2905 if (df.isDir()) { 2906 rf.syncedToDisk(); 2907 } else { 2908 df.delete(); 2909 if (df.mkdirs()) { 2910 if (df.getOrigin().getName().startsWith(".")) { 2911 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2912 } 2913 rf.syncedToDisk(); 2914 } 2915 } 2916 } else { 2917 if (df.mkdirs()) { 2918 if (df.getOrigin().getName().startsWith(".")) { 2919 Files.setAttribute(Paths.get(df.getPath()), "dos:hidden", true); 2920 } 2921 rf.syncedToDisk(); 2922 } 2923 } 2924 } 2925 df.removeLogicModify(); 2926 } 2927 } 2928 } catch (Exception e) { 2929 log.error(e.getMessage(), e); 2930 } 2931 } 2932 }; 2933 return runnable; 2934 } 2935 2936 public void startArchiveForFileDeleted(long timerMs, String archiveTime) throws IOException { 2937 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 2938 startArchiveForFileDeleted(timerMs, archiveMs, null); 2939 } 2940 2941 public void startArchiveForFileDeleted(long timerMs, long archiveMs) throws IOException { 2942 startArchiveForFileDeleted(timerMs, archiveMs, null); 2943 } 2944 2945 public void startArchiveForFileDeleted(long timerMs, String archiveTime, Runnable completedCallback) 2946 throws IOException { 2947 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 2948 startArchiveForFileDeleted(timerMs, archiveMs, completedCallback); 2949 } 2950 2951 public void startArchiveForFileDeleted(long timerMs, long archiveMs, Runnable completedCallback) 2952 throws IOException { 2953 final RemoteFile the = this; 2954 if (archiveMs <= 0) { 2955 throw new IOException(String.format( 2956 "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs)); 2957 } 2958 2959 stopSync = false; 2960 2961 CacheDriverExecutor executor = null; 2962 Connection conn = null; 2963 try { 2964 Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", getPath())); 2965 conn = dataSourceReader.getConnection(); 2966 conn.setAutoCommit(true); 2967 executor = new CacheDriverExecutor(conn); 2968 Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs); 2969 Map<String, Object> dataRecord = new HashMap<String, Object>(); 2970 dataRecord.put("file_parent_path", getPath()); 2971 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 2972 dataRecord.put("before_deleted_at", beforeTime); 2973 CacheArray rows = new CacheArray(); 2974 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 2975 2976 @Override 2977 public void completed(Integer size) { 2978 if (isStopSync()) { 2979 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for file deleted."); 2980 } 2981 if (timerMs <= 0) { 2982 LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for file deleted."); 2983 } 2984 2985 if (completedCallback != null) { 2986 completedCallback.run(); 2987 } 2988 2989 if (!isStopSync() && timerMs > 0) { 2990 while(true){ 2991 2992 if(isStopSync()) break; 2993 2994 try { 2995 Thread.sleep(timerMs); 2996 if (!isStopSync()) { 2997 boolean allowed = checkUsage(the); 2998 if(allowed){ 2999 startArchiveForFileDeleted(timerMs, archiveMs, completedCallback); 3000 break; 3001 } 3002 } 3003 } catch (Exception e) { 3004 log.error(e.getMessage(), e); 3005 } 3006 } 3007 } 3008 3009 } 3010 3011 @Override 3012 public void execute(Integer index, Object o) { 3013 Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForFileDeleted-%s", index)); 3014 if(isDebug(the)){ 3015 debugLog("CheckUsage",the,this); 3016 } 3017 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 3018 CacheDriverExecutor _executor = null; 3019 try { 3020 Map<String, Object> item = (Map<String, Object>) o; 3021 boolean notDir = !item.get("file_type").equals("D"); 3022 if(notDir){ 3023 calculateDataMappingDataSourceReader(the, item); 3024 Object fileId = item.get("file_id"); 3025 Map<String, Object> dataRecord = new HashMap<String, Object>(); 3026 dataRecord.put("file_id", fileId); 3027 dataRecord.put("deleted_at", new Clock().getSqlTimestamp()); 3028 Thread.currentThread().setName(String.format("RemoteFile-archiveForFileDeleted-%s", fileId)); 3029 Connection _conn = dataMappingDataSourceReader.getConnection(); 3030 _conn.setAutoCommit(true); 3031 _executor = new CacheDriverExecutor(_conn); 3032 _executor.execute(formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA), dataRecord); 3033 } 3034 } catch (Exception e) { 3035 log.error(e.getMessage(), e); 3036 } finally { 3037 try { 3038 if (_executor != null) 3039 _executor.close(); 3040 } catch (Exception e) { 3041 log.error(e.getMessage(), e); 3042 } 3043 } 3044 }else{ 3045 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 3046 } 3047 } 3048 }; 3049 rows.filter(filter); 3050 final CacheDriverExecutor finalExecutor = executor; 3051 Executors.newFixedThreadPool(1).execute(new Runnable(){ 3052 @Override 3053 public void run(){ 3054 try{ 3055 finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_FILE_DELETED), dataRecord, rows); 3056 }catch(Exception e){ 3057 log.error(e.getMessage(),e); 3058 } 3059 } 3060 }); 3061 } catch (SQLException e) { 3062 throw new IOException(e.getMessage(), e); 3063 } finally { 3064 if (executor != null) { 3065 try { 3066 executor.close(); 3067 } catch (Exception e) { 3068 throw new IOException(e.getMessage(), e); 3069 } 3070 } 3071 } 3072 } 3073 3074 public void startArchiveForDeleted(long timerMs, String archiveTime) throws IOException { 3075 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 3076 startArchiveForDeleted(timerMs, archiveMs, null); 3077 } 3078 3079 public void startArchiveForDeleted(long timerMs, long archiveMs) throws IOException { 3080 startArchiveForDeleted(timerMs, archiveMs, null); 3081 } 3082 3083 public void startArchiveForDeleted(long timerMs, String archiveTime, Runnable completedCallback) 3084 throws IOException { 3085 long archiveMs = ConfigProperties.parseTimeToMs(archiveTime, 0L); 3086 startArchiveForDeleted(timerMs, archiveMs, completedCallback); 3087 } 3088 3089 public void startArchiveForDeleted(long timerMs, long archiveMs, Runnable completedCallback) throws IOException { 3090 final RemoteFile the = this; 3091 if (archiveMs <= 0) { 3092 throw new IOException(String.format( 3093 "This parameter 'archiveMs' cannot be '%s', please use a number greater than 0.", archiveMs)); 3094 } 3095 3096 stopSync = false; 3097 3098 CacheDriverExecutor executor = null; 3099 Connection conn = null; 3100 try { 3101 Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", getPath())); 3102 conn = dataSourceReader.getConnection(); 3103 conn.setAutoCommit(true); 3104 executor = new CacheDriverExecutor(conn); 3105 Timestamp beforeTime = new Timestamp(new Clock().getTime() - archiveMs); 3106 Map<String, Object> dataRecord = new HashMap<String, Object>(); 3107 dataRecord.put("file_parent_path", getPath()); 3108 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 3109 dataRecord.put("before_deleted_at", beforeTime); 3110 3111 CacheArray rows = new CacheArray(); 3112 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 3113 3114 @Override 3115 public void completed(Integer size) { 3116 if (isStopSync()) { 3117 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for deleted."); 3118 } 3119 if (timerMs <= 0) { 3120 LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for deleted."); 3121 } 3122 3123 if (completedCallback != null) { 3124 completedCallback.run(); 3125 } 3126 3127 if (!isStopSync() && timerMs > 0) { 3128 while(true){ 3129 3130 if(isStopSync()) break; 3131 3132 try { 3133 Thread.sleep(timerMs); 3134 if (!isStopSync()) { 3135 boolean allowed = checkUsage(the); 3136 if(allowed){ 3137 startArchiveForDeleted(timerMs, archiveMs, completedCallback); 3138 break; 3139 } 3140 } 3141 } catch (Exception e) { 3142 log.error(e.getMessage(), e); 3143 } 3144 } 3145 } 3146 } 3147 3148 @Override 3149 public void execute(Integer index, Object o) { 3150 Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForDeleted-%s", index)); 3151 CacheDriverExecutor dmExecutor = null; 3152 CacheDriverExecutor dmDelExecutor = null; 3153 CacheDriverExecutor executor = null; 3154 if(isDebug(the)){ 3155 debugLog("CheckUsage",the,this); 3156 } 3157 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 3158 try { 3159 Clock clock = new Clock(); 3160 Timestamp currentTime = clock.getSqlTimestamp(); 3161 Map<String, Object> item = (Map<String, Object>) o; 3162 Object fileId = item.get("file_id"); 3163 Thread.currentThread().setName(String.format("RemoteFile-archiveForDeleted-%s", fileId)); 3164 calculateDataMappingDataSourceReader(the, item); 3165 Map<String, Object> dataRecord = null; 3166 boolean haveSubFiles = false; 3167 /**Need use dataMappingDataSourceReader**/ 3168 if (item.get("file_type").equals("D")) { 3169 int count = countAllSubFiles(item.get("file_parent_path") + "", 3170 item.get("file_name") + ""); 3171 haveSubFiles = count > 0; 3172 } 3173 if (!haveSubFiles) { 3174 if (!item.get("file_type").equals("D")) { 3175 Connection dmConn = dataMappingDataSourceReader.getConnection(); 3176 dmConn.setAutoCommit(true); 3177 dmExecutor = new CacheDriverExecutor(dmConn); 3178 dataRecord = new HashMap<String, Object>(); 3179 dataRecord.put("file_id", item.get("file_id")); 3180 dataRecord.put("deleted_at", currentTime); 3181 dmExecutor.execute( 3182 formatSqlForFilePartData(SQL_UPDATE_BY_FILE_ID_FOR_CLEAR_FILE_DATA), 3183 dataRecord); 3184 } 3185 Connection conn = dataSourceWriter.getConnection(); 3186 conn.setAutoCommit(true); 3187 executor = new CacheDriverExecutor(conn); 3188 dataRecord = new HashMap<String, Object>(); 3189 dataRecord.put("file_id", item.get("file_id")); 3190 executor.execute(formatSql(SQL_DELETE_BY_FILE_ID_FROM_FILE), dataRecord); 3191 } 3192 3193 Connection dmDelConn = dataMappingDataSourceReader.getConnection(); 3194 dmDelConn.setAutoCommit(true); 3195 dmDelExecutor = new CacheDriverExecutor(dmDelConn); 3196 dataRecord = new HashMap<String, Object>(); 3197 dataRecord.put("deleted_at", new Timestamp(currentTime.getTime() - archiveMs)); 3198 dmDelExecutor.execute(formatSqlForFilePartData(SQL_DELETE_BY_RF_ID_FROM_FILE_DATA), 3199 dataRecord); 3200 } catch (Exception e) { 3201 log.error(e.getMessage(), e); 3202 } finally { 3203 try { 3204 if (dmExecutor != null) 3205 dmExecutor.close(); 3206 } catch (Exception e) { 3207 log.error(e.getMessage(), e); 3208 } 3209 try { 3210 if (dmDelExecutor != null) 3211 dmDelExecutor.close(); 3212 } catch (Exception e) { 3213 log.error(e.getMessage(), e); 3214 } 3215 try { 3216 if (executor != null) 3217 executor.close(); 3218 } catch (Exception e) { 3219 log.error(e.getMessage(), e); 3220 } 3221 } 3222 3223 }else{ 3224 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 3225 } 3226 } 3227 }; 3228 rows.filter(filter); 3229 final CacheDriverExecutor finalExecutor = executor; 3230 Executors.newFixedThreadPool(1).execute(new Runnable(){ 3231 @Override 3232 public void run(){ 3233 try{ 3234 finalExecutor.find(formatSql(SQL_SELECT_FOR_ARCHIVE_DELETED), dataRecord, rows); 3235 }catch(Exception e){ 3236 log.error(e.getMessage(),e); 3237 } 3238 } 3239 }); 3240 } catch (SQLException e) { 3241 throw new IOException(e.getMessage(), e); 3242 } finally { 3243 if (executor != null) { 3244 try { 3245 executor.close(); 3246 } catch (Exception e) { 3247 throw new IOException(e.getMessage(), e); 3248 } 3249 } 3250 } 3251 } 3252 3253 public void startArchiveForTimeout(long timerMs) throws IOException { 3254 startArchiveForTimeout(timerMs,null); 3255 } 3256 3257 public void startArchiveForTimeout(long timerMs,Runnable completedCallback) throws IOException { 3258 final RemoteFile the = this; 3259 3260 stopSync = false; 3261 3262 CacheDriverExecutor executor = null; 3263 Connection conn = null; 3264 try { 3265 Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", getPath())); 3266 conn = dataSourceReader.getConnection(); 3267 conn.setAutoCommit(true); 3268 executor = new CacheDriverExecutor(conn); 3269 Timestamp beforeTime = new Timestamp(new Clock().getTime() - (LOGIC_TIMEOUT_MS*BATCH_SIZE)); 3270 Map<String, Object> dataRecord = new HashMap<String, Object>(); 3271 dataRecord.put("file_parent_path", getPath()); 3272 dataRecord.put("like_file_parent_path", parsePath(getPath() + "/%")); 3273 dataRecord.put("before_updated_at", beforeTime); 3274 3275 CacheArray rows = new CacheArray(); 3276 CacheArrayFilter filter = new CacheArrayFilter(CACHE_ARRAY_FILTER_TIMER) { 3277 3278 @Override 3279 public void completed(Integer size) { 3280 if (isStopSync()) { 3281 LoggerFactory.getLogger(RemoteFile.class).mark("Stoped archive for timeout deleted."); 3282 } 3283 if (timerMs <= 0) { 3284 LoggerFactory.getLogger(RemoteFile.class).mark("Completed archive for timeout deleted."); 3285 } 3286 3287 if (completedCallback != null) { 3288 completedCallback.run(); 3289 } 3290 3291 if (!isStopSync() && timerMs > 0) { 3292 while(true){ 3293 3294 if(isStopSync()) break; 3295 3296 try { 3297 Thread.sleep(timerMs); 3298 if (!isStopSync()) { 3299 boolean allowed = checkUsage(the); 3300 if(allowed){ 3301 startArchiveForTimeout(timerMs, completedCallback); 3302 break; 3303 } 3304 } 3305 } catch (Exception e) { 3306 log.error(e.getMessage(), e); 3307 } 3308 } 3309 } 3310 3311 } 3312 3313 @Override 3314 public void execute(Integer index, Object o) { 3315 Thread.currentThread().setName(String.format("RemoteFile-execute-archiveForTimeout-%s", index)); 3316 if(isDebug(the)){ 3317 debugLog("CheckUsage",the,this); 3318 } 3319 CacheDriverExecutor executor = null; 3320 if (memoryUsage() < RemoteFile.MAX_MEMORY_USAGE) { 3321 try { 3322 Clock clock = new Clock(); 3323 Timestamp currentTime = clock.getSqlTimestamp(); 3324 Map<String, Object> item = (Map<String, Object>) o; 3325 Object fileId = item.get("file_id"); 3326 Thread.currentThread().setName(String.format("RemoteFile-archiveForTimeout-%s", fileId)); 3327 Connection conn = dataSourceWriter.getConnection(); 3328 conn.setAutoCommit(true); 3329 executor = new CacheDriverExecutor(conn); 3330 Map<String,Object> dataRecord = new HashMap<String, Object>(); 3331 dataRecord.put("file_id",fileId); 3332 dataRecord.put("deleted_at",currentTime); 3333 dataRecord.put("deleted_user_id",getDefaultModifyUserId()); 3334 executor.execute(formatSql(SQL_UPDATE_FOR_TIMEOUT_DELETE), dataRecord); 3335 } catch (Exception e) { 3336 log.error(e.getMessage(), e); 3337 } finally { 3338 try { 3339 if (executor != null) 3340 executor.close(); 3341 } catch (Exception e) { 3342 log.error(e.getMessage(), e); 3343 } 3344 } 3345 3346 }else{ 3347 log.warn("Over max memory usage 'MAX_MEMORY_USAGE={}'.",MAX_MEMORY_USAGE); 3348 } 3349 } 3350 }; 3351 rows.filter(filter); 3352 final CacheDriverExecutor finalExecutor = executor; 3353 Executors.newFixedThreadPool(1).execute(new Runnable(){ 3354 @Override 3355 public void run(){ 3356 try{ 3357 finalExecutor.find(formatSql(SQL_SELECT_FOR_ALL_TIMEOUT), dataRecord, rows); 3358 }catch(Exception e){ 3359 log.error(e.getMessage(),e); 3360 } 3361 } 3362 }); 3363 } catch (SQLException e) { 3364 throw new IOException(e.getMessage(), e); 3365 } finally { 3366 if (executor != null) { 3367 try { 3368 executor.close(); 3369 } catch (Exception e) { 3370 throw new IOException(e.getMessage(), e); 3371 } 3372 } 3373 } 3374 } 3375 3376 public static Map<String, DriverDataSource> getDriverDataSourceMap() { 3377 return Collections.synchronizedMap(DATASOURCE_REMOTE_FILE); 3378 } 3379 3380 private static synchronized String generateId(boolean identityCardEnable, String identityCardName, 3381 String identityCardPrefix, int identityCardSuffix) throws SQLException { 3382 if (identityCardEnable) { 3383 if (identityCard == null) { 3384 identityCard = new IdentityCard(identityCardName); 3385 } 3386 return identityCard.generateId(16, identityCardPrefix, "", identityCardSuffix); 3387 } else { 3388 return CommonTools.generateId(16); 3389 } 3390 } 3391 3392 private static synchronized String generateDataId(boolean identityCardEnable, String identityCardName, 3393 int identityCardSuffix, RemoteFile theRf) throws SQLException { 3394 if (identityCardEnable) { 3395 if (identityCard == null) { 3396 identityCard = new IdentityCard(identityCardName + "Data"); 3397 } 3398 return identityCard.generateId(16, String.format("%04d", theRf.dataMappingTable), "", identityCardSuffix); 3399 } else { 3400 return CommonTools.generateId(16); 3401 } 3402 } 3403 3404 protected static synchronized boolean addQueuePathMapping(String path) { 3405 Long addTimeMs = null; 3406 3407 boolean allowed = checkQueuePathMapping(); 3408 3409 if (!allowed) 3410 return false; 3411 3412 addTimeMs = QUEUE_PATH_MAPPING.get(path); 3413 3414 if (addTimeMs == null) { 3415 QUEUE_PATH_MAPPING.put(path, System.currentTimeMillis()); 3416 return true; 3417 } 3418 3419 return false; 3420 } 3421 3422 public static synchronized boolean checkQueuePathMapping() { 3423 Long addTimeMs = null; 3424 Map<String, Long> copyQueueMap = getQueuePathMapping(); 3425 List<String> keyList = new ArrayList<String>(copyQueueMap.keySet()); 3426 for (String key : keyList) { 3427 addTimeMs = copyQueueMap.get(key); 3428 if (addTimeMs != null) { 3429 boolean timeout = (System.currentTimeMillis() - addTimeMs) >= LOGIC_TIMEOUT_MS; 3430 3431 if (timeout) { 3432 QUEUE_PATH_MAPPING.remove(key); 3433 } 3434 } 3435 } 3436 3437 if (QUEUE_PATH_MAPPING.keySet().size() >= MAX_QUEUE_SIZE) 3438 return false; 3439 3440 return true; 3441 } 3442 3443 protected static synchronized void removeQueuePathMapping(String path) { 3444 QUEUE_PATH_MAPPING.remove(path); 3445 } 3446 3447 public static synchronized Map<String, Long> getQueuePathMapping() { 3448 return Collections.synchronizedMap(QUEUE_PATH_MAPPING); 3449 } 3450 3451 protected static synchronized void loadDataMapping(RemoteFile rf) throws Exception { 3452 List<String> dmArray = rf.configProperties.getArray("DataMapping", new String[] {}); 3453 int size = dmArray.size(); 3454 List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey); 3455 3456 if (dmList == null) 3457 dmList = new ArrayList<String>(); 3458 3459 if (!DATA_MAPPING_INDEX.containsKey(rf.dsKey)) 3460 DATA_MAPPING_INDEX.put(rf.dsKey, 0); 3461 3462 for (int i = 0; i < size; i++) { 3463 String dm = dmArray.get(i); 3464 String originKey = String.format("DataMapping[%s]", i); 3465 Map<String, Object> dmMap = rf.configProperties.getMap(originKey); 3466 String shortKey = String.format("%s.%s", rf.dsKey, dm); 3467 if (dmMap != null) { 3468 String dmKey = String.format("%s.%s", rf.dsKey, originKey); 3469 3470 if (!DATA_MAPPING.containsKey(shortKey)) { 3471 ConfigProperties dmCp = new ConfigProperties(); 3472 dmCp.putAllAndReturnSlef(dmMap); 3473 int priority = dmCp.getInteger(String.format("%s.Priority", originKey), 0); 3474 if (i == 0 && priority < 1) { 3475 throw new Exception("First DataMapping priority cannot be less than 1."); 3476 } 3477 String dataSourceFileOriginValue = dmCp.getString(String.format("%s.DataSource", originKey)); 3478 String dataSourceReaderFileOriginValue = dmCp 3479 .getString(String.format("%s.DataSourceReader", originKey), dataSourceFileOriginValue); 3480 String dataSourceWriterFileOriginValue = dmCp 3481 .getString(String.format("%s.DataSourceWriter", originKey), dataSourceFileOriginValue); 3482 3483 String dmDsReaderKey = String.format("%s.reader", CommonTools.md5(dataSourceReaderFileOriginValue)); 3484 String dmDsWriterKey = String.format("%s.writer", CommonTools.md5(dataSourceWriterFileOriginValue)); 3485 DriverDataSource dmDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey); 3486 DriverDataSource dmDataSourceWriter = DATASOURCE_REMOTE_FILE.get(dmDsWriterKey); 3487 if (dmDataSourceReader == null) { 3488 DATASOURCE_REMOTE_FILE.put(dmDsReaderKey, 3489 new DriverDataSource(new File(dataSourceReaderFileOriginValue))); 3490 } 3491 if (dmDataSourceWriter == null) { 3492 DATASOURCE_REMOTE_FILE.put(dmDsWriterKey, 3493 new DriverDataSource(new File(dataSourceWriterFileOriginValue))); 3494 } 3495 String tableRangOriginValue = dmCp.getString(String.format("%s.TableRange", originKey), "0"); 3496 ConfigProperties shortCp = new ConfigProperties(); 3497 List<Integer> tableRange = new ArrayList<Integer>(); 3498 if (tableRangOriginValue.contains("-")) { 3499 String[] ranges = tableRangOriginValue.split("-"); 3500 Integer begin = Integer.parseInt(ranges[0].trim()); 3501 Integer end = Integer.parseInt(ranges[1].trim()); 3502 for (int j = begin; j <= end; j++) { 3503 tableRange.add(begin + j); 3504 } 3505 } else if (tableRangOriginValue.contains(",")) { 3506 String[] ranges = tableRangOriginValue.split(","); 3507 int rangesLen = ranges.length; 3508 for (int j = 0; j < rangesLen; j++) { 3509 tableRange.add(Integer.parseInt(ranges[j].trim())); 3510 } 3511 } else { 3512 tableRange.add(Integer.parseInt(tableRangOriginValue.trim())); 3513 } 3514 shortCp.put("TableRange", tableRange); 3515 shortCp.putAndReturnSlef("DataSourceReader", dmDsReaderKey); 3516 shortCp.putAndReturnSlef("DataSourceWriter", dmDsWriterKey); 3517 shortCp.putAndReturnSlef("Priority", priority); 3518 shortCp.putAndReturnSlef("UsingCount", 0); 3519 shortCp.putAndReturnSlef("Name", dm); 3520 int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size()); 3521 shortCp.putAndReturnSlef("Table", tableRange.get(randomIndex)); 3522 DATA_MAPPING.put(shortKey, shortCp); 3523 dmList.add(dm); 3524 DATA_MAPPING_LIST.put(rf.dsKey, dmList); 3525 } 3526 } 3527 } 3528 3529 } 3530 3531 protected static synchronized void calculateDataMappingDataSourceReader(RemoteFile rf, 3532 Map<String, Object> rfRecord) { 3533 rf.dataMappingDs = (String) rfRecord.get("file_part_data_ds"); 3534 rf.dataMappingTable = (Integer) rfRecord.get("file_part_data_table"); 3535 String shortKey = String.format("%s.%s", rf.dsKey, rf.dataMappingDs); 3536 ConfigProperties cp = DATA_MAPPING.get(shortKey); 3537 if (cp != null) { 3538 String dmDsReaderKey = cp.getString("DataSourceReader"); 3539 rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(dmDsReaderKey); 3540 } 3541 } 3542 3543 protected static synchronized void calculateDataMappingDataSourceWriter(RemoteFile rf) { 3544 if (DATA_MAPPING_INDEX.get(rf.dsKey) >= DATA_MAPPING_LIST.size() - 1) { 3545 DATA_MAPPING_INDEX.put(rf.dsKey, 0); 3546 } 3547 ConfigProperties cp = getDataMapping(rf, DATA_MAPPING_INDEX.get(rf.dsKey)); 3548 List<Integer> tableRange = (List<Integer>) cp.get("TableRange"); 3549 Integer priority = cp.getInteger("Priority", 0); 3550 Integer usingCount = cp.getInteger("UsingCount", 0); 3551 String key = String.format("%s.%s", rf.dsKey, cp.getString("Name")); 3552 if (usingCount < priority) { 3553 rf.dataMappingDataSourceReader = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceReader")); 3554 rf.dataMappingDataSourceWriter = DATASOURCE_REMOTE_FILE.get(cp.getString("DataSourceWriter")); 3555 rf.dataMappingDs = cp.getString("Name"); 3556 rf.dataMappingTable = cp.getInteger("Table"); 3557 usingCount++; 3558 if (usingCount >= priority) { 3559 DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1); 3560 cp.putAndReturnSlef("UsingCount", 0); 3561 int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size()); 3562 cp.putAndReturnSlef("Table", tableRange.get(randomIndex)); 3563 DATA_MAPPING.put(key, cp); 3564 } else { 3565 cp.putAndReturnSlef("UsingCount", usingCount); 3566 int randomIndex = ThreadLocalRandom.current().nextInt(0, tableRange.size()); 3567 cp.putAndReturnSlef("Table", tableRange.get(randomIndex)); 3568 DATA_MAPPING.put(key, cp); 3569 } 3570 } else { 3571 DATA_MAPPING_INDEX.put(rf.dsKey, DATA_MAPPING_INDEX.get(rf.dsKey) + 1); 3572 calculateDataMappingDataSourceWriter(rf); 3573 } 3574 } 3575 3576 protected static synchronized ConfigProperties getDataMapping(RemoteFile rf, int index) { 3577 List<String> dmList = DATA_MAPPING_LIST.get(rf.dsKey); 3578 String dm = dmList.get(index); 3579 String key = String.format("%s.%s", rf.dsKey, dm); 3580 return DATA_MAPPING.get(key); 3581 } 3582 3583 protected String getDataMappingTableName() { 3584 return String.format("%s_data_%s", this.tableName, this.dataMappingTable); 3585 } 3586 3587 private static void debugLog(String point,RemoteFile rf,CacheArrayFilter filter){ 3588 debugLog(RemoteFile.class,point,rf,filter); 3589 } 3590 3591}