001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.datasource; 025 026import java.sql.Connection; 027import java.sql.PreparedStatement; 028import java.util.List; 029import java.sql.SQLException; 030import java.sql.Types; 031import java.util.Map; 032import java.sql.ResultSet; 033import java.sql.ResultSetMetaData; 034import java.util.HashMap; 035import java.util.ArrayList; 036import java.util.regex.Pattern; 037import java.util.regex.Matcher; 038import java.util.Arrays; 039import com.killcoding.tool.ResultMap; 040import java.sql.DatabaseMetaData; 041import com.killcoding.log.LoggerFactory; 042import com.killcoding.log.Logger; 043import java.util.concurrent.Executors; 044import com.killcoding.datasource.DriverConnection; 045import com.killcoding.datasource.DriverDataSource; 046import java.util.concurrent.ConcurrentHashMap; 047import com.killcoding.tool.CommonTools; 048import com.killcoding.tool.ConfigProperties; 049import java.io.File; 050import java.text.DateFormat; 051import java.text.SimpleDateFormat; 052import com.killcoding.tool.FileTools; 053import java.io.IOException; 054import java.nio.file.Files; 055import java.util.Date; 056import java.nio.file.Paths; 057import java.nio.file.Path; 058import java.nio.file.StandardCopyOption; 059import com.killcoding.cache.CacheArray; 060import com.killcoding.cache.CacheArrayFilter; 061import com.killcoding.tool.CodeEscape; 062import java.sql.Blob; 063import java.util.stream.Collectors; 064import java.net.URI; 065import java.util.Comparator; 066import java.sql.Timestamp; 067import java.sql.Clob; 068import java.io.InputStream; 069import java.sql.CallableStatement; 070 071/** 072 * This class is execute sql base class. 073 * Support database replication. 074 * Support database multi-activity. 075 * Support database CRUD 076 * */ 077public class DriverExecutor { 078 079 protected final static Map<Integer, List<DriverExecutor>> SYNC_EXECUTOR_MARK = new ConcurrentHashMap<Integer, List<DriverExecutor>>(); 080 private final static Map<Integer, Long> SYNC_CONN_ERROR_TIME = new ConcurrentHashMap<Integer, Long>(); 081 082 private final static Map<String,String> SQL_LOG_MSG_MAPPING = new ConcurrentHashMap<String,String>(); 083 private final static Map<String,Boolean> SQL_LOG_OVERSPEND_MAPPING = new ConcurrentHashMap<String,Boolean>(); 084 085 protected Logger log = null; 086 087 public final static String COLUMN_NAME_CASE_UPPER = "UPPER"; 088 public final static String COLUMN_NAME_CASE_LOWER = "LOWER"; 089 public final static String COLUMN_NAME_CASE_ORIGINAL = "ORIGINAL"; 090 public static String COLUMN_NAME_CASE_MODE = COLUMN_NAME_CASE_ORIGINAL; 091 092 protected boolean closed = true; 093 protected Connection connection = null; 094 private static CacheArray sqlLogCacheArray = null; 095 096 /** 097 * New a DriverExecutor object 098 * @param connection - JDBC connection 099 * */ 100 public DriverExecutor(Connection connection) { 101 super(); 102 log = LoggerFactory.getLogger(this.getClass()); 103 this.connection = connection; 104 writeSqlLog("open", 0, "open", ""); 105 closed = (this.connection == null); 106 } 107 108 /** 109 * Get current Connection 110 * @return Connection 111 * */ 112 public Connection getConnection() { 113 return connection; 114 } 115 116 /** 117 * Get column classes by table name 118 * @exception SQLException 119 * @return Map<String,Object> - column and java type class mapping 120 * @param tableName - table name 121 * */ 122 public Map<String, Object> getColumnClasses(String tableName) throws SQLException { 123 Map<String, Object> types = null; 124 ResultSet result = null; 125 PreparedStatement statement = null; 126 String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName); 127 statement = connection.prepareStatement(sql); 128 result = statement.executeQuery(); 129 final ResultSetMetaData rsmd = result.getMetaData(); 130 for (int i = 0; i < rsmd.getColumnCount(); i++) { 131 if (types == null) { 132 types = new ResultMap<String, Object>(); 133 } 134 String cn = rsmd.getColumnClassName(i + 1); 135 types.put(converCase(rsmd.getColumnLabel(i + 1)), cn); 136 } 137 return types; 138 } 139 140 /** 141 * Get column db data types by table name 142 * @exception SQLException 143 * @return Map<String, Object> - column and db data type mapping 144 * @param tableName - Table name 145 * */ 146 public Map<String, Object> getColumnTypes(String tableName) throws SQLException { 147 ResultSet result = null; 148 PreparedStatement statement = null; 149 String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName); 150 Map<String, Object> types = null; 151 statement = connection.prepareStatement(sql); 152 result = statement.executeQuery(); 153 final ResultSetMetaData rsmd = result.getMetaData(); 154 for (int i = 0; i < rsmd.getColumnCount(); i++) { 155 if (types == null) { 156 types = new ResultMap<String, Object>(); 157 } 158 types.put(converCase(rsmd.getColumnLabel(i + 1)), rsmd.getColumnTypeName(i + 1)); 159 } 160 return types; 161 } 162 163 /** 164 * Show column data type and java type mapping 165 * @exception SQLException 166 * @return List<Map<String, Object>> - Mapping list 167 * @param tableName - Table name 168 * */ 169 public List<Map<String, Object>> desc(String tableName) throws SQLException { 170 List<String> primaryKeys = getPrimaryKeys(tableName); 171 List<Map<String, Object>> results = new ArrayList<Map<String, Object>>(); 172 ResultSet result = null; 173 PreparedStatement statement = null; 174 String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName); 175 statement = connection.prepareStatement(sql); 176 result = statement.executeQuery(); 177 final ResultSetMetaData rsmd = result.getMetaData(); 178 for (int i = 0; i < rsmd.getColumnCount(); i++) { 179 Map<String, Object> types = new ResultMap<String, Object>(); 180 int ci = i + 1; 181 boolean nullable = rsmd.isNullable(ci) == 1; 182 String name = converCase(rsmd.getColumnLabel(ci)); 183 String isPk = "UNKNOWN"; 184 if (primaryKeys != null) { 185 isPk = primaryKeys.contains(name) ? "Y" : "N"; 186 } 187 types.put("NAME", name); 188 types.put("PRIMARY_KEY", isPk); 189 types.put("DATA_TYPE", rsmd.getColumnTypeName(ci)); 190 types.put("JAVA_TYPE", rsmd.getColumnClassName(ci)); 191 types.put("PRECISION", rsmd.getPrecision(ci)); 192 types.put("ALLOW_NULLABLE", nullable ? "Y" : 'N'); 193 results.add(types); 194 } 195 return results; 196 } 197 198 /** 199 * Get primary Keys by table name 200 * @return List<String> - Primary Keys 201 * @param _tableName - Table name 202 * */ 203 public List<String> getPrimaryKeys(String _tableName) { 204 try { 205 List<String> pks = new ArrayList<String>(); 206 DatabaseMetaData meta = connection.getMetaData(); 207 ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" }); 208 while (tables.next()) { 209 String catalog = tables.getString("TABLE_CAT"); 210 String schema = tables.getString("TABLE_SCHEM"); 211 String tableName = tables.getString("TABLE_NAME"); 212 if (tableName.equalsIgnoreCase(_tableName)) { 213 ResultSet primaryKeys = meta.getPrimaryKeys(catalog, schema, tableName); 214 while (primaryKeys.next()) { 215 pks.add(primaryKeys.getString("COLUMN_NAME")); 216 } 217 break; 218 } 219 } 220 return pks; 221 } catch (Exception e) { 222 log.warn(e); 223 return null; 224 } 225 } 226 227 /** 228 * Get all tables 229 * @return List<Map<String, Object>> 230 * @exception SQLException 231 * */ 232 public List<Map<String, Object>> getAllTables() throws SQLException { 233 List<Map<String, Object>> tablesList = new ArrayList<Map<String, Object>>(); 234 DatabaseMetaData meta = connection.getMetaData(); 235 ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" }); 236 while (tables.next()) { 237 String catalog = tables.getString("TABLE_CAT"); 238 String schema = tables.getString("TABLE_SCHEM"); 239 String tableName = tables.getString("TABLE_NAME"); 240 Map<String, Object> t = new ResultMap<String, Object>(); 241 t.put("TABLE_SCHEMA", schema); 242 t.put("TABLE_NAME", tableName); 243 tablesList.add(t); 244 } 245 return tablesList; 246 } 247 248 /** 249 * Get all tables by schema 250 * @return List<Map<String, Object>> 251 * @exception SQLException 252 * */ 253 public List<Map<String, Object>> getAllTables(String _schema) throws SQLException { 254 List<Map<String, Object>> tablesList = new ArrayList<Map<String, Object>>(); 255 DatabaseMetaData meta = connection.getMetaData(); 256 ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" }); 257 while (tables.next()) { 258 String schema = tables.getString("TABLE_SCHEM"); 259 if (schema != null && schema.equalsIgnoreCase(_schema)) { 260 String catalog = tables.getString("TABLE_CAT"); 261 String tableName = tables.getString("TABLE_NAME"); 262 Map<String, Object> t = new ResultMap<String, Object>(); 263 t.put("TABLE_SCHEMA", schema); 264 t.put("TABLE_NAME", tableName); 265 tablesList.add(t); 266 } 267 } 268 return tablesList; 269 } 270 271 /** 272 * Query first record 273 * @exception SQLException 274 * @return Map<String,Object> - First result 275 * @param sql - Condition use format ':column_name' 276 * @param params 277 * */ 278 public Map<String, Object> first(String sql, Map<String, Object> params) throws SQLException { 279 List<Map<String, Object>> list = find(0, 1, sql, params); 280 if (list.size() > 0) { 281 return list.get(0); 282 } 283 return null; 284 } 285 286 /** 287 * Query first record 288 * @exception SQLException 289 * @return Map<String,Object> - First result 290 * @param sql 291 * */ 292 public Map<String, Object> first(String sql) throws SQLException { 293 return first(sql, Arrays.asList(new Object[] {})); 294 } 295 296 /** 297 * Query first record 298 * @exception SQLException 299 * @return Map<String,Object> - First result 300 * @param sql - Condition use format '?' 301 * @param params 302 * */ 303 public Map<String, Object> first(String sql, List<Object> params) throws SQLException { 304 List<Map<String, Object>> list = find(0, 1, sql, params); 305 if (list.size() > 0) { 306 return list.get(0); 307 } 308 return null; 309 } 310 311 /** 312 * Query all matched records 313 * @exception SQLException 314 * @return List<Map<String, Object>> 315 * @param sql 316 * @param params 317 * */ 318 public List<Map<String, Object>> find(String sql) throws SQLException { 319 return find(0, 0, sql, Arrays.asList(new Object[] {})); 320 } 321 322 /** 323 * Query all matched records 324 * @exception SQLException 325 * @return List<Map<String, Object>> 326 * @param cursorStart - JDBC result Cursor start index 327 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 328 * @param sql 329 * @param params 330 * */ 331 public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql) throws SQLException { 332 return find(cursorStart, maxRows, sql, Arrays.asList(new Object[] {})); 333 } 334 335 /** 336 * Query all matched records 337 * @exception SQLException 338 * @return List<Map<String, Object>> 339 * @param sql - Condition use format ':column_name' 340 * @param params 341 * */ 342 public List<Map<String, Object>> find(String sql, Map<String, Object> params) throws SQLException { 343 return find(0, 0, sql, params); 344 } 345 346 /** 347 * Query all matched records 348 * @exception SQLException 349 * @return List<Map<String, Object>> 350 * @param cursorStart - JDBC result Cursor start index 351 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 352 * @param sql - Condition use format ':column_name' 353 * @param params 354 * */ 355 public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql, Map<String, Object> params) 356 throws SQLException { 357 String csql = converSql(sql); 358 List<Object> cparams = converParams(sql, params); 359 return find(cursorStart, maxRows, csql, cparams); 360 } 361 362 /** 363 * Query all matched records 364 * @exception SQLException 365 * @return List<Map<String, Object>> 366 * @param cursorStart - JDBC result Cursor start index 367 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 368 * @param sql - Condition use format '?' 369 * @param params 370 * */ 371 public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql, List<Object> params) 372 throws SQLException { 373 long begin = System.currentTimeMillis(); 374 boolean allowedLog = writeSqlLog("find", begin, 375 String.format("%s [cursorStart=%s,maxRows=%s]", sql, cursorStart, maxRows), params); 376 377 PreparedStatement statement = null; 378 Map<String, Object> row = null; 379 ResultSet result = null; 380 final List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>(); 381 try { 382 // ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE 383 // ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY 384 statement = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); 385 if (params != null) { 386 int size = params.size(); 387 for (int i = 0; i < size; i++) { 388 int ci = i + 1; 389 Object param = params.get(i); 390 if (param == null) { 391 statement.setNull(ci, Types.VARCHAR); 392 } else { 393 statement.setObject(ci, param); 394 } 395 } 396 } 397 if (maxRows > 0) { 398 statement.setMaxRows(maxRows); 399 } 400 result = statement.executeQuery(); 401 result.absolute(cursorStart); 402 final ResultSetMetaData rsmd = result.getMetaData(); 403 final int c = rsmd.getColumnCount(); 404 while (result.next()) { 405 row = new ResultMap<String, Object>(); 406 for (int i = 0; i < c; i++) { 407 int ci = i + 1; 408 Object value = null; 409 Object originValue = result.getObject(ci); 410 if (originValue == null) { 411 value = originValue; 412 } else if (originValue instanceof Blob) { 413 Blob blobValue = (Blob) originValue; 414 InputStream is = null; 415 try { 416 is = blobValue.getBinaryStream(); 417 if(is != null) value = is.readAllBytes(); 418 } catch (IOException e) { 419 throw new SQLException(e.getMessage(), e); 420 } finally { 421 if (blobValue != null) { 422 try { 423 blobValue.free(); 424 } catch (SQLException e) { 425 throw e; 426 } 427 } 428 if (is != null) { 429 try { 430 is.close(); 431 } catch (IOException e) { 432 throw new SQLException(e.getMessage(), e); 433 } 434 } 435 } 436 } else if (originValue instanceof Clob) { 437 Clob clobValue = (Clob) originValue; 438 InputStream is = null; 439 try { 440 is = clobValue.getAsciiStream(); 441 if(is != null) value = is.readAllBytes(); 442 } catch (IOException e) { 443 throw new SQLException(e.getMessage(), e); 444 } finally { 445 if (clobValue != null) { 446 try { 447 clobValue.free(); 448 } catch (SQLException e) { 449 throw e; 450 } 451 } 452 if (is != null) { 453 try { 454 is.close(); 455 } catch (IOException e) { 456 throw new SQLException(e.getMessage(), e); 457 } 458 } 459 } 460 } else { 461 value = originValue; 462 } 463 row.put(converCase(rsmd.getColumnLabel(ci)), value); 464 } 465 rows.add(row); 466 } 467 if (allowedLog) { 468 writeSqlLog("rows", begin, "rows", rows.size()); 469 long spend = System.currentTimeMillis() - begin; 470 writeSqlLog("spend", begin, "spend", spend); 471 } 472 return rows; 473 } catch (SQLException e) { 474 if (allowedLog) 475 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 476 477 throw e; 478 } finally { 479 if (result != null) 480 result.close(); 481 482 if (statement != null) 483 statement.close(); 484 } 485 } 486 487 /** 488 * Execute stored proc (and return boolean) 489 * @param sql - Query sql 490 * @exception SQLException 491 * @return boolean 492 * */ 493 public boolean callAndReturnBoolean(String sql) throws SQLException { 494 return callAndReturnBoolean(sql, Arrays.asList(new Object[] {})); 495 } 496 497 /** 498 * Execute stored proc (and return boolean) 499 * @param sql - Query sql, condition use format ':column_name' 500 * @param params 501 * @exception SQLException 502 * @return boolean 503 * */ 504 public boolean callAndReturnBoolean(String sql, Map<String, Object> params) throws SQLException { 505 String csql = converSql(sql); 506 List<Object> cparams = converParams(sql, params); 507 return callAndReturnBoolean(sql, params); 508 } 509 510 /** 511 * Execute stored proc (and return boolean) 512 * @param sql - Query sql,condition use format '?' 513 * @param params 514 * @exception SQLException 515 * @return boolean 516 * */ 517 public boolean callAndReturnBoolean(String sql, List<Object> params) throws SQLException { 518 519 if (!checkSqlAvailable(sql)) 520 return false; 521 522 long begin = System.currentTimeMillis(); 523 boolean allowedLog = writeSqlLog("call", begin, sql, params); 524 525 CallableStatement statement = null; 526 try { 527 statement = connection.prepareCall("{" + sql + "}"); 528 if (params != null) { 529 int size = params.size(); 530 for (int i = 0; i < size; i++) { 531 Object param = params.get(i); 532 if (param == null) { 533 statement.setNull(i + 1, Types.VARCHAR); 534 } else { 535 statement.setObject(i + 1, param); 536 } 537 } 538 } 539 540 statement.execute(); 541 542 boolean returnResult = true; //true is not exception 543 544 if (allowedLog) { 545 writeSqlLog("return", begin, "return", returnResult); 546 long spend = System.currentTimeMillis() - begin; 547 writeSqlLog("spend", begin, "spend", spend); 548 } 549 550 callAndReturnBooleanSync(connection, begin, sql, params, returnResult); 551 552 return returnResult; 553 } catch (SQLException e) { 554 if (allowedLog) 555 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 556 557 throw e; 558 } finally { 559 if (statement != null) 560 statement.close(); 561 } 562 } 563 564 /** 565 * Execute stored proc (and return rows) 566 * @param sql - Query sql 567 * @exception SQLException 568 * @return int 569 * */ 570 public int callAndReturnRows(String sql) throws SQLException { 571 return callAndReturnRows(sql, Arrays.asList(new Object[] {})); 572 } 573 574 /** 575 * Execute stored proc (and return rows) 576 * @param sql - Query sql,condition use format ':column_name' 577 * @param params 578 * @exception SQLException 579 * @return int 580 * */ 581 public int callAndReturnRows(String sql, Map<String, Object> params) throws SQLException { 582 String csql = converSql(sql); 583 List<Object> cparams = converParams(sql, params); 584 return callAndReturnRows(sql, params); 585 } 586 587 public int callAndReturnRows(String sql, List<Object> params) throws SQLException { 588 return (int)callAndReturnRows(Types.INTEGER,sql,params); 589 } 590 591 /** 592 * Execute stored proc (and return rows) 593 * @param sqltypes - e.g Types.INTEGER 594 * @param sql - Query sql,condition use format '?' 595 * @param params 596 * @exception SQLException 597 * @return Object 598 * */ 599 public Object callAndReturnRows(int sqlTypes,String sql, List<Object> params) throws SQLException { 600 601 if (!checkSqlAvailable(sql)) 602 return -1; 603 604 long begin = System.currentTimeMillis(); 605 boolean allowedLog = writeSqlLog("call", begin, sql, params); 606 607 CallableStatement statement = null; 608 try { 609 statement = connection.prepareCall("{?=" + sql + "}"); 610 statement.registerOutParameter(1,sqlTypes); 611 if (params != null) { 612 int size = params.size(); 613 for (int i = 0; i < size; i++) { 614 Object param = params.get(i); 615 if (param == null) { 616 statement.setNull(i + 1, Types.VARCHAR); 617 } else { 618 statement.setObject(i + 1, param); 619 } 620 } 621 } 622 boolean hasResult = statement.execute(); 623 624 Object row = statement.getObject(1); 625 626 if (allowedLog) { 627 writeSqlLog("return", begin, "return", row); 628 long spend = System.currentTimeMillis() - begin; 629 writeSqlLog("spend", begin, "spend", spend); 630 } 631 632 callAndReturnRowsSync(connection, begin, sql, params, row); 633 634 return row; 635 } catch (SQLException e) { 636 if (allowedLog) 637 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 638 639 throw e; 640 } finally { 641 if (statement != null) 642 statement.close(); 643 } 644 } 645 646 /** 647 * Execute stored proc (and return List) 648 * @param sql - Query sql 649 * @exception SQLException 650 * @return List<Map<String, Object>> 651 * */ 652 public List<Map<String, Object>> callAndReturnList(String sql) throws SQLException { 653 return callAndReturnList(0, 0, sql, Arrays.asList(new Object[] {})); 654 } 655 656 /** 657 * Execute stored proc (and return List) 658 * @param cursorStart - JDBC result Cursor start index 659 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 660 * @param sql - Query sql 661 * @exception SQLException 662 * @return List<Map<String, Object>> 663 * */ 664 public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql) throws SQLException { 665 return callAndReturnList(cursorStart, maxRows, sql, Arrays.asList(new Object[] {})); 666 } 667 668 /** 669 * Execute stored proc (and return List) 670 * @param cursorStart - JDBC result Cursor start index 671 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 672 * @param sql - Query sql, condition use format ':column_name' 673 * @exception SQLException 674 * @return List<Map<String, Object>> 675 * */ 676 public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql, 677 Map<String, Object> params) throws SQLException { 678 String csql = converSql(sql); 679 List<Object> cparams = converParams(sql, params); 680 return callAndReturnList(cursorStart, maxRows, sql, params); 681 } 682 683 /** 684 * Execute stored proc (and return List) 685 * @param cursorStart - JDBC result Cursor start index 686 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 687 * @param sql - Query sql, condition use format '?' 688 * @exception SQLException 689 * @return List<Map<String, Object>> 690 * */ 691 public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql, List<Object> params) 692 throws SQLException { 693 694 if (!checkSqlAvailable(sql)) 695 return null; 696 697 long begin = System.currentTimeMillis(); 698 boolean allowedLog = writeSqlLog("call", begin, 699 String.format("%s [cursorStart=%s,maxRows=%s]", sql, cursorStart, maxRows), params); 700 701 CallableStatement statement = null; 702 Map<String, Object> row = null; 703 ResultSet result = null; 704 final List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>(); 705 try { 706 statement = connection.prepareCall("{" + sql + "}"); 707 if (params != null) { 708 int size = params.size(); 709 for (int i = 0; i < size; i++) { 710 int ci = i + 1; 711 Object param = params.get(i); 712 if (param == null) { 713 statement.setNull(ci, Types.VARCHAR); 714 } else { 715 statement.setObject(ci, param); 716 } 717 } 718 } 719 if (maxRows > 0) { 720 statement.setMaxRows(maxRows); 721 } 722 result = statement.executeQuery(); 723 final ResultSetMetaData rsmd = result.getMetaData(); 724 final int c = rsmd.getColumnCount(); 725 int rowIndex = 0; 726 while (result.next()) { 727 if (rowIndex >= cursorStart) { 728 row = new ResultMap<String, Object>(); 729 for (int i = 0; i < c; i++) { 730 int ci = i + 1; 731 Object value = null; 732 Object originValue = result.getObject(ci); 733 if (originValue == null) { 734 value = originValue; 735 } else if (originValue instanceof Blob) { 736 Blob blobValue = (Blob) originValue; 737 InputStream is = null; 738 try { 739 is = blobValue.getBinaryStream(); 740 if(is != null) value = is.readAllBytes(); 741 } catch (IOException e) { 742 throw new SQLException(e.getMessage(), e); 743 } finally { 744 if (blobValue != null) { 745 try { 746 blobValue.free(); 747 } catch (SQLException e) { 748 throw e; 749 } 750 } 751 if (is != null) { 752 try { 753 is.close(); 754 } catch (IOException e) { 755 throw new SQLException(e.getMessage(), e); 756 } 757 } 758 } 759 } else if (originValue instanceof Clob) { 760 Clob clobValue = (Clob) originValue; 761 InputStream is = null; 762 try { 763 is = clobValue.getAsciiStream(); 764 if(is != null) value = is.readAllBytes(); 765 } catch (IOException e) { 766 throw new SQLException(e.getMessage(), e); 767 } finally { 768 if (clobValue != null) { 769 try { 770 clobValue.free(); 771 } catch (SQLException e) { 772 throw e; 773 } 774 } 775 if (is != null) { 776 try { 777 is.close(); 778 } catch (IOException e) { 779 throw new SQLException(e.getMessage(), e); 780 } 781 } 782 } 783 } else { 784 value = originValue; 785 } 786 row.put(converCase(rsmd.getColumnLabel(ci)), value); 787 } 788 rows.add(row); 789 } 790 rowIndex++; 791 } 792 793 if (allowedLog) { 794 writeSqlLog("rows", begin, "rows", rows); 795 long spend = System.currentTimeMillis() - begin; 796 writeSqlLog("spend", begin, "spend", spend); 797 } 798 799 callAndReturnListSync(connection, cursorStart, maxRows, sql, params); 800 801 return rows; 802 } catch (SQLException e) { 803 if (allowedLog) 804 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 805 throw e; 806 } finally { 807 if (result != null) 808 result.close(); 809 810 if (statement != null) 811 statement.close(); 812 } 813 } 814 815 /** 816 * Execute sql 817 * @exception SQLException 818 * @param sql 819 * @return int 820 * */ 821 public int execute(String sql) throws SQLException { 822 return execute(sql, Arrays.asList(new Object[] {})); 823 } 824 825 /** 826 * Execute sql 827 * @exception SQLException 828 * @param sql - Condition use format ':column_name' 829 * @param params 830 * @return int 831 * */ 832 public int execute(String sql, Map<String, Object> params) throws SQLException { 833 String csql = converSql(sql); 834 List<Object> cparams = converParams(sql, params); 835 return execute(csql, cparams); 836 } 837 838 /** 839 * Execute sql 840 * @exception SQLException 841 * @param sql - Condition use format '?' 842 * @param params 843 * @return int 844 * */ 845 public int execute(String sql, List<Object> params) throws SQLException { 846 if (!checkSqlAvailable(sql)) 847 return -1; 848 849 long begin = System.currentTimeMillis(); 850 boolean allowedLog = writeSqlLog("execute", begin, sql, params); 851 852 PreparedStatement statement = null; 853 try { 854 statement = connection.prepareStatement(sql); 855 if (params != null) { 856 int size = params.size(); 857 for (int i = 0; i < size; i++) { 858 Object param = params.get(i); 859 if (param == null) { 860 statement.setNull(i + 1, Types.VARCHAR); 861 } else { 862 statement.setObject(i + 1, param); 863 } 864 } 865 } 866 int row = statement.executeUpdate(); 867 868 if (allowedLog) { 869 writeSqlLog("return", begin, "return", row); 870 long spend = System.currentTimeMillis() - begin; 871 writeSqlLog("spend", begin, "spend", spend); 872 } 873 874 executeSync(connection, begin, sql, params, row); 875 876 return row; 877 } catch (SQLException e) { 878 if (allowedLog) 879 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 880 throw e; 881 } finally { 882 if (statement != null) 883 statement.close(); 884 } 885 } 886 887 /** 888 * Execute batch 889 * @exception SQLException 890 * @param sql - Condition use format ':column_name' 891 * @param records 892 * @return int - Return rows 893 * */ 894 public int executeBatch(String sql, List<Map<String, Object>> records) throws SQLException { 895 String csql = converSql(sql); 896 List<List<Object>> crecords = new ArrayList<List<Object>>(); 897 for (Map<String, Object> record : records) { 898 List<Object> crecord = converParams(sql, record); 899 crecords.add(crecord); 900 } 901 return executeBatchList(csql, crecords); 902 } 903 904 /** 905 * Execute batch 906 * @exception SQLException 907 * @param sql - Condition use format '?' 908 * @param records 909 * @return int - Return rows 910 * */ 911 public int executeBatchList(String sql, List<List<Object>> records) throws SQLException { 912 boolean allowedLog = false; 913 if (!checkSqlAvailable(sql)) 914 return -1; 915 916 long begin = System.currentTimeMillis(); 917 if (records != null) { 918 allowedLog = writeSqlLog("batch", begin, sql, String.format("[batchSize=%s]", records.size())); 919 } 920 921 boolean first = true; 922 PreparedStatement statement = null; 923 try { 924 for (List<Object> params : records) { 925 if (first) { 926 statement = connection.prepareStatement(sql); 927 first = false; 928 } 929 930 int size = params.size(); 931 for (int i = 0; i < size; i++) { 932 Object param = params.get(i); 933 if (param == null) { 934 statement.setNull(i + 1, Types.VARCHAR); 935 } else { 936 statement.setObject(i + 1, param); 937 } 938 } 939 940 statement.addBatch(); 941 } 942 943 int sumRow = 0; 944 if (records.size() > 0) { 945 int[] rows = statement.executeBatch(); 946 947 for (int r : rows) { 948 sumRow += r; 949 } 950 951 if (allowedLog) { 952 long spend = System.currentTimeMillis() - begin; 953 writeSqlLog("return", begin, "return", sumRow); 954 writeSqlLog("spend", begin, "spend", spend); 955 } 956 957 executeBatchListSync(connection, begin, sql, records, sumRow); 958 } 959 960 return sumRow; 961 } catch (SQLException e) { 962 if (allowedLog) 963 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 964 965 throw e; 966 } finally { 967 if (statement != null) 968 statement.close(); 969 } 970 } 971 972 /** 973 * Check connection is closed 974 * @return boolean 975 * */ 976 public boolean isClosed() { 977 try { 978 if(closed) return closed; 979 980 return (connection == null || connection.isClosed()); 981 } catch (Exception e) { 982 log.warn(e); 983 return true; 984 } 985 } 986 987 /** 988 * Abort connection 989 * @exception SQLException 990 * */ 991 public void abort() throws SQLException { 992 writeSqlLog("aborting", 0, "aborting", ""); 993 if (connection != null) { 994 try { 995 if (connection instanceof DriverConnection) { 996 abortSyncConnection(connection); 997 connection.abort(null); 998 } else { 999 connection.abort(Executors.newFixedThreadPool(1)); 1000 } 1001 closed = true; 1002 } catch (SQLException e) { 1003 writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1004 throw e; 1005 } 1006 } 1007 writeSqlLog("aborted", 0, "aborted", ""); 1008 } 1009 1010 /** 1011 * Close connection 1012 * @exception SQLException 1013 * */ 1014 public void close() throws SQLException { 1015 writeSqlLog("closing", 0, "closing", ""); 1016 if (connection != null) { 1017 try { 1018 closeSyncConnection(connection); 1019 connection.close(); 1020 closed = true; 1021 } catch (SQLException e) { 1022 writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1023 throw e; 1024 } 1025 } 1026 writeSqlLog("closed", 0, "closed", ""); 1027 } 1028 1029 /** 1030 * Commit connection 1031 * @exception SQLException 1032 * */ 1033 public void commit() throws SQLException { 1034 writeSqlLog("committing", 0, "committing", ""); 1035 if (connection != null) { 1036 try { 1037 commitSyncConnection(connection); 1038 connection.commit(); 1039 } catch (SQLException e) { 1040 writeSqlLog("error", 0, "", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1041 throw e; 1042 } 1043 } 1044 writeSqlLog("committed", 0, "committed", ""); 1045 } 1046 1047 /** 1048 * Rollback connection 1049 * @exception SQLException 1050 * */ 1051 public void rollback() throws SQLException { 1052 writeSqlLog("rollbacking", 0, "rollbacking", ""); 1053 if (connection != null) { 1054 try { 1055 rollbackSyncConnection(connection); 1056 connection.rollback(); 1057 } catch (SQLException e) { 1058 writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1059 throw e; 1060 } 1061 } 1062 writeSqlLog("rollbacked", 0, "rollbacked", ""); 1063 } 1064 1065 /** 1066 * Get origin connection hash code 1067 * @return Integer - hash code 1068 * */ 1069 private Integer getOriginConnectionHashCode() { 1070 if (connection instanceof DriverConnection) { 1071 return ((DriverConnection) connection).getOriginConnectionHashCode(); 1072 } else { 1073 return connection.hashCode(); 1074 } 1075 } 1076 1077 /** 1078 * Get data source thread name 1079 * @return String - DataSource thread name 1080 * */ 1081 private String getDataSourceName() { 1082 if (connection instanceof DriverConnection) { 1083 return ((DriverConnection) connection).getDriverDataSource().getName(); 1084 } else { 1085 return null; 1086 } 1087 } 1088 1089 /** 1090 * Conver sql from ':column_name' to '?' 1091 * @return String 1092 * */ 1093 protected String converSql(String sql) { 1094 return sql.replaceAll(":[a-zA-Z0-9_]+", "?"); 1095 } 1096 1097 /** 1098 * Conver param from map to list 1099 * @param sql 1100 * @param map 1101 * @return List<Object> 1102 * */ 1103 protected List<Object> converParams(String sql, Map<String, Object> map) { 1104 List<String> paramKeys = new ArrayList<String>(); 1105 Pattern pattern = Pattern.compile(":[a-zA-Z0-9_]+"); 1106 Matcher matcher = pattern.matcher(sql); 1107 while (matcher.find()) { 1108 String key = matcher.group().replaceFirst(":", ""); 1109 paramKeys.add(key); 1110 } 1111 1112 List<Object> params = new ArrayList<Object>(); 1113 for (String pk : paramKeys) { 1114 Object pv = map.get(pk); 1115 if (pv == null) { 1116 pv = map.get(pk.toLowerCase()); 1117 } 1118 if (pv == null) { 1119 pv = map.get(pk.toUpperCase()); 1120 } 1121 if (pv instanceof java.util.Date) { 1122 java.util.Date utilDate = (java.util.Date) pv; 1123 java.sql.Timestamp sqlDate = new java.sql.Timestamp(utilDate.getTime()); 1124 params.add(sqlDate); 1125 } else { 1126 params.add(pv); 1127 } 1128 } 1129 1130 return params; 1131 } 1132 1133 /** 1134 * Covner to upper or lower 1135 * @param s - Column name or table name 1136 * @return String 1137 * */ 1138 protected String converCase(String s) { 1139 if (COLUMN_NAME_CASE_MODE.equals(COLUMN_NAME_CASE_UPPER)) { 1140 return s.toUpperCase(); 1141 } else if (COLUMN_NAME_CASE_MODE.equals(COLUMN_NAME_CASE_LOWER)) { 1142 return s.toLowerCase(); 1143 } else { 1144 return s; 1145 } 1146 } 1147 1148 /** 1149 * Check available sql to write to log 1150 * @return boolean 1151 * @param connection - Connection 1152 * @param sql 1153 * */ 1154 protected static boolean checkSqlLogAvailable(Connection connection, String sql) { 1155 if (connection instanceof DriverConnection) { 1156 DriverConnection dc = (DriverConnection) connection; 1157 DriverDataSource dds = dc.getDriverDataSource(); 1158 ConfigProperties configProperties = dds.getConfigProperties(); 1159 List<String> sqlAllowed = configProperties.getArray("SqlLogAllowed"); 1160 List<String> sqlIgnored = configProperties.getArray("SqlLogIgnored"); 1161 return checkSqlAvailable(sql, sqlAllowed, sqlIgnored); 1162 } 1163 return false; 1164 } 1165 1166 /** 1167 * Check available sql to execute 1168 * @return boolean 1169 * @param sql 1170 * */ 1171 protected boolean checkSqlAvailable(String sql) { 1172 return checkSqlAvailable(connection, sql); 1173 } 1174 1175 /** 1176 * Check available sql to execute 1177 * @return boolean 1178 * @param connection 1179 * @param sql 1180 * */ 1181 protected static boolean checkSqlAvailable(Connection connection, String sql) { 1182 if (connection instanceof DriverConnection) { 1183 DriverConnection dc = (DriverConnection) connection; 1184 DriverDataSource dds = dc.getDriverDataSource(); 1185 ConfigProperties configProperties = dds.getConfigProperties(); 1186 List<String> sqlAllowed = configProperties.getArray("SqlExecuteAllowed"); 1187 List<String> sqlIgnored = configProperties.getArray("SqlExecuteIgnored"); 1188 return checkSqlAvailable(sql, sqlAllowed, sqlIgnored); 1189 } 1190 return true; 1191 } 1192 1193 /** 1194 * Check available sql to execute 1195 * @return boolean 1196 * @param sql 1197 * @param sqlAllowed - From DataSources.properties 1198 * @param sqlIgnored - From DataSources.properties 1199 * */ 1200 private static boolean checkSqlAvailable(String sql, List<String> sqlAllowed, List<String> sqlIgnored) { 1201 boolean matchedAllowed = true; 1202 boolean matchedIgnored = false; 1203 if (sqlAllowed != null) { 1204 for (String regex : sqlAllowed) { 1205 if (!CommonTools.isBlank(regex)) { 1206 matchedAllowed = sql.matches(regex); 1207 1208 if (matchedAllowed) 1209 break; 1210 } 1211 } 1212 } 1213 if (matchedAllowed && sqlIgnored != null) { 1214 for (String regex : sqlIgnored) { 1215 if (!CommonTools.isBlank(regex)) { 1216 matchedIgnored = sql.matches(regex); 1217 1218 if (matchedIgnored) 1219 break; 1220 } 1221 } 1222 } 1223 1224 boolean b = matchedAllowed && !matchedIgnored; 1225 if (!b) { 1226 LoggerFactory.getLogger(DriverExecutor.class).debug("Not available - '{}'", sql); 1227 } 1228 return b; 1229 } 1230 1231 /** 1232 * Write sql log 1233 * @return boolean 1234 * @param type 1235 * @param seq 1236 * @param sql 1237 * @param params 1238 * */ 1239 protected synchronized boolean writeSqlLog(String type, long seq, String sql, Object params) { 1240 return writeSqlLog(this.hashCode(),connection, type, seq, sql, params); 1241 } 1242 1243 /** 1244 * Write sql log 1245 * @return boolean 1246 * @param connection 1247 * @param type 1248 * @param seq 1249 * @param sql 1250 * @param params 1251 * */ 1252 protected synchronized static boolean writeSqlLog(int deHashCode,Connection connection, String type, long seq, String sql, 1253 Object params) { 1254 if (connection instanceof DriverConnection) { 1255 if (!checkSqlLogAvailable(connection, sql)) 1256 return false; 1257 1258 final String header = String.format( 1259 "LOG_DATE,LOG_HOUR,LOG_MI,LOG_SEC,LOG_MS,LOG_HOST,LOG_THREAD,LOG_DS,LOG_CONN_ID,LOG_EXEC_ID,LOG_TYPE,LOG_SEQ,LOG_SQL,LOG_PARAMS%s", 1260 System.lineSeparator()); 1261 final DateFormat df = new SimpleDateFormat("yyyyMMdd"); 1262 final DateFormat dtf = new SimpleDateFormat("yyyyMMdd,HH,mm,ss,SSS"); 1263 DriverConnection dc = (DriverConnection) connection; 1264 Integer connHashCode = dc.getOriginConnectionHashCode(); 1265 DriverDataSource dds = dc.getDriverDataSource(); 1266 ConfigProperties configProperties = dds.getConfigProperties(); 1267 1268 boolean logEnable = configProperties.getBoolean("SqlLogEnable", false); 1269 long overspend = configProperties.getMilliSeconds("SqlLogOverspend",0L); 1270 1271 if (!logEnable) 1272 return false; 1273 1274 final String defaultLogFolderPath = String.format("%s/SqlLog/", 1275 CommonTools.getJarPath(DriverExecutor.class)); 1276 final String logFolderPath = configProperties.getString("SqlLogFolder", defaultLogFolderPath); 1277 final long maxFileSize = configProperties.getFileSize("SqlLogMaxFileSize", 1024 * 1024 * 10L); 1278 final int archiveDays = configProperties.getInteger("SqlLogArchiveDays", 31); 1279 final int logParamMaxLength = configProperties.getInteger("SqlLogParamMaxLength", 20); 1280 1281 if (sqlLogCacheArray == null) { 1282 1283 sqlLogCacheArray = new CacheArray(); 1284 long sqlLogFilterTimer = configProperties.getLong("SqlLogTimer", 1000L); 1285 sqlLogCacheArray.filter(new CacheArrayFilter(0L,sqlLogFilterTimer) { 1286 @Override 1287 public void executeBatch(Integer index, List batch) { 1288 final StringBuffer sbf = new StringBuffer(); 1289 for(Object item : batch){ 1290 sbf.append(item); 1291 } 1292 try { 1293 String msg = sbf.toString(); 1294 String dateStr = df.format(new Timestamp(System.currentTimeMillis())); 1295 File sqlLogFile = new File(String.format("%s/%s.csv", logFolderPath, dateStr)); 1296 File sqlLogFolder = new File(sqlLogFile.getParent()); 1297 1298 if (sqlLogFolder.exists()) { 1299 if (!sqlLogFolder.canWrite()) 1300 throw new IOException(String.format("Can not write to log folder '%s'", 1301 sqlLogFolder.getAbsolutePath())); 1302 } else { 1303 sqlLogFolder.mkdirs(); 1304 } 1305 1306 if (sqlLogFile.exists()) { 1307 if (!sqlLogFile.canWrite()) 1308 throw new IOException(String.format("Can not write to log file '%s'", 1309 sqlLogFile.getAbsolutePath())); 1310 } 1311 1312 int suffixIndex = sqlLogFile.getName().lastIndexOf("."); 1313 String logFileNamePrefix = sqlLogFile.getName().substring(0, suffixIndex); 1314 long logSize = FileTools.size(sqlLogFile); 1315 if (!sqlLogFile.exists()) { 1316 FileTools.write(sqlLogFile, header, false); 1317 } 1318 if (logSize < maxFileSize) { 1319 FileTools.write(sqlLogFile, msg, true); 1320 } else { 1321 int logIndex = getLogFileIndex(configProperties, sqlLogFolder, logFileNamePrefix, 1322 "csv"); 1323 if (logIndex == 0) { 1324 FileTools.write(sqlLogFile, String.format("%s%s", header, msg), false); 1325 } else { 1326 backupLog(sqlLogFolder, sqlLogFile, logFileNamePrefix, logIndex, header, msg); 1327 } 1328 } 1329 archiveLog(archiveDays, sqlLogFolder); 1330 } catch (Exception e) { 1331 LoggerFactory.getLogger(DriverExecutor.class).warn(e.getMessage(), e); 1332 } 1333 } 1334 }); 1335 } 1336 1337 if (logEnable) { 1338 String hostname = CommonTools.getHostname(); 1339 String dateTimeStr = dtf.format(new Timestamp(System.currentTimeMillis())); 1340 boolean isOverspend = false; 1341 boolean isSpendSeq = sql.equals("spend"); 1342 if(isSpendSeq && params != null){ 1343 long spendValue = Long.parseLong(params + ""); 1344 isOverspend = (spendValue >= overspend); 1345 } 1346 1347 String threadId = Thread.currentThread().getName() + "-" + Thread.currentThread().getId(); 1348 String msg = String.format("%s,%s,%s,%s,%s,%s,%s,%s,\"%s\",\"%s\"%s", dateTimeStr, hostname, 1349 threadId, dds.getName(), connHashCode,deHashCode, type, seq, 1350 replaceToSigleLine(sql), handleParams(logParamMaxLength, params), System.lineSeparator()); 1351 1352 String key = String.format("%s_%s",deHashCode,connHashCode); 1353 if(seq <= 0 || overspend <= 0){ 1354 sqlLogCacheArray.add(msg); 1355 SQL_LOG_MSG_MAPPING.remove(key); 1356 SQL_LOG_OVERSPEND_MAPPING.remove(key); 1357 } 1358 if(seq > 0 && overspend > 0){ 1359 SQL_LOG_OVERSPEND_MAPPING.put(key,isOverspend); 1360 String existsMsg = SQL_LOG_MSG_MAPPING.get(key); 1361 existsMsg = existsMsg == null ? msg : (existsMsg + msg); 1362 SQL_LOG_MSG_MAPPING.put(key, existsMsg); 1363 1364 boolean seqOverspend = SQL_LOG_OVERSPEND_MAPPING.get(key); 1365 if(isSpendSeq && seqOverspend){ 1366 sqlLogCacheArray.add(SQL_LOG_MSG_MAPPING.get(key) + ""); 1367 SQL_LOG_MSG_MAPPING.remove(key); 1368 SQL_LOG_OVERSPEND_MAPPING.remove(key); 1369 } 1370 if(isSpendSeq && !seqOverspend){ 1371 SQL_LOG_MSG_MAPPING.remove(key); 1372 SQL_LOG_OVERSPEND_MAPPING.remove(key); 1373 } 1374 } 1375 return true; 1376 } 1377 1378 } 1379 return false; 1380 } 1381 1382 /** 1383 * Backup log file 1384 * */ 1385 private static void backupLog(File logFolder, File logFile, String logFileNamePrefix, int logIndex, String header, 1386 String msg) throws Exception { 1387 String backupLogFileName = String.format("%s/%s.%s.csv", logFolder.getAbsolutePath(), logFileNamePrefix, 1388 logIndex); 1389 Path source = Paths.get(logFile.getAbsolutePath()); 1390 Path target = Paths.get(backupLogFileName); 1391 Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); 1392 FileTools.write(logFile, String.format("%s%s", header, msg), false); 1393 } 1394 1395 /** 1396 * Get log file index 1397 * For backup log file use 1398 * */ 1399 private static int getLogFileIndex(ConfigProperties configProperties, File folder, String prefix, String suffix) { 1400 Integer maxBackupIndex = configProperties.getInteger("SqlLogMaxBackupIndex", 10); 1401 for (int i = 1; i <= maxBackupIndex; i++) { 1402 String logFileName = String.format("%s/%s.%s.%s", folder.getAbsolutePath(), prefix, i, suffix); 1403 File logFile = new File(logFileName); 1404 if (!logFile.exists()) 1405 return i; 1406 } 1407 return 0; 1408 } 1409 1410 /** 1411 * Archive Log 1412 * */ 1413 private static void archiveLog(int archiveDays, File sqlLogFolder) { 1414 if (archiveDays > 0) { 1415 try { 1416 long archiveDaysMs = new Date().getTime() - (archiveDays * 24 * 3600000L); 1417 deleteFilesOlderThan(sqlLogFolder, archiveDaysMs); 1418 } catch (Exception e) { 1419 LoggerFactory.getLogger(DriverExecutor.class).warn(e); 1420 } 1421 } 1422 } 1423 1424 /** 1425 * Delete old archive logs 1426 * */ 1427 private static void deleteFilesOlderThan(File directory, long archiveDaysMs) throws IOException { 1428 if (directory.isDirectory()) { 1429 File[] files = directory.listFiles(); 1430 if (files != null) { 1431 for (File file : files) { 1432 if (file.isFile()) { 1433 boolean isLogFile = file.getName().toLowerCase().endsWith(".csv"); 1434 if (isLogFile) { 1435 boolean canWrite = file.canWrite(); 1436 if (canWrite) { 1437 long lastModified = file.lastModified(); 1438 if (lastModified < archiveDaysMs) { 1439 Files.deleteIfExists(Paths.get(file.toURI())); 1440 } 1441 } 1442 } 1443 } 1444 } 1445 } 1446 } 1447 } 1448 1449 /** 1450 * Replace to sigle line 1451 * For write csv log 1452 * */ 1453 private static String replaceToSigleLine(String msg) { 1454 return CodeEscape.escapeToSingleLineForCsv(msg); 1455 } 1456 1457 /** 1458 * Hahdle Params 1459 * For write csv log 1460 * */ 1461 private static String handleParams(int paramMaxLength, Object params) { 1462 1463 if (params == null) 1464 return "null"; 1465 1466 StringBuffer paramSbf = new StringBuffer(""); 1467 if (params instanceof List) { 1468 List<Object> listParams = (List<Object>) params; 1469 int size = listParams.size(); 1470 for (int i = 0; i < size; i++) { 1471 Object param = listParams.get(i); 1472 String str = param + ""; 1473 if (str.length() > paramMaxLength) { 1474 paramSbf.append(str.substring(0, paramMaxLength) + "..."); 1475 } else { 1476 paramSbf.append(str); 1477 } 1478 if (i < size - 1) { 1479 paramSbf.append(";"); 1480 } 1481 } 1482 } else { 1483 paramSbf.append(params); 1484 } 1485 return replaceToSigleLine(paramSbf.toString()); 1486 } 1487 1488 /**For Sync DataSource**/ 1489 1490 /** 1491 * For database replication 1492 * */ 1493 protected static void callAndReturnBooleanSync(Connection masterConn, long seq, String sql, List<Object> params, 1494 boolean returnResult) throws SQLException { 1495 if (masterConn instanceof DriverConnection) { 1496 openSyncConnection(masterConn); 1497 1498 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1499 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1500 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1501 boolean connCheck = true; 1502 boolean returnCheck = true; 1503 if (ddscp != null) { 1504 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1505 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1506 } 1507 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1508 if (deList != null) { 1509 for (DriverExecutor de : deList) { 1510 try { 1511 boolean resultSync = de.callAndReturnBoolean(sql, params); 1512 if (returnCheck) { 1513 if (resultSync != returnResult) { 1514 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1515 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1516 int errorCode = 99906; 1517 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1518 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1519 throw new SQLException(errorMsg) { 1520 @Override 1521 public int getErrorCode() { 1522 return errorCode; 1523 } 1524 }; 1525 } 1526 } 1527 } catch (SQLException e) { 1528 LoggerFactory.getLogger(DriverExecutor.class) 1529 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1530 if (connCheck) 1531 throw e; 1532 } 1533 } 1534 } 1535 } 1536 } 1537 1538 /** 1539 * For database replication 1540 * */ 1541 protected static void callAndReturnListSync(Connection masterConn, int cursorStart, int maxRows, String sql, 1542 List<Object> params) throws SQLException { 1543 if (masterConn instanceof DriverConnection) { 1544 openSyncConnection(masterConn); 1545 1546 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1547 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1548 1549 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1550 if (deList != null) { 1551 for (DriverExecutor de : deList) { 1552 try { 1553 de.callAndReturnBoolean(sql, params); 1554 } catch (SQLException e) { 1555 LoggerFactory.getLogger(DriverExecutor.class) 1556 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1557 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1558 boolean connCheck = true; 1559 if (ddscp != null) { 1560 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1561 } 1562 if (connCheck) 1563 throw e; 1564 } 1565 } 1566 } 1567 } 1568 } 1569 1570 /** 1571 * For database replication 1572 * */ 1573 protected static void callAndReturnRowsSync(Connection masterConn, long seq, String sql, List<Object> params, 1574 Object returnRows) throws SQLException { 1575 if (masterConn instanceof DriverConnection) { 1576 openSyncConnection(masterConn); 1577 1578 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1579 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1580 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1581 boolean connCheck = true; 1582 boolean returnCheck = true; 1583 if (ddscp != null) { 1584 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1585 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1586 } 1587 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1588 if (deList != null) { 1589 for (DriverExecutor de : deList) { 1590 try { 1591 Object rowSync = de.callAndReturnRows(sql, params); 1592 if (returnCheck) { 1593 if (rowSync != null && rowSync.equals(returnRows)) { 1594 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1595 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1596 int errorCode = 99906; 1597 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1598 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1599 throw new SQLException(errorMsg) { 1600 @Override 1601 public int getErrorCode() { 1602 return errorCode; 1603 } 1604 }; 1605 } 1606 } 1607 } catch (SQLException e) { 1608 LoggerFactory.getLogger(DriverExecutor.class) 1609 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1610 if (connCheck) 1611 throw e; 1612 } 1613 } 1614 } 1615 } 1616 } 1617 1618 /** 1619 * For database replication 1620 * */ 1621 protected static void executeBatchListSync(Connection masterConn, long seq, String sql, List<List<Object>> records, 1622 int returnRows) throws SQLException { 1623 if (masterConn instanceof DriverConnection) { 1624 openSyncConnection(masterConn); 1625 1626 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1627 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1628 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1629 boolean connCheck = true; 1630 boolean returnCheck = true; 1631 if (ddscp != null) { 1632 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1633 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1634 } 1635 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1636 if (deList != null) { 1637 for (DriverExecutor de : deList) { 1638 try { 1639 int rowSync = de.executeBatchList(sql, records); 1640 if (returnCheck) { 1641 if (rowSync != returnRows) { 1642 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1643 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1644 int errorCode = 99906; 1645 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1646 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1647 throw new SQLException(errorMsg) { 1648 @Override 1649 public int getErrorCode() { 1650 return errorCode; 1651 } 1652 }; 1653 } 1654 } 1655 } catch (SQLException e) { 1656 LoggerFactory.getLogger(DriverExecutor.class) 1657 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1658 if (connCheck) 1659 throw e; 1660 } 1661 } 1662 } 1663 } 1664 } 1665 1666 /** 1667 * For database replication 1668 * */ 1669 protected static void executeSync(Connection masterConn, long seq, String sql, List<Object> params, int returnRows) 1670 throws SQLException { 1671 if (masterConn instanceof DriverConnection) { 1672 openSyncConnection(masterConn); 1673 1674 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1675 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1676 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1677 boolean connCheck = true; 1678 boolean returnCheck = true; 1679 if (ddscp != null) { 1680 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1681 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1682 } 1683 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1684 if (deList != null) { 1685 for (DriverExecutor de : deList) { 1686 try { 1687 int rowSync = de.execute(sql, params); 1688 if (returnCheck) { 1689 if (rowSync != returnRows) { 1690 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1691 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1692 int errorCode = 99906; 1693 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1694 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1695 throw new SQLException(errorMsg) { 1696 @Override 1697 public int getErrorCode() { 1698 return errorCode; 1699 } 1700 }; 1701 } 1702 } 1703 } catch (SQLException e) { 1704 LoggerFactory.getLogger(DriverExecutor.class) 1705 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1706 if (connCheck) 1707 throw e; 1708 } 1709 } 1710 } 1711 } 1712 } 1713 1714 /** 1715 * For database replication 1716 * */ 1717 protected synchronized static boolean openSyncConnection(Connection masterConn) throws SQLException { 1718 if (masterConn instanceof DriverConnection) { 1719 1720 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1721 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1722 1723 boolean isExist = SYNC_EXECUTOR_MARK.containsKey(masterConnHashCode); 1724 1725 if (isExist) { 1726 return false; 1727 } else { 1728 SYNC_EXECUTOR_MARK.put(masterConnHashCode, new ArrayList<DriverExecutor>()); 1729 } 1730 1731 DriverDataSource dds = masterDriverConn.getDriverDataSource(); 1732 if (dds != null) { 1733 List<DriverDataSource> sdsl = dds.getSyncDataSourceList(dds.getName()); 1734 if (sdsl != null) { 1735 1736 if (sdsl.isEmpty()) 1737 return false; 1738 1739 for (DriverDataSource sds : sdsl) { 1740 1741 Integer sdsHashCode = sds.hashCode(); 1742 if (!SYNC_CONN_ERROR_TIME.containsKey(sdsHashCode)) { 1743 SYNC_CONN_ERROR_TIME.put(sdsHashCode, 0L); 1744 } 1745 1746 Long syncConnErrorTime = SYNC_CONN_ERROR_TIME.get(sdsHashCode); 1747 if (syncConnErrorTime > 0) { 1748 ConfigProperties ddscp = dds.getConfigProperties(); 1749 long connCheckMs = 10000L; 1750 if (ddscp != null) { 1751 connCheckMs = ddscp.getLong("SyncConnectionCheckTime", 10000L); 1752 } 1753 boolean isSkipConn = syncConnErrorTime > 0 1754 && (System.currentTimeMillis() - syncConnErrorTime) <= connCheckMs; 1755 if (isSkipConn) { 1756 continue; 1757 } 1758 } 1759 1760 String masterFingerprint = dds.getFingerprint(); 1761 String syncFingerprint = sds.getFingerprint(); 1762 if (masterFingerprint.equalsIgnoreCase(syncFingerprint)) { 1763 LoggerFactory.getLogger(DriverExecutor.class) 1764 .warn("Skip sync reason 'same connection fingerprint'."); 1765 continue; 1766 } 1767 1768 try { 1769 final Connection conn = sds.getConnection(); 1770 if (conn == null) { 1771 int errorCode = 99904; 1772 String errorMsg = "Connection is null."; 1773 throw new SQLException(errorMsg) { 1774 @Override 1775 public int getErrorCode() { 1776 return errorCode; 1777 } 1778 }; 1779 } 1780 conn.setAutoCommit(masterConn.getAutoCommit()); 1781 SYNC_CONN_ERROR_TIME.put(sdsHashCode, 0L); 1782 SYNC_EXECUTOR_MARK.get(masterConnHashCode).add(new DriverExecutor(conn)); 1783 } catch (SQLException e) { 1784 SYNC_CONN_ERROR_TIME.put(sdsHashCode, System.currentTimeMillis()); 1785 LoggerFactory.getLogger(DriverExecutor.class) 1786 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1787 ConfigProperties ddscp = dds.getConfigProperties(); 1788 boolean connCheck = true; 1789 if (ddscp != null) { 1790 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1791 } 1792 if (connCheck) 1793 throw e; 1794 } 1795 1796 } 1797 1798 return true; 1799 } 1800 } 1801 } 1802 return false; 1803 } 1804 1805 /** 1806 * For database replication 1807 * */ 1808 protected static void closeSyncConnection(Connection masterConn) throws SQLException { 1809 if (masterConn instanceof DriverConnection) { 1810 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1811 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1812 1813 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1814 if (deList != null) { 1815 for (DriverExecutor de : deList) { 1816 try { 1817 de.close(); 1818 } catch (SQLException e) { 1819 LoggerFactory.getLogger(DriverExecutor.class) 1820 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1821 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1822 boolean connCheck = true; 1823 if (ddscp != null) { 1824 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1825 } 1826 if (connCheck) 1827 throw e; 1828 } 1829 } 1830 SYNC_EXECUTOR_MARK.remove(masterConnHashCode); 1831 LoggerFactory.getLogger(DriverExecutor.class).debug("CloseSyncConnection - masterConn '{}' finished.", 1832 masterConnHashCode); 1833 } 1834 } 1835 } 1836 1837 /** 1838 * For database replication 1839 * */ 1840 protected static void commitSyncConnection(Connection masterConn) throws SQLException { 1841 if (masterConn instanceof DriverConnection) { 1842 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1843 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1844 1845 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1846 if (deList != null) { 1847 for (DriverExecutor de : deList) { 1848 try { 1849 de.commit(); 1850 } catch (SQLException e) { 1851 LoggerFactory.getLogger(DriverExecutor.class) 1852 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1853 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1854 boolean connCheck = true; 1855 if (ddscp != null) { 1856 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1857 } 1858 if (connCheck) 1859 throw e; 1860 } 1861 } 1862 LoggerFactory.getLogger(DriverExecutor.class).debug("CommitSyncConnection - masterConn '{}' finished.", 1863 masterConnHashCode); 1864 } 1865 } 1866 } 1867 1868 /** 1869 * For database replication 1870 * */ 1871 protected static void rollbackSyncConnection(Connection masterConn) throws SQLException { 1872 if (masterConn instanceof DriverConnection) { 1873 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1874 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1875 1876 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1877 if (deList != null) { 1878 for (DriverExecutor de : deList) { 1879 try { 1880 de.rollback(); 1881 } catch (SQLException e) { 1882 LoggerFactory.getLogger(DriverExecutor.class) 1883 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1884 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1885 boolean connCheck = true; 1886 if (ddscp != null) { 1887 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1888 } 1889 if (connCheck) 1890 throw e; 1891 } 1892 } 1893 LoggerFactory.getLogger(DriverExecutor.class) 1894 .debug("RollbackSyncConnection - masterConn '{}' finished.", masterConnHashCode); 1895 } 1896 } 1897 } 1898 1899 /** 1900 * For database replication 1901 * */ 1902 protected static void abortSyncConnection(Connection masterConn) throws SQLException { 1903 if (masterConn instanceof DriverConnection) { 1904 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1905 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1906 1907 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1908 if (deList != null) { 1909 for (DriverExecutor de : deList) { 1910 try { 1911 de.abort(); 1912 } catch (SQLException e) { 1913 LoggerFactory.getLogger(DriverExecutor.class) 1914 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1915 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1916 boolean connCheck = true; 1917 if (ddscp != null) { 1918 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1919 } 1920 if (connCheck) 1921 throw e; 1922 } 1923 } 1924 LoggerFactory.getLogger(DriverExecutor.class).debug("AbortSyncConnection - masterConn '{}' finished.", 1925 masterConnHashCode); 1926 } 1927 } 1928 } 1929 1930}