001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.datasource; 025 026import java.sql.Connection; 027import java.sql.PreparedStatement; 028import java.util.List; 029import java.sql.SQLException; 030import java.sql.Types; 031import java.util.Map; 032import java.sql.ResultSet; 033import java.sql.ResultSetMetaData; 034import java.util.HashMap; 035import java.util.ArrayList; 036import java.util.regex.Pattern; 037import java.util.regex.Matcher; 038import java.util.Arrays; 039import com.killcoding.tool.ResultMap; 040import java.sql.DatabaseMetaData; 041import com.killcoding.log.LoggerFactory; 042import com.killcoding.log.Logger; 043import java.util.concurrent.Executors; 044import com.killcoding.datasource.DriverConnection; 045import com.killcoding.datasource.DriverDataSource; 046import java.util.concurrent.ConcurrentHashMap; 047import com.killcoding.tool.CommonTools; 048import com.killcoding.tool.ConfigProperties; 049import java.io.File; 050import java.text.DateFormat; 051import java.text.SimpleDateFormat; 052import com.killcoding.tool.FileTools; 053import java.io.IOException; 054import java.nio.file.Files; 055import java.util.Date; 056import java.nio.file.Paths; 057import java.nio.file.Path; 058import java.nio.file.StandardCopyOption; 059import com.killcoding.cache.CacheArray; 060import com.killcoding.cache.CacheArrayFilter; 061import com.killcoding.tool.CodeEscape; 062import java.sql.Blob; 063import java.util.stream.Collectors; 064import java.net.URI; 065import java.util.Comparator; 066import java.sql.Timestamp; 067import java.sql.Clob; 068import java.io.InputStream; 069import java.sql.CallableStatement; 070 071/** 072 * This class is execute sql base class. 073 * Support database replication. 074 * Support database multi-activity. 075 * Support database CRUD 076 * */ 077public class DriverExecutor { 078 079 protected final static Map<Integer, List<DriverExecutor>> SYNC_EXECUTOR_MARK = new ConcurrentHashMap<Integer, List<DriverExecutor>>(); 080 private final static Map<Integer, Long> SYNC_CONN_ERROR_TIME = new ConcurrentHashMap<Integer, Long>(); 081 082 private final static Map<String,String> SQL_LOG_MSG_MAPPING = new ConcurrentHashMap<String,String>(); 083 private final static Map<String,Boolean> SQL_LOG_OVERSPEND_MAPPING = new ConcurrentHashMap<String,Boolean>(); 084 085 protected Logger log = null; 086 087 public final static String COLUMN_NAME_CASE_UPPER = "UPPER"; 088 public final static String COLUMN_NAME_CASE_LOWER = "LOWER"; 089 public final static String COLUMN_NAME_CASE_ORIGINAL = "ORIGINAL"; 090 public static String COLUMN_NAME_CASE_MODE = COLUMN_NAME_CASE_ORIGINAL; 091 092 protected boolean closed = true; 093 protected Connection connection = null; 094 private static CacheArray sqlLogCacheArray = null; 095 096 /** 097 * New a DriverExecutor object 098 * @param connection - JDBC connection 099 * */ 100 public DriverExecutor(Connection connection) { 101 super(); 102 log = LoggerFactory.getLogger(this.getClass()); 103 this.connection = connection; 104 writeSqlLog("open", 0, "open", ""); 105 closed = (this.connection == null); 106 } 107 108 /** 109 * Get current Connection 110 * @return Connection 111 * */ 112 public Connection getConnection() { 113 return connection; 114 } 115 116 /** 117 * Get column classes by table name 118 * @exception SQLException 119 * @return Map<String,Object> - column and java type class mapping 120 * @param tableName - table name 121 * */ 122 public Map<String, Object> getColumnClasses(String tableName) throws SQLException { 123 Map<String, Object> types = null; 124 ResultSet result = null; 125 PreparedStatement statement = null; 126 String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName); 127 statement = connection.prepareStatement(sql); 128 result = statement.executeQuery(); 129 final ResultSetMetaData rsmd = result.getMetaData(); 130 for (int i = 0; i < rsmd.getColumnCount(); i++) { 131 if (types == null) { 132 types = new ResultMap<String, Object>(); 133 } 134 String cn = rsmd.getColumnClassName(i + 1); 135 types.put(converCase(rsmd.getColumnLabel(i + 1)), cn); 136 } 137 return types; 138 } 139 140 /** 141 * Get column db data types by table name 142 * @exception SQLException 143 * @return Map<String, Object> - column and db data type mapping 144 * @param tableName - Table name 145 * */ 146 public Map<String, Object> getColumnTypes(String tableName) throws SQLException { 147 ResultSet result = null; 148 PreparedStatement statement = null; 149 String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName); 150 Map<String, Object> types = null; 151 statement = connection.prepareStatement(sql); 152 result = statement.executeQuery(); 153 final ResultSetMetaData rsmd = result.getMetaData(); 154 for (int i = 0; i < rsmd.getColumnCount(); i++) { 155 if (types == null) { 156 types = new ResultMap<String, Object>(); 157 } 158 types.put(converCase(rsmd.getColumnLabel(i + 1)), rsmd.getColumnTypeName(i + 1)); 159 } 160 return types; 161 } 162 163 /** 164 * Show column data type and java type mapping 165 * @exception SQLException 166 * @return List<Map<String, Object>> - Mapping list 167 * @param tableName - Table name 168 * */ 169 public List<Map<String, Object>> desc(String tableName) throws SQLException { 170 List<String> primaryKeys = getPrimaryKeys(tableName); 171 List<Map<String, Object>> results = new ArrayList<Map<String, Object>>(); 172 ResultSet result = null; 173 PreparedStatement statement = null; 174 String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName); 175 statement = connection.prepareStatement(sql); 176 result = statement.executeQuery(); 177 final ResultSetMetaData rsmd = result.getMetaData(); 178 for (int i = 0; i < rsmd.getColumnCount(); i++) { 179 Map<String, Object> types = new ResultMap<String, Object>(); 180 int ci = i + 1; 181 boolean nullable = rsmd.isNullable(ci) == 1; 182 String name = converCase(rsmd.getColumnLabel(ci)); 183 String isPk = "UNKNOWN"; 184 if (primaryKeys != null) { 185 isPk = primaryKeys.contains(name) ? "Y" : "N"; 186 } 187 types.put("NAME", name); 188 types.put("PRIMARY_KEY", isPk); 189 types.put("DATA_TYPE", rsmd.getColumnTypeName(ci)); 190 types.put("JAVA_TYPE", rsmd.getColumnClassName(ci)); 191 types.put("PRECISION", rsmd.getPrecision(ci)); 192 types.put("ALLOW_NULLABLE", nullable ? "Y" : 'N'); 193 results.add(types); 194 } 195 return results; 196 } 197 198 /** 199 * Get primary Keys by table name 200 * @return List<String> - Primary Keys 201 * @param _tableName - Table name 202 * */ 203 public List<String> getPrimaryKeys(String _tableName) { 204 try { 205 List<String> pks = new ArrayList<String>(); 206 DatabaseMetaData meta = connection.getMetaData(); 207 ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" }); 208 while (tables.next()) { 209 String catalog = tables.getString("TABLE_CAT"); 210 String schema = tables.getString("TABLE_SCHEM"); 211 String tableName = tables.getString("TABLE_NAME"); 212 if (tableName.equalsIgnoreCase(_tableName)) { 213 ResultSet primaryKeys = meta.getPrimaryKeys(catalog, schema, tableName); 214 while (primaryKeys.next()) { 215 pks.add(primaryKeys.getString("COLUMN_NAME")); 216 } 217 break; 218 } 219 } 220 return pks; 221 } catch (Exception e) { 222 log.warn(e); 223 return null; 224 } 225 } 226 227 /** 228 * Get all tables 229 * @return List<Map<String, Object>> 230 * @exception SQLException 231 * */ 232 public List<Map<String, Object>> getAllTables() throws SQLException { 233 List<Map<String, Object>> tablesList = new ArrayList<Map<String, Object>>(); 234 DatabaseMetaData meta = connection.getMetaData(); 235 ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" }); 236 while (tables.next()) { 237 String catalog = tables.getString("TABLE_CAT"); 238 String schema = tables.getString("TABLE_SCHEM"); 239 String tableName = tables.getString("TABLE_NAME"); 240 Map<String, Object> t = new ResultMap<String, Object>(); 241 t.put("TABLE_SCHEMA", schema); 242 t.put("TABLE_NAME", tableName); 243 tablesList.add(t); 244 } 245 return tablesList; 246 } 247 248 /** 249 * Get all tables by schema 250 * @return List<Map<String, Object>> 251 * @exception SQLException 252 * */ 253 public List<Map<String, Object>> getAllTables(String _schema) throws SQLException { 254 List<Map<String, Object>> tablesList = new ArrayList<Map<String, Object>>(); 255 DatabaseMetaData meta = connection.getMetaData(); 256 ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" }); 257 while (tables.next()) { 258 String schema = tables.getString("TABLE_SCHEM"); 259 if (schema != null && schema.equalsIgnoreCase(_schema)) { 260 String catalog = tables.getString("TABLE_CAT"); 261 String tableName = tables.getString("TABLE_NAME"); 262 Map<String, Object> t = new ResultMap<String, Object>(); 263 t.put("TABLE_SCHEMA", schema); 264 t.put("TABLE_NAME", tableName); 265 tablesList.add(t); 266 } 267 } 268 return tablesList; 269 } 270 271 /** 272 * Query first record 273 * @exception SQLException 274 * @return Map<String,Object> - First result 275 * @param sql - Condition use format ':column_name' 276 * @param params 277 * */ 278 public Map<String, Object> first(String sql, Map<String, Object> params) throws SQLException { 279 List<Map<String, Object>> list = find(0, 1, sql, params); 280 if (list.size() > 0) { 281 return list.get(0); 282 } 283 return null; 284 } 285 286 /** 287 * Query first record 288 * @exception SQLException 289 * @return Map<String,Object> - First result 290 * @param sql 291 * */ 292 public Map<String, Object> first(String sql) throws SQLException { 293 return first(sql, Arrays.asList(new Object[] {})); 294 } 295 296 /** 297 * Query first record 298 * @exception SQLException 299 * @return Map<String,Object> - First result 300 * @param sql - Condition use format '?' 301 * @param params 302 * */ 303 public Map<String, Object> first(String sql, List<Object> params) throws SQLException { 304 List<Map<String, Object>> list = find(0, 1, sql, params); 305 if (list.size() > 0) { 306 return list.get(0); 307 } 308 return null; 309 } 310 311 /** 312 * Query all matched records 313 * @exception SQLException 314 * @return List<Map<String, Object>> 315 * @param sql 316 * @param params 317 * */ 318 public List<Map<String, Object>> find(String sql) throws SQLException { 319 return find(0, 0, sql, Arrays.asList(new Object[] {})); 320 } 321 322 /** 323 * Query all matched records 324 * @exception SQLException 325 * @return List<Map<String, Object>> 326 * @param cursorStart - JDBC result Cursor start index 327 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 328 * @param sql 329 * @param params 330 * */ 331 public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql) throws SQLException { 332 return find(cursorStart, maxRows, sql, Arrays.asList(new Object[] {})); 333 } 334 335 /** 336 * Query all matched records 337 * @exception SQLException 338 * @return List<Map<String, Object>> 339 * @param sql - Condition use format ':column_name' 340 * @param params 341 * */ 342 public List<Map<String, Object>> find(String sql, Map<String, Object> params) throws SQLException { 343 return find(0, 0, sql, params); 344 } 345 346 /** 347 * Query all matched records 348 * @exception SQLException 349 * @return List<Map<String, Object>> 350 * @param cursorStart - JDBC result Cursor start index 351 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 352 * @param sql - Condition use format ':column_name' 353 * @param params 354 * */ 355 public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql, Map<String, Object> params) 356 throws SQLException { 357 String csql = converSql(sql); 358 List<Object> cparams = converParams(sql, params); 359 return find(cursorStart, maxRows, csql, cparams); 360 } 361 362 /** 363 * Query all matched records 364 * @exception SQLException 365 * @return List<Map<String, Object>> 366 * @param cursorStart - JDBC result Cursor start index 367 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 368 * @param sql - Condition use format '?' 369 * @param params 370 * */ 371 public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql, List<Object> params) 372 throws SQLException { 373 long begin = System.currentTimeMillis(); 374 boolean allowedLog = writeSqlLog("find", begin, 375 String.format("%s [cursorStart=%s,maxRows=%s]", sql, cursorStart, maxRows), params); 376 377 PreparedStatement statement = null; 378 Map<String, Object> row = null; 379 ResultSet result = null; 380 final List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>(); 381 try { 382 // ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE 383 // ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY 384 statement = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); 385 if (params != null) { 386 int size = params.size(); 387 for (int i = 0; i < size; i++) { 388 int ci = i + 1; 389 Object param = params.get(i); 390 if (param == null) { 391 statement.setNull(ci, Types.VARCHAR); 392 } else { 393 statement.setObject(ci, param); 394 } 395 } 396 } 397 if (maxRows > 0) { 398 statement.setMaxRows(maxRows); 399 } 400 result = statement.executeQuery(); 401 result.absolute(cursorStart); 402 final ResultSetMetaData rsmd = result.getMetaData(); 403 final int c = rsmd.getColumnCount(); 404 while (result.next()) { 405 row = new ResultMap<String, Object>(); 406 for (int i = 0; i < c; i++) { 407 int ci = i + 1; 408 Object value = null; 409 Object originValue = result.getObject(ci); 410 if (originValue == null) { 411 value = originValue; 412 } else if (originValue instanceof Blob) { 413 Blob blobValue = (Blob) originValue; 414 InputStream is = null; 415 try { 416 is = blobValue.getBinaryStream(); 417 if(is != null) value = is.readAllBytes(); 418 } catch (IOException e) { 419 throw new SQLException(e.getMessage(), e); 420 } finally { 421 if (blobValue != null) { 422 try { 423 blobValue.free(); 424 } catch (SQLException e) { 425 throw e; 426 } 427 } 428 if (is != null) { 429 try { 430 is.close(); 431 } catch (IOException e) { 432 throw new SQLException(e.getMessage(), e); 433 } 434 } 435 } 436 } else if (originValue instanceof Clob) { 437 Clob clobValue = (Clob) originValue; 438 InputStream is = null; 439 try { 440 is = clobValue.getAsciiStream(); 441 if(is != null) value = is.readAllBytes(); 442 } catch (IOException e) { 443 throw new SQLException(e.getMessage(), e); 444 } finally { 445 if (clobValue != null) { 446 try { 447 clobValue.free(); 448 } catch (SQLException e) { 449 throw e; 450 } 451 } 452 if (is != null) { 453 try { 454 is.close(); 455 } catch (IOException e) { 456 throw new SQLException(e.getMessage(), e); 457 } 458 } 459 } 460 } else { 461 value = originValue; 462 } 463 row.put(converCase(rsmd.getColumnLabel(ci)), value); 464 } 465 rows.add(row); 466 } 467 if (allowedLog) { 468 writeSqlLog("rows", begin, "rows", rows.size()); 469 long spend = System.currentTimeMillis() - begin; 470 writeSqlLog("spend", begin, "spend", spend); 471 } 472 return rows; 473 } catch (SQLException e) { 474 if (allowedLog) 475 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 476 477 throw e; 478 } finally { 479 if (result != null) 480 result.close(); 481 482 if (statement != null) 483 statement.close(); 484 } 485 } 486 487 /** 488 * Execute stored proc (and return boolean) 489 * @param sql - Query sql 490 * @exception SQLException 491 * @return boolean 492 * */ 493 public boolean callAndReturnBoolean(String sql) throws SQLException { 494 return callAndReturnBoolean(sql, Arrays.asList(new Object[] {})); 495 } 496 497 /** 498 * Execute stored proc (and return boolean) 499 * @param sql - Query sql, condition use format ':column_name' 500 * @param params 501 * @exception SQLException 502 * @return boolean 503 * */ 504 public boolean callAndReturnBoolean(String sql, Map<String, Object> params) throws SQLException { 505 String csql = converSql(sql); 506 List<Object> cparams = converParams(sql, params); 507 return callAndReturnBoolean(sql, params); 508 } 509 510 /** 511 * Execute stored proc (and return boolean) 512 * @param sql - Query sql,condition use format '?' 513 * @param params 514 * @exception SQLException 515 * @return boolean 516 * */ 517 public boolean callAndReturnBoolean(String sql, List<Object> params) throws SQLException { 518 519 if (!checkSqlAvailable(sql)) 520 return false; 521 522 long begin = System.currentTimeMillis(); 523 boolean allowedLog = writeSqlLog("call", begin, sql, params); 524 525 CallableStatement statement = null; 526 try { 527 statement = connection.prepareCall("{" + sql + "}"); 528 if (params != null) { 529 int size = params.size(); 530 for (int i = 0; i < size; i++) { 531 Object param = params.get(i); 532 if (param == null) { 533 statement.setNull(i + 1, Types.VARCHAR); 534 } else { 535 statement.setObject(i + 1, param); 536 } 537 } 538 } 539 540 statement.execute(); 541 542 boolean returnResult = true; //true is not exception 543 544 if (allowedLog) { 545 writeSqlLog("return", begin, "return", returnResult); 546 long spend = System.currentTimeMillis() - begin; 547 writeSqlLog("spend", begin, "spend", spend); 548 } 549 550 callAndReturnBooleanSync(connection, begin, sql, params, returnResult); 551 552 return returnResult; 553 } catch (SQLException e) { 554 if (allowedLog) 555 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 556 557 throw e; 558 } finally { 559 if (statement != null) 560 statement.close(); 561 } 562 } 563 564 /** 565 * Execute stored proc (and return rows) 566 * @param sql - Query sql 567 * @exception SQLException 568 * @return int 569 * */ 570 public int callAndReturnRows(String sql) throws SQLException { 571 return callAndReturnRows(sql, Arrays.asList(new Object[] {})); 572 } 573 574 /** 575 * Execute stored proc (and return rows) 576 * @param sql - Query sql,condition use format ':column_name' 577 * @param params 578 * @exception SQLException 579 * @return int 580 * */ 581 public int callAndReturnRows(String sql, Map<String, Object> params) throws SQLException { 582 String csql = converSql(sql); 583 List<Object> cparams = converParams(sql, params); 584 return callAndReturnRows(sql, params); 585 } 586 587 public int callAndReturnRows(String sql, List<Object> params) throws SQLException { 588 return (int)callAndReturnRows(Types.INTEGER,sql,params); 589 } 590 591 /** 592 * Execute stored proc (and return rows) 593 * @param sqltypes - e.g Types.INTEGER 594 * @param sql - Query sql,condition use format '?' 595 * @param params 596 * @exception SQLException 597 * @return Object 598 * */ 599 public Object callAndReturnRows(int sqlTypes,String sql, List<Object> params) throws SQLException { 600 601 if (!checkSqlAvailable(sql)) 602 return -1; 603 604 long begin = System.currentTimeMillis(); 605 boolean allowedLog = writeSqlLog("call", begin, sql, params); 606 607 CallableStatement statement = null; 608 try { 609 statement = connection.prepareCall("{?=" + sql + "}"); 610 statement.registerOutParameter(1,sqlTypes); 611 if (params != null) { 612 int size = params.size(); 613 for (int i = 0; i < size; i++) { 614 Object param = params.get(i); 615 if (param == null) { 616 statement.setNull(i + 1, Types.VARCHAR); 617 } else { 618 statement.setObject(i + 1, param); 619 } 620 } 621 } 622 boolean hasResult = statement.execute(); 623 624 Object row = statement.getObject(1); 625 626 if (allowedLog) { 627 writeSqlLog("return", begin, "return", row); 628 long spend = System.currentTimeMillis() - begin; 629 writeSqlLog("spend", begin, "spend", spend); 630 } 631 632 callAndReturnRowsSync(connection, begin, sql, params, row); 633 634 return row; 635 } catch (SQLException e) { 636 if (allowedLog) 637 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 638 639 throw e; 640 } finally { 641 if (statement != null) 642 statement.close(); 643 } 644 } 645 646 /** 647 * Execute stored proc (and return List) 648 * @param sql - Query sql 649 * @exception SQLException 650 * @return List<Map<String, Object>> 651 * */ 652 public List<Map<String, Object>> callAndReturnList(String sql) throws SQLException { 653 return callAndReturnList(0, 0, sql, Arrays.asList(new Object[] {})); 654 } 655 656 /** 657 * Execute stored proc (and return List) 658 * @param cursorStart - JDBC result Cursor start index 659 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 660 * @param sql - Query sql 661 * @exception SQLException 662 * @return List<Map<String, Object>> 663 * */ 664 public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql) throws SQLException { 665 return callAndReturnList(cursorStart, maxRows, sql, Arrays.asList(new Object[] {})); 666 } 667 668 /** 669 * Execute stored proc (and return List) 670 * @param cursorStart - JDBC result Cursor start index 671 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 672 * @param sql - Query sql, condition use format ':column_name' 673 * @exception SQLException 674 * @return List<Map<String, Object>> 675 * */ 676 public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql, 677 Map<String, Object> params) throws SQLException { 678 String csql = converSql(sql); 679 List<Object> cparams = converParams(sql, params); 680 return callAndReturnList(cursorStart, maxRows, sql, params); 681 } 682 683 /** 684 * Execute stored proc (and return List) 685 * @param cursorStart - JDBC result Cursor start index 686 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 687 * @param sql - Query sql, condition use format '?' 688 * @exception SQLException 689 * @return List<Map<String, Object>> 690 * */ 691 public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql, List<Object> params) 692 throws SQLException { 693 694 if (!checkSqlAvailable(sql)) 695 return null; 696 697 long begin = System.currentTimeMillis(); 698 boolean allowedLog = writeSqlLog("call", begin, 699 String.format("%s [cursorStart=%s,maxRows=%s]", sql, cursorStart, maxRows), params); 700 701 CallableStatement statement = null; 702 Map<String, Object> row = null; 703 ResultSet result = null; 704 final List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>(); 705 try { 706 statement = connection.prepareCall("{" + sql + "}"); 707 if (params != null) { 708 int size = params.size(); 709 for (int i = 0; i < size; i++) { 710 int ci = i + 1; 711 Object param = params.get(i); 712 if (param == null) { 713 statement.setNull(ci, Types.VARCHAR); 714 } else { 715 statement.setObject(ci, param); 716 } 717 } 718 } 719 if (maxRows > 0) { 720 statement.setMaxRows(maxRows); 721 } 722 result = statement.executeQuery(); 723 final ResultSetMetaData rsmd = result.getMetaData(); 724 final int c = rsmd.getColumnCount(); 725 int rowIndex = 0; 726 while (result.next()) { 727 if (rowIndex >= cursorStart) { 728 row = new ResultMap<String, Object>(); 729 for (int i = 0; i < c; i++) { 730 int ci = i + 1; 731 Object value = null; 732 Object originValue = result.getObject(ci); 733 if (originValue == null) { 734 value = originValue; 735 } else if (originValue instanceof Blob) { 736 Blob blobValue = (Blob) originValue; 737 InputStream is = null; 738 try { 739 is = blobValue.getBinaryStream(); 740 if(is != null) value = is.readAllBytes(); 741 } catch (IOException e) { 742 throw new SQLException(e.getMessage(), e); 743 } finally { 744 if (blobValue != null) { 745 try { 746 blobValue.free(); 747 } catch (SQLException e) { 748 throw e; 749 } 750 } 751 if (is != null) { 752 try { 753 is.close(); 754 } catch (IOException e) { 755 throw new SQLException(e.getMessage(), e); 756 } 757 } 758 } 759 } else if (originValue instanceof Clob) { 760 Clob clobValue = (Clob) originValue; 761 InputStream is = null; 762 try { 763 is = clobValue.getAsciiStream(); 764 if(is != null) value = is.readAllBytes(); 765 } catch (IOException e) { 766 throw new SQLException(e.getMessage(), e); 767 } finally { 768 if (clobValue != null) { 769 try { 770 clobValue.free(); 771 } catch (SQLException e) { 772 throw e; 773 } 774 } 775 if (is != null) { 776 try { 777 is.close(); 778 } catch (IOException e) { 779 throw new SQLException(e.getMessage(), e); 780 } 781 } 782 } 783 } else { 784 value = originValue; 785 } 786 row.put(converCase(rsmd.getColumnLabel(ci)), value); 787 } 788 rows.add(row); 789 } 790 rowIndex++; 791 } 792 793 if (allowedLog) { 794 writeSqlLog("rows", begin, "rows", rows); 795 long spend = System.currentTimeMillis() - begin; 796 writeSqlLog("spend", begin, "spend", spend); 797 } 798 799 callAndReturnListSync(connection, cursorStart, maxRows, sql, params); 800 801 return rows; 802 } catch (SQLException e) { 803 if (allowedLog) 804 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 805 throw e; 806 } finally { 807 if (result != null) 808 result.close(); 809 810 if (statement != null) 811 statement.close(); 812 } 813 } 814 815 /** 816 * Execute sql 817 * @exception SQLException 818 * @param sql 819 * @return int 820 * */ 821 public int execute(String sql) throws SQLException { 822 return execute(sql, Arrays.asList(new Object[] {})); 823 } 824 825 /** 826 * Execute sql 827 * @exception SQLException 828 * @param sql - Condition use format ':column_name' 829 * @param params 830 * @return int 831 * */ 832 public int execute(String sql, Map<String, Object> params) throws SQLException { 833 String csql = converSql(sql); 834 List<Object> cparams = converParams(sql, params); 835 return execute(csql, cparams); 836 } 837 838 /** 839 * Execute sql 840 * @exception SQLException 841 * @param sql - Condition use format '?' 842 * @param params 843 * @return int 844 * */ 845 public int execute(String sql, List<Object> params) throws SQLException { 846 if (!checkSqlAvailable(sql)) 847 return -1; 848 849 long begin = System.currentTimeMillis(); 850 boolean allowedLog = writeSqlLog("execute", begin, sql, params); 851 852 PreparedStatement statement = null; 853 try { 854 statement = connection.prepareStatement(sql); 855 if (params != null) { 856 int size = params.size(); 857 for (int i = 0; i < size; i++) { 858 Object param = params.get(i); 859 if (param == null) { 860 statement.setNull(i + 1, Types.VARCHAR); 861 } else { 862 statement.setObject(i + 1, param); 863 } 864 } 865 } 866 int row = statement.executeUpdate(); 867 868 if (allowedLog) { 869 writeSqlLog("return", begin, "return", row); 870 long spend = System.currentTimeMillis() - begin; 871 writeSqlLog("spend", begin, "spend", spend); 872 } 873 874 executeSync(connection, begin, sql, params, row); 875 876 return row; 877 } catch (SQLException e) { 878 if (allowedLog) 879 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 880 throw e; 881 } finally { 882 if (statement != null) 883 statement.close(); 884 } 885 } 886 887 /** 888 * Execute batch 889 * @exception SQLException 890 * @param sql - Condition use format ':column_name' 891 * @param records 892 * @return int - Return rows 893 * */ 894 public int executeBatch(String sql, List<Map<String, Object>> records) throws SQLException { 895 String csql = converSql(sql); 896 List<List<Object>> crecords = new ArrayList<List<Object>>(); 897 for (Map<String, Object> record : records) { 898 List<Object> crecord = converParams(sql, record); 899 crecords.add(crecord); 900 } 901 return executeBatchList(csql, crecords); 902 } 903 904 /** 905 * Execute batch 906 * @exception SQLException 907 * @param sql - Condition use format '?' 908 * @param records 909 * @return int - Return rows 910 * */ 911 public int executeBatchList(String sql, List<List<Object>> records) throws SQLException { 912 boolean allowedLog = false; 913 if (!checkSqlAvailable(sql)) 914 return -1; 915 916 long begin = System.currentTimeMillis(); 917 if (records != null) { 918 allowedLog = writeSqlLog("batch", begin, sql, String.format("[batchSize=%s]", records.size())); 919 } 920 921 boolean first = true; 922 PreparedStatement statement = null; 923 try { 924 for (List<Object> params : records) { 925 if (first) { 926 statement = connection.prepareStatement(sql); 927 first = false; 928 } 929 930 int size = params.size(); 931 for (int i = 0; i < size; i++) { 932 Object param = params.get(i); 933 if (param == null) { 934 statement.setNull(i + 1, Types.VARCHAR); 935 } else { 936 statement.setObject(i + 1, param); 937 } 938 } 939 940 statement.addBatch(); 941 } 942 943 int sumRow = 0; 944 if (records.size() > 0) { 945 int[] rows = statement.executeBatch(); 946 947 for (int r : rows) { 948 sumRow += r; 949 } 950 951 if (allowedLog) { 952 long spend = System.currentTimeMillis() - begin; 953 writeSqlLog("return", begin, "return", sumRow); 954 writeSqlLog("spend", begin, "spend", spend); 955 } 956 957 executeBatchListSync(connection, begin, sql, records, sumRow); 958 } 959 960 return sumRow; 961 } catch (SQLException e) { 962 if (allowedLog) 963 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 964 965 throw e; 966 } finally { 967 if (statement != null) 968 statement.close(); 969 } 970 } 971 972 /** 973 * Check connection is closed 974 * @return boolean 975 * */ 976 public boolean isClosed() { 977 try { 978 if(closed) return closed; 979 980 return (connection == null || connection.isClosed()); 981 } catch (Exception e) { 982 log.warn(e); 983 return true; 984 } 985 } 986 987 /** 988 * Abort connection 989 * @exception SQLException 990 * */ 991 public void abort() throws SQLException { 992 writeSqlLog("aborting", 0, "aborting", ""); 993 if (connection != null) { 994 try { 995 if (connection instanceof DriverConnection) { 996 abortSyncConnection(connection); 997 connection.abort(null); 998 } else { 999 connection.abort(Executors.newFixedThreadPool(1)); 1000 } 1001 closed = true; 1002 } catch (SQLException e) { 1003 writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1004 throw e; 1005 } 1006 } 1007 writeSqlLog("aborted", 0, "aborted", ""); 1008 } 1009 1010 /** 1011 * Close connection 1012 * @exception SQLException 1013 * */ 1014 public void close() throws SQLException { 1015 writeSqlLog("closing", 0, "closing", ""); 1016 if (connection != null) { 1017 try { 1018 closeSyncConnection(connection); 1019 connection.close(); 1020 closed = true; 1021 } catch (SQLException e) { 1022 writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1023 throw e; 1024 } 1025 } 1026 writeSqlLog("closed", 0, "closed", ""); 1027 } 1028 1029 /** 1030 * Commit connection 1031 * @exception SQLException 1032 * */ 1033 public void commit() throws SQLException { 1034 writeSqlLog("committing", 0, "committing", ""); 1035 if (connection != null) { 1036 try { 1037 commitSyncConnection(connection); 1038 connection.commit(); 1039 } catch (SQLException e) { 1040 writeSqlLog("error", 0, "", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1041 throw e; 1042 } 1043 } 1044 writeSqlLog("committed", 0, "committed", ""); 1045 } 1046 1047 /** 1048 * Rollback connection 1049 * @exception SQLException 1050 * */ 1051 public void rollback() throws SQLException { 1052 writeSqlLog("rollbacking", 0, "rollbacking", ""); 1053 if (connection != null) { 1054 try { 1055 rollbackSyncConnection(connection); 1056 connection.rollback(); 1057 } catch (SQLException e) { 1058 writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 1059 throw e; 1060 } 1061 } 1062 writeSqlLog("rollbacked", 0, "rollbacked", ""); 1063 } 1064 1065 /** 1066 * Get origin connection hash code 1067 * @return Integer - hash code 1068 * */ 1069 private Integer getOriginConnectionHashCode() { 1070 if (connection instanceof DriverConnection) { 1071 return ((DriverConnection) connection).getOriginConnectionHashCode(); 1072 } else { 1073 return connection.hashCode(); 1074 } 1075 } 1076 1077 /** 1078 * Get data source thread name 1079 * @return String - DataSource thread name 1080 * */ 1081 private String getDataSourceName() { 1082 if (connection instanceof DriverConnection) { 1083 return ((DriverConnection) connection).getDriverDataSource().getName(); 1084 } else { 1085 return null; 1086 } 1087 } 1088 1089 /** 1090 * Conver sql from ':column_name' to '?' 1091 * @return String 1092 * */ 1093 protected String converSql(String sql) { 1094 return sql.replaceAll(":[a-zA-Z0-9_]+", "?"); 1095 } 1096 1097 /** 1098 * Conver param from map to list 1099 * @param sql 1100 * @param map 1101 * @return List<Object> 1102 * */ 1103 protected List<Object> converParams(String sql, Map<String, Object> map) { 1104 List<String> paramKeys = new ArrayList<String>(); 1105 Pattern pattern = Pattern.compile(":[a-zA-Z0-9_]+"); 1106 Matcher matcher = pattern.matcher(sql); 1107 while (matcher.find()) { 1108 String key = matcher.group().replaceFirst(":", ""); 1109 paramKeys.add(key); 1110 } 1111 1112 List<Object> params = new ArrayList<Object>(); 1113 for (String pk : paramKeys) { 1114 Object pv = map.get(pk); 1115 if (pv == null) { 1116 pv = map.get(pk.toLowerCase()); 1117 } 1118 if (pv == null) { 1119 pv = map.get(pk.toUpperCase()); 1120 } 1121 if (pv instanceof java.util.Date) { 1122 java.util.Date utilDate = (java.util.Date) pv; 1123 java.sql.Timestamp sqlDate = new java.sql.Timestamp(utilDate.getTime()); 1124 params.add(sqlDate); 1125 } else { 1126 params.add(pv); 1127 } 1128 } 1129 1130 return params; 1131 } 1132 1133 /** 1134 * Covner to upper or lower 1135 * @param s - Column name or table name 1136 * @return String 1137 * */ 1138 protected String converCase(String s) { 1139 if (COLUMN_NAME_CASE_MODE.equals(COLUMN_NAME_CASE_UPPER)) { 1140 return s.toUpperCase(); 1141 } else if (COLUMN_NAME_CASE_MODE.equals(COLUMN_NAME_CASE_LOWER)) { 1142 return s.toLowerCase(); 1143 } else { 1144 return s; 1145 } 1146 } 1147 1148 /** 1149 * Check available sql to write to log 1150 * @return boolean 1151 * @param connection - Connection 1152 * @param sql 1153 * */ 1154 protected static boolean checkSqlLogAvailable(Connection connection, String sql) { 1155 if (connection instanceof DriverConnection) { 1156 DriverConnection dc = (DriverConnection) connection; 1157 DriverDataSource dds = dc.getDriverDataSource(); 1158 ConfigProperties configProperties = dds.getConfigProperties(); 1159 List<String> sqlAllowed = configProperties.getArray("SqlLogAllowed"); 1160 List<String> sqlIgnored = configProperties.getArray("SqlLogIgnored"); 1161 return checkSqlAvailable(sql, sqlAllowed, sqlIgnored); 1162 } 1163 return false; 1164 } 1165 1166 /** 1167 * Check available sql to execute 1168 * @return boolean 1169 * @param sql 1170 * */ 1171 protected boolean checkSqlAvailable(String sql) { 1172 return checkSqlAvailable(connection, sql); 1173 } 1174 1175 /** 1176 * Check available sql to execute 1177 * @return boolean 1178 * @param connection 1179 * @param sql 1180 * */ 1181 protected static boolean checkSqlAvailable(Connection connection, String sql) { 1182 if (connection instanceof DriverConnection) { 1183 DriverConnection dc = (DriverConnection) connection; 1184 DriverDataSource dds = dc.getDriverDataSource(); 1185 ConfigProperties configProperties = dds.getConfigProperties(); 1186 List<String> sqlAllowed = configProperties.getArray("SqlExecuteAllowed"); 1187 List<String> sqlIgnored = configProperties.getArray("SqlExecuteIgnored"); 1188 return checkSqlAvailable(sql, sqlAllowed, sqlIgnored); 1189 } 1190 return true; 1191 } 1192 1193 /** 1194 * Check available sql to execute 1195 * @return boolean 1196 * @param sql 1197 * @param sqlAllowed - From DataSources.properties 1198 * @param sqlIgnored - From DataSources.properties 1199 * */ 1200 private static boolean checkSqlAvailable(String sql, List<String> sqlAllowed, List<String> sqlIgnored) { 1201 boolean matchedAllowed = true; 1202 boolean matchedIgnored = false; 1203 if (sqlAllowed != null) { 1204 for (String regex : sqlAllowed) { 1205 if (!CommonTools.isBlank(regex)) { 1206 matchedAllowed = sql.matches(regex); 1207 1208 if (matchedAllowed) 1209 break; 1210 } 1211 } 1212 } 1213 if (matchedAllowed && sqlIgnored != null) { 1214 for (String regex : sqlIgnored) { 1215 if (!CommonTools.isBlank(regex)) { 1216 matchedIgnored = sql.matches(regex); 1217 1218 if (matchedIgnored) 1219 break; 1220 } 1221 } 1222 } 1223 1224 boolean b = matchedAllowed && !matchedIgnored; 1225 if (!b) { 1226 LoggerFactory.getLogger(DriverExecutor.class).debug("Not available - '{}'", sql); 1227 } 1228 return b; 1229 } 1230 1231 /** 1232 * Write sql log 1233 * @return boolean 1234 * @param type 1235 * @param seq 1236 * @param sql 1237 * @param params 1238 * */ 1239 protected synchronized boolean writeSqlLog(String type, long seq, String sql, Object params) { 1240 return writeSqlLog(this.hashCode(),connection, type, seq, sql, params); 1241 } 1242 1243 /** 1244 * Write sql log 1245 * @return boolean 1246 * @param connection 1247 * @param type 1248 * @param seq 1249 * @param sql 1250 * @param params 1251 * */ 1252 protected synchronized static boolean writeSqlLog(int deHashCode,Connection connection, String type, long seq, String sql, 1253 Object params) { 1254 if (connection instanceof DriverConnection) { 1255 if (!checkSqlLogAvailable(connection, sql)) 1256 return false; 1257 1258 final String header = String.format( 1259 "LOG_DATE,LOG_HOUR,LOG_MI,LOG_SEC,LOG_MS,LOG_HOST,LOG_THREAD,LOG_DS,LOG_CONN_ID,LOG_EXEC_ID,LOG_TYPE,LOG_SEQ,LOG_SQL,LOG_PARAMS%s", 1260 System.lineSeparator()); 1261 final DateFormat df = new SimpleDateFormat("yyyyMMdd"); 1262 final DateFormat dtf = new SimpleDateFormat("yyyyMMdd,HH,mm,ss,SSS"); 1263 DriverConnection dc = (DriverConnection) connection; 1264 Integer connHashCode = dc.getOriginConnectionHashCode(); 1265 DriverDataSource dds = dc.getDriverDataSource(); 1266 ConfigProperties configProperties = dds.getConfigProperties(); 1267 1268 boolean logEnable = configProperties.getBoolean("SqlLogEnable", false); 1269 long overspend = configProperties.getMilliSeconds("SqlLogOverspend",0L); 1270 1271 if (!logEnable) 1272 return false; 1273 1274 final String defaultLogFolderPath = String.format("%s/SqlLog/", 1275 CommonTools.getJarPath(DriverExecutor.class)); 1276 final String logFolderPath = configProperties.getString("SqlLogFolder", defaultLogFolderPath); 1277 final long maxFileSize = configProperties.getFileSize("SqlLogMaxFileSize", 1024 * 1024 * 10L); 1278 final int archiveDays = configProperties.getInteger("SqlLogArchiveDays", 31); 1279 final int logParamMaxLength = configProperties.getInteger("SqlLogParamMaxLength", 20); 1280 1281 if (sqlLogCacheArray == null) { 1282 1283 sqlLogCacheArray = new CacheArray(); 1284 long sqlLogFilterTimer = configProperties.getLong("SqlLogTimer", 1000L); 1285 sqlLogCacheArray.filter(new CacheArrayFilter(0L,sqlLogFilterTimer) { 1286 @Override 1287 public void executeBatch(Integer index, List batch) { 1288 final StringBuffer sbf = new StringBuffer(); 1289 for(Object item : batch){ 1290 sbf.append(item); 1291 } 1292 try { 1293 String msg = sbf.toString(); 1294 String dateStr = df.format(new Timestamp(System.currentTimeMillis())); 1295 File sqlLogFile = new File(String.format("%s/%s.csv", logFolderPath, dateStr)); 1296 File sqlLogFolder = new File(sqlLogFile.getParent()); 1297 1298 if (sqlLogFolder.exists()) { 1299 if (!sqlLogFolder.canWrite()) 1300 throw new IOException(String.format("Can not write to log folder '%s'", 1301 sqlLogFolder.getAbsolutePath())); 1302 } else { 1303 sqlLogFolder.mkdirs(); 1304 } 1305 1306 if (sqlLogFile.exists()) { 1307 if (!sqlLogFile.canWrite()) 1308 throw new IOException(String.format("Can not write to log file '%s'", 1309 sqlLogFile.getAbsolutePath())); 1310 } 1311 1312 int suffixIndex = sqlLogFile.getName().lastIndexOf("."); 1313 String logFileNamePrefix = sqlLogFile.getName().substring(0, suffixIndex); 1314 long logSize = FileTools.size(sqlLogFile); 1315 if (!sqlLogFile.exists()) { 1316 FileTools.write(sqlLogFile, header, false); 1317 } 1318 if (logSize < maxFileSize) { 1319 FileTools.write(sqlLogFile, msg, true); 1320 } else { 1321 int logIndex = getLogFileIndex(configProperties, sqlLogFolder, logFileNamePrefix, 1322 "csv"); 1323 if (logIndex == 0) { 1324 FileTools.write(sqlLogFile, String.format("%s%s", header, msg), false); 1325 } else { 1326 backupLog(sqlLogFolder, sqlLogFile, logFileNamePrefix, logIndex, header, msg); 1327 } 1328 } 1329 archiveLog(archiveDays, sqlLogFolder); 1330 } catch (Exception e) { 1331 LoggerFactory.getLogger(DriverExecutor.class).warn(e.getMessage(), e); 1332 } 1333 } 1334 }); 1335 } 1336 1337 if (logEnable) { 1338 String hostname = CommonTools.getHostname(); 1339 String dateTimeStr = dtf.format(new Timestamp(System.currentTimeMillis())); 1340 boolean isOverspend = false; 1341 boolean isSpendSeq = sql.equals("spend"); 1342 if(isSpendSeq && params != null){ 1343 long spendValue = Long.parseLong(params + ""); 1344 isOverspend = (spendValue >= overspend); 1345 } 1346 1347 String threadId = Thread.currentThread().getName() + "-" + Thread.currentThread().getId(); 1348 String msg = String.format("%s,%s,%s,%s,%s,%s,%s,%s,\"%s\",\"%s\"%s", dateTimeStr, hostname, 1349 threadId, dds.getName(), connHashCode,deHashCode, type, seq, 1350 replaceToSigleLine(sql), handleParams(logParamMaxLength, params), System.lineSeparator()); 1351 1352 String key = String.format("%s_%s",deHashCode,connHashCode); 1353 if(seq <= 0 || overspend <= 0){ 1354 sqlLogCacheArray.add(msg); 1355 SQL_LOG_MSG_MAPPING.remove(key); 1356 SQL_LOG_OVERSPEND_MAPPING.remove(key); 1357 } 1358 if(seq > 0 && overspend > 0){ 1359 SQL_LOG_OVERSPEND_MAPPING.put(key,isOverspend); 1360 String existsMsg = SQL_LOG_MSG_MAPPING.get(key); 1361 existsMsg = existsMsg == null ? msg : (existsMsg + msg); 1362 SQL_LOG_MSG_MAPPING.put(key, existsMsg); 1363 1364 boolean seqOverspend = SQL_LOG_OVERSPEND_MAPPING.get(key); 1365 if(isSpendSeq && seqOverspend){ 1366 sqlLogCacheArray.add(SQL_LOG_MSG_MAPPING.get(key) + ""); 1367 SQL_LOG_MSG_MAPPING.remove(key); 1368 SQL_LOG_OVERSPEND_MAPPING.remove(key); 1369 } 1370 if(isSpendSeq && !seqOverspend){ 1371 SQL_LOG_MSG_MAPPING.remove(key); 1372 SQL_LOG_OVERSPEND_MAPPING.remove(key); 1373 } 1374 } 1375 return true; 1376 } 1377 1378 } 1379 return false; 1380 } 1381 1382 /** 1383 * Backup log file 1384 * */ 1385 private synchronized static void backupLog(File logFolder, File logFile, String logFileNamePrefix, int logIndex, String header, 1386 String msg) { 1387 try{ 1388 String backupLogFileName = String.format("%s/%s.%s.csv", logFolder.getAbsolutePath(), logFileNamePrefix, 1389 logIndex); 1390 Path source = Paths.get(logFile.getAbsolutePath()); 1391 Path target = Paths.get(backupLogFileName); 1392 Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); 1393 FileTools.write(logFile, String.format("%s%s", header, msg), false); 1394 }catch(Exception e){ 1395 LoggerFactory.getLogger(DriverExecutor.class).error(e.getMessage(),e); 1396 } 1397 } 1398 1399 /** 1400 * Get log file index 1401 * For backup log file use 1402 * */ 1403 private static int getLogFileIndex(ConfigProperties configProperties, File folder, String prefix, String suffix) { 1404 Integer maxBackupIndex = configProperties.getInteger("SqlLogMaxBackupIndex", 10); 1405 for (int i = 1; i <= maxBackupIndex; i++) { 1406 String logFileName = String.format("%s/%s.%s.%s", folder.getAbsolutePath(), prefix, i, suffix); 1407 File logFile = new File(logFileName); 1408 if (!logFile.exists()) 1409 return i; 1410 } 1411 return 0; 1412 } 1413 1414 /** 1415 * Archive Log 1416 * */ 1417 private static void archiveLog(int archiveDays, File sqlLogFolder) { 1418 if (archiveDays > 0) { 1419 try { 1420 long archiveDaysMs = new Date().getTime() - (archiveDays * 24 * 3600000L); 1421 deleteFilesOlderThan(sqlLogFolder, archiveDaysMs); 1422 } catch (Exception e) { 1423 LoggerFactory.getLogger(DriverExecutor.class).warn(e); 1424 } 1425 } 1426 } 1427 1428 /** 1429 * Delete old archive logs 1430 * */ 1431 private static void deleteFilesOlderThan(File directory, long archiveDaysMs) throws IOException { 1432 if (directory.isDirectory()) { 1433 File[] files = directory.listFiles(); 1434 if (files != null) { 1435 for (File file : files) { 1436 if (file.isFile()) { 1437 boolean isLogFile = file.getName().toLowerCase().endsWith(".csv"); 1438 if (isLogFile) { 1439 boolean canWrite = file.canWrite(); 1440 if (canWrite) { 1441 long lastModified = file.lastModified(); 1442 if (lastModified < archiveDaysMs) { 1443 Files.deleteIfExists(Paths.get(file.toURI())); 1444 } 1445 } 1446 } 1447 } 1448 } 1449 } 1450 } 1451 } 1452 1453 /** 1454 * Replace to sigle line 1455 * For write csv log 1456 * */ 1457 private static String replaceToSigleLine(String msg) { 1458 return CodeEscape.escapeToSingleLineForCsv(msg); 1459 } 1460 1461 /** 1462 * Hahdle Params 1463 * For write csv log 1464 * */ 1465 private static String handleParams(int paramMaxLength, Object params) { 1466 1467 if (params == null) 1468 return "null"; 1469 1470 StringBuffer paramSbf = new StringBuffer(""); 1471 if (params instanceof List) { 1472 List<Object> listParams = (List<Object>) params; 1473 int size = listParams.size(); 1474 for (int i = 0; i < size; i++) { 1475 Object param = listParams.get(i); 1476 String str = param + ""; 1477 if (str.length() > paramMaxLength) { 1478 paramSbf.append(str.substring(0, paramMaxLength) + "..."); 1479 } else { 1480 paramSbf.append(str); 1481 } 1482 if (i < size - 1) { 1483 paramSbf.append(";"); 1484 } 1485 } 1486 } else { 1487 paramSbf.append(params); 1488 } 1489 return replaceToSigleLine(paramSbf.toString()); 1490 } 1491 1492 /**For Sync DataSource**/ 1493 1494 /** 1495 * For database replication 1496 * */ 1497 protected static void callAndReturnBooleanSync(Connection masterConn, long seq, String sql, List<Object> params, 1498 boolean returnResult) throws SQLException { 1499 if (masterConn instanceof DriverConnection) { 1500 openSyncConnection(masterConn); 1501 1502 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1503 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1504 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1505 boolean connCheck = true; 1506 boolean returnCheck = true; 1507 if (ddscp != null) { 1508 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1509 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1510 } 1511 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1512 if (deList != null) { 1513 for (DriverExecutor de : deList) { 1514 try { 1515 boolean resultSync = de.callAndReturnBoolean(sql, params); 1516 if (returnCheck) { 1517 if (resultSync != returnResult) { 1518 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1519 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1520 int errorCode = 99906; 1521 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1522 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1523 throw new SQLException(errorMsg) { 1524 @Override 1525 public int getErrorCode() { 1526 return errorCode; 1527 } 1528 }; 1529 } 1530 } 1531 } catch (SQLException e) { 1532 LoggerFactory.getLogger(DriverExecutor.class) 1533 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1534 if (connCheck) 1535 throw e; 1536 } 1537 } 1538 } 1539 } 1540 } 1541 1542 /** 1543 * For database replication 1544 * */ 1545 protected static void callAndReturnListSync(Connection masterConn, int cursorStart, int maxRows, String sql, 1546 List<Object> params) throws SQLException { 1547 if (masterConn instanceof DriverConnection) { 1548 openSyncConnection(masterConn); 1549 1550 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1551 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1552 1553 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1554 if (deList != null) { 1555 for (DriverExecutor de : deList) { 1556 try { 1557 de.callAndReturnBoolean(sql, params); 1558 } catch (SQLException e) { 1559 LoggerFactory.getLogger(DriverExecutor.class) 1560 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1561 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1562 boolean connCheck = true; 1563 if (ddscp != null) { 1564 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1565 } 1566 if (connCheck) 1567 throw e; 1568 } 1569 } 1570 } 1571 } 1572 } 1573 1574 /** 1575 * For database replication 1576 * */ 1577 protected static void callAndReturnRowsSync(Connection masterConn, long seq, String sql, List<Object> params, 1578 Object returnRows) throws SQLException { 1579 if (masterConn instanceof DriverConnection) { 1580 openSyncConnection(masterConn); 1581 1582 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1583 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1584 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1585 boolean connCheck = true; 1586 boolean returnCheck = true; 1587 if (ddscp != null) { 1588 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1589 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1590 } 1591 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1592 if (deList != null) { 1593 for (DriverExecutor de : deList) { 1594 try { 1595 Object rowSync = de.callAndReturnRows(sql, params); 1596 if (returnCheck) { 1597 if (rowSync != null && rowSync.equals(returnRows)) { 1598 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1599 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1600 int errorCode = 99906; 1601 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1602 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1603 throw new SQLException(errorMsg) { 1604 @Override 1605 public int getErrorCode() { 1606 return errorCode; 1607 } 1608 }; 1609 } 1610 } 1611 } catch (SQLException e) { 1612 LoggerFactory.getLogger(DriverExecutor.class) 1613 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1614 if (connCheck) 1615 throw e; 1616 } 1617 } 1618 } 1619 } 1620 } 1621 1622 /** 1623 * For database replication 1624 * */ 1625 protected static void executeBatchListSync(Connection masterConn, long seq, String sql, List<List<Object>> records, 1626 int returnRows) throws SQLException { 1627 if (masterConn instanceof DriverConnection) { 1628 openSyncConnection(masterConn); 1629 1630 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1631 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1632 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1633 boolean connCheck = true; 1634 boolean returnCheck = true; 1635 if (ddscp != null) { 1636 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1637 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1638 } 1639 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1640 if (deList != null) { 1641 for (DriverExecutor de : deList) { 1642 try { 1643 int rowSync = de.executeBatchList(sql, records); 1644 if (returnCheck) { 1645 if (rowSync != returnRows) { 1646 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1647 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1648 int errorCode = 99906; 1649 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1650 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1651 throw new SQLException(errorMsg) { 1652 @Override 1653 public int getErrorCode() { 1654 return errorCode; 1655 } 1656 }; 1657 } 1658 } 1659 } catch (SQLException e) { 1660 LoggerFactory.getLogger(DriverExecutor.class) 1661 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1662 if (connCheck) 1663 throw e; 1664 } 1665 } 1666 } 1667 } 1668 } 1669 1670 /** 1671 * For database replication 1672 * */ 1673 protected static void executeSync(Connection masterConn, long seq, String sql, List<Object> params, int returnRows) 1674 throws SQLException { 1675 if (masterConn instanceof DriverConnection) { 1676 openSyncConnection(masterConn); 1677 1678 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1679 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1680 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1681 boolean connCheck = true; 1682 boolean returnCheck = true; 1683 if (ddscp != null) { 1684 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1685 returnCheck = ddscp.getBoolean("SyncReturnCheck", true); 1686 } 1687 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1688 if (deList != null) { 1689 for (DriverExecutor de : deList) { 1690 try { 1691 int rowSync = de.execute(sql, params); 1692 if (returnCheck) { 1693 if (rowSync != returnRows) { 1694 writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s", 1695 de.getDataSourceName(), de.getOriginConnectionHashCode())); 1696 int errorCode = 99906; 1697 String errorMsg = String.format("The returned results are inconsistent '%s-%s'.", 1698 de.getDataSourceName(), de.getOriginConnectionHashCode()); 1699 throw new SQLException(errorMsg) { 1700 @Override 1701 public int getErrorCode() { 1702 return errorCode; 1703 } 1704 }; 1705 } 1706 } 1707 } catch (SQLException e) { 1708 LoggerFactory.getLogger(DriverExecutor.class) 1709 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1710 if (connCheck) 1711 throw e; 1712 } 1713 } 1714 } 1715 } 1716 } 1717 1718 /** 1719 * For database replication 1720 * */ 1721 protected synchronized static boolean openSyncConnection(Connection masterConn) throws SQLException { 1722 if (masterConn instanceof DriverConnection) { 1723 1724 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1725 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1726 1727 boolean isExist = SYNC_EXECUTOR_MARK.containsKey(masterConnHashCode); 1728 1729 if (isExist) { 1730 return false; 1731 } else { 1732 SYNC_EXECUTOR_MARK.put(masterConnHashCode, new ArrayList<DriverExecutor>()); 1733 } 1734 1735 DriverDataSource dds = masterDriverConn.getDriverDataSource(); 1736 if (dds != null) { 1737 List<DriverDataSource> sdsl = dds.getSyncDataSourceList(dds.getName()); 1738 if (sdsl != null) { 1739 1740 if (sdsl.isEmpty()) 1741 return false; 1742 1743 for (DriverDataSource sds : sdsl) { 1744 1745 Integer sdsHashCode = sds.hashCode(); 1746 if (!SYNC_CONN_ERROR_TIME.containsKey(sdsHashCode)) { 1747 SYNC_CONN_ERROR_TIME.put(sdsHashCode, 0L); 1748 } 1749 1750 Long syncConnErrorTime = SYNC_CONN_ERROR_TIME.get(sdsHashCode); 1751 if (syncConnErrorTime > 0) { 1752 ConfigProperties ddscp = dds.getConfigProperties(); 1753 long connCheckMs = 10000L; 1754 if (ddscp != null) { 1755 connCheckMs = ddscp.getLong("SyncConnectionCheckTime", 10000L); 1756 } 1757 boolean isSkipConn = syncConnErrorTime > 0 1758 && (System.currentTimeMillis() - syncConnErrorTime) <= connCheckMs; 1759 if (isSkipConn) { 1760 continue; 1761 } 1762 } 1763 1764 String masterFingerprint = dds.getFingerprint(); 1765 String syncFingerprint = sds.getFingerprint(); 1766 if (masterFingerprint.equalsIgnoreCase(syncFingerprint)) { 1767 LoggerFactory.getLogger(DriverExecutor.class) 1768 .warn("Skip sync reason 'same connection fingerprint'."); 1769 continue; 1770 } 1771 1772 try { 1773 final Connection conn = sds.getConnection(); 1774 if (conn == null) { 1775 int errorCode = 99904; 1776 String errorMsg = "Connection is null."; 1777 throw new SQLException(errorMsg) { 1778 @Override 1779 public int getErrorCode() { 1780 return errorCode; 1781 } 1782 }; 1783 } 1784 conn.setAutoCommit(masterConn.getAutoCommit()); 1785 SYNC_CONN_ERROR_TIME.put(sdsHashCode, 0L); 1786 SYNC_EXECUTOR_MARK.get(masterConnHashCode).add(new DriverExecutor(conn)); 1787 } catch (SQLException e) { 1788 SYNC_CONN_ERROR_TIME.put(sdsHashCode, System.currentTimeMillis()); 1789 LoggerFactory.getLogger(DriverExecutor.class) 1790 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1791 ConfigProperties ddscp = dds.getConfigProperties(); 1792 boolean connCheck = true; 1793 if (ddscp != null) { 1794 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1795 } 1796 if (connCheck) 1797 throw e; 1798 } 1799 1800 } 1801 1802 return true; 1803 } 1804 } 1805 } 1806 return false; 1807 } 1808 1809 /** 1810 * For database replication 1811 * */ 1812 protected static void closeSyncConnection(Connection masterConn) throws SQLException { 1813 if (masterConn instanceof DriverConnection) { 1814 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1815 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1816 1817 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1818 if (deList != null) { 1819 for (DriverExecutor de : deList) { 1820 try { 1821 de.close(); 1822 } catch (SQLException e) { 1823 LoggerFactory.getLogger(DriverExecutor.class) 1824 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1825 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1826 boolean connCheck = true; 1827 if (ddscp != null) { 1828 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1829 } 1830 if (connCheck) 1831 throw e; 1832 } 1833 } 1834 SYNC_EXECUTOR_MARK.remove(masterConnHashCode); 1835 LoggerFactory.getLogger(DriverExecutor.class).debug("CloseSyncConnection - masterConn '{}' finished.", 1836 masterConnHashCode); 1837 } 1838 } 1839 } 1840 1841 /** 1842 * For database replication 1843 * */ 1844 protected static void commitSyncConnection(Connection masterConn) throws SQLException { 1845 if (masterConn instanceof DriverConnection) { 1846 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1847 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1848 1849 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1850 if (deList != null) { 1851 for (DriverExecutor de : deList) { 1852 try { 1853 de.commit(); 1854 } catch (SQLException e) { 1855 LoggerFactory.getLogger(DriverExecutor.class) 1856 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1857 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1858 boolean connCheck = true; 1859 if (ddscp != null) { 1860 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1861 } 1862 if (connCheck) 1863 throw e; 1864 } 1865 } 1866 LoggerFactory.getLogger(DriverExecutor.class).debug("CommitSyncConnection - masterConn '{}' finished.", 1867 masterConnHashCode); 1868 } 1869 } 1870 } 1871 1872 /** 1873 * For database replication 1874 * */ 1875 protected static void rollbackSyncConnection(Connection masterConn) throws SQLException { 1876 if (masterConn instanceof DriverConnection) { 1877 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1878 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1879 1880 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1881 if (deList != null) { 1882 for (DriverExecutor de : deList) { 1883 try { 1884 de.rollback(); 1885 } catch (SQLException e) { 1886 LoggerFactory.getLogger(DriverExecutor.class) 1887 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1888 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1889 boolean connCheck = true; 1890 if (ddscp != null) { 1891 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1892 } 1893 if (connCheck) 1894 throw e; 1895 } 1896 } 1897 LoggerFactory.getLogger(DriverExecutor.class) 1898 .debug("RollbackSyncConnection - masterConn '{}' finished.", masterConnHashCode); 1899 } 1900 } 1901 } 1902 1903 /** 1904 * For database replication 1905 * */ 1906 protected static void abortSyncConnection(Connection masterConn) throws SQLException { 1907 if (masterConn instanceof DriverConnection) { 1908 DriverConnection masterDriverConn = (DriverConnection) masterConn; 1909 Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode(); 1910 1911 List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode); 1912 if (deList != null) { 1913 for (DriverExecutor de : deList) { 1914 try { 1915 de.abort(); 1916 } catch (SQLException e) { 1917 LoggerFactory.getLogger(DriverExecutor.class) 1918 .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage())); 1919 ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties(); 1920 boolean connCheck = true; 1921 if (ddscp != null) { 1922 connCheck = ddscp.getBoolean("SyncConnectionCheck", true); 1923 } 1924 if (connCheck) 1925 throw e; 1926 } 1927 } 1928 LoggerFactory.getLogger(DriverExecutor.class).debug("AbortSyncConnection - masterConn '{}' finished.", 1929 masterConnHashCode); 1930 } 1931 } 1932 } 1933 1934}