001/*******************************************************************************
002The MIT License (MIT)
003
004Copyright (c) 2024 KILLCODING.COM
005
006Permission is hereby granted, free of charge, to any person obtaining a copy
007of this software and associated documentation files (the "Software"), to deal
008in the Software without restriction, including without limitation the rights
009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
010copies of the Software, and to permit persons to whom the Software is
011furnished to do so, subject to the following conditions:
012
013The above copyright notice and this permission notice shall be included in
014all copies or substantial portions of the Software.
015
016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
022THE SOFTWARE.
023*****************************************************************************/
024package com.killcoding.cache;
025
026import java.io.Serializable;
027import java.util.List;
028import java.util.ArrayList;
029import java.util.Map;
030import com.killcoding.tool.CommonTools;
031import java.util.Collection;
032import java.util.Collections;
033import com.killcoding.cache.CacheArrayFilter;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import com.killcoding.log.LoggerFactory;
037import com.killcoding.log.Logger;
038import com.killcoding.cache.CacheArray;
039
040/**
041 * This class use asynchronous caching to handle large result and large list.
042 * Need class CacheArrayFilter to use.
043 * */
044public class CacheArray implements java.io.Serializable {
045    
046    private static Integer MAX_POOL_SIZE = 100;
047    
048    private static Integer USING_POOL_SIZE = 0;
049
050        private static ExecutorService arrayPool = null;
051
052        protected Object lastItem = null;
053
054        private static boolean stop = false;
055        
056        private static boolean forceStop = false;
057        
058        private CacheArrayFilter filter = null;
059        
060        private boolean nullIsCompleted = true;
061        
062        protected Integer INDEX = -1;
063        protected Integer SIZE = -1;
064        protected Object OBJECT = null;
065        protected Integer BATCH_INDEX = -1;
066    protected final List BATCH = new ArrayList();
067    
068        /**
069         * All cache keys list
070         * */
071        private final List<String> cacheKeys = new ArrayList<String>();
072
073        /**
074         * 'true' is completed
075         * */
076        private boolean completed = false;
077
078        /**
079         * New a CacheArray object
080         * */
081        public CacheArray() {
082                super();
083                this.nullIsCompleted = true;
084        }
085        
086        public CacheArray(boolean nullIsCompleted) {
087                super();
088                this.nullIsCompleted = nullIsCompleted;
089        }       
090
091        public static synchronized void initPool(int poolSize) {
092                if (arrayPool == null) {
093                    MAX_POOL_SIZE = poolSize;
094                    Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE);
095                        arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
096                }
097        }
098        
099        public CacheArrayFilter filter(final long timer,final Runnable executeRun,final Runnable completedRun,final Runnable terminatedRun) {
100            CacheArray the = this;
101            the.filter = new CacheArrayFilter(timer){
102                @Override
103                public void execute(Integer index, Object object){
104                    the.INDEX = index;
105                    the.OBJECT = object;
106                    
107                    if(executeRun != null) executeRun.run();
108                }
109                @Override
110                public void completed(Integer size) {
111                    the.SIZE = size;
112                    
113                if(completedRun != null) completedRun.run();
114                }   
115                @Override
116                public void terminated() {
117                    
118                    if(terminatedRun != null) terminatedRun.run();
119                }
120            };
121            filter(the.filter);
122            return the.filter;
123        }
124
125        public CacheArrayFilter filter(final long timer,final long loopTimer,final Runnable executeRun,final Runnable executeBatchRun,final Runnable completedRun,final Runnable terminatedRun) {
126            CacheArray the = this;
127            the.filter = new CacheArrayFilter(timer,loopTimer){
128                @Override
129                public void execute(Integer index, Object object){
130                    
131                    if(executeRun != null) executeRun.run();
132                }
133                
134                @Override
135                public void executeBatch(Integer index,List batch) {
136                    
137                if(executeBatchRun != null) executeBatchRun.run();
138                }    
139                
140                @Override
141                public void completed(Integer size) {
142
143                if(completedRun != null) completedRun.run();
144                }   
145                
146                @Override
147                public void terminated() {
148                    
149                    if(terminatedRun != null) terminatedRun.run();
150                }
151            };
152            filter(the.filter);
153            return the.filter;
154        }
155        /**
156         * Need override the method to process logic handle
157         * @param filter - CacheArrayFilter
158         * */
159        public synchronized void filter(CacheArrayFilter _filter) {
160            this.filter = _filter;
161        int usagePool = setUsingPoolSize(+1);
162        if(usagePool > MAX_POOL_SIZE){
163            LoggerFactory.getLogger(CacheArray.class).debug(String.format("Exceeds the entire pool size ('%s' > '%s').",usagePool,MAX_POOL_SIZE));
164        }
165                final CacheArray the = this;
166                Runnable runnable = new Runnable() {
167                        @Override
168                        public void run() {
169                                int index = 0;
170                                try {
171                                        while (!the.completed && !the.filter.isForceTerminated() && !isForceStoped()) {
172                                            
173                                                if (Thread.currentThread().isInterrupted()){
174                                                    the.filter.terminated = true;
175                                                    the.filter.forceTerminated = true;
176                                                    the.completed = false;
177                                                        break;
178                                                }
179
180                                                the.filter.setCacheArray(the);
181                                                int size = the.size();
182                                                while (index < size && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) {
183                                                        try {
184                                                        if (Thread.currentThread().isInterrupted()){
185                                                            the.filter.terminated = true;
186                                                            the.filter.forceTerminated = true;
187                                                            the.completed = false;
188                                                                break;
189                                                        }                       
190                                                        
191                                                                if (forceStop) {
192                                                                        the.filter.terminated = true;
193                                                                        the.filter.forceTerminated = true;
194                                                                        the.completed = false;
195                                                                        break;
196                                                                }
197                                                                
198                                                                if (the.filter.forceTerminated){
199                                                                    the.completed = false;
200                                                                        break;
201                                                                }
202                                                                
203                                                                Object t = the.get(index);
204                                                                if (t == null) {
205                                                                        the.completed = true;
206                                                                        break;
207                                                                }
208                                                                lastItem = t;
209                                    the.INDEX = index;
210                                    the.OBJECT = t;                                                             
211                                                                BATCH.add(t);
212                                                                the.filter.execute(index, t);
213                                                                String cacheKey = the.getCacheKey(index);
214                                                                StoredCache.remove(cacheKey);
215                                                        } catch (Exception ee) {
216                                                                LoggerFactory.getLogger(CacheArray.class).warn(ee);
217                                                        }
218                                                        index++;
219                                                try {
220                                                        Thread.sleep(the.filter.getTimer());
221                                                } catch (InterruptedException e) {
222                                                        LoggerFactory.getLogger(CacheArray.class).warn(e);
223                                                        the.filter.terminated = true;
224                                                        the.completed = false;
225                                                        break;
226                                                }                                                       
227                                                }
228                                                BATCH_INDEX++;
229                                                the.filter.executeBatch(BATCH_INDEX,BATCH);
230                                                BATCH.clear();
231                                                try {
232                                                        Thread.sleep(the.filter.getLoopTimer());
233                                                } catch (InterruptedException e) {
234                                                        LoggerFactory.getLogger(CacheArray.class).warn(e);
235                                                        the.filter.terminated = true;
236                                                        the.completed = false;
237                                                        break;
238                                                }                                               
239                                        }
240                                } finally {
241                                    
242                                        if(the.completed) {
243                                            the.SIZE = index;
244                                            the.filter.completed(index);
245                                            setUsingPoolSize(-1);
246                                        }
247                                        
248                                        if(the.filter.terminated){
249                                            the.filter.terminated();
250                                            setUsingPoolSize(-1);
251                                        }
252                                        
253                                        the.removeAll();
254                                        
255                                        Thread.currentThread().interrupt();
256                                }
257
258                        }
259                };
260                if (arrayPool == null) {
261                    Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE);
262                    arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
263                }
264                arrayPool.execute(runnable);
265        }
266
267        /**
268         * Get all cache keys
269         * @return List
270         * */
271        public synchronized List<String> getCacheKeys() {
272                return new ArrayList<String>(Collections.synchronizedList(cacheKeys));
273        }
274
275        /**
276         * Get all cache size
277         * @reteurn int
278         * */
279        public synchronized int size() {
280                return cacheKeys.size();
281        }
282
283        /**
284         * Add object to the cache list
285         * @return - return cacke key
286         * @value - It is object value
287         * */
288        public synchronized String add(Object value) {
289            if(nullIsCompleted || (!nullIsCompleted && value != null)) {
290            if(this.filter != null && !this.filter.isForceTerminated() && !isForceStoped()){
291                        String key = CommonTools.generateId(16);
292                        StoredCache.set(key, value);
293                        cacheKeys.add(key);
294                        return key;
295            }
296            }
297            return null;
298        }
299        
300        /**
301         * Add object to the cache list
302         * @return - return cacke key
303         * @value - It is object value
304         * @sleepMs - It is object value
305         * @throws InterruptedException
306         * */
307        public synchronized String add(Object value,long sleepMs) throws InterruptedException {
308            String key = add(value);
309            
310            if(sleepMs > 0) Thread.sleep(sleepMs);
311            
312            return key;
313        }       
314
315        /**
316         * Add list to the cache list
317         * @param list - It is object list value
318         * */
319        public synchronized void addAll(List list) {
320                for (Object value : list) {
321                        add(value);
322                }
323        }
324
325        /**
326         * Remove cache list index object
327         * @return - return removed cache key name
328         * @param index - It is remove index cache
329         * */
330        public synchronized String remove(int index) {
331                String key = cacheKeys.get(index);
332                StoredCache.remove(key);
333                cacheKeys.remove(index);
334                return key;
335        }
336
337        /**
338         * Remove all cache list
339         * */
340        public synchronized void removeAll() {
341                for (String key : cacheKeys) {
342                        StoredCache.remove(key);
343                }
344                cacheKeys.clear();
345        }
346
347        /**
348         * Get cache value by index
349         * @return - Cache value
350         * @param index - Cache index
351         * */
352        public synchronized Object get(int index) {
353                String key = cacheKeys.get(index);
354                return StoredCache.get(key);
355        }
356
357        /**
358         * Get cache key by index
359         * @return - Cache key
360         * @param index - Cache index
361         * */
362        public synchronized String getCacheKey(int index) {
363                return cacheKeys.get(index);
364        }
365
366        /**
367         * Check complete status
368         * @return - 'true' is completed
369         * */
370        public synchronized Boolean isCompleted() {
371                return completed;
372        }
373        
374        public static Boolean isStoped() {
375                return stop;
376        }       
377        
378        public static Boolean isForceStoped() {
379                return forceStop;
380        }       
381        
382        public static void stop() {
383                stop = true;
384        }
385        
386        public static void forceStop() {
387                stop = true;
388                forceStop = true;
389        }       
390        
391        public static void start() {
392                stop = false;
393        }
394        
395        public void terminate(){
396            if(filter != null) filter.terminate();
397        }
398
399        public void forceTerminate(){
400            if(filter != null) filter.forceTerminate();
401        }
402        
403        public Boolean isTerminated(){
404            return filter == null ? true : filter.isTerminated();
405        }
406        
407        public Boolean isForceTerminated(){
408            return filter == null ? true : filter.isForceTerminated();
409        }               
410        
411        public Integer getIndex(){
412            return INDEX;
413        }
414        
415        public Integer getSize(){
416            return SIZE;
417        }
418        
419        public Object getObject(){
420            return OBJECT;
421        }               
422
423        public Object getBatch(){
424            return BATCH;
425        }       
426        
427        public Integer getBatchIndex(){
428            return BATCH_INDEX;
429        }
430        
431        private static synchronized Integer setUsingPoolSize(int value){
432            return USING_POOL_SIZE += value;
433        }
434        
435        public static synchronized Integer getUsingPoolSize(){
436            return USING_POOL_SIZE;
437        }       
438
439        public static Integer getMaxPoolSize(){
440            return MAX_POOL_SIZE;
441        }       
442        
443        public static synchronized double getUsage(){
444            return getUsingPoolSize()/(MAX_POOL_SIZE*1.0D);
445        }
446        
447        public boolean isNulIsCompleted(){
448            return nullIsCompleted;
449        }
450        
451        public void setNulIsCompleted(boolean nullIsCompleted){
452            this.nullIsCompleted = nullIsCompleted;
453        }       
454}