001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.datasource; 025 026import java.sql.Connection; 027import java.sql.PreparedStatement; 028import java.util.List; 029import java.sql.SQLException; 030import java.sql.Types; 031import java.util.Map; 032import java.sql.ResultSet; 033import java.sql.ResultSetMetaData; 034import java.util.HashMap; 035import java.util.ArrayList; 036import java.util.regex.Pattern; 037import java.util.regex.Matcher; 038import java.util.Arrays; 039import com.killcoding.tool.ResultMap; 040import com.killcoding.log.LoggerFactory; 041import com.killcoding.log.Logger; 042import com.killcoding.cache.CacheArray; 043import java.io.IOException; 044import java.sql.Blob; 045import java.sql.Clob; 046import java.io.InputStream; 047 048/** 049 * This class uses a 'CacheArray' to process the asynchronous processing method of the data query result set. 050 * If you choose to use the mode 'DiskCache', temporary files will be generated. 051 * If you choose to use the mode 'MemoryCache', temporary files will not be generated but the physical memory size must be considered. 052 * */ 053public final class CacheDriverExecutor extends DriverExecutor { 054 055 public static long READ_TIMER = 10L; 056 057 /** 058 * New a object CacheDriverExecutor 059 * @param connection - it is jdbc connection 060 * */ 061 public CacheDriverExecutor(Connection connection) { 062 super(connection); 063 } 064 065 /** 066 * This is full table query function 067 * @param sql - Query sql 068 * @param params - Query params 069 * @param rows - CacheArray object 070 * @exception SQLException - if query failed 071 * */ 072 public void find(String sql, List<Object> params, final CacheArray rows) throws SQLException { 073 find(0, 0, sql, params, rows); 074 } 075 076 /** 077 * This is full table query function 078 * @param sql - Query sql 079 * @param params - Query params 080 * @param rows - CacheArray object 081 * @exception SQLException - if query failed 082 * */ 083 public void find(String sql, Map<String, Object> params, final CacheArray rows) 084 throws SQLException { 085 String csql = converSql(sql); 086 List<Object> cparams = converParams(sql, params); 087 find(0, 0, csql, cparams, rows); 088 } 089 090 /** 091 * This is full table query function 092 * @param sql - Query sql 093 * @param rows - CacheArray object 094 * @exception SQLException - if query failed 095 * */ 096 public void find(String sql, final CacheArray rows) throws SQLException { 097 find(0, 0, sql, Arrays.asList(new Object[] {}), rows); 098 } 099 100 /** 101 * This is limited rows query function 102 * @param cursorStart - JDBC result Cursor start index 103 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 104 * @param sql - Query sql 105 * @param rows - CacheArray object 106 * @exception SQLException - if query failed 107 * */ 108 public void find(int cursorStart, int maxRows, String sql, final CacheArray rows) 109 throws SQLException { 110 find(cursorStart, maxRows, sql, Arrays.asList(new Object[] {}), rows); 111 } 112 113 /** 114 * This is limited rows query function (use Map param mode) 115 * @param cursorStart - JDBC result Cursor start index 116 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 117 * @param sql - Query sql 118 * @param rows - CacheArray object 119 * @exception SQLException - if query failed 120 * */ 121 public void find(int cursorStart, int maxRows, String sql, Map<String, Object> params, 122 final CacheArray rows) throws SQLException { 123 String csql = converSql(sql); 124 List<Object> cparams = converParams(sql, params); 125 find(cursorStart, maxRows, csql, cparams, rows); 126 } 127 128 /** 129 * This is limited rows query function (use List param mode) 130 * @param cursorStart - JDBC result Cursor start index 131 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 132 * @param sql - Query sql 133 * @param rows - CacheArray object 134 * @exception SQLException - if query failed 135 * */ 136 public void find(int cursorStart, int maxRows, String sql, List<Object> params, 137 final CacheArray rows) throws SQLException { 138 long begin = System.currentTimeMillis(); 139 boolean allowedLog = writeSqlLog("find", begin, 140 String.format("%s [cursorStart=%s,maxRows=%s]", sql, cursorStart, maxRows), params); 141 PreparedStatement statement = null; 142 Map<String, Object> row = null; 143 144 ResultSet result = null; 145 try { 146 // ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE 147 // ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY 148 statement = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); 149 if (params != null) { 150 int size = params.size(); 151 for (int i = 0; i < size; i++) { 152 int ci = i + 1; 153 Object param = params.get(i); 154 if (param == null) { 155 statement.setNull(ci, Types.VARCHAR); 156 } else { 157 statement.setObject(ci, param); 158 } 159 } 160 } 161 if (maxRows > 0) { 162 statement.setMaxRows(maxRows); 163 } 164 result = statement.executeQuery(); 165 result.absolute(cursorStart); 166 final ResultSetMetaData rsmd = result.getMetaData(); 167 final int c = rsmd.getColumnCount(); 168 while (result.next()) { 169 row = new ResultMap<String, Object>(); 170 for (int i = 0; i < c; i++) { 171 int ci = i + 1; 172 Object value = null; 173 Object originValue = result.getObject(ci); 174 if (originValue == null) { 175 value = originValue; 176 } else if (originValue instanceof Blob) { 177 Blob blobValue = (Blob) originValue; 178 InputStream is = null; 179 try { 180 is = blobValue.getBinaryStream(); 181 if(is != null) value = is.readAllBytes(); 182 } catch (IOException e) { 183 throw new SQLException(e.getMessage(), e); 184 } finally { 185 if (blobValue != null) { 186 try { 187 blobValue.free(); 188 } catch (SQLException e) { 189 throw e; 190 } 191 } 192 if (is != null) { 193 try { 194 is.close(); 195 } catch (IOException e) { 196 throw new SQLException(e.getMessage(), e); 197 } 198 } 199 } 200 } else if (originValue instanceof Clob) { 201 Clob clobValue = (Clob) originValue; 202 InputStream is = null; 203 try { 204 is = clobValue.getAsciiStream(); 205 if(is != null) value = is.readAllBytes(); 206 } catch (IOException e) { 207 throw new SQLException(e.getMessage(), e); 208 } finally { 209 if (clobValue != null) { 210 try { 211 clobValue.free(); 212 } catch (SQLException e) { 213 throw e; 214 } 215 } 216 if (is != null) { 217 try { 218 is.close(); 219 } catch (IOException e) { 220 throw new SQLException(e.getMessage(), e); 221 } 222 } 223 } 224 } else { 225 value = originValue; 226 } 227 row.put(converCase(rsmd.getColumnLabel(ci)), value); 228 } 229 rows.add(row); 230 try { 231 Thread.sleep(getReadTimer()); 232 } catch (InterruptedException e) { 233 log.debug(e); 234 continue; 235 } 236 } 237 rows.add(null); 238 239 if (allowedLog) { 240 writeSqlLog("rows", begin, "rows", rows.size() - 1); 241 long spend = System.currentTimeMillis() - begin; 242 writeSqlLog("spend", begin, "spend", spend); 243 } 244 } catch (SQLException e) { 245 if(rows != null) rows.add(null); 246 247 if (allowedLog) 248 writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage())); 249 throw e; 250 } finally { 251 if (result != null) 252 result.close(); 253 254 if (statement != null) 255 statement.close(); 256 257 } 258 } 259 260 /** 261 * Execute stored proc(and return result to CacheArray) 262 * @param cursorStart - JDBC result Cursor start index 263 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 264 * @param sql - Query sql 265 * @param rows - CacheArray object 266 * @exception SQLException - if query failed 267 * */ 268 public void callAndReturnList(int cursorStart, int maxRows, String sql, final CacheArray rows) 269 throws SQLException { 270 callAndReturnList(cursorStart, maxRows, sql, Arrays.asList(new Object[] {}), rows); 271 } 272 273 /** 274 * Execute stored proc(and return result) 275 * @param cursorStart - JDBC result Cursor start index 276 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 277 * @param sql - Query sql 278 * @param params - Use Map param mode (use ':column_name' to mapping) 279 * @param rows - CacheArray object 280 * @exception SQLException - if query failed 281 * */ 282 public void callAndReturnList(int cursorStart, int maxRows, String sql, Map<String, Object> params, 283 final CacheArray rows) throws SQLException { 284 String csql = converSql(sql); 285 List<Object> cparams = converParams(sql, params); 286 callAndReturnList(cursorStart, maxRows, csql, cparams, rows); 287 } 288 289 /** 290 * Execute stored proc(and return result) 291 * @param cursorStart - JDBC result Cursor start index 292 * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 293 * @param sql - Query sql (use '?' to mapping) 294 * @param params - Use List param mode 295 * @param rows - CacheArray object 296 * @exception SQLException - if query failed 297 * */ 298 public void callAndReturnList(int cursorStart, int maxRows, String sql, List<Object> params, 299 final CacheArray rows) throws SQLException { 300 301 if (!checkSqlAvailable(sql)) 302 return; 303 304 long begin = System.currentTimeMillis(); 305 boolean allowedLog = writeSqlLog("call", begin, 306 String.format("%s [cursorStart=%s,maxRows=%s]", sql, cursorStart, maxRows), params); 307 308 PreparedStatement statement = null; 309 Map<String, Object> row = null; 310 ResultSet result = null; 311 try { 312 statement = connection.prepareCall("{" + sql + "}"); 313 if (params != null) { 314 int size = params.size(); 315 for (int i = 0; i < size; i++) { 316 int ci = i + 1; 317 Object param = params.get(i); 318 if (param == null) { 319 statement.setNull(ci, Types.VARCHAR); 320 } else { 321 statement.setObject(ci, param); 322 } 323 } 324 } 325 if (maxRows > 0) { 326 statement.setMaxRows(maxRows); 327 } 328 result = statement.executeQuery(); 329 callAndReturnListSync(connection, cursorStart, maxRows, sql, params); 330 final ResultSetMetaData rsmd = result.getMetaData(); 331 final int c = rsmd.getColumnCount(); 332 int rowIndex = 0; 333 while (result.next()) { 334 if (rowIndex >= cursorStart) { 335 row = new ResultMap<String, Object>(); 336 for (int i = 0; i < c; i++) { 337 int ci = i + 1; 338 Object value = null; 339 Object originValue = result.getObject(ci); 340 if (originValue == null) { 341 value = originValue; 342 } else if (originValue instanceof Blob) { 343 Blob blobValue = (Blob) originValue; 344 InputStream is = null; 345 try { 346 is = blobValue.getBinaryStream(); 347 if(is != null) value = is.readAllBytes(); 348 } catch (IOException e) { 349 throw new SQLException(e.getMessage(), e); 350 } finally { 351 if (blobValue != null) { 352 try { 353 blobValue.free(); 354 } catch (SQLException e) { 355 throw e; 356 } 357 } 358 if (is != null) { 359 try { 360 is.close(); 361 } catch (IOException e) { 362 throw new SQLException(e.getMessage(), e); 363 } 364 } 365 } 366 } else if (originValue instanceof Clob) { 367 Clob clobValue = (Clob) originValue; 368 InputStream is = null; 369 try { 370 is = clobValue.getAsciiStream(); 371 if(is != null) value = is.readAllBytes(); 372 } catch (IOException e) { 373 throw new SQLException(e.getMessage(), e); 374 } finally { 375 if (clobValue != null) { 376 try { 377 clobValue.free(); 378 } catch (SQLException e) { 379 throw e; 380 } 381 } 382 if (is != null) { 383 try { 384 is.close(); 385 } catch (IOException e) { 386 throw new SQLException(e.getMessage(), e); 387 } 388 } 389 } 390 } else { 391 value = originValue; 392 } 393 row.put(converCase(rsmd.getColumnLabel(ci)), value); 394 } 395 rows.add(row); 396 try { 397 Thread.sleep(getReadTimer()); 398 } catch (InterruptedException e) { 399 log.debug(e); 400 continue; 401 } 402 } 403 rowIndex++; 404 } 405 rows.add(null); 406 407 if (allowedLog) { 408 writeSqlLog("rows", begin, "rows", rows.size() - 1); 409 long spend = System.currentTimeMillis() - begin; 410 writeSqlLog("spend", begin, "spend", spend); 411 } 412 } catch (SQLException e) { 413 if(rows != null) rows.add(null); 414 415 if (allowedLog) 416 writeSqlLog("error", begin, "error", e.getErrorCode()); 417 throw e; 418 } finally { 419 if (result != null) 420 result.close(); 421 422 if (statement != null) 423 statement.close(); 424 425 } 426 } 427 428 private long getReadTimer(){ 429 return READ_TIMER; 430 } 431 432}