001/*******************************************************************************
002The MIT License (MIT)
003
004Copyright (c) 2024 KILLCODING.COM
005
006Permission is hereby granted, free of charge, to any person obtaining a copy
007of this software and associated documentation files (the "Software"), to deal
008in the Software without restriction, including without limitation the rights
009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
010copies of the Software, and to permit persons to whom the Software is
011furnished to do so, subject to the following conditions:
012
013The above copyright notice and this permission notice shall be included in
014all copies or substantial portions of the Software.
015
016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
022THE SOFTWARE.
023*****************************************************************************/
024package com.killcoding.cache;
025
026import java.io.Serializable;
027import java.util.List;
028import java.util.ArrayList;
029import java.util.Map;
030import com.killcoding.tool.CommonTools;
031import java.util.Collection;
032import java.util.Collections;
033import com.killcoding.cache.CacheArrayFilter;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import com.killcoding.log.LoggerFactory;
037import com.killcoding.log.Logger;
038import com.killcoding.cache.CacheArray;
039
040/**
041 * This class use asynchronous caching to handle large result and large list.
042 * Need class CacheArrayFilter to use.
043 * */
044public class CacheArray implements java.io.Serializable {
045    
046    private static Integer MAX_POOL_SIZE = 100;
047    
048    private static Integer USING_POOL_SIZE = 0;
049
050        private static ExecutorService arrayPool = null;
051
052        protected Object lastItem = null;
053        
054        
055
056        private static boolean stop = false;
057        
058        private static boolean forceStop = false;
059        
060        private CacheArrayFilter filter = null;
061        
062        protected Integer INDEX = -1;
063        protected Integer SIZE = -1;
064        protected Object OBJECT = null;
065        protected Integer BATCH_INDEX = -1;
066    protected final List BATCH = new ArrayList();
067    
068        /**
069         * All cache keys list
070         * */
071        private final List<String> cacheKeys = new ArrayList<String>();
072
073        /**
074         * 'true' is completed
075         * */
076        private boolean completed = false;
077
078        /**
079         * New a CacheArray object
080         * */
081        public CacheArray() {
082                super();
083        }
084
085        public static synchronized void initPool(int poolSize) {
086                if (arrayPool == null) {
087                    MAX_POOL_SIZE = poolSize;
088                    Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE);
089                        arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
090                }
091        }
092        
093        public CacheArrayFilter filter(final long timer,final Runnable executeRun,final Runnable completedRun,final Runnable terminatedRun) {
094            CacheArray the = this;
095            the.filter = new CacheArrayFilter(timer){
096                @Override
097                public void execute(Integer index, Object object){
098                    the.INDEX = index;
099                    the.OBJECT = object;
100                    
101                    if(executeRun != null) executeRun.run();
102                }
103                @Override
104                public void completed(Integer size) {
105                    the.SIZE = size;
106                    
107                if(completedRun != null) completedRun.run();
108                }   
109                @Override
110                public void terminated() {
111                    
112                    if(terminatedRun != null) terminatedRun.run();
113                }
114            };
115            filter(the.filter);
116            return the.filter;
117        }
118
119        public CacheArrayFilter filter(final long timer,final long loopTimer,final Runnable executeRun,final Runnable executeBatchRun,final Runnable completedRun,final Runnable terminatedRun) {
120            CacheArray the = this;
121            the.filter = new CacheArrayFilter(timer,loopTimer){
122                @Override
123                public void execute(Integer index, Object object){
124                    
125                    if(executeRun != null) executeRun.run();
126                }
127                
128                @Override
129                public void executeBatch(Integer index,List batch) {
130                    
131                if(executeBatchRun != null) executeBatchRun.run();
132                }    
133                
134                @Override
135                public void completed(Integer size) {
136
137                if(completedRun != null) completedRun.run();
138                }   
139                
140                @Override
141                public void terminated() {
142                    
143                    if(terminatedRun != null) terminatedRun.run();
144                }
145            };
146            filter(the.filter);
147            return the.filter;
148        }
149        /**
150         * Need override the method to process logic handle
151         * @param filter - CacheArrayFilter
152         * */
153        public void filter(CacheArrayFilter _filter) {
154            this.filter = _filter;
155        int usagePool = setUsingPoolSize(+1);
156        if(usagePool > MAX_POOL_SIZE){
157            LoggerFactory.getLogger(CacheArray.class).debug(String.format("Exceeds the entire pool size ('%s' > '%s').",usagePool,MAX_POOL_SIZE));
158        }
159                final CacheArray the = this;
160                Runnable runnable = new Runnable() {
161                        @Override
162                        public void run() {
163                                int index = 0;
164                                try {
165                                        while (!the.completed && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) {
166                                            
167                                                if (Thread.currentThread().isInterrupted()){
168                                                    the.filter.terminated = true;
169                                                    the.filter.forceTerminated = true;
170                                                    the.completed = false;
171                                                        break;
172                                                }
173
174                                                the.filter.setCacheArray(the);
175                                                int size = the.size();
176                                                while (index < size && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) {
177                                                        try {
178                                                        if (Thread.currentThread().isInterrupted()){
179                                                            the.filter.terminated = true;
180                                                            the.filter.forceTerminated = true;
181                                                            the.completed = false;
182                                                                break;
183                                                        }                       
184                                                        
185                                                                if (forceStop) {
186                                                                        the.filter.terminated = true;
187                                                                        the.filter.forceTerminated = true;
188                                                                        the.completed = false;
189                                                                        break;
190                                                                }
191                                                                
192                                                                if (the.filter.forceTerminated){
193                                                                    the.completed = false;
194                                                                        break;
195                                                                }
196                                                                
197                                                                Object t = the.get(index);
198                                                                if (t == null) {
199                                                                        the.completed = true;
200                                                                        break;
201                                                                }
202                                                                lastItem = t;
203                                    the.INDEX = index;
204                                    the.OBJECT = t;                                                             
205                                                                BATCH.add(t);
206                                                                the.filter.execute(index, t);
207                                                                String cacheKey = the.getCacheKey(index);
208                                                                StoredCache.remove(cacheKey);
209                                                        } catch (Exception ee) {
210                                                                LoggerFactory.getLogger(CacheArray.class).warn(ee);
211                                                        }
212                                                        index++;
213                                                try {
214                                                        Thread.sleep(the.filter.getTimer());
215                                                } catch (InterruptedException e) {
216                                                        LoggerFactory.getLogger(CacheArray.class).warn(e);
217                                                        the.filter.terminated = true;
218                                                        the.completed = false;
219                                                        break;
220                                                }                                                       
221                                                }
222                                                BATCH_INDEX++;
223                                                the.filter.executeBatch(BATCH_INDEX,BATCH);
224                                                BATCH.clear();
225                                                try {
226                                                        Thread.sleep(the.filter.getLoopTimer());
227                                                } catch (InterruptedException e) {
228                                                        LoggerFactory.getLogger(CacheArray.class).warn(e);
229                                                        the.filter.terminated = true;
230                                                        the.completed = false;
231                                                        break;
232                                                }                                               
233                                        }
234                                } finally {
235                                    
236                                        if(the.completed) {
237                                            the.SIZE = index;
238                                            the.filter.completed(index);
239                                            setUsingPoolSize(-1);
240                                        }
241                                        
242                                        if(the.filter.terminated){
243                                            the.filter.terminated();
244                                            setUsingPoolSize(-1);
245                                        }
246                                        
247                                        the.removeAll();
248                                        
249                                        Thread.currentThread().interrupt();
250                                }
251
252                        }
253                };
254                if (arrayPool == null) {
255                    Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE);
256                    arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
257                }
258                arrayPool.execute(runnable);
259        }
260
261        /**
262         * Get all cache keys
263         * @return List
264         * */
265        public synchronized List<String> getCacheKeys() {
266                return new ArrayList<String>(Collections.synchronizedList(cacheKeys));
267        }
268
269        /**
270         * Get all cache size
271         * @reteurn int
272         * */
273        public synchronized int size() {
274                return cacheKeys.size();
275        }
276
277        /**
278         * Add object to the cache list
279         * @return - return cacke key
280         * @value - It is object value
281         * */
282        public synchronized String add(Object value) {
283            if(this.filter != null && !this.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()){
284                String key = CommonTools.generateId(16);
285                StoredCache.set(key, value);
286                cacheKeys.add(key);
287                return key;
288            }
289            return null;
290        }
291        
292        /**
293         * Add object to the cache list
294         * @return - return cacke key
295         * @value - It is object value
296         * @sleepMs - It is object value
297         * @throws InterruptedException
298         * */
299        public synchronized String add(Object value,long sleepMs) throws InterruptedException {
300            String key = add(value);
301            Thread.sleep(sleepMs);
302            return key;
303        }       
304
305        /**
306         * Add list to the cache list
307         * @param list - It is object list value
308         * */
309        public synchronized void addAll(List list) {
310                for (Object value : list) {
311                        add(value);
312                }
313        }
314
315        /**
316         * Remove cache list index object
317         * @return - return removed cache key name
318         * @param index - It is remove index cache
319         * */
320        public synchronized String remove(int index) {
321                String key = cacheKeys.get(index);
322                StoredCache.remove(key);
323                cacheKeys.remove(index);
324                return key;
325        }
326
327        /**
328         * Remove all cache list
329         * */
330        public synchronized void removeAll() {
331                for (String key : cacheKeys) {
332                        StoredCache.remove(key);
333                }
334                cacheKeys.clear();
335        }
336
337        /**
338         * Get cache value by index
339         * @return - Cache value
340         * @param index - Cache index
341         * */
342        public synchronized Object get(int index) {
343                String key = cacheKeys.get(index);
344                return StoredCache.get(key);
345        }
346
347        /**
348         * Get cache key by index
349         * @return - Cache key
350         * @param index - Cache index
351         * */
352        public synchronized String getCacheKey(int index) {
353                return cacheKeys.get(index);
354        }
355
356        /**
357         * Check complete status
358         * @return - 'true' is completed
359         * */
360        public synchronized Boolean isCompleted() {
361                return completed;
362        }
363        
364        public static Boolean isStoped() {
365                return stop;
366        }       
367        
368        public static Boolean isForceStoped() {
369                return forceStop;
370        }       
371        
372        public static void stop() {
373                stop = true;
374        }
375        
376        public static void forceStop() {
377                stop = true;
378                forceStop = true;
379        }       
380        
381        public static void start() {
382                stop = false;
383        }
384        
385        public void terminate(){
386            if(filter != null) filter.terminate();
387        }
388
389        public void forceTerminate(){
390            if(filter != null) filter.forceTerminate();
391        }
392        
393        public Boolean isTerminated(){
394            return filter == null ? true : filter.isTerminated();
395        }
396        
397        public Boolean isForceTerminated(){
398            return filter == null ? true : filter.isForceTerminated();
399        }               
400        
401        public Integer getIndex(){
402            return INDEX;
403        }
404        
405        public Integer getSize(){
406            return SIZE;
407        }
408        
409        public Object getObject(){
410            return OBJECT;
411        }               
412
413        public Object getBatch(){
414            return BATCH;
415        }       
416        
417        public Integer getBatchIndex(){
418            return BATCH_INDEX;
419        }
420        
421        private static synchronized Integer setUsingPoolSize(int value){
422            return USING_POOL_SIZE += value;
423        }
424        
425        public static synchronized Integer getUsingPoolSize(){
426            return USING_POOL_SIZE;
427        }       
428
429        public static Integer getMaxPoolSize(){
430            return MAX_POOL_SIZE;
431        }       
432        
433        public static synchronized double getUsage(){
434            return getUsingPoolSize()/(MAX_POOL_SIZE*1.0D);
435        }
436}