001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.datasource; 025 026import java.sql.Connection; 027import java.sql.PreparedStatement; 028import java.util.List; 029import java.sql.SQLException; 030import java.sql.Types; 031import java.util.Map; 032import java.sql.ResultSet; 033import java.sql.ResultSetMetaData; 034import java.util.HashMap; 035import java.util.ArrayList; 036import java.util.regex.Pattern; 037import java.util.regex.Matcher; 038import java.util.Arrays; 039import com.killcoding.tool.ResultMap; 040import java.sql.DatabaseMetaData; 041import com.killcoding.log.LoggerFactory; 042import com.killcoding.log.Logger; 043import java.util.concurrent.Executors; 044import com.killcoding.datasource.DriverConnection; 045import com.killcoding.datasource.DriverDataSource; 046import java.util.concurrent.ConcurrentHashMap; 047import com.killcoding.tool.CommonTools; 048import com.killcoding.tool.ConfigProperties; 049import java.io.File; 050import java.text.DateFormat; 051import java.text.SimpleDateFormat; 052import com.killcoding.tool.FileTools; 053import java.io.IOException; 054import java.nio.file.Files; 055import java.util.Date; 056import java.nio.file.Paths; 057import java.nio.file.Path; 058import java.nio.file.StandardCopyOption; 059import com.killcoding.cache.CacheArray; 060import com.killcoding.cache.CacheArrayFilter; 061import com.killcoding.tool.CodeEscape; 062import java.sql.Blob; 063import java.util.stream.Collectors; 064import java.net.URI; 065import java.util.Comparator; 066import java.sql.Timestamp; 067import java.sql.Clob; 068import java.io.InputStream; 069import java.sql.CallableStatement; 070 071/** 072 * This class is execute sql base class. 073 * Support database replication. 074 * Support database multi-activity. 075 * Support database CRUD 076 * */ 077public class DriverExecutor { 078 079 protected final static Map<Integer, List<DriverExecutor>> SYNC_EXECUTOR_MARK = new ConcurrentHashMap<Integer, List<DriverExecutor>>(); 080 private final static Map<Integer, Long> SYNC_CONN_ERROR_TIME = new ConcurrentHashMap<Integer, Long>(); 081 082 private final static Map<String,String> SQL_LOG_MSG_MAPPING = new ConcurrentHashMap<String,String>(); 083 private final static Map<String,Boolean> SQL_LOG_OVERSPEND_MAPPING = new ConcurrentHashMap<String,Boolean>(); 084 085 protected Logger log = null; 086 087 public final static String COLUMN_NAME_CASE_UPPER = "UPPER"; 088 public final static String COLUMN_NAME_CASE_LOWER = "LOWER"; 089 public final static String COLUMN_NAME_CASE_ORIGINAL = "ORIGINAL"; 090 public static String COLUMN_NAME_CASE_MODE = COLUMN_NAME_CASE_ORIGINAL; 091 092 protected boolean closed = true; 093 protected Connection connection = null; 094 private static CacheArray sqlLogCacheArray = null; 095 096 /** 097 * New a DriverExecutor object 098 * @param connection - JDBC connection 099 * */ 100 public DriverExecutor(Connection connection) { 101 super(); 102 log = LoggerFactory.getLogger(this.getClass()); 103 this.connection = connection; 104 writeSqlLog("open", 0, "open", ""); 105 closed = (this.connection == null); 106 } 107 108 /** 109 * Get current Connection 110 * @return Connection 111 * */ 112 public Connection getConnection() { 113 return connection; 114 } 115 116 /** 117 * Get column classes by table name 118 * @exception SQLException 119 * @return Map<String,Object> - column and java type class mapping 120 * @param tableName - table name 121 * */ 122 public Map<String, Object> getColumnClasses(String tableName) throws SQLException { 123 Map<String, Object> types = null; 124 ResultSet result = null; 125 PreparedStatement statement = null; 126 String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName); 127 statement = connection.prepareStatement(sql); 128 result = statement.executeQuery(); 129 final ResultSetMetaData rsmd = result.getMetaData(); 130 for (int i = 0; i < rsmd.getColumnCount(); i++) { 131 if (types == null) { 132 types = new ResultMap<String, Object>(); 133 } 134 String cn = rsmd.getColumnClassName(i + 1); 135 types.put(converCase(rsmd.getColumnLabel(i + 1)), cn); 136 } 137 return types; 138 } 139 140 /** 141 * Get column db data types by table name 142 * @exception SQLException 143 * @return Map<String, Object> - column and db data type mapping 144 * @param tableName - Table name 145 * */ 146 public Map<String, Object> getColumnTypes(String tableName) throws SQLException { 147 ResultSet result = null; 148 PreparedStatement statement = null; 149 String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName); 150 Map<String, Object> types = null; 151 statement = connection.prepareStatement(sql); 152 result = statement.executeQuery(); 153 final ResultSetMetaData rsmd = result.getMetaData(); 154 for (int i = 0; i < rsmd.getColumnCount(); i++) { 155 if (types == null) { 156 types = new ResultMap<String, Object>(); 157 } 158 types.put(converCase(rsmd.getColumnLabel(i + 1)), rsmd.getColumnTypeName(i + 1)); 159 } 160 return types; 161 } 162 163 /** 164 * Show column data type and java type mapping 165 * @exception SQLException 166 * @return List<Map<String, Object>> - Mapping list 167 * @param tableName - Table name 168 * */ 169 public List<Map<String, Object>> desc(String tableName) throws SQLException { 170 List<String> primaryKeys = getPrimaryKeys(tableName); 171 List<Map<String, Object>> results = new ArrayList<Map<String, Object>>(); 172 ResultSet result = null; 173 PreparedStatement statement = null; 174 String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName); 175 statement = connection.prepareStatement(sql); 176 result = statement.executeQuery(); 177 final ResultSetMetaData rsmd = result.getMetaData(); 178 for (int i = 0; i < rsmd.getColumnCount(); i++) { 179 Map<String, Object> types = new ResultMap<String, Object>(); 180 int ci = i + 1; 181 boolean nullable = rsmd.isNullable(ci) == 1; 182 String name = converCase(rsmd.getColumnLabel(ci)); 183 String isPk = "UNKNOWN"; 184 if (primaryKeys != null) { 185 isPk = primaryKeys.contains(name) ? "Y" : "N"; 186 } 187 types.put("NAME", name); 188 types.put("PRIMARY_KEY", isPk); 189 types.put("DATA_TYPE", rsmd.getColumnTypeName(ci)); 190 types.put("JAVA_TYPE", rsmd.getColumnClassName(ci)); 191 types.put("PRECISION", rsmd.getPrecision(ci)); 192 types.put("ALLOW_NULLABLE", nullable ? "Y" : 'N'); 193 results.add(types); 194 } 195 return results; 196 } 197 198 /** 199 * Get primary Keys by table name 200 * @return List<String> - Primary Keys 201 * @param _tableName - Table name 202 * */ 203 public List<String> getPrimaryKeys(String _tableName) { 204 try { 205 List<String> pks = new ArrayList<String>(); 206 DatabaseMetaData meta = connection.getMetaData(); 207 ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" }); 208 while (tables.next()) { 209 String catalog = tables.getString("TABLE_CAT"); 210 String schema = tables.getString("TABLE_SCHEM"); 211 String tableName = tables.getString("TABLE_NAME"); 212 if (tableName.equalsIgnoreCase(_tableName)) { 213 ResultSet primaryKeys = meta.getPrimaryKeys(catalog, schema, tableName); 214 while (primaryKeys.next()) { 215 pks.add(primaryKeys.getString("COLUMN_NAME")); 216 } 217 break; 218 } 219 } 220 return pks; 221 } catch (Exception e) { 222 log.warn(e); 223 return null; 224 } 225 } 226 227 /** 228 * Get all tables 229 * @return List<Map<String, Object>> 230 * @exception SQLException 231 * */ 232 public List<Map<String, Object>> getAllTables() throws SQLException { 233 List<Map<String, Object>> tablesList = new ArrayList<Map<String, Object>>(); 234 DatabaseMetaData meta = connection.getMetaData(); 235 ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" }); 236 while (tables.next()) { 237 String catalog = tables.getString("TABLE_CAT"); 238 String schema = tables.getString("TABLE_SCHEM"); 239 String tableName = tables.getString("TABLE_NAME"); 240 Map<String, Object> t = new ResultMap<String, Object>(); 241 t.put("TABLE_SCHEMA", schema); 242 t.put("TABLE_NAME", tableName); 243 tablesList.add(t); 244 } 245 return tablesList; 246 } 247 248 /** 249 * Get all tables by schema 250 * @return List<Map<String, Object>> 251 * @exception SQLException 252 * */ 253 public List<Map<String, Object>> getAllTables(String _schema) throws SQLException { 254 List<Map<String, Object>> tablesList = new ArrayList<Map<String, Object>>(); 255 DatabaseMetaData meta = connection.getMetaData(); 256 ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" }); 257 while (tables.next()) { 258 String schema = tables.getString("TABLE_SCHEM"); 259 if (schema != null && schema.equalsIgnoreCase(_schema)) { 260 String catalog = tables.getString("TABLE_CAT"); 261 String tableName = tables.getString("TABLE_NAME"); 262 Map<String, Object> t = new ResultMap<String, Object>(); 263 t.put("TABLE_SCHEMA", schema); 264 t.put("TABLE_NAME", tableName); 265 tablesList.add(t); 266 } 267 } 268 return tablesList; 269 } 270 271 /** 272 * Query first record 273 * @exception SQLException 274 * @return Map<String,Object> - First result 275 * @param sql - Condition use format ':column_name' 276 * @param params 277 * */ 278 public Map<String, Object> first(String sql, Map<String, Object> params) throws SQLException { 279 List<Map<String, Object>> list = find(0, 1, sql, params); 280 if (list.size() > 0) { 281 return list.get(0); 282 } 283 return null; 284 } 285 286 /** 287 * Query first record 288 * @exception SQLException 289 * @return Map<String,Object> - First result 290 * @param sql 291 * */ 292 public Map<String, Object> first(String sql) throws SQLException { 293 return first(sql, Arrays.asList(new Object[] {})); 294 } 295 296 /** 297 * Query first record 298 * @exception SQLException 299 * @return Map<String,Object> - First result 300 * @param sql - Condition use format '?' 301 * @param params 302 * */ 303 public Map<String, Object> first(String sql, List<Object> params) throws SQLException { 304 List<Map<String, Object>> list = find(0, 1, sql, params); 305 if (list.size() > 0) { 306 return list.get(0); 307 } 308 return null; 309 } 310 311 /** 312 * Query all matched records 313 * @exception SQLException 314 * @return List<Map<String, Object>> 315 * @param sql 316 * @param params 317 * */ 318 public List<Map<String, Object>> find(String sql) throws SQLException { 319 return find(0, 0, sql, Arrays.asList(new Object[] {})); 320 } 321 322 /** 323 * Query all matched records 324 * @exception SQLException 325 * @return List<Map<String, Object>> 326 * @param cursorStart - JDBC result Cursor start index 327 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 328 * @param sql 329 * @param params 330 * */ 331 public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql) throws SQLException { 332 return find(cursorStart, maxRows, sql, Arrays.asList(new Object[] {})); 333 } 334 335 /** 336 * Query all matched records 337 * @exception SQLException 338 * @return List<Map<String, Object>> 339 * @param sql - Condition use format ':column_name' 340 * @param params 341 * */ 342 public List<Map<String, Object>> find(String sql, Map<String, Object> params) throws SQLException { 343 return find(0, 0, sql, params); 344 } 345 346 /** 347 * Query all matched records 348 * @exception SQLException 349 * @return List<Map<String, Object>> 350 * @param cursorStart - JDBC result Cursor start index 351 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 352 * @param sql - Condition use format ':column_name' 353 * @param params 354 * */ 355 public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql, Map<String, Object> params) 356 throws SQLException { 357 String csql = converSql(sql); 358 List<Object> cparams = converParams(sql, params); 359 return find(cursorStart, maxRows, csql, cparams); 360 } 361 362 /** 363 * Query all matched records 364 * @exception SQLException 365 * @return List<Map<String, Object>> 366 * @param cursorStart - JDBC result Cursor start index 367 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 368 * @param sql - Condition use format '?' 369 * @param params 370 * */ 371 public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql, List<Object> params) 372 throws SQLException { 373 long begin = System.currentTimeMillis(); 374 boolean allowedLog = writeSqlLog("find", begin, 375 String.format("%s [cursorStart=%s,maxRows=%s]", sql, cursorStart, maxRows), params); 376 377 PreparedStatement statement = null; 378 Map<String, Object> row = null; 379 ResultSet result = null; 380 final List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>(); 381 try { 382 // ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE 383 // ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY 384 statement = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); 385 if (params != null) { 386 int size = params.size(); 387 for (int i = 0; i < size; i++) { 388 int ci = i + 1; 389 Object param = params.get(i); 390 if (param == null) { 391 statement.setNull(ci, Types.VARCHAR); 392 } else { 393 statement.setObject(ci, param); 394 } 395 } 396 } 397 if (maxRows > 0) { 398 statement.setMaxRows(maxRows); 399 } 400 result = statement.executeQuery(); 401 result.absolute(cursorStart); 402 final ResultSetMetaData rsmd = result.getMetaData(); 403 final int c = rsmd.getColumnCount(); 404 while (result.next()) { 405 row = new ResultMap<String, Object>(); 406 for (int i = 0; i < c; i++) { 407 int ci = i + 1; 408 Object value = null; 409 Object originValue = result.getObject(ci); 410 if (originValue == null) { 411 value = originValue; 412 } else if (originValue instanceof Blob) { 413 Blob blobValue = (Blob) originValue; 414 InputStream is = null; 415 try { 416 is = blobValue.getBinaryStream(); 417 if(is != null) value = is.readAllBytes(); 418 } catch (IOException e) { 419 throw new SQLException(e.getMessage(), e); 420 } finally { 421 if (blobValue != null) { 422 try { 423 blobValue.free(); 424 } catch (SQLException e) { 425 throw e; 426 } 427 } 428 if (is != null) { 429 try { 430 is.close(); 431 } catch (IOException e) { 432 throw new SQLException(e.getMessage(), e); 433 } 434 } 435 } 436 } else if (originValue instanceof Clob) { 437 Clob clobValue = (Clob) originValue; 438 InputStream is = null; 439 try { 440 is = clobValue.getAsciiStream(); 441 if(is != null) value = is.readAllBytes(); 442 } catch (IOException e) { 443 throw new SQLException(e.getMessage(), e); 444 } finally { 445 if (clobValue != null) { 446 try { 447 clobValue.free(); 448 } catch (SQLException e) { 449 throw e; 450 } 451 } 452 if (is != null) { 453 try { 454 is.close(); 455 } catch (IOException e) { 456 throw new SQLException(e.getMessage(), e); 457 } 458 } 459 } 460 } else { 461 value = originValue; 462 } 463 row.put(converCase(rsmd.getColumnLabel(ci)), value); 464 } 465 rows.add(row); 466 } 467 if (allowedLog) { 468 writeSqlLog("rows", begin, "rows", rows.size()); 469 long spend = System.currentTimeMillis() - begin; 470 writeSqlLog("spend", begin, "spend", spend); 471 } 472 return rows; 473 } catch (SQLException e) { 474 if (allowedLog) 475 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 476 477 throw e; 478 } finally { 479 if (result != null) 480 result.close(); 481 482 if (statement != null) 483 statement.close(); 484 } 485 } 486 487 /** 488 * Execute stored proc (and return boolean) 489 * @param sql - Query sql 490 * @exception SQLException 491 * @return boolean 492 * */ 493 public boolean callAndReturnBoolean(String sql) throws SQLException { 494 return callAndReturnBoolean(sql, Arrays.asList(new Object[] {})); 495 } 496 497 /** 498 * Execute stored proc (and return boolean) 499 * @param sql - Query sql, condition use format ':column_name' 500 * @param params 501 * @exception SQLException 502 * @return boolean 503 * */ 504 public boolean callAndReturnBoolean(String sql, Map<String, Object> params) throws SQLException { 505 String csql = converSql(sql); 506 List<Object> cparams = converParams(sql, params); 507 return callAndReturnBoolean(sql, params); 508 } 509 510 /** 511 * Execute stored proc (and return boolean) 512 * @param sql - Query sql,condition use format '?' 513 * @param params 514 * @exception SQLException 515 * @return boolean 516 * */ 517 public boolean callAndReturnBoolean(String sql, List<Object> params) throws SQLException { 518 519 if (!checkSqlAvailable(sql)) 520 return false; 521 522 long begin = System.currentTimeMillis(); 523 boolean allowedLog = writeSqlLog("call", begin, sql, params); 524 525 CallableStatement statement = null; 526 try { 527 statement = connection.prepareCall("{" + sql + "}"); 528 if (params != null) { 529 int size = params.size(); 530 for (int i = 0; i < size; i++) { 531 Object param = params.get(i); 532 if (param == null) { 533 statement.setNull(i + 1, Types.VARCHAR); 534 } else { 535 statement.setObject(i + 1, param); 536 } 537 } 538 } 539 540 statement.execute(); 541 542 boolean returnResult = true; //true is not exception 543 544 if (allowedLog) { 545 writeSqlLog("return", begin, "return", returnResult); 546 long spend = System.currentTimeMillis() - begin; 547 writeSqlLog("spend", begin, "spend", spend); 548 } 549 550 callAndReturnBooleanSync(connection, begin, sql, params, returnResult); 551 552 return returnResult; 553 } catch (SQLException e) { 554 if (allowedLog) 555 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 556 557 throw e; 558 } finally { 559 if (statement != null) 560 statement.close(); 561 } 562 } 563 564 /** 565 * Execute stored proc (and return rows) 566 * @param sql - Query sql 567 * @exception SQLException 568 * @return int 569 * */ 570 public int callAndReturnRows(String sql) throws SQLException { 571 return callAndReturnRows(sql, Arrays.asList(new Object[] {})); 572 } 573 574 /** 575 * Execute stored proc (and return rows) 576 * @param sql - Query sql,condition use format ':column_name' 577 * @param params 578 * @exception SQLException 579 * @return int 580 * */ 581 public int callAndReturnRows(String sql, Map<String, Object> params) throws SQLException { 582 String csql = converSql(sql); 583 List<Object> cparams = converParams(sql, params); 584 return callAndReturnRows(sql, params); 585 } 586 587 /** 588 * Execute stored proc (and return rows) 589 * @param sql - Query sql,condition use format '?' 590 * @param params 591 * @exception SQLException 592 * @return int 593 * */ 594 public int callAndReturnRows(String sql, List<Object> params) throws SQLException { 595 596 if (!checkSqlAvailable(sql)) 597 return -1; 598 599 long begin = System.currentTimeMillis(); 600 boolean allowedLog = writeSqlLog("call", begin, sql, params); 601 602 CallableStatement statement = null; 603 try { 604 statement = connection.prepareCall("{?=" + sql + "}"); 605 statement.registerOutParameter(1,Types.INTEGER); 606 if (params != null) { 607 int size = params.size(); 608 for (int i = 0; i < size; i++) { 609 Object param = params.get(i); 610 if (param == null) { 611 statement.setNull(i + 1, Types.VARCHAR); 612 } else { 613 statement.setObject(i + 1, param); 614 } 615 } 616 } 617 boolean hasResult = statement.execute(); 618 619 int row = statement.getInt(1); 620 621 if (allowedLog) { 622 writeSqlLog("return", begin, "return", row); 623 long spend = System.currentTimeMillis() - begin; 624 writeSqlLog("spend", begin, "spend", spend); 625 } 626 627 callAndReturnRowsSync(connection, begin, sql, params, row); 628 629 return row; 630 } catch (SQLException e) { 631 if (allowedLog) 632 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 633 634 throw e; 635 } finally { 636 if (statement != null) 637 statement.close(); 638 } 639 } 640 641 /** 642 * Execute stored proc (and return List) 643 * @param sql - Query sql 644 * @exception SQLException 645 * @return List<Map<String, Object>> 646 * */ 647 public List<Map<String, Object>> callAndReturnList(String sql) throws SQLException { 648 return callAndReturnList(0, 0, sql, Arrays.asList(new Object[] {})); 649 } 650 651 /** 652 * Execute stored proc (and return List) 653 * @param cursorStart - JDBC result Cursor start index 654 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 655 * @param sql - Query sql 656 * @exception SQLException 657 * @return List<Map<String, Object>> 658 * */ 659 public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql) throws SQLException { 660 return callAndReturnList(cursorStart, maxRows, sql, Arrays.asList(new Object[] {})); 661 } 662 663 /** 664 * Execute stored proc (and return List) 665 * @param cursorStart - JDBC result Cursor start index 666 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 667 * @param sql - Query sql, condition use format ':column_name' 668 * @exception SQLException 669 * @return List<Map<String, Object>> 670 * */ 671 public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql, 672 Map<String, Object> params) throws SQLException { 673 String csql = converSql(sql); 674 List<Object> cparams = converParams(sql, params); 675 return callAndReturnList(cursorStart, maxRows, sql, params); 676 } 677 678 /** 679 * Execute stored proc (and return List) 680 * @param cursorStart - JDBC result Cursor start index 681 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 682 * @param sql - Query sql, condition use format '?' 683 * @exception SQLException 684 * @return List<Map<String, Object>> 685 * */ 686 public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql, List<Object> params) 687 throws SQLException { 688 689 if (!checkSqlAvailable(sql)) 690 return null; 691 692 long begin = System.currentTimeMillis(); 693 boolean allowedLog = writeSqlLog("call", begin, 694 String.format("%s [cursorStart=%s,maxRows=%s]", sql, cursorStart, maxRows), params); 695 696 CallableStatement statement = null; 697 Map<String, Object> row = null; 698 ResultSet result = null; 699 final List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>(); 700 try { 701 statement = connection.prepareCall("{" + sql + "}"); 702 if (params != null) { 703 int size = params.size(); 704 for (int i = 0; i < size; i++) { 705 int ci = i + 1; 706 Object param = params.get(i); 707 if (param == null) { 708 statement.setNull(ci, Types.VARCHAR); 709 } else { 710 statement.setObject(ci, param); 711 } 712 } 713 } 714 if (maxRows > 0) { 715 statement.setMaxRows(maxRows); 716 } 717 result = statement.executeQuery(); 718 final ResultSetMetaData rsmd = result.getMetaData(); 719 final int c = rsmd.getColumnCount(); 720 int rowIndex = 0; 721 while (result.next()) { 722 if (rowIndex >= cursorStart) { 723 row = new ResultMap<String, Object>(); 724 for (int i = 0; i < c; i++) { 725 int ci = i + 1; 726 Object value = null; 727 Object originValue = result.getObject(ci); 728 if (originValue == null) { 729 value = originValue; 730 } else if (originValue instanceof Blob) { 731 Blob blobValue = (Blob) originValue; 732 InputStream is = null; 733 try { 734 is = blobValue.getBinaryStream(); 735 if(is != null) value = is.readAllBytes(); 736 } catch (IOException e) { 737 throw new SQLException(e.getMessage(), e); 738 } finally { 739 if (blobValue != null) { 740 try { 741 blobValue.free(); 742 } catch (SQLException e) { 743 throw e; 744 } 745 } 746 if (is != null) { 747 try { 748 is.close(); 749 } catch (IOException e) { 750 throw new SQLException(e.getMessage(), e); 751 } 752 } 753 } 754 } else if (originValue instanceof Clob) { 755 Clob clobValue = (Clob) originValue; 756 InputStream is = null; 757 try { 758 is = clobValue.getAsciiStream(); 759 if(is != null) value = is.readAllBytes(); 760 } catch (IOException e) { 761 throw new SQLException(e.getMessage(), e); 762 } finally { 763 if (clobValue != null) { 764 try { 765 clobValue.free(); 766 } catch (SQLException e) { 767 throw e; 768 } 769 } 770 if (is != null) { 771 try { 772 is.close(); 773 } catch (IOException e) { 774 throw new SQLException(e.getMessage(), e); 775 } 776 } 777 } 778 } else { 779 value = originValue; 780 } 781 row.put(converCase(rsmd.getColumnLabel(ci)), value); 782 } 783 rows.add(row); 784 } 785 rowIndex++; 786 } 787 788 if (allowedLog) { 789 writeSqlLog("rows", begin, "rows", rows); 790 long spend = System.currentTimeMillis() - begin; 791 writeSqlLog("spend", begin, "spend", spend); 792 } 793 794 callAndReturnListSync(connection, cursorStart, maxRows, sql, params); 795 796 return rows; 797 } catch (SQLException e) { 798 if (allowedLog) 799 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 800 throw e; 801 } finally { 802 if (result != null) 803 result.close(); 804 805 if (statement != null) 806 statement.close(); 807 } 808 } 809 810 /** 811 * Execute sql 812 * @exception SQLException 813 * @param sql 814 * @return int 815 * */ 816 public int execute(String sql) throws SQLException { 817 return execute(sql, Arrays.asList(new Object[] {})); 818 } 819 820 /** 821 * Execute sql 822 * @exception SQLException 823 * @param sql - Condition use format ':column_name' 824 * @param params 825 * @return int 826 * */ 827 public int execute(String sql, Map<String, Object> params) throws SQLException { 828 String csql = converSql(sql); 829 List<Object> cparams = converParams(sql, params); 830 return execute(csql, cparams); 831 } 832 833 /** 834 * Execute sql 835 * @exception SQLException 836 * @param sql - Condition use format '?' 837 * @param params 838 * @return int 839 * */ 840 public int execute(String sql, List<Object> params) throws SQLException { 841 if (!checkSqlAvailable(sql)) 842 return -1; 843 844 long begin = System.currentTimeMillis(); 845 boolean allowedLog = writeSqlLog("execute", begin, sql, params); 846 847 PreparedStatement statement = null; 848 try { 849 statement = connection.prepareStatement(sql); 850 if (params != null) { 851 int size = params.size(); 852 for (int i = 0; i < size; i++) { 853 Object param = params.get(i); 854 if (param == null) { 855 statement.setNull(i + 1, Types.VARCHAR); 856 } else { 857 statement.setObject(i + 1, param); 858 } 859 } 860 } 861 int row = statement.executeUpdate(); 862 863 if (allowedLog) { 864 writeSqlLog("return", begin, "return", row); 865 long spend = System.currentTimeMillis() - begin; 866 writeSqlLog("spend", begin, "spend", spend); 867 } 868 869 executeSync(connection, begin, sql, params, row); 870 871 return row; 872 } catch (SQLException e) { 873 if (allowedLog) 874 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 875 throw e; 876 } finally { 877 if (statement != null) 878 statement.close(); 879 } 880 } 881 882 /** 883 * Execute batch 884 * @exception SQLException 885 * @param sql - Condition use format ':column_name' 886 * @param records 887 * @return int - Return rows 888 * */ 889 public int executeBatch(String sql, List<Map<String, Object>> records) throws SQLException { 890 String csql = converSql(sql); 891 List<List<Object>> crecords = new ArrayList<List<Object>>(); 892 for (Map<String, Object> record : records) { 893 List<Object> crecord = converParams(sql, record); 894 crecords.add(crecord); 895 } 896 return executeBatchList(csql, crecords); 897 } 898 899 /** 900 * Execute batch 901 * @exception SQLException 902 * @param sql - Condition use format '?' 903 * @param records 904 * @return int - Return rows 905 * */ 906 public int executeBatchList(String sql, List<List<Object>> records) throws SQLException { 907 boolean allowedLog = false; 908 if (!checkSqlAvailable(sql)) 909 return -1; 910 911 long begin = System.currentTimeMillis(); 912 if (records != null) { 913 allowedLog = writeSqlLog("batch", begin, sql, String.format("[batchSize=%s]", records.size())); 914 } 915 916 boolean first = true; 917 PreparedStatement statement = null; 918 try { 919 for (List<Object> params : records) { 920 if (first) { 921 statement = connection.prepareStatement(sql); 922 first = false; 923 } 924 925 int size = params.size(); 926 for (int i = 0; i < size; i++) { 927 Object param = params.get(i); 928 if (param == null) { 929 statement.setNull(i + 1, Types.VARCHAR); 930 } else { 931 statement.setObject(i + 1, param); 932 } 933 } 934 935 statement.addBatch(); 936 } 937 938 int sumRow = 0; 939 if (records.size() > 0) { 940 int[] rows = statement.executeBatch(); 941 942 for (int r : rows) { 943 sumRow += r; 944 } 945 946 if (allowedLog) { 947 long spend = System.currentTimeMillis() - begin; 948 writeSqlLog("return", begin, "return", sumRow); 949 writeSqlLog("spend", begin, "spend", spend); 950 } 951 952 executeBatchListSync(connection, begin, sql, records, sumRow); 953 } 954 955 return sumRow; 956 } catch (SQLException e) { 957 if (allowedLog) 958 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 959 960 throw e; 961 } finally { 962 if (statement != null) 963 statement.close(); 964 } 965 } 966 967 /** 968 * Check connection is closed 969 * @return boolean 970 * */ 971 public boolean isClosed() { 972 try { 973 if(closed) return closed; 974 975 return (connection == null || connection.isClosed()); 976 } catch (Exception e) { 977 log.warn(e); 978 return true; 979 } 980 } 981 982 /** 983 * Abort connection 984 * @exception SQLException 985 * */ 986 public void abort() throws SQLException { 987 writeSqlLog("aborting", 0, "aborting", ""); 988 if (connection != null) { 989 try { 990 if (connection instanceof DriverConnection) { 991 abortSyncConnection(connection); 992 connection.abort(null); 993 } else { 994 connection.abort(Executors.newFixedThreadPool(1)); 995 } 996 closed = true; 997 } catch (SQLException e) { 998 writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 999 throw e; 1000 } 1001 } 1002 writeSqlLog("aborted", 0, "aborted", ""); 1003 } 1004 1005 /** 1006 * Close connection 1007 * @exception SQLException 1008 * */ 1009 public void close() throws SQLException { 1010 writeSqlLog("closing", 0, "closing", ""); 1011 if (connection != null) { 1012 try { 1013 closeSyncConnection(connection); 1014 connection.close(); 1015 closed = true; 1016 } catch (SQLException e) { 1017 writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1018 throw e; 1019 } 1020 } 1021 writeSqlLog("closed", 0, "closed", ""); 1022 } 1023 1024 /** 1025 * Commit connection 1026 * @exception SQLException 1027 * */ 1028 public void commit() throws SQLException { 1029 writeSqlLog("committing", 0, "committing", ""); 1030 if (connection != null) { 1031 try { 1032 commitSyncConnection(connection); 1033 connection.commit(); 1034 } catch (SQLException e) { 1035 writeSqlLog("error", 0, "", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1036 throw e; 1037 } 1038 } 1039 writeSqlLog("committed", 0, "committed", ""); 1040 } 1041 1042 /** 1043 * Rollback connection 1044 * @exception SQLException 1045 * */ 1046 public void rollback() throws SQLException { 1047 writeSqlLog("rollbacking", 0, "rollbacking", ""); 1048 if (connection != null) { 1049 try { 1050 rollbackSyncConnection(connection); 1051 connection.rollback(); 1052 } catch (SQLException e) { 1053 writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1054 throw e; 1055 } 1056 } 1057 writeSqlLog("rollbacked", 0, "rollbacked", ""); 1058 } 1059 1060 /** 1061 * Get origin connection hash code 1062 * @return Integer - hash code 1063 * */ 1064 private Integer getOriginConnectionHashCode() { 1065 if (connection instanceof DriverConnection) { 1066 return ((DriverConnection) connection).getOriginConnectionHashCode(); 1067 } else { 1068 return connection.hashCode(); 1069 } 1070 } 1071 1072 /** 1073 * Get data source thread name 1074 * @return String - DataSource thread name 1075 * */ 1076 private String getDataSourceName() { 1077 if (connection instanceof DriverConnection) { 1078 return ((DriverConnection) connection).getDriverDataSource().getName(); 1079 } else { 1080 return null; 1081 } 1082 } 1083 1084 /** 1085 * Conver sql from ':column_name' to '?' 1086 * @return String 1087 * */ 1088 protected String converSql(String sql) { 1089 return sql.replaceAll(":[a-zA-Z0-9_]+", "?"); 1090 } 1091 1092 /** 1093 * Conver param from map to list 1094 * @param sql 1095 * @param map 1096 * @return List<Object> 1097 * */ 1098 protected List<Object> converParams(String sql, Map<String, Object> map) { 1099 List<String> paramKeys = new ArrayList<String>(); 1100 Pattern pattern = Pattern.compile(":[a-zA-Z0-9_]+"); 1101 Matcher matcher = pattern.matcher(sql); 1102 while (matcher.find()) { 1103 String key = matcher.group().replaceFirst(":", ""); 1104 paramKeys.add(key); 1105 } 1106 1107 List<Object> params = new ArrayList<Object>(); 1108 for (String pk : paramKeys) { 1109 Object pv = map.get(pk); 1110 if (pv == null) { 1111 pv = map.get(pk.toLowerCase()); 1112 } 1113 if (pv == null) { 1114 pv = map.get(pk.toUpperCase()); 1115 } 1116 if (pv instanceof java.util.Date) { 1117 java.util.Date utilDate = (java.util.Date) pv; 1118 java.sql.Timestamp sqlDate = new java.sql.Timestamp(utilDate.getTime()); 1119 params.add(sqlDate); 1120 } else { 1121 params.add(pv); 1122 } 1123 } 1124 1125 return params; 1126 } 1127 1128 /** 1129 * Covner to upper or lower 1130 * @param s - Column name or table name 1131 * @return String 1132 * */ 1133 protected String converCase(String s) { 1134 if (COLUMN_NAME_CASE_MODE.equals(COLUMN_NAME_CASE_UPPER)) { 1135 return s.toUpperCase(); 1136 } else if (COLUMN_NAME_CASE_MODE.equals(COLUMN_NAME_CASE_LOWER)) { 1137 return s.toLowerCase(); 1138 } else { 1139 return s; 1140 } 1141 } 1142 1143 /** 1144 * Check available sql to write to log 1145 * @return boolean 1146 * @param connection - Connection 1147 * @param sql 1148 * */ 1149 protected static boolean checkSqlLogAvailable(Connection connection, String sql) { 1150 if (connection instanceof DriverConnection) { 1151 DriverConnection dc = (DriverConnection) connection; 1152 DriverDataSource dds = dc.getDriverDataSource(); 1153 ConfigProperties configProperties = dds.getConfigProperties(); 1154 List<String> sqlAllowed = configProperties.getArray("SqlLogAllowed"); 1155 List<String> sqlIgnored = configProperties.getArray("SqlLogIgnored"); 1156 return checkSqlAvailable(sql, sqlAllowed, sqlIgnored); 1157 } 1158 return false; 1159 } 1160 1161 /** 1162 * Check available sql to execute 1163 * @return boolean 1164 * @param sql 1165 * */ 1166 protected boolean checkSqlAvailable(String sql) { 1167 return checkSqlAvailable(connection, sql); 1168 } 1169 1170 /** 1171 * Check available sql to execute 1172 * @return boolean 1173 * @param connection 1174 * @param sql 1175 * */ 1176 protected static boolean checkSqlAvailable(Connection connection, String sql) { 1177 if (connection instanceof DriverConnection) { 1178 DriverConnection dc = (DriverConnection) connection; 1179 DriverDataSource dds = dc.getDriverDataSource(); 1180 ConfigProperties configProperties = dds.getConfigProperties(); 1181 List<String> sqlAllowed = configProperties.getArray("SqlExecuteAllowed"); 1182 List<String> sqlIgnored = configProperties.getArray("SqlExecuteIgnored"); 1183 return checkSqlAvailable(sql, sqlAllowed, sqlIgnored); 1184 } 1185 return true; 1186 } 1187 1188 /** 1189 * Check available sql to execute 1190 * @return boolean 1191 * @param sql 1192 * @param sqlAllowed - From DataSources.properties 1193 * @param sqlIgnored - From DataSources.properties 1194 * */ 1195 private static boolean checkSqlAvailable(String sql, List<String> sqlAllowed, List<String> sqlIgnored) { 1196 boolean matchedAllowed = true; 1197 boolean matchedIgnored = false; 1198 if (sqlAllowed != null) { 1199 for (String regex : sqlAllowed) { 1200 if (!CommonTools.isBlank(regex)) { 1201 matchedAllowed = sql.matches(regex); 1202 1203 if (matchedAllowed) 1204 break; 1205 } 1206 } 1207 } 1208 if (matchedAllowed && sqlIgnored != null) { 1209 for (String regex : sqlIgnored) { 1210 if (!CommonTools.isBlank(regex)) { 1211 matchedIgnored = sql.matches(regex); 1212 1213 if (matchedIgnored) 1214 break; 1215 } 1216 } 1217 } 1218 1219 boolean b = matchedAllowed && !matchedIgnored; 1220 if (!b) { 1221 LoggerFactory.getLogger(DriverExecutor.class).debug("Not available - '{}'", sql); 1222 } 1223 return b; 1224 } 1225 1226 /** 1227 * Write sql log 1228 * @return boolean 1229 * @param type 1230 * @param seq 1231 * @param sql 1232 * @param params 1233 * */ 1234 protected synchronized boolean writeSqlLog(String type, long seq, String sql, Object params) { 1235 return writeSqlLog(this.hashCode(),connection, type, seq, sql, params); 1236 } 1237 1238 /** 1239 * Write sql log 1240 * @return boolean 1241 * @param connection 1242 * @param type 1243 * @param seq 1244 * @param sql 1245 * @param params 1246 * */ 1247 protected synchronized static boolean writeSqlLog(int deHashCode,Connection connection, String type, long seq, String sql, 1248 Object params) { 1249 if (connection instanceof DriverConnection) { 1250 if (!checkSqlLogAvailable(connection, sql)) 1251 return false; 1252 1253 final String header = String.format( 1254 "LOG_DATE,LOG_HOUR,LOG_MI,LOG_SEC,LOG_MS,LOG_HOST,LOG_THREAD,LOG_DS,LOG_CONN_ID,LOG_EXEC_ID,LOG_TYPE,LOG_SEQ,LOG_SQL,LOG_PARAMS%s", 1255 System.lineSeparator()); 1256 final DateFormat df = new SimpleDateFormat("yyyyMMdd"); 1257 final DateFormat dtf = new SimpleDateFormat("yyyyMMdd,HH,mm,ss,SSS"); 1258 DriverConnection dc = (DriverConnection) connection; 1259 Integer connHashCode = dc.getOriginConnectionHashCode(); 1260 DriverDataSource dds = dc.getDriverDataSource(); 1261 ConfigProperties configProperties = dds.getConfigProperties(); 1262 1263 boolean logEnable = configProperties.getBoolean("SqlLogEnable", false); 1264 long overspend = configProperties.getMilliSeconds("SqlLogOverspend",0L); 1265 1266 if (!logEnable) 1267 return false; 1268 1269 final String defaultLogFolderPath = String.format("%s/SqlLog/", 1270 CommonTools.getJarPath(DriverExecutor.class)); 1271 final String logFolderPath = configProperties.getString("SqlLogFolder", defaultLogFolderPath); 1272 final long maxFileSize = configProperties.getFileSize("SqlLogMaxFileSize", 1024 * 1024 * 10L); 1273 final int archiveDays = configProperties.getInteger("SqlLogArchiveDays", 31); 1274 final int logParamMaxLength = configProperties.getInteger("SqlLogParamMaxLength", 20); 1275 1276 if (sqlLogCacheArray == null) { 1277 1278 sqlLogCacheArray = new CacheArray(); 1279 long sqlLogFilterTimer = configProperties.getLong("SqlLogTimer", 1000L); 1280 sqlLogCacheArray.filter(new CacheArrayFilter(0L,sqlLogFilterTimer) { 1281 @Override 1282 public void executeBatch(Integer index, List batch) { 1283 final StringBuffer sbf = new StringBuffer(); 1284 for(Object item : batch){ 1285 sbf.append(item); 1286 } 1287 try { 1288 String msg = sbf.toString(); 1289 String dateStr = df.format(new Timestamp(System.currentTimeMillis())); 1290 File sqlLogFile = new File(String.format("%s/%s.csv", logFolderPath, dateStr)); 1291 File sqlLogFolder = new File(sqlLogFile.getParent()); 1292 1293 if (sqlLogFolder.exists()) { 1294 if (!sqlLogFolder.canWrite()) 1295 throw new IOException(String.format("Can not write to log folder '%s'", 1296 sqlLogFolder.getAbsolutePath())); 1297 } else { 1298 sqlLogFolder.mkdirs(); 1299 } 1300 1301 if (sqlLogFile.exists()) { 1302 if (!sqlLogFile.canWrite()) 1303 throw new IOException(String.format("Can not write to log file '%s'", 1304 sqlLogFile.getAbsolutePath())); 1305 } 1306 1307 int suffixIndex = sqlLogFile.getName().lastIndexOf("."); 1308 String logFileNamePrefix = sqlLogFile.getName().substring(0, suffixIndex); 1309 long logSize = FileTools.size(sqlLogFile); 1310 if (!sqlLogFile.exists()) { 1311 FileTools.write(sqlLogFile, header, false); 1312 } 1313 if (logSize < maxFileSize) { 1314 FileTools.write(sqlLogFile, msg, true); 1315 } else { 1316 int logIndex = getLogFileIndex(configProperties, sqlLogFolder, logFileNamePrefix, 1317 "csv"); 1318 if (logIndex == 0) { 1319 FileTools.write(sqlLogFile, String.format("%s%s", header, msg), false); 1320 } else { 1321 backupLog(sqlLogFolder, sqlLogFile, logFileNamePrefix, logIndex, header, msg); 1322 } 1323 } 1324 archiveLog(archiveDays, sqlLogFolder); 1325 } catch (Exception e) { 1326 LoggerFactory.getLogger(DriverExecutor.class).warn(e.getMessage(), e); 1327 } 1328 } 1329 }); 1330 } 1331 1332 if (logEnable) { 1333 String hostname = CommonTools.getHostname(); 1334 String dateTimeStr = dtf.format(new Timestamp(System.currentTimeMillis())); 1335 boolean isOverspend = false; 1336 boolean isSpendSeq = sql.equals("spend"); 1337 if(isSpendSeq && params != null){ 1338 long spendValue = Long.parseLong(params + ""); 1339 isOverspend = (spendValue >= overspend); 1340 } 1341 1342 String threadId = Thread.currentThread().getName() + "-" + Thread.currentThread().getId(); 1343 String msg = String.format("%s,%s,%s,%s,%s,%s,%s,%s,\"%s\",\"%s\"%s", dateTimeStr, hostname, 1344 threadId, dds.getName(), connHashCode,deHashCode, type, seq, 1345 replaceToSigleLine(sql), handleParams(logParamMaxLength, params), System.lineSeparator()); 1346 1347 String key = String.format("%s_%s",deHashCode,connHashCode); 1348 if(seq <= 0 || overspend <= 0){ 1349 sqlLogCacheArray.add(msg); 1350 SQL_LOG_MSG_MAPPING.remove(key); 1351 SQL_LOG_OVERSPEND_MAPPING.remove(key); 1352 } 1353 if(seq > 0 && overspend > 0){ 1354 SQL_LOG_OVERSPEND_MAPPING.put(key,isOverspend); 1355 String existsMsg = SQL_LOG_MSG_MAPPING.get(key); 1356 existsMsg = existsMsg == null ? msg : (existsMsg + msg); 1357 SQL_LOG_MSG_MAPPING.put(key, existsMsg); 1358 1359 boolean seqOverspend = SQL_LOG_OVERSPEND_MAPPING.get(key); 1360 if(isSpendSeq && seqOverspend){ 1361 sqlLogCacheArray.add(SQL_LOG_MSG_MAPPING.get(key) + ""); 1362 SQL_LOG_MSG_MAPPING.remove(key); 1363 SQL_LOG_OVERSPEND_MAPPING.remove(key); 1364 } 1365 if(isSpendSeq && !seqOverspend){ 1366 SQL_LOG_MSG_MAPPING.remove(key); 1367 SQL_LOG_OVERSPEND_MAPPING.remove(key); 1368 } 1369 } 1370 return true; 1371 } 1372 1373 } 1374 return false; 1375 } 1376 1377 /** 1378 * Backup log file 1379 * */ 1380 private static void backupLog(File logFolder, File logFile, String logFileNamePrefix, int logIndex, String header, 1381 String msg) throws Exception { 1382 String backupLogFileName = String.format("%s/%s.%s.csv", logFolder.getAbsolutePath(), logFileNamePrefix, 1383 logIndex); 1384 Path source = Paths.get(logFile.getAbsolutePath()); 1385 Path target = Paths.get(backupLogFileName); 1386 Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); 1387 FileTools.write(logFile, String.format("%s%s", header, msg), false); 1388 } 1389 1390 /** 1391 * Get log file index 1392 * For backup log file use 1393 * */ 1394 private static int getLogFileIndex(ConfigProperties configProperties, File folder, String prefix, String suffix) { 1395 Integer maxBackupIndex = configProperties.getInteger("SqlLogMaxBackupIndex", 10); 1396 for (int i = 1; i <= maxBackupIndex; i++) { 1397 String logFileName = String.format("%s/%s.%s.%s", folder.getAbsolutePath(), prefix, i, suffix); 1398 File logFile = new File(logFileName); 1399 if (!logFile.exists()) 1400 return i; 1401 } 1402 return 0; 1403 } 1404 1405 /** 1406 * Archive Log 1407 * */ 1408 private static void archiveLog(int archiveDays, File sqlLogFolder) { 1409 if (archiveDays > 0) { 1410 try { 1411 long archiveDaysMs = new Date().getTime() - (archiveDays * 24 * 3600000L); 1412 deleteFilesOlderThan(sqlLogFolder, archiveDaysMs); 1413 } catch (Exception e) { 1414 LoggerFactory.getLogger(DriverExecutor.class).warn(e); 1415 } 1416 } 1417 } 1418 1419 /** 1420 * Delete old archive logs 1421 * */ 1422 private static void deleteFilesOlderThan(File directory, long archiveDaysMs) throws IOException { 1423 if (directory.isDirectory()) { 1424 File[] files = directory.listFiles(); 1425 if (files != null) { 1426 for (File file : files) { 1427 if (file.isFile()) { 1428 boolean isLogFile = file.getName().toLowerCase().endsWith(".csv"); 1429 if (isLogFile) { 1430 boolean canWrite = file.canWrite(); 1431 if (canWrite) { 1432 long lastModified = file.lastModified(); 1433 if (lastModified < archiveDaysMs) { 1434 Files.deleteIfExists(Paths.get(file.toURI())); 1435 } 1436 } 1437 } 1438 } 1439 } 1440 } 1441 } 1442 } 1443 1444 /** 1445 * Replace to sigle line 1446 * For write csv log 1447 * */ 1448 private static String replaceToSigleLine(String msg) { 1449 return CodeEscape.escapeToSingleLineForCsv(msg); 1450 } 1451 1452 /** 1453 * Hahdle Params 1454 * For write csv log 1455 * */ 1456 private static String handleParams(int paramMaxLength, Object params) { 1457 1458 if (params == null) 1459 return "null"; 1460 1461 StringBuffer paramSbf = new StringBuffer(""); 1462 if (params instanceof List) { 1463 List<Object> listParams = (List<Object>) params; 1464 int size = listParams.size(); 1465 for (int i = 0; i < size; i++) { 1466 Object param = listParams.get(i); 1467 String str = param + ""; 1468 if (str.length() > paramMaxLength) { 1469 paramSbf.append(str.substring(0, paramMaxLength) + "..."); 1470 } else { 1471 paramSbf.append(str); 1472 } 1473 if (i < size - 1) { 1474 paramSbf.append(";"); 1475 } 1476 } 1477 } else { 1478 paramSbf.append(params); 1479 } 1480 return replaceToSigleLine(paramSbf.toString()); 1481 } 1482 1483 /**For Sync DataSource**/ 1484 1485 /** 1486 * For database replication 1487 * */ 1488 protected static void callAndReturnBooleanSync(Connection masterConn, long seq, String sql, List<Object> params, 1489 boolean returnResult) throws SQLException { 1490 if (masterConn instanceof DriverConnection) { 1491 openSyncConnection(masterConn); 1492 1493 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1494 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1495 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1496 boolean connCheck = true; 1497 boolean returnCheck = true; 1498 if (ddscp != null) { 1499 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1500 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1501 } 1502 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1503 if (deList != null) { 1504 for (DriverExecutor de : deList) { 1505 try { 1506 boolean resultSync = de.callAndReturnBoolean(sql, params); 1507 if (returnCheck) { 1508 if (resultSync != returnResult) { 1509 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1510 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1511 int errorCode = 99906; 1512 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1513 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1514 throw new SQLException(errorMsg) { 1515 @Override 1516 public int getErrorCode() { 1517 return errorCode; 1518 } 1519 }; 1520 } 1521 } 1522 } catch (SQLException e) { 1523 LoggerFactory.getLogger(DriverExecutor.class) 1524 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1525 if (connCheck) 1526 throw e; 1527 } 1528 } 1529 } 1530 } 1531 } 1532 1533 /** 1534 * For database replication 1535 * */ 1536 protected static void callAndReturnListSync(Connection masterConn, int cursorStart, int maxRows, String sql, 1537 List<Object> params) throws SQLException { 1538 if (masterConn instanceof DriverConnection) { 1539 openSyncConnection(masterConn); 1540 1541 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1542 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1543 1544 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1545 if (deList != null) { 1546 for (DriverExecutor de : deList) { 1547 try { 1548 de.callAndReturnBoolean(sql, params); 1549 } catch (SQLException e) { 1550 LoggerFactory.getLogger(DriverExecutor.class) 1551 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1552 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1553 boolean connCheck = true; 1554 if (ddscp != null) { 1555 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1556 } 1557 if (connCheck) 1558 throw e; 1559 } 1560 } 1561 } 1562 } 1563 } 1564 1565 /** 1566 * For database replication 1567 * */ 1568 protected static void callAndReturnRowsSync(Connection masterConn, long seq, String sql, List<Object> params, 1569 int returnRows) throws SQLException { 1570 if (masterConn instanceof DriverConnection) { 1571 openSyncConnection(masterConn); 1572 1573 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1574 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1575 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1576 boolean connCheck = true; 1577 boolean returnCheck = true; 1578 if (ddscp != null) { 1579 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1580 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1581 } 1582 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1583 if (deList != null) { 1584 for (DriverExecutor de : deList) { 1585 try { 1586 int rowSync = de.callAndReturnRows(sql, params); 1587 if (returnCheck) { 1588 if (rowSync != returnRows) { 1589 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1590 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1591 int errorCode = 99906; 1592 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1593 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1594 throw new SQLException(errorMsg) { 1595 @Override 1596 public int getErrorCode() { 1597 return errorCode; 1598 } 1599 }; 1600 } 1601 } 1602 } catch (SQLException e) { 1603 LoggerFactory.getLogger(DriverExecutor.class) 1604 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1605 if (connCheck) 1606 throw e; 1607 } 1608 } 1609 } 1610 } 1611 } 1612 1613 /** 1614 * For database replication 1615 * */ 1616 protected static void executeBatchListSync(Connection masterConn, long seq, String sql, List<List<Object>> records, 1617 int returnRows) throws SQLException { 1618 if (masterConn instanceof DriverConnection) { 1619 openSyncConnection(masterConn); 1620 1621 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1622 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1623 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1624 boolean connCheck = true; 1625 boolean returnCheck = true; 1626 if (ddscp != null) { 1627 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1628 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1629 } 1630 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1631 if (deList != null) { 1632 for (DriverExecutor de : deList) { 1633 try { 1634 int rowSync = de.executeBatchList(sql, records); 1635 if (returnCheck) { 1636 if (rowSync != returnRows) { 1637 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1638 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1639 int errorCode = 99906; 1640 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1641 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1642 throw new SQLException(errorMsg) { 1643 @Override 1644 public int getErrorCode() { 1645 return errorCode; 1646 } 1647 }; 1648 } 1649 } 1650 } catch (SQLException e) { 1651 LoggerFactory.getLogger(DriverExecutor.class) 1652 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1653 if (connCheck) 1654 throw e; 1655 } 1656 } 1657 } 1658 } 1659 } 1660 1661 /** 1662 * For database replication 1663 * */ 1664 protected static void executeSync(Connection masterConn, long seq, String sql, List<Object> params, int returnRows) 1665 throws SQLException { 1666 if (masterConn instanceof DriverConnection) { 1667 openSyncConnection(masterConn); 1668 1669 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1670 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1671 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1672 boolean connCheck = true; 1673 boolean returnCheck = true; 1674 if (ddscp != null) { 1675 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1676 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1677 } 1678 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1679 if (deList != null) { 1680 for (DriverExecutor de : deList) { 1681 try { 1682 int rowSync = de.execute(sql, params); 1683 if (returnCheck) { 1684 if (rowSync != returnRows) { 1685 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1686 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1687 int errorCode = 99906; 1688 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1689 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1690 throw new SQLException(errorMsg) { 1691 @Override 1692 public int getErrorCode() { 1693 return errorCode; 1694 } 1695 }; 1696 } 1697 } 1698 } catch (SQLException e) { 1699 LoggerFactory.getLogger(DriverExecutor.class) 1700 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1701 if (connCheck) 1702 throw e; 1703 } 1704 } 1705 } 1706 } 1707 } 1708 1709 /** 1710 * For database replication 1711 * */ 1712 protected synchronized static boolean openSyncConnection(Connection masterConn) throws SQLException { 1713 if (masterConn instanceof DriverConnection) { 1714 1715 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1716 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1717 1718 boolean isExist = SYNC_EXECUTOR_MARK.containsKey(masterConnHashCode); 1719 1720 if (isExist) { 1721 return false; 1722 } else { 1723 SYNC_EXECUTOR_MARK.put(masterConnHashCode, new ArrayList<DriverExecutor>()); 1724 } 1725 1726 DriverDataSource dds = masterDriverConn.getDriverDataSource(); 1727 if (dds != null) { 1728 List<DriverDataSource> sdsl = dds.getSyncDataSourceList(dds.getName()); 1729 if (sdsl != null) { 1730 1731 if (sdsl.isEmpty()) 1732 return false; 1733 1734 for (DriverDataSource sds : sdsl) { 1735 1736 Integer sdsHashCode = sds.hashCode(); 1737 if (!SYNC_CONN_ERROR_TIME.containsKey(sdsHashCode)) { 1738 SYNC_CONN_ERROR_TIME.put(sdsHashCode, 0L); 1739 } 1740 1741 Long syncConnErrorTime = SYNC_CONN_ERROR_TIME.get(sdsHashCode); 1742 if (syncConnErrorTime > 0) { 1743 ConfigProperties ddscp = dds.getConfigProperties(); 1744 long connCheckMs = 10000L; 1745 if (ddscp != null) { 1746 connCheckMs = ddscp.getLong("SyncConnectionCheckTime", 10000L); 1747 } 1748 boolean isSkipConn = syncConnErrorTime > 0 1749 && (System.currentTimeMillis() - syncConnErrorTime) <= connCheckMs; 1750 if (isSkipConn) { 1751 continue; 1752 } 1753 } 1754 1755 String masterFingerprint = dds.getFingerprint(); 1756 String syncFingerprint = sds.getFingerprint(); 1757 if (masterFingerprint.equalsIgnoreCase(syncFingerprint)) { 1758 LoggerFactory.getLogger(DriverExecutor.class) 1759 .warn("Skip sync reason 'same connection fingerprint'."); 1760 continue; 1761 } 1762 1763 try { 1764 final Connection conn = sds.getConnection(); 1765 if (conn == null) { 1766 int errorCode = 99904; 1767 String errorMsg = "Connection is null."; 1768 throw new SQLException(errorMsg) { 1769 @Override 1770 public int getErrorCode() { 1771 return errorCode; 1772 } 1773 }; 1774 } 1775 conn.setAutoCommit(masterConn.getAutoCommit()); 1776 SYNC_CONN_ERROR_TIME.put(sdsHashCode, 0L); 1777 SYNC_EXECUTOR_MARK.get(masterConnHashCode).add(new DriverExecutor(conn)); 1778 } catch (SQLException e) { 1779 SYNC_CONN_ERROR_TIME.put(sdsHashCode, System.currentTimeMillis()); 1780 LoggerFactory.getLogger(DriverExecutor.class) 1781 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1782 ConfigProperties ddscp = dds.getConfigProperties(); 1783 boolean connCheck = true; 1784 if (ddscp != null) { 1785 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1786 } 1787 if (connCheck) 1788 throw e; 1789 } 1790 1791 } 1792 1793 return true; 1794 } 1795 } 1796 } 1797 return false; 1798 } 1799 1800 /** 1801 * For database replication 1802 * */ 1803 protected static void closeSyncConnection(Connection masterConn) throws SQLException { 1804 if (masterConn instanceof DriverConnection) { 1805 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1806 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1807 1808 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1809 if (deList != null) { 1810 for (DriverExecutor de : deList) { 1811 try { 1812 de.close(); 1813 } catch (SQLException e) { 1814 LoggerFactory.getLogger(DriverExecutor.class) 1815 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1816 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1817 boolean connCheck = true; 1818 if (ddscp != null) { 1819 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1820 } 1821 if (connCheck) 1822 throw e; 1823 } 1824 } 1825 SYNC_EXECUTOR_MARK.remove(masterConnHashCode); 1826 LoggerFactory.getLogger(DriverExecutor.class).debug("CloseSyncConnection - masterConn '{}' finished.", 1827 masterConnHashCode); 1828 } 1829 } 1830 } 1831 1832 /** 1833 * For database replication 1834 * */ 1835 protected static void commitSyncConnection(Connection masterConn) throws SQLException { 1836 if (masterConn instanceof DriverConnection) { 1837 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1838 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1839 1840 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1841 if (deList != null) { 1842 for (DriverExecutor de : deList) { 1843 try { 1844 de.commit(); 1845 } catch (SQLException e) { 1846 LoggerFactory.getLogger(DriverExecutor.class) 1847 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1848 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1849 boolean connCheck = true; 1850 if (ddscp != null) { 1851 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1852 } 1853 if (connCheck) 1854 throw e; 1855 } 1856 } 1857 LoggerFactory.getLogger(DriverExecutor.class).debug("CommitSyncConnection - masterConn '{}' finished.", 1858 masterConnHashCode); 1859 } 1860 } 1861 } 1862 1863 /** 1864 * For database replication 1865 * */ 1866 protected static void rollbackSyncConnection(Connection masterConn) throws SQLException { 1867 if (masterConn instanceof DriverConnection) { 1868 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1869 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1870 1871 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1872 if (deList != null) { 1873 for (DriverExecutor de : deList) { 1874 try { 1875 de.rollback(); 1876 } catch (SQLException e) { 1877 LoggerFactory.getLogger(DriverExecutor.class) 1878 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1879 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1880 boolean connCheck = true; 1881 if (ddscp != null) { 1882 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1883 } 1884 if (connCheck) 1885 throw e; 1886 } 1887 } 1888 LoggerFactory.getLogger(DriverExecutor.class) 1889 .debug("RollbackSyncConnection - masterConn '{}' finished.", masterConnHashCode); 1890 } 1891 } 1892 } 1893 1894 /** 1895 * For database replication 1896 * */ 1897 protected static void abortSyncConnection(Connection masterConn) throws SQLException { 1898 if (masterConn instanceof DriverConnection) { 1899 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1900 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1901 1902 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1903 if (deList != null) { 1904 for (DriverExecutor de : deList) { 1905 try { 1906 de.abort(); 1907 } catch (SQLException e) { 1908 LoggerFactory.getLogger(DriverExecutor.class) 1909 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1910 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1911 boolean connCheck = true; 1912 if (ddscp != null) { 1913 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1914 } 1915 if (connCheck) 1916 throw e; 1917 } 1918 } 1919 LoggerFactory.getLogger(DriverExecutor.class).debug("AbortSyncConnection - masterConn '{}' finished.", 1920 masterConnHashCode); 1921 } 1922 } 1923 } 1924 1925}