001/*******************************************************************************
002The MIT License (MIT)
003
004Copyright (c) 2024 KILLCODING.COM
005
006Permission is hereby granted, free of charge, to any person obtaining a copy
007of this software and associated documentation files (the "Software"), to deal
008in the Software without restriction, including without limitation the rights
009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
010copies of the Software, and to permit persons to whom the Software is
011furnished to do so, subject to the following conditions:
012
013The above copyright notice and this permission notice shall be included in
014all copies or substantial portions of the Software.
015
016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
022THE SOFTWARE.
023*****************************************************************************/
024package com.killcoding.cache;
025
026import java.io.Serializable;
027import java.util.List;
028import java.util.ArrayList;
029import java.util.Map;
030import com.killcoding.tool.CommonTools;
031import java.util.Collection;
032import java.util.Collections;
033import com.killcoding.cache.CacheArrayFilter;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import com.killcoding.log.LoggerFactory;
037import com.killcoding.log.Logger;
038import com.killcoding.cache.CacheArray;
039
040/**
041 * This class use asynchronous caching to handle large result and large list.
042 * Need class CacheArrayFilter to use.
043 * */
044public class CacheArray implements java.io.Serializable {
045    
046    private static Integer MAX_POOL_SIZE = 100;
047    
048    private static Integer USING_POOL_SIZE = 0;
049
050        private static ExecutorService arrayPool = null;
051
052        protected Object lastItem = null;
053
054        private static boolean stop = false;
055        
056        private static boolean forceStop = false;
057        
058        private CacheArrayFilter filter = null;
059        
060        private boolean nullIsCompleted = true;
061        
062        protected Integer INDEX = -1;
063        protected Integer SIZE = -1;
064        protected Object OBJECT = null;
065        protected Integer BATCH_INDEX = -1;
066    protected final List BATCH = new ArrayList();
067    
068        /**
069         * All cache keys list
070         * */
071        private final List<String> cacheKeys = new ArrayList<String>();
072
073        /**
074         * 'true' is completed
075         * */
076        private boolean completed = false;
077
078        /**
079         * New a CacheArray object
080         * */
081        public CacheArray() {
082                super();
083                this.nullIsCompleted = true;
084        }
085        
086        public CacheArray(boolean nullIsCompleted) {
087                super();
088                this.nullIsCompleted = nullIsCompleted;
089        }       
090
091        public static synchronized void initPool(int poolSize) {
092                if (arrayPool == null) {
093                    MAX_POOL_SIZE = poolSize;
094                    Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE);
095                        arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
096                }
097        }
098        
099        public CacheArrayFilter filter(final long timer,final Runnable executeRun,final Runnable completedRun,final Runnable terminatedRun) {
100            CacheArray the = this;
101            the.filter = new CacheArrayFilter(timer){
102                @Override
103                public void execute(Integer index, Object object){
104                    the.INDEX = index;
105                    the.OBJECT = object;
106                    
107                    if(executeRun != null) executeRun.run();
108                }
109                @Override
110                public void completed(Integer size) {
111                    the.SIZE = size;
112                    
113                if(completedRun != null) completedRun.run();
114                }   
115                @Override
116                public void terminated() {
117                    
118                    if(terminatedRun != null) terminatedRun.run();
119                }
120            };
121            filter(the.filter);
122            return the.filter;
123        }
124
125        public CacheArrayFilter filter(final long timer,final long loopTimer,final Runnable executeRun,final Runnable executeBatchRun,final Runnable completedRun,final Runnable terminatedRun) {
126            CacheArray the = this;
127            the.filter = new CacheArrayFilter(timer,loopTimer){
128                @Override
129                public void execute(Integer index, Object object){
130                    
131                    if(executeRun != null) executeRun.run();
132                }
133                
134                @Override
135                public void executeBatch(Integer index,List batch) {
136                    
137                if(executeBatchRun != null) executeBatchRun.run();
138                }    
139                
140                @Override
141                public void completed(Integer size) {
142
143                if(completedRun != null) completedRun.run();
144                }   
145                
146                @Override
147                public void terminated() {
148                    
149                    if(terminatedRun != null) terminatedRun.run();
150                }
151            };
152            filter(the.filter);
153            return the.filter;
154        }
155        /**
156         * Need override the method to process logic handle
157         * @param filter - CacheArrayFilter
158         * */
159        public synchronized void filter(CacheArrayFilter _filter) {
160            this.filter = _filter;
161        int usagePool = setUsingPoolSize(+1);
162        if(usagePool > MAX_POOL_SIZE){
163            LoggerFactory.getLogger(CacheArray.class).debug(String.format("Exceeds the entire pool size ('%s' > '%s').",usagePool,MAX_POOL_SIZE));
164        }
165                final CacheArray the = this;
166                Runnable runnable = new Runnable() {
167                        @Override
168                        public void run() {
169                                int index = 0;
170                                try {
171                                        while (!the.completed && !the.filter.isForceTerminated() && !isForceStoped()) {
172                                            
173                                                if (Thread.currentThread().isInterrupted()){
174                                                    the.filter.terminated = true;
175                                                    the.filter.forceTerminated = true;
176                                                    the.completed = false;
177                                                        break;
178                                                }
179
180                                                the.filter.setCacheArray(the);
181                                                int size = the.size();
182                                                while (index < size && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) {
183                                                        try {
184                                                        if (Thread.currentThread().isInterrupted()){
185                                                            the.filter.terminated = true;
186                                                            the.filter.forceTerminated = true;
187                                                            the.completed = false;
188                                                                break;
189                                                        }                       
190                                                        
191                                                                if (forceStop) {
192                                                                        the.filter.terminated = true;
193                                                                        the.filter.forceTerminated = true;
194                                                                        the.completed = false;
195                                                                        break;
196                                                                }
197                                                                
198                                                                if (the.filter.forceTerminated){
199                                                                    the.completed = false;
200                                                                        break;
201                                                                }
202                                                                
203                                                                Object t = the.get(index);
204                                                                if (t == null) {
205                                                                        the.completed = true;
206                                                                        break;
207                                                                }
208                                                                lastItem = t;
209                                    the.INDEX = index;
210                                    the.OBJECT = t;                                                             
211                                                                BATCH.add(t);
212                                                                the.filter.execute(index, t);
213                                                                String cacheKey = the.getCacheKey(index);
214                                                                StoredCache.remove(cacheKey);
215                                                        } catch (Exception ee) {
216                                                                LoggerFactory.getLogger(CacheArray.class).warn(ee);
217                                                        }
218                                                        index++;
219                                                try {
220                                                        Thread.sleep(the.filter.getTimer());
221                                                } catch (InterruptedException e) {
222                                                        LoggerFactory.getLogger(CacheArray.class).warn(e);
223                                                        the.filter.terminated = true;
224                                                        the.completed = false;
225                                                        break;
226                                                }                                                       
227                                                }
228                                                BATCH_INDEX++;
229                                                the.filter.executeBatch(BATCH_INDEX,BATCH);
230                                                BATCH.clear();
231                                                try {
232                                                        Thread.sleep(the.filter.getLoopTimer());
233                                                } catch (InterruptedException e) {
234                                                        LoggerFactory.getLogger(CacheArray.class).warn(e);
235                                                        the.filter.terminated = true;
236                                                        the.completed = false;
237                                                        break;
238                                                }                                               
239                                        }
240                                } finally {
241                                    
242                                        if(the.completed) {
243                                            the.SIZE = index;
244                                            the.filter.completed(index);
245                                            setUsingPoolSize(-1);
246                                        }
247                                        
248                                        if(the.filter.terminated){
249                                            the.filter.terminated();
250                                            setUsingPoolSize(-1);
251                                        }
252                                        
253                                        the.removeAll();
254                                        
255                                        //Thread.currentThread().interrupt();
256                                }
257
258                        }
259                };
260                if (arrayPool == null) {
261                    Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE);
262                    arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
263                }
264                arrayPool.execute(runnable);
265        }
266
267        /**
268         * Get all cache keys
269         * @return List
270         * */
271        public synchronized List<String> getCacheKeys() {
272                return new ArrayList<String>(Collections.synchronizedList(cacheKeys));
273        }
274
275        /**
276         * Get all cache size
277         * @reteurn int
278         * */
279        public synchronized int size() {
280                return cacheKeys.size();
281        }
282
283        /**
284         * Add object to the cache list
285         * @return - return cacke key
286         * @value - It is object value
287         * */
288        public synchronized String add(Object value) {
289            if(nullIsCompleted || (!nullIsCompleted && value != null)) {
290            if(this.filter != null && !this.filter.isForceTerminated() && !isForceStoped()){
291                        String key = CommonTools.generateId(16);
292                        StoredCache.set(key, value);
293                        cacheKeys.add(key);
294                        return key;
295            }
296            }
297            return null;
298        }
299        
300        /**
301         * Add object to the cache list
302         * @return - return cacke key
303         * @value - It is object value
304         * @sleepMs - It is object value
305         * @throws InterruptedException
306         * */
307        public synchronized String add(Object value,long sleepMs) throws InterruptedException {
308            String key = add(value);
309            Thread.sleep(sleepMs);
310            return key;
311        }       
312
313        /**
314         * Add list to the cache list
315         * @param list - It is object list value
316         * */
317        public synchronized void addAll(List list) {
318                for (Object value : list) {
319                        add(value);
320                }
321        }
322
323        /**
324         * Remove cache list index object
325         * @return - return removed cache key name
326         * @param index - It is remove index cache
327         * */
328        public synchronized String remove(int index) {
329                String key = cacheKeys.get(index);
330                StoredCache.remove(key);
331                cacheKeys.remove(index);
332                return key;
333        }
334
335        /**
336         * Remove all cache list
337         * */
338        public synchronized void removeAll() {
339                for (String key : cacheKeys) {
340                        StoredCache.remove(key);
341                }
342                cacheKeys.clear();
343        }
344
345        /**
346         * Get cache value by index
347         * @return - Cache value
348         * @param index - Cache index
349         * */
350        public synchronized Object get(int index) {
351                String key = cacheKeys.get(index);
352                return StoredCache.get(key);
353        }
354
355        /**
356         * Get cache key by index
357         * @return - Cache key
358         * @param index - Cache index
359         * */
360        public synchronized String getCacheKey(int index) {
361                return cacheKeys.get(index);
362        }
363
364        /**
365         * Check complete status
366         * @return - 'true' is completed
367         * */
368        public synchronized Boolean isCompleted() {
369                return completed;
370        }
371        
372        public static Boolean isStoped() {
373                return stop;
374        }       
375        
376        public static Boolean isForceStoped() {
377                return forceStop;
378        }       
379        
380        public static void stop() {
381                stop = true;
382        }
383        
384        public static void forceStop() {
385                stop = true;
386                forceStop = true;
387        }       
388        
389        public static void start() {
390                stop = false;
391        }
392        
393        public void terminate(){
394            if(filter != null) filter.terminate();
395        }
396
397        public void forceTerminate(){
398            if(filter != null) filter.forceTerminate();
399        }
400        
401        public Boolean isTerminated(){
402            return filter == null ? true : filter.isTerminated();
403        }
404        
405        public Boolean isForceTerminated(){
406            return filter == null ? true : filter.isForceTerminated();
407        }               
408        
409        public Integer getIndex(){
410            return INDEX;
411        }
412        
413        public Integer getSize(){
414            return SIZE;
415        }
416        
417        public Object getObject(){
418            return OBJECT;
419        }               
420
421        public Object getBatch(){
422            return BATCH;
423        }       
424        
425        public Integer getBatchIndex(){
426            return BATCH_INDEX;
427        }
428        
429        private static synchronized Integer setUsingPoolSize(int value){
430            return USING_POOL_SIZE += value;
431        }
432        
433        public static synchronized Integer getUsingPoolSize(){
434            return USING_POOL_SIZE;
435        }       
436
437        public static Integer getMaxPoolSize(){
438            return MAX_POOL_SIZE;
439        }       
440        
441        public static synchronized double getUsage(){
442            return getUsingPoolSize()/(MAX_POOL_SIZE*1.0D);
443        }
444        
445        public boolean isNulIsCompleted(){
446            return nullIsCompleted;
447        }
448        
449        public void setNulIsCompleted(boolean nullIsCompleted){
450            this.nullIsCompleted = nullIsCompleted;
451        }       
452}