001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.datasource; 025 026import java.sql.Connection; 027import java.sql.PreparedStatement; 028import java.util.List; 029import java.sql.SQLException; 030import java.sql.Types; 031import java.util.Map; 032import java.sql.ResultSet; 033import java.sql.ResultSetMetaData; 034import java.util.HashMap; 035import java.util.ArrayList; 036import java.util.regex.Pattern; 037import java.util.regex.Matcher; 038import java.util.Arrays; 039import com.killcoding.tool.ResultMap; 040import java.sql.DatabaseMetaData; 041import com.killcoding.log.LoggerFactory; 042import com.killcoding.log.Logger; 043import java.util.concurrent.Executors; 044import com.killcoding.datasource.DriverConnection; 045import com.killcoding.datasource.DriverDataSource; 046import java.util.concurrent.ConcurrentHashMap; 047import com.killcoding.tool.CommonTools; 048import com.killcoding.tool.ConfigProperties; 049import java.io.File; 050import java.text.DateFormat; 051import java.text.SimpleDateFormat; 052import com.killcoding.tool.FileTools; 053import java.io.IOException; 054import java.nio.file.Files; 055import java.util.Date; 056import java.nio.file.Paths; 057import java.nio.file.Path; 058import java.nio.file.StandardCopyOption; 059import com.killcoding.cache.CacheArray; 060import com.killcoding.cache.CacheArrayFilter; 061import com.killcoding.tool.CodeEscape; 062import java.sql.Blob; 063import java.util.stream.Collectors; 064import java.net.URI; 065import java.util.Comparator; 066import java.sql.Timestamp; 067import java.sql.Clob; 068import java.io.InputStream; 069import java.sql.CallableStatement; 070 071/** 072 * This class is execute sql base class. 073 * Support database replication. 074 * Support database multi-activity. 075 * Support database CRUD 076 * */ 077public class DriverExecutor { 078 079 protected final static Map<Integer, List<DriverExecutor>> SYNC_EXECUTOR_MARK = new ConcurrentHashMap<Integer, List<DriverExecutor>>(); 080 private final static Map<Integer, Long> SYNC_CONN_ERROR_TIME = new ConcurrentHashMap<Integer, Long>(); 081 082 private final static Map<String,String> SQL_LOG_MSG_MAPPING = new ConcurrentHashMap<String,String>(); 083 private final static Map<String,Boolean> SQL_LOG_OVERSPEND_MAPPING = new ConcurrentHashMap<String,Boolean>(); 084 085 protected Logger log = null; 086 087 public final static String COLUMN_NAME_CASE_UPPER = "UPPER"; 088 public final static String COLUMN_NAME_CASE_LOWER = "LOWER"; 089 public final static String COLUMN_NAME_CASE_ORIGINAL = "ORIGINAL"; 090 public static String COLUMN_NAME_CASE_MODE = COLUMN_NAME_CASE_ORIGINAL; 091 092 protected boolean closed = true; 093 protected Connection connection = null; 094 private static CacheArray sqlLogCacheArray = null; 095 096 /** 097 * New a DriverExecutor object 098 * @param connection - JDBC connection 099 * */ 100 public DriverExecutor(Connection connection) { 101 super(); 102 log = LoggerFactory.getLogger(this.getClass()); 103 this.connection = connection; 104 if(this.connection == null){ 105 writeSqlLog("open", 0, "open", "The connection is null"); 106 }else{ 107 try{ 108 writeSqlLog("open", 0, "open", String.format("AutoCommit=%s",this.connection.getAutoCommit() + "")); 109 }catch(Exception e) { 110 log.error(e.getMessage(),e); 111 } 112 } 113 closed = (this.connection == null); 114 } 115 116 /** 117 * Get current Connection 118 * @return Connection 119 * */ 120 public Connection getConnection() { 121 return connection; 122 } 123 124 /** 125 * Get column classes by table name 126 * @exception SQLException 127 * @return Map<String,Object> - column and java type class mapping 128 * @param tableName - table name 129 * */ 130 public Map<String, Object> getColumnClasses(String tableName) throws SQLException { 131 Map<String, Object> types = null; 132 ResultSet result = null; 133 PreparedStatement statement = null; 134 String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName); 135 statement = connection.prepareStatement(sql); 136 result = statement.executeQuery(); 137 final ResultSetMetaData rsmd = result.getMetaData(); 138 for (int i = 0; i < rsmd.getColumnCount(); i++) { 139 if (types == null) { 140 types = new ResultMap<String, Object>(); 141 } 142 String cn = rsmd.getColumnClassName(i + 1); 143 types.put(converCase(rsmd.getColumnLabel(i + 1)), cn); 144 } 145 return types; 146 } 147 148 /** 149 * Get column db data types by table name 150 * @exception SQLException 151 * @return Map<String, Object> - column and db data type mapping 152 * @param tableName - Table name 153 * */ 154 public Map<String, Object> getColumnTypes(String tableName) throws SQLException { 155 ResultSet result = null; 156 PreparedStatement statement = null; 157 String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName); 158 Map<String, Object> types = null; 159 statement = connection.prepareStatement(sql); 160 result = statement.executeQuery(); 161 final ResultSetMetaData rsmd = result.getMetaData(); 162 for (int i = 0; i < rsmd.getColumnCount(); i++) { 163 if (types == null) { 164 types = new ResultMap<String, Object>(); 165 } 166 types.put(converCase(rsmd.getColumnLabel(i + 1)), rsmd.getColumnTypeName(i + 1)); 167 } 168 return types; 169 } 170 171 /** 172 * Show column data type and java type mapping 173 * @exception SQLException 174 * @return List<Map<String, Object>> - Mapping list 175 * @param tableName - Table name 176 * */ 177 public List<Map<String, Object>> desc(String tableName) throws SQLException { 178 List<String> primaryKeys = getPrimaryKeys(tableName); 179 List<Map<String, Object>> results = new ArrayList<Map<String, Object>>(); 180 ResultSet result = null; 181 PreparedStatement statement = null; 182 String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName); 183 statement = connection.prepareStatement(sql); 184 result = statement.executeQuery(); 185 final ResultSetMetaData rsmd = result.getMetaData(); 186 for (int i = 0; i < rsmd.getColumnCount(); i++) { 187 Map<String, Object> types = new ResultMap<String, Object>(); 188 int ci = i + 1; 189 boolean nullable = rsmd.isNullable(ci) == 1; 190 String name = converCase(rsmd.getColumnLabel(ci)); 191 String isPk = "UNKNOWN"; 192 if (primaryKeys != null) { 193 isPk = primaryKeys.contains(name) ? "Y" : "N"; 194 } 195 types.put("NAME", name); 196 types.put("PRIMARY_KEY", isPk); 197 types.put("DATA_TYPE", rsmd.getColumnTypeName(ci)); 198 types.put("JAVA_TYPE", rsmd.getColumnClassName(ci)); 199 types.put("PRECISION", rsmd.getPrecision(ci)); 200 types.put("ALLOW_NULLABLE", nullable ? "Y" : 'N'); 201 results.add(types); 202 } 203 return results; 204 } 205 206 /** 207 * Get primary Keys by table name 208 * @return List<String> - Primary Keys 209 * @param _tableName - Table name 210 * */ 211 public List<String> getPrimaryKeys(String _tableName) { 212 try { 213 List<String> pks = new ArrayList<String>(); 214 DatabaseMetaData meta = connection.getMetaData(); 215 ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" }); 216 while (tables.next()) { 217 String catalog = tables.getString("TABLE_CAT"); 218 String schema = tables.getString("TABLE_SCHEM"); 219 String tableName = tables.getString("TABLE_NAME"); 220 if (tableName.equalsIgnoreCase(_tableName)) { 221 ResultSet primaryKeys = meta.getPrimaryKeys(catalog, schema, tableName); 222 while (primaryKeys.next()) { 223 pks.add(primaryKeys.getString("COLUMN_NAME")); 224 } 225 break; 226 } 227 } 228 return pks; 229 } catch (Exception e) { 230 log.warn(e); 231 return null; 232 } 233 } 234 235 /** 236 * Get all tables 237 * @return List<Map<String, Object>> 238 * @exception SQLException 239 * */ 240 public List<Map<String, Object>> getAllTables() throws SQLException { 241 List<Map<String, Object>> tablesList = new ArrayList<Map<String, Object>>(); 242 DatabaseMetaData meta = connection.getMetaData(); 243 ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" }); 244 while (tables.next()) { 245 String catalog = tables.getString("TABLE_CAT"); 246 String schema = tables.getString("TABLE_SCHEM"); 247 String tableName = tables.getString("TABLE_NAME"); 248 Map<String, Object> t = new ResultMap<String, Object>(); 249 t.put("TABLE_SCHEMA", schema); 250 t.put("TABLE_NAME", tableName); 251 tablesList.add(t); 252 } 253 return tablesList; 254 } 255 256 /** 257 * Get all tables by schema 258 * @return List<Map<String, Object>> 259 * @exception SQLException 260 * */ 261 public List<Map<String, Object>> getAllTables(String _schema) throws SQLException { 262 List<Map<String, Object>> tablesList = new ArrayList<Map<String, Object>>(); 263 DatabaseMetaData meta = connection.getMetaData(); 264 ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" }); 265 while (tables.next()) { 266 String schema = tables.getString("TABLE_SCHEM"); 267 if (schema != null && schema.equalsIgnoreCase(_schema)) { 268 String catalog = tables.getString("TABLE_CAT"); 269 String tableName = tables.getString("TABLE_NAME"); 270 Map<String, Object> t = new ResultMap<String, Object>(); 271 t.put("TABLE_SCHEMA", schema); 272 t.put("TABLE_NAME", tableName); 273 tablesList.add(t); 274 } 275 } 276 return tablesList; 277 } 278 279 /** 280 * Query first record 281 * @exception SQLException 282 * @return Map<String,Object> - First result 283 * @param sql - Condition use format ':column_name' 284 * @param params 285 * */ 286 public Map<String, Object> first(String sql, Map<String, Object> params) throws SQLException { 287 List<Map<String, Object>> list = find(0, 1, sql, params); 288 if (list.size() > 0) { 289 return list.get(0); 290 } 291 return null; 292 } 293 294 /** 295 * Query first record 296 * @exception SQLException 297 * @return Map<String,Object> - First result 298 * @param sql 299 * */ 300 public Map<String, Object> first(String sql) throws SQLException { 301 return first(sql, Arrays.asList(new Object[] {})); 302 } 303 304 /** 305 * Query first record 306 * @exception SQLException 307 * @return Map<String,Object> - First result 308 * @param sql - Condition use format '?' 309 * @param params 310 * */ 311 public Map<String, Object> first(String sql, List<Object> params) throws SQLException { 312 List<Map<String, Object>> list = find(0, 1, sql, params); 313 if (list.size() > 0) { 314 return list.get(0); 315 } 316 return null; 317 } 318 319 /** 320 * Query all matched records 321 * @exception SQLException 322 * @return List<Map<String, Object>> 323 * @param sql 324 * @param params 325 * */ 326 public List<Map<String, Object>> find(String sql) throws SQLException { 327 return find(0, 0, sql, Arrays.asList(new Object[] {})); 328 } 329 330 /** 331 * Query all matched records 332 * @exception SQLException 333 * @return List<Map<String, Object>> 334 * @param cursorStart - JDBC result Cursor start index 335 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 336 * @param sql 337 * @param params 338 * */ 339 public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql) throws SQLException { 340 return find(cursorStart, maxRows, sql, Arrays.asList(new Object[] {})); 341 } 342 343 /** 344 * Query all matched records 345 * @exception SQLException 346 * @return List<Map<String, Object>> 347 * @param sql - Condition use format ':column_name' 348 * @param params 349 * */ 350 public List<Map<String, Object>> find(String sql, Map<String, Object> params) throws SQLException { 351 return find(0, 0, sql, params); 352 } 353 354 /** 355 * Query all matched records 356 * @exception SQLException 357 * @return List<Map<String, Object>> 358 * @param cursorStart - JDBC result Cursor start index 359 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 360 * @param sql - Condition use format ':column_name' 361 * @param params 362 * */ 363 public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql, Map<String, Object> params) 364 throws SQLException { 365 String csql = converSql(sql); 366 List<Object> cparams = converParams(sql, params); 367 return find(cursorStart, maxRows, csql, cparams); 368 } 369 370 /** 371 * Query all matched records 372 * @exception SQLException 373 * @return List<Map<String, Object>> 374 * @param cursorStart - JDBC result Cursor start index 375 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 376 * @param sql - Condition use format '?' 377 * @param params 378 * */ 379 public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql, List<Object> params) 380 throws SQLException { 381 long begin = System.currentTimeMillis(); 382 boolean allowedLog = writeSqlLog("find", begin, 383 String.format("%s [cursorStart=%s,maxRows=%s]", sql, cursorStart, maxRows), params); 384 385 PreparedStatement statement = null; 386 Map<String, Object> row = null; 387 ResultSet result = null; 388 final List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>(); 389 try { 390 // ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE 391 // ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY 392 statement = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); 393 if (params != null) { 394 int size = params.size(); 395 for (int i = 0; i < size; i++) { 396 int ci = i + 1; 397 Object param = params.get(i); 398 if (param == null) { 399 statement.setNull(ci, Types.VARCHAR); 400 } else { 401 statement.setObject(ci, param); 402 } 403 } 404 } 405 if (maxRows > 0) { 406 statement.setMaxRows(maxRows); 407 } 408 result = statement.executeQuery(); 409 result.absolute(cursorStart); 410 final ResultSetMetaData rsmd = result.getMetaData(); 411 final int c = rsmd.getColumnCount(); 412 while (result.next()) { 413 row = new ResultMap<String, Object>(); 414 for (int i = 0; i < c; i++) { 415 int ci = i + 1; 416 Object value = null; 417 Object originValue = result.getObject(ci); 418 if (originValue == null) { 419 value = originValue; 420 } else if (originValue instanceof Blob) { 421 Blob blobValue = (Blob) originValue; 422 InputStream is = null; 423 try { 424 is = blobValue.getBinaryStream(); 425 if(is != null) value = is.readAllBytes(); 426 } catch (IOException e) { 427 throw new SQLException(e.getMessage(), e); 428 } finally { 429 if (blobValue != null) { 430 try { 431 blobValue.free(); 432 } catch (SQLException e) { 433 throw e; 434 } 435 } 436 if (is != null) { 437 try { 438 is.close(); 439 } catch (IOException e) { 440 throw new SQLException(e.getMessage(), e); 441 } 442 } 443 } 444 } else if (originValue instanceof Clob) { 445 Clob clobValue = (Clob) originValue; 446 InputStream is = null; 447 try { 448 is = clobValue.getAsciiStream(); 449 if(is != null) value = is.readAllBytes(); 450 } catch (IOException e) { 451 throw new SQLException(e.getMessage(), e); 452 } finally { 453 if (clobValue != null) { 454 try { 455 clobValue.free(); 456 } catch (SQLException e) { 457 throw e; 458 } 459 } 460 if (is != null) { 461 try { 462 is.close(); 463 } catch (IOException e) { 464 throw new SQLException(e.getMessage(), e); 465 } 466 } 467 } 468 } else { 469 value = originValue; 470 } 471 row.put(converCase(rsmd.getColumnLabel(ci)), value); 472 } 473 rows.add(row); 474 } 475 if (allowedLog) { 476 writeSqlLog("rows", begin, "rows", rows.size()); 477 long spend = System.currentTimeMillis() - begin; 478 writeSqlLog("spend", begin, "spend", spend); 479 } 480 return rows; 481 } catch (SQLException e) { 482 if (allowedLog) 483 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 484 485 throw e; 486 } finally { 487 if (result != null) 488 result.close(); 489 490 if (statement != null) 491 statement.close(); 492 } 493 } 494 495 /** 496 * Execute stored proc (and return boolean) 497 * @param sql - Query sql 498 * @exception SQLException 499 * @return boolean 500 * */ 501 public boolean callAndReturnBoolean(String sql) throws SQLException { 502 return callAndReturnBoolean(sql, Arrays.asList(new Object[] {})); 503 } 504 505 /** 506 * Execute stored proc (and return boolean) 507 * @param sql - Query sql, condition use format ':column_name' 508 * @param params 509 * @exception SQLException 510 * @return boolean 511 * */ 512 public boolean callAndReturnBoolean(String sql, Map<String, Object> params) throws SQLException { 513 String csql = converSql(sql); 514 List<Object> cparams = converParams(sql, params); 515 return callAndReturnBoolean(sql, params); 516 } 517 518 /** 519 * Execute stored proc (and return boolean) 520 * @param sql - Query sql,condition use format '?' 521 * @param params 522 * @exception SQLException 523 * @return boolean 524 * */ 525 public boolean callAndReturnBoolean(String sql, List<Object> params) throws SQLException { 526 527 if (!checkSqlAvailable(sql)) 528 return false; 529 530 long begin = System.currentTimeMillis(); 531 boolean allowedLog = writeSqlLog("call", begin, sql, params); 532 533 CallableStatement statement = null; 534 try { 535 statement = connection.prepareCall("{" + sql + "}"); 536 if (params != null) { 537 int size = params.size(); 538 for (int i = 0; i < size; i++) { 539 Object param = params.get(i); 540 if (param == null) { 541 statement.setNull(i + 1, Types.VARCHAR); 542 } else { 543 statement.setObject(i + 1, param); 544 } 545 } 546 } 547 548 statement.execute(); 549 550 boolean returnResult = true; //true is not exception 551 552 if (allowedLog) { 553 writeSqlLog("return", begin, "return", returnResult); 554 long spend = System.currentTimeMillis() - begin; 555 writeSqlLog("spend", begin, "spend", spend); 556 } 557 558 callAndReturnBooleanSync(connection, begin, sql, params, returnResult); 559 560 return returnResult; 561 } catch (SQLException e) { 562 if (allowedLog) 563 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 564 565 throw e; 566 } finally { 567 if (statement != null) 568 statement.close(); 569 } 570 } 571 572 /** 573 * Execute stored proc (and return rows) 574 * @param sql - Query sql 575 * @exception SQLException 576 * @return int 577 * */ 578 public int callAndReturnRows(String sql) throws SQLException { 579 return callAndReturnRows(sql, Arrays.asList(new Object[] {})); 580 } 581 582 /** 583 * Execute stored proc (and return rows) 584 * @param sql - Query sql,condition use format ':column_name' 585 * @param params 586 * @exception SQLException 587 * @return int 588 * */ 589 public int callAndReturnRows(String sql, Map<String, Object> params) throws SQLException { 590 String csql = converSql(sql); 591 List<Object> cparams = converParams(sql, params); 592 return callAndReturnRows(sql, params); 593 } 594 595 public int callAndReturnRows(String sql, List<Object> params) throws SQLException { 596 return (int)callAndReturnRows(Types.INTEGER,sql,params); 597 } 598 599 /** 600 * Execute stored proc (and return rows) 601 * @param sqltypes - e.g Types.INTEGER 602 * @param sql - Query sql,condition use format '?' 603 * @param params 604 * @exception SQLException 605 * @return Object 606 * */ 607 public Object callAndReturnRows(int sqlTypes,String sql, List<Object> params) throws SQLException { 608 609 if (!checkSqlAvailable(sql)) 610 return -1; 611 612 long begin = System.currentTimeMillis(); 613 boolean allowedLog = writeSqlLog("call", begin, sql, params); 614 615 CallableStatement statement = null; 616 try { 617 statement = connection.prepareCall("{?=" + sql + "}"); 618 statement.registerOutParameter(1,sqlTypes); 619 if (params != null) { 620 int size = params.size(); 621 for (int i = 0; i < size; i++) { 622 Object param = params.get(i); 623 if (param == null) { 624 statement.setNull(i + 1, Types.VARCHAR); 625 } else { 626 statement.setObject(i + 1, param); 627 } 628 } 629 } 630 boolean hasResult = statement.execute(); 631 632 Object row = statement.getObject(1); 633 634 if (allowedLog) { 635 writeSqlLog("return", begin, "return", row); 636 long spend = System.currentTimeMillis() - begin; 637 writeSqlLog("spend", begin, "spend", spend); 638 } 639 640 callAndReturnRowsSync(connection, begin, sql, params, row); 641 642 return row; 643 } catch (SQLException e) { 644 if (allowedLog) 645 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 646 647 throw e; 648 } finally { 649 if (statement != null) 650 statement.close(); 651 } 652 } 653 654 /** 655 * Execute stored proc (and return List) 656 * @param sql - Query sql 657 * @exception SQLException 658 * @return List<Map<String, Object>> 659 * */ 660 public List<Map<String, Object>> callAndReturnList(String sql) throws SQLException { 661 return callAndReturnList(0, 0, sql, Arrays.asList(new Object[] {})); 662 } 663 664 /** 665 * Execute stored proc (and return List) 666 * @param cursorStart - JDBC result Cursor start index 667 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 668 * @param sql - Query sql 669 * @exception SQLException 670 * @return List<Map<String, Object>> 671 * */ 672 public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql) throws SQLException { 673 return callAndReturnList(cursorStart, maxRows, sql, Arrays.asList(new Object[] {})); 674 } 675 676 /** 677 * Execute stored proc (and return List) 678 * @param cursorStart - JDBC result Cursor start index 679 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 680 * @param sql - Query sql, condition use format ':column_name' 681 * @exception SQLException 682 * @return List<Map<String, Object>> 683 * */ 684 public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql, 685 Map<String, Object> params) throws SQLException { 686 String csql = converSql(sql); 687 List<Object> cparams = converParams(sql, params); 688 return callAndReturnList(cursorStart, maxRows, sql, params); 689 } 690 691 /** 692 * Execute stored proc (and return List) 693 * @param cursorStart - JDBC result Cursor start index 694 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 695 * @param sql - Query sql, condition use format '?' 696 * @exception SQLException 697 * @return List<Map<String, Object>> 698 * */ 699 public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql, List<Object> params) 700 throws SQLException { 701 702 if (!checkSqlAvailable(sql)) 703 return null; 704 705 long begin = System.currentTimeMillis(); 706 boolean allowedLog = writeSqlLog("call", begin, 707 String.format("%s [cursorStart=%s,maxRows=%s]", sql, cursorStart, maxRows), params); 708 709 CallableStatement statement = null; 710 Map<String, Object> row = null; 711 ResultSet result = null; 712 final List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>(); 713 try { 714 statement = connection.prepareCall("{" + sql + "}"); 715 if (params != null) { 716 int size = params.size(); 717 for (int i = 0; i < size; i++) { 718 int ci = i + 1; 719 Object param = params.get(i); 720 if (param == null) { 721 statement.setNull(ci, Types.VARCHAR); 722 } else { 723 statement.setObject(ci, param); 724 } 725 } 726 } 727 if (maxRows > 0) { 728 statement.setMaxRows(maxRows); 729 } 730 result = statement.executeQuery(); 731 final ResultSetMetaData rsmd = result.getMetaData(); 732 final int c = rsmd.getColumnCount(); 733 int rowIndex = 0; 734 while (result.next()) { 735 if (rowIndex >= cursorStart) { 736 row = new ResultMap<String, Object>(); 737 for (int i = 0; i < c; i++) { 738 int ci = i + 1; 739 Object value = null; 740 Object originValue = result.getObject(ci); 741 if (originValue == null) { 742 value = originValue; 743 } else if (originValue instanceof Blob) { 744 Blob blobValue = (Blob) originValue; 745 InputStream is = null; 746 try { 747 is = blobValue.getBinaryStream(); 748 if(is != null) value = is.readAllBytes(); 749 } catch (IOException e) { 750 throw new SQLException(e.getMessage(), e); 751 } finally { 752 if (blobValue != null) { 753 try { 754 blobValue.free(); 755 } catch (SQLException e) { 756 throw e; 757 } 758 } 759 if (is != null) { 760 try { 761 is.close(); 762 } catch (IOException e) { 763 throw new SQLException(e.getMessage(), e); 764 } 765 } 766 } 767 } else if (originValue instanceof Clob) { 768 Clob clobValue = (Clob) originValue; 769 InputStream is = null; 770 try { 771 is = clobValue.getAsciiStream(); 772 if(is != null) value = is.readAllBytes(); 773 } catch (IOException e) { 774 throw new SQLException(e.getMessage(), e); 775 } finally { 776 if (clobValue != null) { 777 try { 778 clobValue.free(); 779 } catch (SQLException e) { 780 throw e; 781 } 782 } 783 if (is != null) { 784 try { 785 is.close(); 786 } catch (IOException e) { 787 throw new SQLException(e.getMessage(), e); 788 } 789 } 790 } 791 } else { 792 value = originValue; 793 } 794 row.put(converCase(rsmd.getColumnLabel(ci)), value); 795 } 796 rows.add(row); 797 } 798 rowIndex++; 799 } 800 801 if (allowedLog) { 802 writeSqlLog("rows", begin, "rows", rows); 803 long spend = System.currentTimeMillis() - begin; 804 writeSqlLog("spend", begin, "spend", spend); 805 } 806 807 callAndReturnListSync(connection, cursorStart, maxRows, sql, params); 808 809 return rows; 810 } catch (SQLException e) { 811 if (allowedLog) 812 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 813 throw e; 814 } finally { 815 if (result != null) 816 result.close(); 817 818 if (statement != null) 819 statement.close(); 820 } 821 } 822 823 /** 824 * Execute sql 825 * @exception SQLException 826 * @param sql 827 * @return int 828 * */ 829 public int execute(String sql) throws SQLException { 830 return execute(sql, Arrays.asList(new Object[] {})); 831 } 832 833 /** 834 * Execute sql 835 * @exception SQLException 836 * @param sql - Condition use format ':column_name' 837 * @param params 838 * @return int 839 * */ 840 public int execute(String sql, Map<String, Object> params) throws SQLException { 841 String csql = converSql(sql); 842 List<Object> cparams = converParams(sql, params); 843 return execute(csql, cparams); 844 } 845 846 /** 847 * Execute sql 848 * @exception SQLException 849 * @param sql - Condition use format '?' 850 * @param params 851 * @return int 852 * */ 853 public int execute(String sql, List<Object> params) throws SQLException { 854 if (!checkSqlAvailable(sql)) 855 return -1; 856 857 long begin = System.currentTimeMillis(); 858 boolean allowedLog = writeSqlLog("execute", begin, sql, params); 859 860 PreparedStatement statement = null; 861 try { 862 statement = connection.prepareStatement(sql); 863 if (params != null) { 864 int size = params.size(); 865 for (int i = 0; i < size; i++) { 866 Object param = params.get(i); 867 if (param == null) { 868 statement.setNull(i + 1, Types.VARCHAR); 869 } else { 870 statement.setObject(i + 1, param); 871 } 872 } 873 } 874 int row = statement.executeUpdate(); 875 876 if (allowedLog) { 877 writeSqlLog("return", begin, "return", row); 878 long spend = System.currentTimeMillis() - begin; 879 writeSqlLog("spend", begin, "spend", spend); 880 } 881 882 executeSync(connection, begin, sql, params, row); 883 884 return row; 885 } catch (SQLException e) { 886 if (allowedLog) 887 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 888 throw e; 889 } finally { 890 if (statement != null) 891 statement.close(); 892 } 893 } 894 895 /** 896 * Execute batch 897 * @exception SQLException 898 * @param sql - Condition use format ':column_name' 899 * @param records 900 * @return int - Return rows 901 * */ 902 public int executeBatch(String sql, List<Map<String, Object>> records) throws SQLException { 903 String csql = converSql(sql); 904 List<List<Object>> crecords = new ArrayList<List<Object>>(); 905 for (Map<String, Object> record : records) { 906 List<Object> crecord = converParams(sql, record); 907 crecords.add(crecord); 908 } 909 return executeBatchList(csql, crecords); 910 } 911 912 /** 913 * Execute batch 914 * @exception SQLException 915 * @param sql - Condition use format '?' 916 * @param records 917 * @return int - Return rows 918 * */ 919 public int executeBatchList(String sql, List<List<Object>> records) throws SQLException { 920 boolean allowedLog = false; 921 if (!checkSqlAvailable(sql)) 922 return -1; 923 924 long begin = System.currentTimeMillis(); 925 if (records != null) { 926 allowedLog = writeSqlLog("batch", begin, sql, String.format("[batchSize=%s]", records.size())); 927 } 928 929 boolean first = true; 930 PreparedStatement statement = null; 931 try { 932 for (List<Object> params : records) { 933 if (first) { 934 statement = connection.prepareStatement(sql); 935 first = false; 936 } 937 938 int size = params.size(); 939 for (int i = 0; i < size; i++) { 940 Object param = params.get(i); 941 if (param == null) { 942 statement.setNull(i + 1, Types.VARCHAR); 943 } else { 944 statement.setObject(i + 1, param); 945 } 946 } 947 948 statement.addBatch(); 949 } 950 951 int sumRow = 0; 952 if (records.size() > 0) { 953 int[] rows = statement.executeBatch(); 954 955 for (int r : rows) { 956 sumRow += r; 957 } 958 959 if (allowedLog) { 960 long spend = System.currentTimeMillis() - begin; 961 writeSqlLog("return", begin, "return", sumRow); 962 writeSqlLog("spend", begin, "spend", spend); 963 } 964 965 executeBatchListSync(connection, begin, sql, records, sumRow); 966 } 967 968 return sumRow; 969 } catch (SQLException e) { 970 if (allowedLog) 971 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 972 973 throw e; 974 } finally { 975 if (statement != null) 976 statement.close(); 977 } 978 } 979 980 /** 981 * Check connection is closed 982 * @return boolean 983 * */ 984 public boolean isClosed() { 985 try { 986 if(closed) return closed; 987 988 return (connection == null || connection.isClosed()); 989 } catch (Exception e) { 990 log.warn(e); 991 return true; 992 } 993 } 994 995 /** 996 * Abort connection 997 * @exception SQLException 998 * */ 999 public void abort() throws SQLException { 1000 writeSqlLog("aborting", 0, "aborting", ""); 1001 if (connection != null) { 1002 try { 1003 if (connection instanceof DriverConnection) { 1004 abortSyncConnection(connection); 1005 connection.abort(null); 1006 } else { 1007 connection.abort(Executors.newFixedThreadPool(1)); 1008 } 1009 closed = true; 1010 } catch (SQLException e) { 1011 writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1012 throw e; 1013 } 1014 } 1015 writeSqlLog("aborted", 0, "aborted", ""); 1016 } 1017 1018 /** 1019 * Close connection 1020 * @exception SQLException 1021 * */ 1022 public void close() throws SQLException { 1023 writeSqlLog("closing", 0, "closing", ""); 1024 if (connection != null) { 1025 try { 1026 closeSyncConnection(connection); 1027 connection.close(); 1028 closed = true; 1029 } catch (SQLException e) { 1030 writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1031 throw e; 1032 } 1033 } 1034 writeSqlLog("closed", 0, "closed", ""); 1035 } 1036 1037 /** 1038 * Commit connection 1039 * @exception SQLException 1040 * */ 1041 public void commit() throws SQLException { 1042 writeSqlLog("committing", 0, "committing", ""); 1043 if (connection != null) { 1044 try { 1045 commitSyncConnection(connection); 1046 connection.commit(); 1047 } catch (SQLException e) { 1048 writeSqlLog("error", 0, "", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1049 throw e; 1050 } 1051 } 1052 writeSqlLog("committed", 0, "committed", ""); 1053 } 1054 1055 /** 1056 * Rollback connection 1057 * @exception SQLException 1058 * */ 1059 public void rollback() throws SQLException { 1060 writeSqlLog("rollbacking", 0, "rollbacking", ""); 1061 if (connection != null) { 1062 try { 1063 rollbackSyncConnection(connection); 1064 connection.rollback(); 1065 } catch (SQLException e) { 1066 writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1067 throw e; 1068 } 1069 } 1070 writeSqlLog("rollbacked", 0, "rollbacked", ""); 1071 } 1072 1073 /** 1074 * Get origin connection hash code 1075 * @return Integer - hash code 1076 * */ 1077 private Integer getOriginConnectionHashCode() { 1078 if (connection instanceof DriverConnection) { 1079 return ((DriverConnection) connection).getOriginConnectionHashCode(); 1080 } else { 1081 return connection.hashCode(); 1082 } 1083 } 1084 1085 /** 1086 * Get data source thread name 1087 * @return String - DataSource thread name 1088 * */ 1089 private String getDataSourceName() { 1090 if (connection instanceof DriverConnection) { 1091 return ((DriverConnection) connection).getDriverDataSource().getName(); 1092 } else { 1093 return null; 1094 } 1095 } 1096 1097 /** 1098 * Conver sql from ':column_name' to '?' 1099 * @return String 1100 * */ 1101 protected String converSql(String sql) { 1102 return sql.replaceAll(":[a-zA-Z0-9_]+", "?"); 1103 } 1104 1105 /** 1106 * Conver param from map to list 1107 * @param sql 1108 * @param map 1109 * @return List<Object> 1110 * */ 1111 protected List<Object> converParams(String sql, Map<String, Object> map) { 1112 List<String> paramKeys = new ArrayList<String>(); 1113 Pattern pattern = Pattern.compile(":[a-zA-Z0-9_]+"); 1114 Matcher matcher = pattern.matcher(sql); 1115 while (matcher.find()) { 1116 String key = matcher.group().replaceFirst(":", ""); 1117 paramKeys.add(key); 1118 } 1119 1120 List<Object> params = new ArrayList<Object>(); 1121 for (String pk : paramKeys) { 1122 Object pv = map.get(pk); 1123 if (pv == null) { 1124 pv = map.get(pk.toLowerCase()); 1125 } 1126 if (pv == null) { 1127 pv = map.get(pk.toUpperCase()); 1128 } 1129 if (pv instanceof java.util.Date) { 1130 java.util.Date utilDate = (java.util.Date) pv; 1131 java.sql.Timestamp sqlDate = new java.sql.Timestamp(utilDate.getTime()); 1132 params.add(sqlDate); 1133 } else { 1134 params.add(pv); 1135 } 1136 } 1137 1138 return params; 1139 } 1140 1141 /** 1142 * Covner to upper or lower 1143 * @param s - Column name or table name 1144 * @return String 1145 * */ 1146 protected String converCase(String s) { 1147 if (COLUMN_NAME_CASE_MODE.equals(COLUMN_NAME_CASE_UPPER)) { 1148 return s.toUpperCase(); 1149 } else if (COLUMN_NAME_CASE_MODE.equals(COLUMN_NAME_CASE_LOWER)) { 1150 return s.toLowerCase(); 1151 } else { 1152 return s; 1153 } 1154 } 1155 1156 /** 1157 * Check available sql to write to log 1158 * @return boolean 1159 * @param connection - Connection 1160 * @param sql 1161 * */ 1162 protected static boolean checkSqlLogAvailable(Connection connection, String sql) { 1163 if (connection instanceof DriverConnection) { 1164 DriverConnection dc = (DriverConnection) connection; 1165 DriverDataSource dds = dc.getDriverDataSource(); 1166 ConfigProperties configProperties = dds.getConfigProperties(); 1167 List<String> sqlAllowed = configProperties.getArray("SqlLogAllowed"); 1168 List<String> sqlIgnored = configProperties.getArray("SqlLogIgnored"); 1169 return checkSqlAvailable(sql, sqlAllowed, sqlIgnored); 1170 } 1171 return false; 1172 } 1173 1174 /** 1175 * Check available sql to execute 1176 * @return boolean 1177 * @param sql 1178 * */ 1179 protected boolean checkSqlAvailable(String sql) { 1180 return checkSqlAvailable(connection, sql); 1181 } 1182 1183 /** 1184 * Check available sql to execute 1185 * @return boolean 1186 * @param connection 1187 * @param sql 1188 * */ 1189 protected static boolean checkSqlAvailable(Connection connection, String sql) { 1190 if (connection instanceof DriverConnection) { 1191 DriverConnection dc = (DriverConnection) connection; 1192 DriverDataSource dds = dc.getDriverDataSource(); 1193 ConfigProperties configProperties = dds.getConfigProperties(); 1194 List<String> sqlAllowed = configProperties.getArray("SqlExecuteAllowed"); 1195 List<String> sqlIgnored = configProperties.getArray("SqlExecuteIgnored"); 1196 return checkSqlAvailable(sql, sqlAllowed, sqlIgnored); 1197 } 1198 return true; 1199 } 1200 1201 /** 1202 * Check available sql to execute 1203 * @return boolean 1204 * @param sql 1205 * @param sqlAllowed - From DataSources.properties 1206 * @param sqlIgnored - From DataSources.properties 1207 * */ 1208 private static boolean checkSqlAvailable(String sql, List<String> sqlAllowed, List<String> sqlIgnored) { 1209 boolean matchedAllowed = true; 1210 boolean matchedIgnored = false; 1211 if (sqlAllowed != null) { 1212 for (String regex : sqlAllowed) { 1213 if (!CommonTools.isBlank(regex)) { 1214 matchedAllowed = sql.matches(regex); 1215 1216 if (matchedAllowed) 1217 break; 1218 } 1219 } 1220 } 1221 if (matchedAllowed && sqlIgnored != null) { 1222 for (String regex : sqlIgnored) { 1223 if (!CommonTools.isBlank(regex)) { 1224 matchedIgnored = sql.matches(regex); 1225 1226 if (matchedIgnored) 1227 break; 1228 } 1229 } 1230 } 1231 1232 boolean b = matchedAllowed && !matchedIgnored; 1233 if (!b) { 1234 LoggerFactory.getLogger(DriverExecutor.class).debug("Not available - '{}'", sql); 1235 } 1236 return b; 1237 } 1238 1239 /** 1240 * Write sql log 1241 * @return boolean 1242 * @param type 1243 * @param seq 1244 * @param sql 1245 * @param params 1246 * */ 1247 protected synchronized boolean writeSqlLog(String type, long seq, String sql, Object params) { 1248 return writeSqlLog(this.hashCode(),connection, type, seq, sql, params); 1249 } 1250 1251 /** 1252 * Write sql log 1253 * @return boolean 1254 * @param connection 1255 * @param type 1256 * @param seq 1257 * @param sql 1258 * @param params 1259 * */ 1260 protected synchronized static boolean writeSqlLog(int deHashCode,Connection connection, String type, long seq, String sql, 1261 Object params) { 1262 if (connection instanceof DriverConnection) { 1263 if (!checkSqlLogAvailable(connection, sql)) 1264 return false; 1265 1266 final String header = String.format( 1267 "LOG_DATE,LOG_HOUR,LOG_MI,LOG_SEC,LOG_MS,LOG_HOST,LOG_THREAD,LOG_DS,LOG_CONN_ID,LOG_EXEC_ID,LOG_TYPE,LOG_SEQ,LOG_SQL,LOG_PARAMS%s", 1268 System.lineSeparator()); 1269 final DateFormat df = new SimpleDateFormat("yyyyMMdd"); 1270 final DateFormat dtf = new SimpleDateFormat("yyyyMMdd,HH,mm,ss,SSS"); 1271 DriverConnection dc = (DriverConnection) connection; 1272 Integer connHashCode = dc.getOriginConnectionHashCode(); 1273 DriverDataSource dds = dc.getDriverDataSource(); 1274 ConfigProperties configProperties = dds.getConfigProperties(); 1275 1276 boolean logEnable = configProperties.getBoolean("SqlLogEnable", false); 1277 long overspend = configProperties.getMilliSeconds("SqlLogOverspend",0L); 1278 1279 if (!logEnable) 1280 return false; 1281 1282 final String defaultLogFolderPath = String.format("%s/SqlLog/", 1283 CommonTools.getJarPath(DriverExecutor.class)); 1284 final String logFolderPath = configProperties.getString("SqlLogFolder", defaultLogFolderPath); 1285 final long maxFileSize = configProperties.getFileSize("SqlLogMaxFileSize", 1024 * 1024 * 10L); 1286 final int archiveDays = configProperties.getInteger("SqlLogArchiveDays", 31); 1287 final int logParamMaxLength = configProperties.getInteger("SqlLogParamMaxLength", 20); 1288 1289 if (sqlLogCacheArray == null) { 1290 1291 sqlLogCacheArray = new CacheArray(); 1292 long sqlLogFilterTimer = configProperties.getLong("SqlLogTimer", 1000L); 1293 sqlLogCacheArray.filter(new CacheArrayFilter(0L,sqlLogFilterTimer) { 1294 @Override 1295 public void executeBatch(Integer index, List batch) { 1296 final StringBuffer sbf = new StringBuffer(); 1297 for(Object item : batch){ 1298 sbf.append(item); 1299 } 1300 try { 1301 String msg = sbf.toString(); 1302 String dateStr = df.format(new Timestamp(System.currentTimeMillis())); 1303 File sqlLogFile = new File(String.format("%s/%s.csv", logFolderPath, dateStr)); 1304 File sqlLogFolder = new File(sqlLogFile.getParent()); 1305 1306 if (sqlLogFolder.exists()) { 1307 if (!sqlLogFolder.canWrite()) 1308 throw new IOException(String.format("Can not write to log folder '%s'", 1309 sqlLogFolder.getAbsolutePath())); 1310 } else { 1311 sqlLogFolder.mkdirs(); 1312 } 1313 1314 if (sqlLogFile.exists()) { 1315 if (!sqlLogFile.canWrite()) 1316 throw new IOException(String.format("Can not write to log file '%s'", 1317 sqlLogFile.getAbsolutePath())); 1318 } 1319 1320 int suffixIndex = sqlLogFile.getName().lastIndexOf("."); 1321 String logFileNamePrefix = sqlLogFile.getName().substring(0, suffixIndex); 1322 long logSize = FileTools.size(sqlLogFile); 1323 if (!sqlLogFile.exists()) { 1324 FileTools.write(sqlLogFile, header, false); 1325 } 1326 if (logSize < maxFileSize) { 1327 FileTools.write(sqlLogFile, msg, true); 1328 } else { 1329 int logIndex = getLogFileIndex(configProperties, sqlLogFolder, logFileNamePrefix, 1330 "csv"); 1331 if (logIndex == 0) { 1332 FileTools.write(sqlLogFile, String.format("%s%s", header, msg), false); 1333 } else { 1334 backupLog(sqlLogFolder, sqlLogFile, logFileNamePrefix, logIndex, header, msg); 1335 } 1336 } 1337 archiveLog(archiveDays, sqlLogFolder); 1338 } catch (Exception e) { 1339 LoggerFactory.getLogger(DriverExecutor.class).warn(e.getMessage(), e); 1340 } 1341 } 1342 }); 1343 } 1344 1345 if (logEnable) { 1346 String hostname = CommonTools.getHostname(); 1347 String dateTimeStr = dtf.format(new Timestamp(System.currentTimeMillis())); 1348 boolean isOverspend = false; 1349 boolean isSpendSeq = sql.equals("spend"); 1350 if(isSpendSeq && params != null){ 1351 long spendValue = Long.parseLong(params + ""); 1352 isOverspend = (spendValue >= overspend); 1353 } 1354 1355 String threadId = Thread.currentThread().getName() + "-" + Thread.currentThread().getId(); 1356 String msg = String.format("%s,%s,%s,%s,%s,%s,%s,%s,\"%s\",\"%s\"%s", dateTimeStr, hostname, 1357 threadId, dds.getName(), connHashCode,deHashCode, type, seq, 1358 replaceToSigleLine(sql), handleParams(logParamMaxLength, params), System.lineSeparator()); 1359 1360 String key = String.format("%s_%s",deHashCode,connHashCode); 1361 if(seq <= 0 || overspend <= 0){ 1362 sqlLogCacheArray.add(msg); 1363 SQL_LOG_MSG_MAPPING.remove(key); 1364 SQL_LOG_OVERSPEND_MAPPING.remove(key); 1365 } 1366 if(seq > 0 && overspend > 0){ 1367 SQL_LOG_OVERSPEND_MAPPING.put(key,isOverspend); 1368 String existsMsg = SQL_LOG_MSG_MAPPING.get(key); 1369 existsMsg = existsMsg == null ? msg : (existsMsg + msg); 1370 SQL_LOG_MSG_MAPPING.put(key, existsMsg); 1371 1372 boolean seqOverspend = SQL_LOG_OVERSPEND_MAPPING.get(key); 1373 if(isSpendSeq && seqOverspend){ 1374 sqlLogCacheArray.add(SQL_LOG_MSG_MAPPING.get(key) + ""); 1375 SQL_LOG_MSG_MAPPING.remove(key); 1376 SQL_LOG_OVERSPEND_MAPPING.remove(key); 1377 } 1378 if(isSpendSeq && !seqOverspend){ 1379 SQL_LOG_MSG_MAPPING.remove(key); 1380 SQL_LOG_OVERSPEND_MAPPING.remove(key); 1381 } 1382 } 1383 return true; 1384 } 1385 1386 } 1387 return false; 1388 } 1389 1390 /** 1391 * Backup log file 1392 * */ 1393 private synchronized static void backupLog(File logFolder, File logFile, String logFileNamePrefix, int logIndex, String header, 1394 String msg) { 1395 try{ 1396 String backupLogFileName = String.format("%s/%s.%s.csv", logFolder.getAbsolutePath(), logFileNamePrefix, 1397 logIndex); 1398 Path source = Paths.get(logFile.getAbsolutePath()); 1399 Path target = Paths.get(backupLogFileName); 1400 Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); 1401 FileTools.write(logFile, String.format("%s%s", header, msg), false); 1402 }catch(Exception e){ 1403 LoggerFactory.getLogger(DriverExecutor.class).error(e.getMessage(),e); 1404 } 1405 } 1406 1407 /** 1408 * Get log file index 1409 * For backup log file use 1410 * */ 1411 private static int getLogFileIndex(ConfigProperties configProperties, File folder, String prefix, String suffix) { 1412 Integer maxBackupIndex = configProperties.getInteger("SqlLogMaxBackupIndex", 10); 1413 for (int i = 1; i <= maxBackupIndex; i++) { 1414 String logFileName = String.format("%s/%s.%s.%s", folder.getAbsolutePath(), prefix, i, suffix); 1415 File logFile = new File(logFileName); 1416 if (!logFile.exists()) 1417 return i; 1418 } 1419 return 0; 1420 } 1421 1422 /** 1423 * Archive Log 1424 * */ 1425 private static void archiveLog(int archiveDays, File sqlLogFolder) { 1426 if (archiveDays > 0) { 1427 try { 1428 long archiveDaysMs = new Date().getTime() - (archiveDays * 24 * 3600000L); 1429 deleteFilesOlderThan(sqlLogFolder, archiveDaysMs); 1430 } catch (Exception e) { 1431 LoggerFactory.getLogger(DriverExecutor.class).warn(e); 1432 } 1433 } 1434 } 1435 1436 /** 1437 * Delete old archive logs 1438 * */ 1439 private static void deleteFilesOlderThan(File directory, long archiveDaysMs) throws IOException { 1440 if (directory.isDirectory()) { 1441 File[] files = directory.listFiles(); 1442 if (files != null) { 1443 for (File file : files) { 1444 if (file.isFile()) { 1445 boolean isLogFile = file.getName().toLowerCase().endsWith(".csv"); 1446 if (isLogFile) { 1447 boolean canWrite = file.canWrite(); 1448 if (canWrite) { 1449 long lastModified = file.lastModified(); 1450 if (lastModified < archiveDaysMs) { 1451 Files.deleteIfExists(Paths.get(file.toURI())); 1452 } 1453 } 1454 } 1455 } 1456 } 1457 } 1458 } 1459 } 1460 1461 /** 1462 * Replace to sigle line 1463 * For write csv log 1464 * */ 1465 private static String replaceToSigleLine(String msg) { 1466 return CodeEscape.escapeToSingleLineForCsv(msg); 1467 } 1468 1469 /** 1470 * Hahdle Params 1471 * For write csv log 1472 * */ 1473 private static String handleParams(int paramMaxLength, Object params) { 1474 1475 if (params == null) 1476 return "null"; 1477 1478 StringBuffer paramSbf = new StringBuffer(""); 1479 if (params instanceof List) { 1480 List<Object> listParams = (List<Object>) params; 1481 int size = listParams.size(); 1482 for (int i = 0; i < size; i++) { 1483 Object param = listParams.get(i); 1484 String str = param + ""; 1485 if (str.length() > paramMaxLength) { 1486 paramSbf.append(str.substring(0, paramMaxLength) + "..."); 1487 } else { 1488 paramSbf.append(str); 1489 } 1490 if (i < size - 1) { 1491 paramSbf.append(";"); 1492 } 1493 } 1494 } else { 1495 paramSbf.append(params); 1496 } 1497 return replaceToSigleLine(paramSbf.toString()); 1498 } 1499 1500 /**For Sync DataSource**/ 1501 1502 /** 1503 * For database replication 1504 * */ 1505 protected static void callAndReturnBooleanSync(Connection masterConn, long seq, String sql, List<Object> params, 1506 boolean returnResult) throws SQLException { 1507 if (masterConn instanceof DriverConnection) { 1508 openSyncConnection(masterConn); 1509 1510 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1511 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1512 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1513 boolean connCheck = true; 1514 boolean returnCheck = true; 1515 if (ddscp != null) { 1516 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1517 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1518 } 1519 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1520 if (deList != null) { 1521 for (DriverExecutor de : deList) { 1522 try { 1523 boolean resultSync = de.callAndReturnBoolean(sql, params); 1524 if (returnCheck) { 1525 if (resultSync != returnResult) { 1526 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1527 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1528 int errorCode = 99906; 1529 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1530 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1531 throw new SQLException(errorMsg) { 1532 @Override 1533 public int getErrorCode() { 1534 return errorCode; 1535 } 1536 }; 1537 } 1538 } 1539 } catch (SQLException e) { 1540 LoggerFactory.getLogger(DriverExecutor.class) 1541 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1542 if (connCheck) 1543 throw e; 1544 } 1545 } 1546 } 1547 } 1548 } 1549 1550 /** 1551 * For database replication 1552 * */ 1553 protected static void callAndReturnListSync(Connection masterConn, int cursorStart, int maxRows, String sql, 1554 List<Object> params) throws SQLException { 1555 if (masterConn instanceof DriverConnection) { 1556 openSyncConnection(masterConn); 1557 1558 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1559 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1560 1561 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1562 if (deList != null) { 1563 for (DriverExecutor de : deList) { 1564 try { 1565 de.callAndReturnBoolean(sql, params); 1566 } catch (SQLException e) { 1567 LoggerFactory.getLogger(DriverExecutor.class) 1568 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1569 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1570 boolean connCheck = true; 1571 if (ddscp != null) { 1572 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1573 } 1574 if (connCheck) 1575 throw e; 1576 } 1577 } 1578 } 1579 } 1580 } 1581 1582 /** 1583 * For database replication 1584 * */ 1585 protected static void callAndReturnRowsSync(Connection masterConn, long seq, String sql, List<Object> params, 1586 Object returnRows) throws SQLException { 1587 if (masterConn instanceof DriverConnection) { 1588 openSyncConnection(masterConn); 1589 1590 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1591 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1592 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1593 boolean connCheck = true; 1594 boolean returnCheck = true; 1595 if (ddscp != null) { 1596 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1597 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1598 } 1599 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1600 if (deList != null) { 1601 for (DriverExecutor de : deList) { 1602 try { 1603 Object rowSync = de.callAndReturnRows(sql, params); 1604 if (returnCheck) { 1605 if (rowSync != null && rowSync.equals(returnRows)) { 1606 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1607 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1608 int errorCode = 99906; 1609 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1610 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1611 throw new SQLException(errorMsg) { 1612 @Override 1613 public int getErrorCode() { 1614 return errorCode; 1615 } 1616 }; 1617 } 1618 } 1619 } catch (SQLException e) { 1620 LoggerFactory.getLogger(DriverExecutor.class) 1621 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1622 if (connCheck) 1623 throw e; 1624 } 1625 } 1626 } 1627 } 1628 } 1629 1630 /** 1631 * For database replication 1632 * */ 1633 protected static void executeBatchListSync(Connection masterConn, long seq, String sql, List<List<Object>> records, 1634 int returnRows) throws SQLException { 1635 if (masterConn instanceof DriverConnection) { 1636 openSyncConnection(masterConn); 1637 1638 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1639 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1640 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1641 boolean connCheck = true; 1642 boolean returnCheck = true; 1643 if (ddscp != null) { 1644 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1645 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1646 } 1647 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1648 if (deList != null) { 1649 for (DriverExecutor de : deList) { 1650 try { 1651 int rowSync = de.executeBatchList(sql, records); 1652 if (returnCheck) { 1653 if (rowSync != returnRows) { 1654 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1655 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1656 int errorCode = 99906; 1657 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1658 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1659 throw new SQLException(errorMsg) { 1660 @Override 1661 public int getErrorCode() { 1662 return errorCode; 1663 } 1664 }; 1665 } 1666 } 1667 } catch (SQLException e) { 1668 LoggerFactory.getLogger(DriverExecutor.class) 1669 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1670 if (connCheck) 1671 throw e; 1672 } 1673 } 1674 } 1675 } 1676 } 1677 1678 /** 1679 * For database replication 1680 * */ 1681 protected static void executeSync(Connection masterConn, long seq, String sql, List<Object> params, int returnRows) 1682 throws SQLException { 1683 if (masterConn instanceof DriverConnection) { 1684 openSyncConnection(masterConn); 1685 1686 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1687 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1688 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1689 boolean connCheck = true; 1690 boolean returnCheck = true; 1691 if (ddscp != null) { 1692 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1693 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1694 } 1695 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1696 if (deList != null) { 1697 for (DriverExecutor de : deList) { 1698 try { 1699 int rowSync = de.execute(sql, params); 1700 if (returnCheck) { 1701 if (rowSync != returnRows) { 1702 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1703 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1704 int errorCode = 99906; 1705 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1706 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1707 throw new SQLException(errorMsg) { 1708 @Override 1709 public int getErrorCode() { 1710 return errorCode; 1711 } 1712 }; 1713 } 1714 } 1715 } catch (SQLException e) { 1716 LoggerFactory.getLogger(DriverExecutor.class) 1717 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1718 if (connCheck) 1719 throw e; 1720 } 1721 } 1722 } 1723 } 1724 } 1725 1726 /** 1727 * For database replication 1728 * */ 1729 protected synchronized static boolean openSyncConnection(Connection masterConn) throws SQLException { 1730 if (masterConn instanceof DriverConnection) { 1731 1732 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1733 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1734 1735 boolean isExist = SYNC_EXECUTOR_MARK.containsKey(masterConnHashCode); 1736 1737 if (isExist) { 1738 return false; 1739 } else { 1740 SYNC_EXECUTOR_MARK.put(masterConnHashCode, new ArrayList<DriverExecutor>()); 1741 } 1742 1743 DriverDataSource dds = masterDriverConn.getDriverDataSource(); 1744 if (dds != null) { 1745 List<DriverDataSource> sdsl = dds.getSyncDataSourceList(dds.getName()); 1746 if (sdsl != null) { 1747 1748 if (sdsl.isEmpty()) 1749 return false; 1750 1751 for (DriverDataSource sds : sdsl) { 1752 1753 Integer sdsHashCode = sds.hashCode(); 1754 if (!SYNC_CONN_ERROR_TIME.containsKey(sdsHashCode)) { 1755 SYNC_CONN_ERROR_TIME.put(sdsHashCode, 0L); 1756 } 1757 1758 Long syncConnErrorTime = SYNC_CONN_ERROR_TIME.get(sdsHashCode); 1759 if (syncConnErrorTime > 0) { 1760 ConfigProperties ddscp = dds.getConfigProperties(); 1761 long connCheckMs = 10000L; 1762 if (ddscp != null) { 1763 connCheckMs = ddscp.getLong("SyncConnectionCheckTime", 10000L); 1764 } 1765 boolean isSkipConn = syncConnErrorTime > 0 1766 && (System.currentTimeMillis() - syncConnErrorTime) <= connCheckMs; 1767 if (isSkipConn) { 1768 continue; 1769 } 1770 } 1771 1772 String masterFingerprint = dds.getFingerprint(); 1773 String syncFingerprint = sds.getFingerprint(); 1774 if (masterFingerprint.equalsIgnoreCase(syncFingerprint)) { 1775 LoggerFactory.getLogger(DriverExecutor.class) 1776 .warn("Skip sync reason 'same connection fingerprint'."); 1777 continue; 1778 } 1779 1780 try { 1781 final Connection conn = sds.getConnection(); 1782 if (conn == null) { 1783 int errorCode = 99904; 1784 String errorMsg = "Connection is null."; 1785 throw new SQLException(errorMsg) { 1786 @Override 1787 public int getErrorCode() { 1788 return errorCode; 1789 } 1790 }; 1791 } 1792 conn.setAutoCommit(masterConn.getAutoCommit()); 1793 SYNC_CONN_ERROR_TIME.put(sdsHashCode, 0L); 1794 SYNC_EXECUTOR_MARK.get(masterConnHashCode).add(new DriverExecutor(conn)); 1795 } catch (SQLException e) { 1796 SYNC_CONN_ERROR_TIME.put(sdsHashCode, System.currentTimeMillis()); 1797 LoggerFactory.getLogger(DriverExecutor.class) 1798 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1799 ConfigProperties ddscp = dds.getConfigProperties(); 1800 boolean connCheck = true; 1801 if (ddscp != null) { 1802 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1803 } 1804 if (connCheck) 1805 throw e; 1806 } 1807 1808 } 1809 1810 return true; 1811 } 1812 } 1813 } 1814 return false; 1815 } 1816 1817 /** 1818 * For database replication 1819 * */ 1820 protected static void closeSyncConnection(Connection masterConn) throws SQLException { 1821 if (masterConn instanceof DriverConnection) { 1822 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1823 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1824 1825 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1826 if (deList != null) { 1827 for (DriverExecutor de : deList) { 1828 try { 1829 de.close(); 1830 } catch (SQLException e) { 1831 LoggerFactory.getLogger(DriverExecutor.class) 1832 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1833 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1834 boolean connCheck = true; 1835 if (ddscp != null) { 1836 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1837 } 1838 if (connCheck) 1839 throw e; 1840 } 1841 } 1842 SYNC_EXECUTOR_MARK.remove(masterConnHashCode); 1843 LoggerFactory.getLogger(DriverExecutor.class).debug("CloseSyncConnection - masterConn '{}' finished.", 1844 masterConnHashCode); 1845 } 1846 } 1847 } 1848 1849 /** 1850 * For database replication 1851 * */ 1852 protected static void commitSyncConnection(Connection masterConn) throws SQLException { 1853 if (masterConn instanceof DriverConnection) { 1854 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1855 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1856 1857 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1858 if (deList != null) { 1859 for (DriverExecutor de : deList) { 1860 try { 1861 de.commit(); 1862 } catch (SQLException e) { 1863 LoggerFactory.getLogger(DriverExecutor.class) 1864 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1865 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1866 boolean connCheck = true; 1867 if (ddscp != null) { 1868 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1869 } 1870 if (connCheck) 1871 throw e; 1872 } 1873 } 1874 LoggerFactory.getLogger(DriverExecutor.class).debug("CommitSyncConnection - masterConn '{}' finished.", 1875 masterConnHashCode); 1876 } 1877 } 1878 } 1879 1880 /** 1881 * For database replication 1882 * */ 1883 protected static void rollbackSyncConnection(Connection masterConn) throws SQLException { 1884 if (masterConn instanceof DriverConnection) { 1885 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1886 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1887 1888 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1889 if (deList != null) { 1890 for (DriverExecutor de : deList) { 1891 try { 1892 de.rollback(); 1893 } catch (SQLException e) { 1894 LoggerFactory.getLogger(DriverExecutor.class) 1895 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1896 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1897 boolean connCheck = true; 1898 if (ddscp != null) { 1899 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1900 } 1901 if (connCheck) 1902 throw e; 1903 } 1904 } 1905 LoggerFactory.getLogger(DriverExecutor.class) 1906 .debug("RollbackSyncConnection - masterConn '{}' finished.", masterConnHashCode); 1907 } 1908 } 1909 } 1910 1911 /** 1912 * For database replication 1913 * */ 1914 protected static void abortSyncConnection(Connection masterConn) throws SQLException { 1915 if (masterConn instanceof DriverConnection) { 1916 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1917 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1918 1919 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1920 if (deList != null) { 1921 for (DriverExecutor de : deList) { 1922 try { 1923 de.abort(); 1924 } catch (SQLException e) { 1925 LoggerFactory.getLogger(DriverExecutor.class) 1926 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1927 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1928 boolean connCheck = true; 1929 if (ddscp != null) { 1930 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1931 } 1932 if (connCheck) 1933 throw e; 1934 } 1935 } 1936 LoggerFactory.getLogger(DriverExecutor.class).debug("AbortSyncConnection - masterConn '{}' finished.", 1937 masterConnHashCode); 1938 } 1939 } 1940 } 1941 1942}