001/*******************************************************************************
002The MIT License (MIT)
003
004Copyright (c) 2024 KILLCODING.COM
005
006Permission is hereby granted, free of charge, to any person obtaining a copy
007of this software and associated documentation files (the "Software"), to deal
008in the Software without restriction, including without limitation the rights
009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
010copies of the Software, and to permit persons to whom the Software is
011furnished to do so, subject to the following conditions:
012
013The above copyright notice and this permission notice shall be included in
014all copies or substantial portions of the Software.
015
016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
022THE SOFTWARE.
023*****************************************************************************/
024package com.killcoding.datasource;
025
026import java.sql.Connection;
027import java.sql.PreparedStatement;
028import java.util.List;
029import java.sql.SQLException;
030import java.sql.Types;
031import java.util.Map;
032import java.sql.ResultSet;
033import java.sql.ResultSetMetaData;
034import java.util.HashMap;
035import java.util.ArrayList;
036import java.util.regex.Pattern;
037import java.util.regex.Matcher;
038import java.util.Arrays;
039import com.killcoding.tool.ResultMap;
040import java.sql.DatabaseMetaData;
041import com.killcoding.log.LoggerFactory;
042import com.killcoding.log.Logger;
043import java.util.concurrent.Executors;
044import com.killcoding.datasource.DriverConnection;
045import com.killcoding.datasource.DriverDataSource;
046import java.util.concurrent.ConcurrentHashMap;
047import com.killcoding.tool.CommonTools;
048import com.killcoding.tool.ConfigProperties;
049import java.io.File;
050import java.text.DateFormat;
051import java.text.SimpleDateFormat;
052import com.killcoding.tool.FileTools;
053import java.io.IOException;
054import java.nio.file.Files;
055import java.util.Date;
056import java.nio.file.Paths;
057import java.nio.file.Path;
058import java.nio.file.StandardCopyOption;
059import com.killcoding.cache.CacheArray;
060import com.killcoding.cache.CacheArrayFilter;
061import com.killcoding.tool.CodeEscape;
062import java.sql.Blob;
063import java.util.stream.Collectors;
064import java.net.URI;
065import java.util.Comparator;
066import java.sql.Timestamp;
067import java.sql.Clob;
068import java.io.InputStream;
069import java.sql.CallableStatement;
070
071/**
072 * This class is execute sql base class.
073 * Support database replication.
074 * Support database multi-activity.
075 * Support database CRUD
076 * */
077public class DriverExecutor {
078
079        protected final static Map<Integer, List<DriverExecutor>> SYNC_EXECUTOR_MARK = new ConcurrentHashMap<Integer, List<DriverExecutor>>();
080        private final static Map<Integer, Long> SYNC_CONN_ERROR_TIME = new ConcurrentHashMap<Integer, Long>();
081        
082        private final static Map<String,String> SQL_LOG_MSG_MAPPING = new ConcurrentHashMap<String,String>();
083        private final static Map<String,Boolean> SQL_LOG_OVERSPEND_MAPPING = new ConcurrentHashMap<String,Boolean>();
084
085        protected Logger log = null;
086
087        public final static String COLUMN_NAME_CASE_UPPER = "UPPER";
088        public final static String COLUMN_NAME_CASE_LOWER = "LOWER";
089        public final static String COLUMN_NAME_CASE_ORIGINAL = "ORIGINAL";
090        public static String COLUMN_NAME_CASE_MODE = COLUMN_NAME_CASE_ORIGINAL;
091
092        protected boolean closed = true;
093        protected Connection connection = null;
094        private static CacheArray sqlLogCacheArray = null;
095
096        /**
097         * New a DriverExecutor object
098         * @param connection - JDBC connection
099         * */
100        public DriverExecutor(Connection connection) {
101                super();
102                log = LoggerFactory.getLogger(this.getClass());
103                this.connection = connection;
104                writeSqlLog("open", 0, "open", "");
105                closed = (this.connection == null);
106        }
107
108        /**
109         * Get current Connection
110         * @return Connection
111         * */
112        public Connection getConnection() {
113                return connection;
114        }
115
116        /**
117         * Get column classes by table name
118         * @exception SQLException
119         * @return Map<String,Object> - column and java type class mapping
120         * @param tableName - table name
121         * */
122        public Map<String, Object> getColumnClasses(String tableName) throws SQLException {
123                Map<String, Object> types = null;
124                ResultSet result = null;
125                PreparedStatement statement = null;
126                String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName);
127                statement = connection.prepareStatement(sql);
128                result = statement.executeQuery();
129                final ResultSetMetaData rsmd = result.getMetaData();
130                for (int i = 0; i < rsmd.getColumnCount(); i++) {
131                        if (types == null) {
132                                types = new ResultMap<String, Object>();
133                        }
134                        String cn = rsmd.getColumnClassName(i + 1);
135                        types.put(converCase(rsmd.getColumnLabel(i + 1)), cn);
136                }
137                return types;
138        }
139
140        /**
141         * Get column db data types by table name
142         * @exception SQLException
143         * @return Map<String, Object> - column and db data type mapping
144         * @param tableName - Table name
145         * */
146        public Map<String, Object> getColumnTypes(String tableName) throws SQLException {
147                ResultSet result = null;
148                PreparedStatement statement = null;
149                String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName);
150                Map<String, Object> types = null;
151                statement = connection.prepareStatement(sql);
152                result = statement.executeQuery();
153                final ResultSetMetaData rsmd = result.getMetaData();
154                for (int i = 0; i < rsmd.getColumnCount(); i++) {
155                        if (types == null) {
156                                types = new ResultMap<String, Object>();
157                        }
158                        types.put(converCase(rsmd.getColumnLabel(i + 1)), rsmd.getColumnTypeName(i + 1));
159                }
160                return types;
161        }
162
163        /**
164         * Show column data type and java type mapping
165         * @exception SQLException
166         * @return List<Map<String, Object>> - Mapping list
167         * @param tableName - Table name
168         * */
169        public List<Map<String, Object>> desc(String tableName) throws SQLException {
170                List<String> primaryKeys = getPrimaryKeys(tableName);
171                List<Map<String, Object>> results = new ArrayList<Map<String, Object>>();
172                ResultSet result = null;
173                PreparedStatement statement = null;
174                String sql = String.format("SELECT * FROM %s WHERE 1 = 0", tableName);
175                statement = connection.prepareStatement(sql);
176                result = statement.executeQuery();
177                final ResultSetMetaData rsmd = result.getMetaData();
178                for (int i = 0; i < rsmd.getColumnCount(); i++) {
179                        Map<String, Object> types = new ResultMap<String, Object>();
180                        int ci = i + 1;
181                        boolean nullable = rsmd.isNullable(ci) == 1;
182                        String name = converCase(rsmd.getColumnLabel(ci));
183                        String isPk = "UNKNOWN";
184                        if (primaryKeys != null) {
185                                isPk = primaryKeys.contains(name) ? "Y" : "N";
186                        }
187                        types.put("NAME", name);
188                        types.put("PRIMARY_KEY", isPk);
189                        types.put("DATA_TYPE", rsmd.getColumnTypeName(ci));
190                        types.put("JAVA_TYPE", rsmd.getColumnClassName(ci));
191                        types.put("PRECISION", rsmd.getPrecision(ci));
192                        types.put("ALLOW_NULLABLE", nullable ? "Y" : 'N');
193                        results.add(types);
194                }
195                return results;
196        }
197
198        /**
199         * Get primary Keys by table name
200         * @return List<String> - Primary Keys
201         * @param _tableName - Table name
202         * */
203        public List<String> getPrimaryKeys(String _tableName) {
204                try {
205                        List<String> pks = new ArrayList<String>();
206                        DatabaseMetaData meta = connection.getMetaData();
207                        ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" });
208                        while (tables.next()) {
209                                String catalog = tables.getString("TABLE_CAT");
210                                String schema = tables.getString("TABLE_SCHEM");
211                                String tableName = tables.getString("TABLE_NAME");
212                                if (tableName.equalsIgnoreCase(_tableName)) {
213                                        ResultSet primaryKeys = meta.getPrimaryKeys(catalog, schema, tableName);
214                                        while (primaryKeys.next()) {
215                                                pks.add(primaryKeys.getString("COLUMN_NAME"));
216                                        }
217                                        break;
218                                }
219                        }
220                        return pks;
221                } catch (Exception e) {
222                        log.warn(e);
223                        return null;
224                }
225        }
226
227        /**
228         * Get all tables
229         * @return List<Map<String, Object>>
230         * @exception SQLException
231         * */
232        public List<Map<String, Object>> getAllTables() throws SQLException {
233                List<Map<String, Object>> tablesList = new ArrayList<Map<String, Object>>();
234                DatabaseMetaData meta = connection.getMetaData();
235                ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" });
236                while (tables.next()) {
237                        String catalog = tables.getString("TABLE_CAT");
238                        String schema = tables.getString("TABLE_SCHEM");
239                        String tableName = tables.getString("TABLE_NAME");
240                        Map<String, Object> t = new ResultMap<String, Object>();
241                        t.put("TABLE_SCHEMA", schema);
242                        t.put("TABLE_NAME", tableName);
243                        tablesList.add(t);
244                }
245                return tablesList;
246        }
247
248        /**
249         * Get all tables by schema
250         * @return List<Map<String, Object>>
251         * @exception SQLException
252         * */
253        public List<Map<String, Object>> getAllTables(String _schema) throws SQLException {
254                List<Map<String, Object>> tablesList = new ArrayList<Map<String, Object>>();
255                DatabaseMetaData meta = connection.getMetaData();
256                ResultSet tables = meta.getTables(null, null, "%", new String[] { "TABLE" });
257                while (tables.next()) {
258                        String schema = tables.getString("TABLE_SCHEM");
259                        if (schema != null && schema.equalsIgnoreCase(_schema)) {
260                                String catalog = tables.getString("TABLE_CAT");
261                                String tableName = tables.getString("TABLE_NAME");
262                                Map<String, Object> t = new ResultMap<String, Object>();
263                                t.put("TABLE_SCHEMA", schema);
264                                t.put("TABLE_NAME", tableName);
265                                tablesList.add(t);
266                        }
267                }
268                return tablesList;
269        }
270
271        /**
272         * Query first record
273         * @exception SQLException
274         * @return Map<String,Object> - First result
275         * @param sql - Condition use format ':column_name'
276         * @param params
277         * */
278        public Map<String, Object> first(String sql, Map<String, Object> params) throws SQLException {
279                List<Map<String, Object>> list = find(0, 1, sql, params);
280                if (list.size() > 0) {
281                        return list.get(0);
282                }
283                return null;
284        }
285
286        /**
287         * Query first record
288         * @exception SQLException
289         * @return Map<String,Object> - First result
290         * @param sql
291         * */
292        public Map<String, Object> first(String sql) throws SQLException {
293                return first(sql, Arrays.asList(new Object[] {}));
294        }
295
296        /**
297         * Query first record
298         * @exception SQLException
299         * @return Map<String,Object> - First result
300         * @param sql - Condition use format '?'
301         * @param params
302         * */
303        public Map<String, Object> first(String sql, List<Object> params) throws SQLException {
304                List<Map<String, Object>> list = find(0, 1, sql, params);
305                if (list.size() > 0) {
306                        return list.get(0);
307                }
308                return null;
309        }
310
311        /**
312         * Query all matched records
313         * @exception SQLException
314         * @return List<Map<String, Object>>
315         * @param sql 
316         * @param params
317         * */
318        public List<Map<String, Object>> find(String sql) throws SQLException {
319                return find(0, 0, sql, Arrays.asList(new Object[] {}));
320        }
321
322        /**
323         * Query all matched records
324         * @exception SQLException
325         * @return List<Map<String, Object>>
326         * @param cursorStart - JDBC result Cursor start index
327         * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000)
328         * @param sql 
329         * @param params
330         * */
331        public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql) throws SQLException {
332                return find(cursorStart, maxRows, sql, Arrays.asList(new Object[] {}));
333        }
334
335        /**
336         * Query all matched records
337         * @exception SQLException
338         * @return List<Map<String, Object>>
339         * @param sql - Condition use format ':column_name'
340         * @param params
341         * */
342        public List<Map<String, Object>> find(String sql, Map<String, Object> params) throws SQLException {
343                return find(0, 0, sql, params);
344        }
345
346        /**
347         * Query all matched records
348         * @exception SQLException
349         * @return List<Map<String, Object>>
350         * @param cursorStart - JDBC result Cursor start index
351         * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000)
352         * @param sql - Condition use format ':column_name'
353         * @param params
354         * */
355        public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql, Map<String, Object> params)
356                        throws SQLException {
357                String csql = converSql(sql);
358                List<Object> cparams = converParams(sql, params);
359                return find(cursorStart, maxRows, csql, cparams);
360        }
361
362        /**
363         * Query all matched records
364         * @exception SQLException
365         * @return List<Map<String, Object>>
366         * @param cursorStart - JDBC result Cursor start index
367         * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000)
368         * @param sql - Condition use format '?'
369         * @param params
370         * */
371        public List<Map<String, Object>> find(int cursorStart, int maxRows, String sql, List<Object> params)
372                        throws SQLException {
373                long begin = System.currentTimeMillis();
374                boolean allowedLog = writeSqlLog("find", begin,
375                                String.format("%s [cursorStart=%s,maxRows=%s]", sql, cursorStart, maxRows), params);
376
377                PreparedStatement statement = null;
378                Map<String, Object> row = null;
379                ResultSet result = null;
380                final List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>();
381                try {
382                        // ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE
383                        // ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY
384                        statement = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
385                        if (params != null) {
386                                int size = params.size();
387                                for (int i = 0; i < size; i++) {
388                                        int ci = i + 1;
389                                        Object param = params.get(i);
390                                        if (param == null) {
391                                                statement.setNull(ci, Types.VARCHAR);
392                                        } else {
393                                                statement.setObject(ci, param);
394                                        }
395                                }
396                        }
397                        if (maxRows > 0) {
398                                statement.setMaxRows(maxRows);
399                        }
400                        result = statement.executeQuery();
401                        result.absolute(cursorStart);
402                        final ResultSetMetaData rsmd = result.getMetaData();
403                        final int c = rsmd.getColumnCount();
404                        while (result.next()) {
405                                row = new ResultMap<String, Object>();
406                                for (int i = 0; i < c; i++) {
407                                        int ci = i + 1;
408                                        Object value = null;
409                                        Object originValue = result.getObject(ci);
410                                        if (originValue == null) {
411                                                value = originValue;
412                                        } else if (originValue instanceof Blob) {
413                                                Blob blobValue = (Blob) originValue;
414                                                InputStream is = null;
415                                                try {
416                                                        is = blobValue.getBinaryStream();
417                                                        if(is != null) value = is.readAllBytes();
418                                                } catch (IOException e) {
419                                                        throw new SQLException(e.getMessage(), e);
420                                                } finally {
421                                                        if (blobValue != null) {
422                                                                try {
423                                                                        blobValue.free();
424                                                                } catch (SQLException e) {
425                                                                        throw e;
426                                                                }
427                                                        }
428                                                        if (is != null) {
429                                                                try {
430                                                                        is.close();
431                                                                } catch (IOException e) {
432                                                                        throw new SQLException(e.getMessage(), e);
433                                                                }
434                                                        }
435                                                }
436                                        } else if (originValue instanceof Clob) {
437                                                Clob clobValue = (Clob) originValue;
438                                                InputStream is = null;
439                                                try {
440                                                        is = clobValue.getAsciiStream();
441                                                        if(is != null) value = is.readAllBytes();
442                                                } catch (IOException e) {
443                                                        throw new SQLException(e.getMessage(), e);
444                                                } finally {
445                                                        if (clobValue != null) {
446                                                                try {
447                                                                        clobValue.free();
448                                                                } catch (SQLException e) {
449                                                                        throw e;
450                                                                }
451                                                        }
452                                                        if (is != null) {
453                                                                try {
454                                                                        is.close();
455                                                                } catch (IOException e) {
456                                                                        throw new SQLException(e.getMessage(), e);
457                                                                }
458                                                        }
459                                                }
460                                        } else {
461                                                value = originValue;
462                                        }
463                                        row.put(converCase(rsmd.getColumnLabel(ci)), value);
464                                }
465                                rows.add(row);
466                        }
467                        if (allowedLog) {
468                                writeSqlLog("rows", begin, "rows", rows.size());
469                                long spend = System.currentTimeMillis() - begin;
470                                writeSqlLog("spend", begin, "spend", spend);
471                        }
472                        return rows;
473                } catch (SQLException e) {
474                        if (allowedLog)
475                                writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage()));
476
477                        throw e;
478                } finally {
479                        if (result != null)
480                                result.close();
481
482                        if (statement != null)
483                                statement.close();
484                }
485        }
486
487        /**
488         * Execute stored proc (and return boolean) 
489         * @param sql - Query sql 
490         * @exception SQLException 
491         * @return boolean
492         * */
493        public boolean callAndReturnBoolean(String sql) throws SQLException {
494                return callAndReturnBoolean(sql, Arrays.asList(new Object[] {}));
495        }
496
497        /**
498         * Execute stored proc (and return boolean) 
499         * @param sql - Query sql, condition use format ':column_name'
500         * @param params
501         * @exception SQLException 
502         * @return boolean
503         * */
504        public boolean callAndReturnBoolean(String sql, Map<String, Object> params) throws SQLException {
505                String csql = converSql(sql);
506                List<Object> cparams = converParams(sql, params);
507                return callAndReturnBoolean(sql, params);
508        }
509
510        /**
511         * Execute stored proc (and return boolean) 
512         * @param sql - Query sql,condition use format '?'
513         * @param params
514         * @exception SQLException 
515         * @return boolean
516         * */
517        public boolean callAndReturnBoolean(String sql, List<Object> params) throws SQLException {
518
519                if (!checkSqlAvailable(sql))
520                        return false;
521
522                long begin = System.currentTimeMillis();
523                boolean allowedLog = writeSqlLog("call", begin, sql, params);
524
525                CallableStatement statement = null;
526                try {
527                        statement = connection.prepareCall("{" + sql + "}");
528                        if (params != null) {
529                                int size = params.size();
530                                for (int i = 0; i < size; i++) {
531                                        Object param = params.get(i);
532                                        if (param == null) {
533                                                statement.setNull(i + 1, Types.VARCHAR);
534                                        } else {
535                                                statement.setObject(i + 1, param);
536                                        }
537                                }
538                        }
539
540            statement.execute();
541            
542                        boolean returnResult = true; //true is not exception
543
544                        if (allowedLog) {
545                                writeSqlLog("return", begin, "return", returnResult);
546                                long spend = System.currentTimeMillis() - begin;
547                                writeSqlLog("spend", begin, "spend", spend);
548                        }
549
550                        callAndReturnBooleanSync(connection, begin, sql, params, returnResult);
551
552                        return returnResult;
553                } catch (SQLException e) {
554                        if (allowedLog)
555                                writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage()));
556
557                        throw e;
558                } finally {
559                        if (statement != null)
560                                statement.close();
561                }
562        }
563
564        /**
565         * Execute stored proc (and return rows) 
566         * @param sql - Query sql
567         * @exception SQLException 
568         * @return int
569         * */
570        public int callAndReturnRows(String sql) throws SQLException {
571                return callAndReturnRows(sql, Arrays.asList(new Object[] {}));
572        }
573
574        /**
575         * Execute stored proc (and return rows) 
576         * @param sql - Query sql,condition use format ':column_name'
577         * @param params
578         * @exception SQLException 
579         * @return int
580         * */
581        public int callAndReturnRows(String sql, Map<String, Object> params) throws SQLException {
582                String csql = converSql(sql);
583                List<Object> cparams = converParams(sql, params);
584                return callAndReturnRows(sql, params);
585        }
586
587        /**
588         * Execute stored proc (and return rows) 
589         * @param sql - Query sql,condition use format '?'
590         * @param params
591         * @exception SQLException 
592         * @return int
593         * */
594        public int callAndReturnRows(String sql, List<Object> params) throws SQLException {
595
596                if (!checkSqlAvailable(sql))
597                        return -1;
598
599                long begin = System.currentTimeMillis();
600                boolean allowedLog = writeSqlLog("call", begin, sql, params);
601
602                CallableStatement statement = null;
603                try {
604                        statement = connection.prepareCall("{?=" + sql + "}");
605                        statement.registerOutParameter(1,Types.INTEGER);
606                        if (params != null) {
607                                int size = params.size();
608                                for (int i = 0; i < size; i++) {
609                                        Object param = params.get(i);
610                                        if (param == null) {
611                                                statement.setNull(i + 1, Types.VARCHAR);
612                                        } else {
613                                                statement.setObject(i + 1, param);
614                                        }
615                                }
616                        }
617                        boolean hasResult = statement.execute();
618                        
619                        int row = statement.getInt(1);
620
621                        if (allowedLog) {
622                                writeSqlLog("return", begin, "return", row);
623                                long spend = System.currentTimeMillis() - begin;
624                                writeSqlLog("spend", begin, "spend", spend);
625                        }
626
627                        callAndReturnRowsSync(connection, begin, sql, params, row);
628
629                        return row;
630                } catch (SQLException e) {
631                        if (allowedLog)
632                                writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage()));
633
634                        throw e;
635                } finally {
636                        if (statement != null)
637                                statement.close();
638                }
639        }
640
641        /**
642         * Execute stored proc (and return List) 
643         * @param sql - Query sql
644         * @exception SQLException 
645         * @return List<Map<String, Object>>
646         * */
647        public List<Map<String, Object>> callAndReturnList(String sql) throws SQLException {
648                return callAndReturnList(0, 0, sql, Arrays.asList(new Object[] {}));
649        }
650
651        /**
652         * Execute stored proc (and return List) 
653         * @param cursorStart - JDBC result Cursor start index
654         * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 
655         * @param sql - Query sql
656         * @exception SQLException 
657         * @return List<Map<String, Object>>
658         * */
659        public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql) throws SQLException {
660                return callAndReturnList(cursorStart, maxRows, sql, Arrays.asList(new Object[] {}));
661        }
662
663        /**
664         * Execute stored proc (and return List) 
665         * @param cursorStart - JDBC result Cursor start index
666         * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 
667         * @param sql - Query sql, condition use format ':column_name'
668         * @exception SQLException 
669         * @return List<Map<String, Object>>
670         * */
671        public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql,
672                        Map<String, Object> params) throws SQLException {
673                String csql = converSql(sql);
674                List<Object> cparams = converParams(sql, params);
675                return callAndReturnList(cursorStart, maxRows, sql, params);
676        }
677
678        /**
679         * Execute stored proc (and return List) 
680         * @param cursorStart - JDBC result Cursor start index
681         * @param maxRows - JDBC result max rows (JDBC limited rows 50,000,000) 
682         * @param sql - Query sql, condition use format '?'
683         * @exception SQLException 
684         * @return List<Map<String, Object>>
685         * */
686        public List<Map<String, Object>> callAndReturnList(int cursorStart, int maxRows, String sql, List<Object> params)
687                        throws SQLException {
688
689                if (!checkSqlAvailable(sql))
690                        return null;
691
692                long begin = System.currentTimeMillis();
693                boolean allowedLog = writeSqlLog("call", begin,
694                                String.format("%s [cursorStart=%s,maxRows=%s]", sql, cursorStart, maxRows), params);
695
696                CallableStatement statement = null;
697                Map<String, Object> row = null;
698                ResultSet result = null;
699                final List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>();
700                try {
701                        statement = connection.prepareCall("{" + sql + "}");
702                        if (params != null) {
703                                int size = params.size();
704                                for (int i = 0; i < size; i++) {
705                                        int ci = i + 1;
706                                        Object param = params.get(i);
707                                        if (param == null) {
708                                                statement.setNull(ci, Types.VARCHAR);
709                                        } else {
710                                                statement.setObject(ci, param);
711                                        }
712                                }
713                        }
714                        if (maxRows > 0) {
715                                statement.setMaxRows(maxRows);
716                        }
717                        result = statement.executeQuery();
718                        final ResultSetMetaData rsmd = result.getMetaData();
719                        final int c = rsmd.getColumnCount();
720                        int rowIndex = 0;
721                        while (result.next()) {
722                                if (rowIndex >= cursorStart) {
723                                        row = new ResultMap<String, Object>();
724                                        for (int i = 0; i < c; i++) {
725                                                int ci = i + 1;
726                                                Object value = null;
727                                                Object originValue = result.getObject(ci);
728                                                if (originValue == null) {
729                                                        value = originValue;
730                                                } else if (originValue instanceof Blob) {
731                                                        Blob blobValue = (Blob) originValue;
732                                                        InputStream is = null;
733                                                        try {
734                                                                is = blobValue.getBinaryStream();
735                                                                if(is != null) value = is.readAllBytes();
736                                                        } catch (IOException e) {
737                                                                throw new SQLException(e.getMessage(), e);
738                                                        } finally {
739                                                                if (blobValue != null) {
740                                                                        try {
741                                                                                blobValue.free();
742                                                                        } catch (SQLException e) {
743                                                                                throw e;
744                                                                        }
745                                                                }
746                                                                if (is != null) {
747                                                                        try {
748                                                                                is.close();
749                                                                        } catch (IOException e) {
750                                                                                throw new SQLException(e.getMessage(), e);
751                                                                        }
752                                                                }
753                                                        }
754                                                } else if (originValue instanceof Clob) {
755                                                        Clob clobValue = (Clob) originValue;
756                                                        InputStream is = null;
757                                                        try {
758                                                                is = clobValue.getAsciiStream();
759                                                                if(is != null) value = is.readAllBytes();
760                                                        } catch (IOException e) {
761                                                                throw new SQLException(e.getMessage(), e);
762                                                        } finally {
763                                                                if (clobValue != null) {
764                                                                        try {
765                                                                                clobValue.free();
766                                                                        } catch (SQLException e) {
767                                                                                throw e;
768                                                                        }
769                                                                }
770                                                                if (is != null) {
771                                                                        try {
772                                                                                is.close();
773                                                                        } catch (IOException e) {
774                                                                                throw new SQLException(e.getMessage(), e);
775                                                                        }
776                                                                }
777                                                        }
778                                                } else {
779                                                        value = originValue;
780                                                }
781                                                row.put(converCase(rsmd.getColumnLabel(ci)), value);
782                                        }
783                                        rows.add(row);
784                                }
785                                rowIndex++;
786                        }
787
788                        if (allowedLog) {
789                                writeSqlLog("rows", begin, "rows", rows);
790                                long spend = System.currentTimeMillis() - begin;
791                                writeSqlLog("spend", begin, "spend", spend);
792                        }
793
794                        callAndReturnListSync(connection, cursorStart, maxRows, sql, params);
795
796                        return rows;
797                } catch (SQLException e) {
798                        if (allowedLog)
799                                writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage()));
800                        throw e;
801                } finally {
802                        if (result != null)
803                                result.close();
804
805                        if (statement != null)
806                                statement.close();
807                }
808        }
809
810        /**
811         * Execute sql
812         * @exception SQLException
813         * @param sql
814         * @return int
815         * */
816        public int execute(String sql) throws SQLException {
817                return execute(sql, Arrays.asList(new Object[] {}));
818        }
819
820        /**
821         * Execute sql
822         * @exception SQLException
823         * @param sql - Condition use format ':column_name'
824         * @param params 
825         * @return int
826         * */
827        public int execute(String sql, Map<String, Object> params) throws SQLException {
828                String csql = converSql(sql);
829                List<Object> cparams = converParams(sql, params);
830                return execute(csql, cparams);
831        }
832
833        /**
834         * Execute sql
835         * @exception SQLException
836         * @param sql - Condition use format '?'
837         * @param params 
838         * @return int
839         * */
840        public int execute(String sql, List<Object> params) throws SQLException {
841                if (!checkSqlAvailable(sql))
842                        return -1;
843
844                long begin = System.currentTimeMillis();
845                boolean allowedLog = writeSqlLog("execute", begin, sql, params);
846
847                PreparedStatement statement = null;
848                try {
849                        statement = connection.prepareStatement(sql);
850                        if (params != null) {
851                                int size = params.size();
852                                for (int i = 0; i < size; i++) {
853                                        Object param = params.get(i);
854                                        if (param == null) {
855                                                statement.setNull(i + 1, Types.VARCHAR);
856                                        } else {
857                                                statement.setObject(i + 1, param);
858                                        }
859                                }
860                        }
861                        int row = statement.executeUpdate();
862
863                        if (allowedLog) {
864                                writeSqlLog("return", begin, "return", row);
865                                long spend = System.currentTimeMillis() - begin;
866                                writeSqlLog("spend", begin, "spend", spend);
867                        }
868
869                        executeSync(connection, begin, sql, params, row);
870
871                        return row;
872                } catch (SQLException e) {
873                        if (allowedLog)
874                                writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage()));
875                        throw e;
876                } finally {
877                        if (statement != null)
878                                statement.close();
879                }
880        }
881
882        /**
883         * Execute batch
884         * @exception SQLException
885         * @param sql - Condition use format ':column_name'
886         * @param records
887         * @return int - Return rows
888         * */
889        public int executeBatch(String sql, List<Map<String, Object>> records) throws SQLException {
890                String csql = converSql(sql);
891                List<List<Object>> crecords = new ArrayList<List<Object>>();
892                for (Map<String, Object> record : records) {
893                        List<Object> crecord = converParams(sql, record);
894                        crecords.add(crecord);
895                }
896                return executeBatchList(csql, crecords);
897        }
898
899        /**
900         * Execute batch
901         * @exception SQLException
902         * @param sql - Condition use format '?'
903         * @param records
904         * @return int - Return rows
905         * */
906        public int executeBatchList(String sql, List<List<Object>> records) throws SQLException {
907                boolean allowedLog = false;
908                if (!checkSqlAvailable(sql))
909                        return -1;
910
911                long begin = System.currentTimeMillis();
912                if (records != null) {
913                        allowedLog = writeSqlLog("batch", begin, sql, String.format("[batchSize=%s]", records.size()));
914                }
915
916                boolean first = true;
917                PreparedStatement statement = null;
918                try {
919                        for (List<Object> params : records) {
920                                if (first) {
921                                        statement = connection.prepareStatement(sql);
922                                        first = false;
923                                }
924
925                                int size = params.size();
926                                for (int i = 0; i < size; i++) {
927                                        Object param = params.get(i);
928                                        if (param == null) {
929                                                statement.setNull(i + 1, Types.VARCHAR);
930                                        } else {
931                                                statement.setObject(i + 1, param);
932                                        }
933                                }
934
935                                statement.addBatch();
936                        }
937
938                        int sumRow = 0;
939                        if (records.size() > 0) {
940                                int[] rows = statement.executeBatch();
941
942                                for (int r : rows) {
943                                        sumRow += r;
944                                }
945
946                                if (allowedLog) {
947                                        long spend = System.currentTimeMillis() - begin;
948                                        writeSqlLog("return", begin, "return", sumRow);
949                                        writeSqlLog("spend", begin, "spend", spend);
950                                }
951
952                                executeBatchListSync(connection, begin, sql, records, sumRow);
953                        }
954
955                        return sumRow;
956                } catch (SQLException e) {
957                        if (allowedLog)
958                                writeSqlLog("error", begin, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage()));
959
960                        throw e;
961                } finally {
962                        if (statement != null)
963                                statement.close();
964                }
965        }
966
967        /**
968         * Check connection is closed
969         * @return boolean
970         * */
971        public boolean isClosed() {
972                try {
973                    if(closed) return closed;
974                    
975                        return (connection == null || connection.isClosed());
976                } catch (Exception e) {
977                        log.warn(e);
978                        return true;
979                }
980        }
981
982        /**
983         * Abort connection
984         * @exception SQLException
985         * */
986        public void abort() throws SQLException {
987                writeSqlLog("aborting", 0, "aborting", "");
988                if (connection != null) {
989                        try {
990                                if (connection instanceof DriverConnection) {
991                                        abortSyncConnection(connection);
992                                        connection.abort(null);
993                                } else {
994                                        connection.abort(Executors.newFixedThreadPool(1));
995                                }
996                                closed = true;
997                        } catch (SQLException e) {
998                                writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage()));
999                                throw e;
1000                        }
1001                }
1002                writeSqlLog("aborted", 0, "aborted", "");
1003        }
1004
1005        /**
1006         * Close connection
1007         * @exception SQLException
1008         * */
1009        public void close() throws SQLException {
1010                writeSqlLog("closing", 0, "closing", "");
1011                if (connection != null) {
1012                        try {
1013                                closeSyncConnection(connection);
1014                                connection.close();
1015                                closed = true;
1016                        } catch (SQLException e) {
1017                                writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage()));
1018                                throw e;
1019                        }
1020                }
1021                writeSqlLog("closed", 0, "closed", "");
1022        }
1023
1024        /**
1025         * Commit connection
1026         * @exception SQLException
1027         * */
1028        public void commit() throws SQLException {
1029                writeSqlLog("committing", 0, "committing", "");
1030                if (connection != null) {
1031                        try {
1032                                commitSyncConnection(connection);
1033                                connection.commit();
1034                        } catch (SQLException e) {
1035                                writeSqlLog("error", 0, "", String.format("%s,%s", e.getErrorCode(), e.getMessage()));
1036                                throw e;
1037                        }
1038                }
1039                writeSqlLog("committed", 0, "committed", "");
1040        }
1041
1042        /**
1043         * Rollback connection
1044         * @exception SQLException
1045         * */
1046        public void rollback() throws SQLException {
1047                writeSqlLog("rollbacking", 0, "rollbacking", "");
1048                if (connection != null) {
1049                        try {
1050                                rollbackSyncConnection(connection);
1051                                connection.rollback();
1052                        } catch (SQLException e) {
1053                                writeSqlLog("error", 0, "error", String.format("%s,%s", e.getErrorCode(), e.getMessage()));
1054                                throw e;
1055                        }
1056                }
1057                writeSqlLog("rollbacked", 0, "rollbacked", "");
1058        }
1059
1060        /**
1061         * Get origin connection hash code
1062         * @return Integer - hash code
1063         * */
1064        private Integer getOriginConnectionHashCode() {
1065                if (connection instanceof DriverConnection) {
1066                        return ((DriverConnection) connection).getOriginConnectionHashCode();
1067                } else {
1068                        return connection.hashCode();
1069                }
1070        }
1071
1072        /**
1073         * Get data source thread name
1074         * @return String - DataSource thread name
1075         * */
1076        private String getDataSourceName() {
1077                if (connection instanceof DriverConnection) {
1078                        return ((DriverConnection) connection).getDriverDataSource().getName();
1079                } else {
1080                        return null;
1081                }
1082        }
1083
1084        /**
1085         * Conver sql from ':column_name' to '?'
1086         * @return String
1087         * */
1088        protected String converSql(String sql) {
1089                return sql.replaceAll(":[a-zA-Z0-9_]+", "?");
1090        }
1091
1092        /**
1093         * Conver param from map to list
1094         * @param sql
1095         * @param map
1096         * @return List<Object>
1097         * */
1098        protected List<Object> converParams(String sql, Map<String, Object> map) {
1099                List<String> paramKeys = new ArrayList<String>();
1100                Pattern pattern = Pattern.compile(":[a-zA-Z0-9_]+");
1101                Matcher matcher = pattern.matcher(sql);
1102                while (matcher.find()) {
1103                        String key = matcher.group().replaceFirst(":", "");
1104                        paramKeys.add(key);
1105                }
1106
1107                List<Object> params = new ArrayList<Object>();
1108                for (String pk : paramKeys) {
1109                        Object pv = map.get(pk);
1110                        if (pv == null) {
1111                                pv = map.get(pk.toLowerCase());
1112                        }
1113                        if (pv == null) {
1114                                pv = map.get(pk.toUpperCase());
1115                        }
1116                        if (pv instanceof java.util.Date) {
1117                                java.util.Date utilDate = (java.util.Date) pv;
1118                                java.sql.Timestamp sqlDate = new java.sql.Timestamp(utilDate.getTime());
1119                                params.add(sqlDate);
1120                        } else {
1121                                params.add(pv);
1122                        }
1123                }
1124
1125                return params;
1126        }
1127
1128        /**
1129         * Covner to upper or lower
1130         * @param s - Column name or table name
1131         * @return String
1132         * */
1133        protected String converCase(String s) {
1134                if (COLUMN_NAME_CASE_MODE.equals(COLUMN_NAME_CASE_UPPER)) {
1135                        return s.toUpperCase();
1136                } else if (COLUMN_NAME_CASE_MODE.equals(COLUMN_NAME_CASE_LOWER)) {
1137                        return s.toLowerCase();
1138                } else {
1139                        return s;
1140                }
1141        }
1142
1143        /**
1144         * Check available sql to write to log
1145         * @return boolean
1146         * @param connection - Connection
1147         * @param sql
1148         * */
1149        protected static boolean checkSqlLogAvailable(Connection connection, String sql) {
1150                if (connection instanceof DriverConnection) {
1151                        DriverConnection dc = (DriverConnection) connection;
1152                        DriverDataSource dds = dc.getDriverDataSource();
1153                        ConfigProperties configProperties = dds.getConfigProperties();
1154                        List<String> sqlAllowed = configProperties.getArray("SqlLogAllowed");
1155                        List<String> sqlIgnored = configProperties.getArray("SqlLogIgnored");
1156                        return checkSqlAvailable(sql, sqlAllowed, sqlIgnored);
1157                }
1158                return false;
1159        }
1160
1161        /**
1162         * Check available sql to execute
1163         * @return boolean
1164         * @param sql
1165         * */
1166        protected boolean checkSqlAvailable(String sql) {
1167                return checkSqlAvailable(connection, sql);
1168        }
1169
1170        /**
1171         * Check available sql to execute
1172         * @return boolean
1173         * @param connection
1174         * @param sql
1175         * */
1176        protected static boolean checkSqlAvailable(Connection connection, String sql) {
1177                if (connection instanceof DriverConnection) {
1178                        DriverConnection dc = (DriverConnection) connection;
1179                        DriverDataSource dds = dc.getDriverDataSource();
1180                        ConfigProperties configProperties = dds.getConfigProperties();
1181                        List<String> sqlAllowed = configProperties.getArray("SqlExecuteAllowed");
1182                        List<String> sqlIgnored = configProperties.getArray("SqlExecuteIgnored");
1183                        return checkSqlAvailable(sql, sqlAllowed, sqlIgnored);
1184                }
1185                return true;
1186        }
1187
1188        /**
1189         * Check available sql to execute
1190         * @return boolean
1191         * @param sql
1192         * @param sqlAllowed - From DataSources.properties
1193         * @param sqlIgnored - From DataSources.properties
1194         * */
1195        private static boolean checkSqlAvailable(String sql, List<String> sqlAllowed, List<String> sqlIgnored) {
1196                boolean matchedAllowed = true;
1197                boolean matchedIgnored = false;
1198                if (sqlAllowed != null) {
1199                        for (String regex : sqlAllowed) {
1200                                if (!CommonTools.isBlank(regex)) {
1201                                        matchedAllowed = sql.matches(regex);
1202
1203                                        if (matchedAllowed)
1204                                                break;
1205                                }
1206                        }
1207                }
1208                if (matchedAllowed && sqlIgnored != null) {
1209                        for (String regex : sqlIgnored) {
1210                                if (!CommonTools.isBlank(regex)) {
1211                                        matchedIgnored = sql.matches(regex);
1212
1213                                        if (matchedIgnored)
1214                                                break;
1215                                }
1216                        }
1217                }
1218
1219                boolean b = matchedAllowed && !matchedIgnored;
1220                if (!b) {
1221                        LoggerFactory.getLogger(DriverExecutor.class).debug("Not available - '{}'", sql);
1222                }
1223                return b;
1224        }
1225
1226        /**
1227         * Write sql log
1228         * @return boolean 
1229         * @param type
1230         * @param seq
1231         * @param sql
1232         * @param params
1233         * */
1234        protected synchronized boolean writeSqlLog(String type, long seq, String sql, Object params) {
1235                return writeSqlLog(this.hashCode(),connection, type, seq, sql, params);
1236        }
1237
1238        /**
1239         * Write sql log
1240         * @return boolean 
1241         * @param connection
1242         * @param type
1243         * @param seq
1244         * @param sql
1245         * @param params
1246         * */
1247        protected synchronized static boolean writeSqlLog(int deHashCode,Connection connection, String type, long seq, String sql,
1248                        Object params) {
1249                if (connection instanceof DriverConnection) {
1250                        if (!checkSqlLogAvailable(connection, sql))
1251                                return false;
1252
1253                        final String header = String.format(
1254                                        "LOG_DATE,LOG_HOUR,LOG_MI,LOG_SEC,LOG_MS,LOG_HOST,LOG_THREAD,LOG_DS,LOG_CONN_ID,LOG_EXEC_ID,LOG_TYPE,LOG_SEQ,LOG_SQL,LOG_PARAMS%s",
1255                                        System.lineSeparator());
1256                        final DateFormat df = new SimpleDateFormat("yyyyMMdd");
1257                        final DateFormat dtf = new SimpleDateFormat("yyyyMMdd,HH,mm,ss,SSS");
1258                        DriverConnection dc = (DriverConnection) connection;
1259                        Integer connHashCode = dc.getOriginConnectionHashCode();
1260                        DriverDataSource dds = dc.getDriverDataSource();
1261                        ConfigProperties configProperties = dds.getConfigProperties();
1262
1263                        boolean logEnable = configProperties.getBoolean("SqlLogEnable", false);
1264                        long overspend = configProperties.getMilliSeconds("SqlLogOverspend",0L);
1265
1266                        if (!logEnable)
1267                                return false;
1268
1269                        final String defaultLogFolderPath = String.format("%s/SqlLog/",
1270                                        CommonTools.getJarPath(DriverExecutor.class));
1271                        final String logFolderPath = configProperties.getString("SqlLogFolder", defaultLogFolderPath);
1272                        final long maxFileSize = configProperties.getFileSize("SqlLogMaxFileSize", 1024 * 1024 * 10L);
1273                        final int archiveDays = configProperties.getInteger("SqlLogArchiveDays", 31);
1274                        final int logParamMaxLength = configProperties.getInteger("SqlLogParamMaxLength", 20);
1275
1276                        if (sqlLogCacheArray == null) {
1277
1278                                sqlLogCacheArray = new CacheArray();
1279                                long sqlLogFilterTimer = configProperties.getLong("SqlLogTimer", 1000L);
1280                                sqlLogCacheArray.filter(new CacheArrayFilter(0L,sqlLogFilterTimer) {
1281                                        @Override
1282                                        public void executeBatch(Integer index, List batch) {
1283                                            final StringBuffer sbf = new StringBuffer();
1284                                            for(Object item : batch){
1285                                                sbf.append(item);
1286                                            }
1287                                                try {
1288                                                        String msg = sbf.toString();
1289                                                        String dateStr = df.format(new Timestamp(System.currentTimeMillis()));
1290                                                        File sqlLogFile = new File(String.format("%s/%s.csv", logFolderPath, dateStr));
1291                                                        File sqlLogFolder = new File(sqlLogFile.getParent());
1292
1293                                                        if (sqlLogFolder.exists()) {
1294                                                                if (!sqlLogFolder.canWrite())
1295                                                                        throw new IOException(String.format("Can not write to log folder '%s'",
1296                                                                                        sqlLogFolder.getAbsolutePath()));
1297                                                        } else {
1298                                                                sqlLogFolder.mkdirs();
1299                                                        }
1300
1301                                                        if (sqlLogFile.exists()) {
1302                                                                if (!sqlLogFile.canWrite())
1303                                                                        throw new IOException(String.format("Can not write to log file '%s'",
1304                                                                                        sqlLogFile.getAbsolutePath()));
1305                                                        }
1306
1307                                                        int suffixIndex = sqlLogFile.getName().lastIndexOf(".");
1308                                                        String logFileNamePrefix = sqlLogFile.getName().substring(0, suffixIndex);
1309                                                        long logSize = FileTools.size(sqlLogFile);
1310                                                        if (!sqlLogFile.exists()) {
1311                                                                FileTools.write(sqlLogFile, header, false);
1312                                                        }
1313                                                        if (logSize < maxFileSize) {
1314                                                                FileTools.write(sqlLogFile, msg, true);
1315                                                        } else {
1316                                                                int logIndex = getLogFileIndex(configProperties, sqlLogFolder, logFileNamePrefix,
1317                                                                                "csv");
1318                                                                if (logIndex == 0) {
1319                                                                        FileTools.write(sqlLogFile, String.format("%s%s", header, msg), false);
1320                                                                } else {
1321                                                                        backupLog(sqlLogFolder, sqlLogFile, logFileNamePrefix, logIndex, header, msg);
1322                                                                }
1323                                                        }
1324                                                        archiveLog(archiveDays, sqlLogFolder);
1325                                                } catch (Exception e) {
1326                                                        LoggerFactory.getLogger(DriverExecutor.class).warn(e.getMessage(), e);
1327                                                }
1328                                        }
1329                                });
1330                        }
1331
1332                        if (logEnable) {
1333                                String hostname = CommonTools.getHostname();
1334                                String dateTimeStr = dtf.format(new Timestamp(System.currentTimeMillis()));
1335                                boolean isOverspend = false;
1336                                boolean isSpendSeq = sql.equals("spend");
1337                                if(isSpendSeq && params != null){
1338                                    long spendValue = Long.parseLong(params + "");
1339                                    isOverspend = (spendValue >= overspend);
1340                                }
1341
1342                String threadId = Thread.currentThread().getName() + "-" + Thread.currentThread().getId();
1343                                String msg = String.format("%s,%s,%s,%s,%s,%s,%s,%s,\"%s\",\"%s\"%s", dateTimeStr, hostname,
1344                                                    threadId, dds.getName(), connHashCode,deHashCode, type, seq,
1345                                                    replaceToSigleLine(sql), handleParams(logParamMaxLength, params), System.lineSeparator());  
1346                                                    
1347                                String key = String.format("%s_%s",deHashCode,connHashCode);
1348                                if(seq <= 0 || overspend <= 0){
1349                                    sqlLogCacheArray.add(msg);
1350                                    SQL_LOG_MSG_MAPPING.remove(key);
1351                                SQL_LOG_OVERSPEND_MAPPING.remove(key);                              
1352                                }
1353                                if(seq > 0 && overspend > 0){
1354                                SQL_LOG_OVERSPEND_MAPPING.put(key,isOverspend);
1355                                String existsMsg = SQL_LOG_MSG_MAPPING.get(key);
1356                                existsMsg = existsMsg == null ? msg : (existsMsg + msg);
1357                                SQL_LOG_MSG_MAPPING.put(key, existsMsg);
1358                                
1359                                boolean seqOverspend = SQL_LOG_OVERSPEND_MAPPING.get(key);
1360                                if(isSpendSeq && seqOverspend){
1361                                    sqlLogCacheArray.add(SQL_LOG_MSG_MAPPING.get(key) + "");
1362                                    SQL_LOG_MSG_MAPPING.remove(key);
1363                                    SQL_LOG_OVERSPEND_MAPPING.remove(key);
1364                                }
1365                                if(isSpendSeq && !seqOverspend){
1366                                    SQL_LOG_MSG_MAPPING.remove(key);
1367                                    SQL_LOG_OVERSPEND_MAPPING.remove(key);
1368                                }
1369                                }
1370                                return true;
1371                        }
1372
1373                }
1374                return false;
1375        }
1376
1377        /**
1378         * Backup log file
1379         * */
1380        private static void backupLog(File logFolder, File logFile, String logFileNamePrefix, int logIndex, String header,
1381                        String msg) throws Exception {
1382                String backupLogFileName = String.format("%s/%s.%s.csv", logFolder.getAbsolutePath(), logFileNamePrefix,
1383                                logIndex);
1384                Path source = Paths.get(logFile.getAbsolutePath());
1385                Path target = Paths.get(backupLogFileName);
1386                Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
1387                FileTools.write(logFile, String.format("%s%s", header, msg), false);
1388        }
1389
1390        /**
1391         * Get log file index
1392         * For backup log file use
1393         * */
1394        private static int getLogFileIndex(ConfigProperties configProperties, File folder, String prefix, String suffix) {
1395                Integer maxBackupIndex = configProperties.getInteger("SqlLogMaxBackupIndex", 10);
1396                for (int i = 1; i <= maxBackupIndex; i++) {
1397                        String logFileName = String.format("%s/%s.%s.%s", folder.getAbsolutePath(), prefix, i, suffix);
1398                        File logFile = new File(logFileName);
1399                        if (!logFile.exists())
1400                                return i;
1401                }
1402                return 0;
1403        }
1404
1405        /**
1406         * Archive Log
1407         * */
1408        private static void archiveLog(int archiveDays, File sqlLogFolder) {
1409                if (archiveDays > 0) {
1410                        try {
1411                                long archiveDaysMs = new Date().getTime() - (archiveDays * 24 * 3600000L);
1412                                deleteFilesOlderThan(sqlLogFolder, archiveDaysMs);
1413                        } catch (Exception e) {
1414                                LoggerFactory.getLogger(DriverExecutor.class).warn(e);
1415                        }
1416                }
1417        }
1418
1419        /**
1420         * Delete old archive logs
1421         * */
1422        private static void deleteFilesOlderThan(File directory, long archiveDaysMs) throws IOException {
1423                if (directory.isDirectory()) {
1424                        File[] files = directory.listFiles();
1425                        if (files != null) {
1426                                for (File file : files) {
1427                                        if (file.isFile()) {
1428                                                boolean isLogFile = file.getName().toLowerCase().endsWith(".csv");
1429                                                if (isLogFile) {
1430                                                        boolean canWrite = file.canWrite();
1431                                                        if (canWrite) {
1432                                                                long lastModified = file.lastModified();
1433                                                                if (lastModified < archiveDaysMs) {
1434                                                                        Files.deleteIfExists(Paths.get(file.toURI()));
1435                                                                }
1436                                                        }
1437                                                }
1438                                        }
1439                                }
1440                        }
1441                }
1442        }
1443
1444        /**
1445         * Replace to sigle line
1446         * For write csv log
1447         * */
1448        private static String replaceToSigleLine(String msg) {
1449                return CodeEscape.escapeToSingleLineForCsv(msg);
1450        }
1451
1452        /**
1453         * Hahdle Params
1454         * For write csv log
1455         * */
1456        private static String handleParams(int paramMaxLength, Object params) {
1457
1458                if (params == null)
1459                        return "null";
1460
1461                StringBuffer paramSbf = new StringBuffer("");
1462                if (params instanceof List) {
1463                        List<Object> listParams = (List<Object>) params;
1464                        int size = listParams.size();
1465                        for (int i = 0; i < size; i++) {
1466                                Object param = listParams.get(i);
1467                                String str = param + "";
1468                                if (str.length() > paramMaxLength) {
1469                                        paramSbf.append(str.substring(0, paramMaxLength) + "...");
1470                                } else {
1471                                        paramSbf.append(str);
1472                                }
1473                                if (i < size - 1) {
1474                                        paramSbf.append(";");
1475                                }
1476                        }
1477                } else {
1478                        paramSbf.append(params);
1479                }
1480                return replaceToSigleLine(paramSbf.toString());
1481        }
1482
1483        /**For Sync DataSource**/
1484
1485        /**
1486         * For database replication
1487         * */
1488        protected static void callAndReturnBooleanSync(Connection masterConn, long seq, String sql, List<Object> params,
1489                        boolean returnResult) throws SQLException {
1490                if (masterConn instanceof DriverConnection) {
1491                        openSyncConnection(masterConn);
1492
1493                        DriverConnection masterDriverConn = (DriverConnection) masterConn;
1494                        Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode();
1495                        ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties();
1496                        boolean connCheck = true;
1497                        boolean returnCheck = true;
1498                        if (ddscp != null) {
1499                                connCheck = ddscp.getBoolean("SyncConnectionCheck", true);
1500                                returnCheck = ddscp.getBoolean("SyncReturnCheck", true);
1501                        }
1502                        List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode);
1503                        if (deList != null) {
1504                                for (DriverExecutor de : deList) {
1505                                        try {
1506                                                boolean resultSync = de.callAndReturnBoolean(sql, params);
1507                                                if (returnCheck) {
1508                                                        if (resultSync != returnResult) {
1509                                                                writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s",
1510                                                                                de.getDataSourceName(), de.getOriginConnectionHashCode()));
1511                                                                int errorCode = 99906;
1512                                                                String errorMsg = String.format("The returned results are inconsistent '%s-%s'.",
1513                                                                                de.getDataSourceName(), de.getOriginConnectionHashCode());
1514                                                                throw new SQLException(errorMsg) {
1515                                                                        @Override
1516                                                                        public int getErrorCode() {
1517                                                                                return errorCode;
1518                                                                        }
1519                                                                };
1520                                                        }
1521                                                }
1522                                        } catch (SQLException e) {
1523                                                LoggerFactory.getLogger(DriverExecutor.class)
1524                                                                .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage()));
1525                                                if (connCheck)
1526                                                        throw e;
1527                                        }
1528                                }
1529                        }
1530                }
1531        }
1532
1533        /**
1534         * For database replication
1535         * */
1536        protected static void callAndReturnListSync(Connection masterConn, int cursorStart, int maxRows, String sql,
1537                        List<Object> params) throws SQLException {
1538                if (masterConn instanceof DriverConnection) {
1539                        openSyncConnection(masterConn);
1540
1541                        DriverConnection masterDriverConn = (DriverConnection) masterConn;
1542                        Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode();
1543
1544                        List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode);
1545                        if (deList != null) {
1546                                for (DriverExecutor de : deList) {
1547                                        try {
1548                                                de.callAndReturnBoolean(sql, params);
1549                                        } catch (SQLException e) {
1550                                                LoggerFactory.getLogger(DriverExecutor.class)
1551                                                                .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage()));
1552                                                ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties();
1553                                                boolean connCheck = true;
1554                                                if (ddscp != null) {
1555                                                        connCheck = ddscp.getBoolean("SyncConnectionCheck", true);
1556                                                }
1557                                                if (connCheck)
1558                                                        throw e;
1559                                        }
1560                                }
1561                        }
1562                }
1563        }
1564
1565        /**
1566         * For database replication
1567         * */
1568        protected static void callAndReturnRowsSync(Connection masterConn, long seq, String sql, List<Object> params,
1569                        int returnRows) throws SQLException {
1570                if (masterConn instanceof DriverConnection) {
1571                        openSyncConnection(masterConn);
1572
1573                        DriverConnection masterDriverConn = (DriverConnection) masterConn;
1574                        Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode();
1575                        ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties();
1576                        boolean connCheck = true;
1577                        boolean returnCheck = true;
1578                        if (ddscp != null) {
1579                                connCheck = ddscp.getBoolean("SyncConnectionCheck", true);
1580                                returnCheck = ddscp.getBoolean("SyncReturnCheck", true);
1581                        }
1582                        List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode);
1583                        if (deList != null) {
1584                                for (DriverExecutor de : deList) {
1585                                        try {
1586                                                int rowSync = de.callAndReturnRows(sql, params);
1587                                                if (returnCheck) {
1588                                                        if (rowSync != returnRows) {
1589                                                                writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s",
1590                                                                                de.getDataSourceName(), de.getOriginConnectionHashCode()));
1591                                                                int errorCode = 99906;
1592                                                                String errorMsg = String.format("The returned results are inconsistent '%s-%s'.",
1593                                                                                de.getDataSourceName(), de.getOriginConnectionHashCode());
1594                                                                throw new SQLException(errorMsg) {
1595                                                                        @Override
1596                                                                        public int getErrorCode() {
1597                                                                                return errorCode;
1598                                                                        }
1599                                                                };
1600                                                        }
1601                                                }
1602                                        } catch (SQLException e) {
1603                                                LoggerFactory.getLogger(DriverExecutor.class)
1604                                                                .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage()));
1605                                                if (connCheck)
1606                                                        throw e;
1607                                        }
1608                                }
1609                        }
1610                }
1611        }
1612
1613        /**
1614         * For database replication
1615         * */
1616        protected static void executeBatchListSync(Connection masterConn, long seq, String sql, List<List<Object>> records,
1617                        int returnRows) throws SQLException {
1618                if (masterConn instanceof DriverConnection) {
1619                        openSyncConnection(masterConn);
1620
1621                        DriverConnection masterDriverConn = (DriverConnection) masterConn;
1622                        Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode();
1623                        ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties();
1624                        boolean connCheck = true;
1625                        boolean returnCheck = true;
1626                        if (ddscp != null) {
1627                                connCheck = ddscp.getBoolean("SyncConnectionCheck", true);
1628                                returnCheck = ddscp.getBoolean("SyncReturnCheck", true);
1629                        }
1630                        List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode);
1631                        if (deList != null) {
1632                                for (DriverExecutor de : deList) {
1633                                        try {
1634                                                int rowSync = de.executeBatchList(sql, records);
1635                                                if (returnCheck) {
1636                                                        if (rowSync != returnRows) {
1637                                                                writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s",
1638                                                                                de.getDataSourceName(), de.getOriginConnectionHashCode()));
1639                                                                int errorCode = 99906;
1640                                                                String errorMsg = String.format("The returned results are inconsistent '%s-%s'.",
1641                                                                                de.getDataSourceName(), de.getOriginConnectionHashCode());
1642                                                                throw new SQLException(errorMsg) {
1643                                                                        @Override
1644                                                                        public int getErrorCode() {
1645                                                                                return errorCode;
1646                                                                        }
1647                                                                };
1648                                                        }
1649                                                }
1650                                        } catch (SQLException e) {
1651                                                LoggerFactory.getLogger(DriverExecutor.class)
1652                                                                .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage()));
1653                                                if (connCheck)
1654                                                        throw e;
1655                                        }
1656                                }
1657                        }
1658                }
1659        }
1660
1661        /**
1662         * For database replication
1663         * */
1664        protected static void executeSync(Connection masterConn, long seq, String sql, List<Object> params, int returnRows)
1665                        throws SQLException {
1666                if (masterConn instanceof DriverConnection) {
1667                        openSyncConnection(masterConn);
1668
1669                        DriverConnection masterDriverConn = (DriverConnection) masterConn;
1670                        Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode();
1671                        ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties();
1672                        boolean connCheck = true;
1673                        boolean returnCheck = true;
1674                        if (ddscp != null) {
1675                                connCheck = ddscp.getBoolean("SyncConnectionCheck", true);
1676                                returnCheck = ddscp.getBoolean("SyncReturnCheck", true);
1677                        }
1678                        List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode);
1679                        if (deList != null) {
1680                                for (DriverExecutor de : deList) {
1681                                        try {
1682                                                int rowSync = de.execute(sql, params);
1683                                                if (returnCheck) {
1684                                                        if (rowSync != returnRows) {
1685                                                                writeSqlLog(de.hashCode(),masterDriverConn, "diffed", seq, "diffed", String.format("%s-%s",
1686                                                                                de.getDataSourceName(), de.getOriginConnectionHashCode()));
1687                                                                int errorCode = 99906;
1688                                                                String errorMsg = String.format("The returned results are inconsistent '%s-%s'.",
1689                                                                                de.getDataSourceName(), de.getOriginConnectionHashCode());
1690                                                                throw new SQLException(errorMsg) {
1691                                                                        @Override
1692                                                                        public int getErrorCode() {
1693                                                                                return errorCode;
1694                                                                        }
1695                                                                };
1696                                                        }
1697                                                }
1698                                        } catch (SQLException e) {
1699                                                LoggerFactory.getLogger(DriverExecutor.class)
1700                                                                .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage()));
1701                                                if (connCheck)
1702                                                        throw e;
1703                                        }
1704                                }
1705                        }
1706                }
1707        }
1708
1709        /**
1710         * For database replication
1711         * */
1712        protected synchronized static boolean openSyncConnection(Connection masterConn) throws SQLException {
1713                if (masterConn instanceof DriverConnection) {
1714
1715                        DriverConnection masterDriverConn = (DriverConnection) masterConn;
1716                        Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode();
1717
1718                        boolean isExist = SYNC_EXECUTOR_MARK.containsKey(masterConnHashCode);
1719
1720                        if (isExist) {
1721                                return false;
1722                        } else {
1723                                SYNC_EXECUTOR_MARK.put(masterConnHashCode, new ArrayList<DriverExecutor>());
1724                        }
1725
1726                        DriverDataSource dds = masterDriverConn.getDriverDataSource();
1727                        if (dds != null) {
1728                                List<DriverDataSource> sdsl = dds.getSyncDataSourceList(dds.getName());
1729                                if (sdsl != null) {
1730
1731                                        if (sdsl.isEmpty())
1732                                                return false;
1733
1734                                        for (DriverDataSource sds : sdsl) {
1735
1736                                                Integer sdsHashCode = sds.hashCode();
1737                                                if (!SYNC_CONN_ERROR_TIME.containsKey(sdsHashCode)) {
1738                                                        SYNC_CONN_ERROR_TIME.put(sdsHashCode, 0L);
1739                                                }
1740
1741                                                Long syncConnErrorTime = SYNC_CONN_ERROR_TIME.get(sdsHashCode);
1742                                                if (syncConnErrorTime > 0) {
1743                                                        ConfigProperties ddscp = dds.getConfigProperties();
1744                                                        long connCheckMs = 10000L;
1745                                                        if (ddscp != null) {
1746                                                                connCheckMs = ddscp.getLong("SyncConnectionCheckTime", 10000L);
1747                                                        }
1748                                                        boolean isSkipConn = syncConnErrorTime > 0
1749                                                                        && (System.currentTimeMillis() - syncConnErrorTime) <= connCheckMs;
1750                                                        if (isSkipConn) {
1751                                                                continue;
1752                                                        }
1753                                                }
1754
1755                                                String masterFingerprint = dds.getFingerprint();
1756                                                String syncFingerprint = sds.getFingerprint();
1757                                                if (masterFingerprint.equalsIgnoreCase(syncFingerprint)) {
1758                                                        LoggerFactory.getLogger(DriverExecutor.class)
1759                                                                        .warn("Skip sync reason 'same connection fingerprint'.");
1760                                                        continue;
1761                                                }
1762
1763                                                try {
1764                                                        final Connection conn = sds.getConnection();
1765                                                        if (conn == null) {
1766                                                                int errorCode = 99904;
1767                                                                String errorMsg = "Connection is null.";
1768                                                                throw new SQLException(errorMsg) {
1769                                                                        @Override
1770                                                                        public int getErrorCode() {
1771                                                                                return errorCode;
1772                                                                        }
1773                                                                };
1774                                                        }
1775                                                        conn.setAutoCommit(masterConn.getAutoCommit());
1776                                                        SYNC_CONN_ERROR_TIME.put(sdsHashCode, 0L);
1777                                                        SYNC_EXECUTOR_MARK.get(masterConnHashCode).add(new DriverExecutor(conn));
1778                                                } catch (SQLException e) {
1779                                                        SYNC_CONN_ERROR_TIME.put(sdsHashCode, System.currentTimeMillis());
1780                                                        LoggerFactory.getLogger(DriverExecutor.class)
1781                                                                        .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage()));
1782                                                        ConfigProperties ddscp = dds.getConfigProperties();
1783                                                        boolean connCheck = true;
1784                                                        if (ddscp != null) {
1785                                                                connCheck = ddscp.getBoolean("SyncConnectionCheck", true);
1786                                                        }
1787                                                        if (connCheck)
1788                                                                throw e;
1789                                                }
1790
1791                                        }
1792
1793                                        return true;
1794                                }
1795                        }
1796                }
1797                return false;
1798        }
1799
1800        /**
1801         * For database replication
1802         * */
1803        protected static void closeSyncConnection(Connection masterConn) throws SQLException {
1804                if (masterConn instanceof DriverConnection) {
1805                        DriverConnection masterDriverConn = (DriverConnection) masterConn;
1806                        Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode();
1807
1808                        List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode);
1809                        if (deList != null) {
1810                                for (DriverExecutor de : deList) {
1811                                        try {
1812                                                de.close();
1813                                        } catch (SQLException e) {
1814                                                LoggerFactory.getLogger(DriverExecutor.class)
1815                                                                .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage()));
1816                                                ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties();
1817                                                boolean connCheck = true;
1818                                                if (ddscp != null) {
1819                                                        connCheck = ddscp.getBoolean("SyncConnectionCheck", true);
1820                                                }
1821                                                if (connCheck)
1822                                                        throw e;
1823                                        }
1824                                }
1825                                SYNC_EXECUTOR_MARK.remove(masterConnHashCode);
1826                                LoggerFactory.getLogger(DriverExecutor.class).debug("CloseSyncConnection - masterConn '{}' finished.",
1827                                                masterConnHashCode);
1828                        }
1829                }
1830        }
1831
1832        /**
1833         * For database replication
1834         * */
1835        protected static void commitSyncConnection(Connection masterConn) throws SQLException {
1836                if (masterConn instanceof DriverConnection) {
1837                        DriverConnection masterDriverConn = (DriverConnection) masterConn;
1838                        Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode();
1839
1840                        List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode);
1841                        if (deList != null) {
1842                                for (DriverExecutor de : deList) {
1843                                        try {
1844                                                de.commit();
1845                                        } catch (SQLException e) {
1846                                                LoggerFactory.getLogger(DriverExecutor.class)
1847                                                                .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage()));
1848                                                ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties();
1849                                                boolean connCheck = true;
1850                                                if (ddscp != null) {
1851                                                        connCheck = ddscp.getBoolean("SyncConnectionCheck", true);
1852                                                }
1853                                                if (connCheck)
1854                                                        throw e;
1855                                        }
1856                                }
1857                                LoggerFactory.getLogger(DriverExecutor.class).debug("CommitSyncConnection - masterConn '{}' finished.",
1858                                                masterConnHashCode);
1859                        }
1860                }
1861        }
1862
1863        /**
1864         * For database replication
1865         * */
1866        protected static void rollbackSyncConnection(Connection masterConn) throws SQLException {
1867                if (masterConn instanceof DriverConnection) {
1868                        DriverConnection masterDriverConn = (DriverConnection) masterConn;
1869                        Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode();
1870
1871                        List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode);
1872                        if (deList != null) {
1873                                for (DriverExecutor de : deList) {
1874                                        try {
1875                                                de.rollback();
1876                                        } catch (SQLException e) {
1877                                                LoggerFactory.getLogger(DriverExecutor.class)
1878                                                                .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage()));
1879                                                ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties();
1880                                                boolean connCheck = true;
1881                                                if (ddscp != null) {
1882                                                        connCheck = ddscp.getBoolean("SyncConnectionCheck", true);
1883                                                }
1884                                                if (connCheck)
1885                                                        throw e;
1886                                        }
1887                                }
1888                                LoggerFactory.getLogger(DriverExecutor.class)
1889                                                .debug("RollbackSyncConnection - masterConn '{}' finished.", masterConnHashCode);
1890                        }
1891                }
1892        }
1893
1894        /**
1895         * For database replication
1896         * */
1897        protected static void abortSyncConnection(Connection masterConn) throws SQLException {
1898                if (masterConn instanceof DriverConnection) {
1899                        DriverConnection masterDriverConn = (DriverConnection) masterConn;
1900                        Integer masterConnHashCode = masterDriverConn.getOriginConnectionHashCode();
1901
1902                        List<DriverExecutor> deList = SYNC_EXECUTOR_MARK.get(masterConnHashCode);
1903                        if (deList != null) {
1904                                for (DriverExecutor de : deList) {
1905                                        try {
1906                                                de.abort();
1907                                        } catch (SQLException e) {
1908                                                LoggerFactory.getLogger(DriverExecutor.class)
1909                                                                .warn(String.format("ERROR CODE: (%s) %s", e.getErrorCode(), e.getMessage()));
1910                                                ConfigProperties ddscp = masterDriverConn.getDriverDataSource().getConfigProperties();
1911                                                boolean connCheck = true;
1912                                                if (ddscp != null) {
1913                                                        connCheck = ddscp.getBoolean("SyncConnectionCheck", true);
1914                                                }
1915                                                if (connCheck)
1916                                                        throw e;
1917                                        }
1918                                }
1919                                LoggerFactory.getLogger(DriverExecutor.class).debug("AbortSyncConnection - masterConn '{}' finished.",
1920                                                masterConnHashCode);
1921                        }
1922                }
1923        }
1924
1925}