001/*******************************************************************************
002The MIT License (MIT)
003
004Copyright (c) 2024 KILLCODING.COM
005
006Permission is hereby granted, free of charge, to any person obtaining a copy
007of this software and associated documentation files (the "Software"), to deal
008in the Software without restriction, including without limitation the rights
009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
010copies of the Software, and to permit persons to whom the Software is
011furnished to do so, subject to the following conditions:
012
013The above copyright notice and this permission notice shall be included in
014all copies or substantial portions of the Software.
015
016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
022THE SOFTWARE.
023*****************************************************************************/
024package com.killcoding.cache;
025
026import java.io.Serializable;
027import java.util.List;
028import java.util.ArrayList;
029import java.util.Map;
030import com.killcoding.tool.CommonTools;
031import java.util.Collection;
032import java.util.Collections;
033import com.killcoding.cache.CacheArrayFilter;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import com.killcoding.log.LoggerFactory;
037import com.killcoding.log.Logger;
038import com.killcoding.cache.CacheArray;
039import java.util.concurrent.CopyOnWriteArrayList;
040
041/**
042 * This class use asynchronous caching to handle large result and large list.
043 * Need class CacheArrayFilter to use.
044 * */
045public class CacheArray implements java.io.Serializable {
046    
047    private static Integer MAX_POOL_SIZE = 100;
048    
049    private static Integer USING_POOL_SIZE = 0;
050
051        private static ExecutorService arrayPool = null;
052
053        protected Object lastItem = null;
054
055        private static boolean stop = false;
056        
057        private static boolean forceStop = false;
058        
059        private CacheArrayFilter filter = null;
060        
061        private boolean nullIsCompleted = true;
062        
063        protected Integer INDEX = -1;
064        protected Integer SIZE = -1;
065        protected Object OBJECT = null;
066        protected Integer BATCH_INDEX = -1;
067    protected final List BATCH = new ArrayList();
068    
069        /**
070         * All cache keys list
071         * */
072        private final List<String> cacheKeys = new CopyOnWriteArrayList<String>();
073
074        /**
075         * 'true' is completed
076         * */
077        private boolean completed = false;
078
079        /**
080         * New a CacheArray object
081         * */
082        public CacheArray() {
083                super();
084                this.nullIsCompleted = true;
085        }
086        
087        public CacheArray(boolean nullIsCompleted) {
088                super();
089                this.nullIsCompleted = nullIsCompleted;
090        }       
091
092        public static synchronized void initPool(int poolSize) {
093                if (arrayPool == null) {
094                    MAX_POOL_SIZE = poolSize;
095                    Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE);
096                        arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
097                }
098        }
099        
100        public CacheArrayFilter filter(final long timer,final Runnable executeRun,final Runnable completedRun,final Runnable terminatedRun) {
101            CacheArray the = this;
102            the.filter = new CacheArrayFilter(timer){
103                @Override
104                public void execute(Integer index, Object object){
105                    the.INDEX = index;
106                    the.OBJECT = object;
107                    
108                    if(executeRun != null) executeRun.run();
109                }
110                @Override
111                public void completed(Integer size) {
112                    the.SIZE = size;
113                    
114                if(completedRun != null) completedRun.run();
115                }   
116                @Override
117                public void terminated() {
118                    
119                    if(terminatedRun != null) terminatedRun.run();
120                }
121            };
122            filter(the.filter);
123            return the.filter;
124        }
125
126        public CacheArrayFilter filter(final long timer,final long loopTimer,final Runnable executeRun,final Runnable executeBatchRun,final Runnable completedRun,final Runnable terminatedRun) {
127            CacheArray the = this;
128            the.filter = new CacheArrayFilter(timer,loopTimer){
129                @Override
130                public void execute(Integer index, Object object){
131                    
132                    if(executeRun != null) executeRun.run();
133                }
134                
135                @Override
136                public void executeBatch(Integer index,List batch) {
137                    
138                if(executeBatchRun != null) executeBatchRun.run();
139                }    
140                
141                @Override
142                public void completed(Integer size) {
143
144                if(completedRun != null) completedRun.run();
145                }   
146                
147                @Override
148                public void terminated() {
149                    
150                    if(terminatedRun != null) terminatedRun.run();
151                }
152            };
153            filter(the.filter);
154            return the.filter;
155        }
156        /**
157         * Need override the method to process logic handle
158         * @param filter - CacheArrayFilter
159         * */
160        public synchronized void filter(CacheArrayFilter _filter) {
161            this.filter = _filter;
162        int usagePool = setUsingPoolSize(+1);
163        if(usagePool > MAX_POOL_SIZE){
164            LoggerFactory.getLogger(CacheArray.class).debug(String.format("Exceeds the entire pool size ('%s' > '%s').",usagePool,MAX_POOL_SIZE));
165        }
166                final CacheArray the = this;
167                Runnable runnable = new Runnable() {
168                        @Override
169                        public void run() {
170                                int index = 0;
171                                try {
172                                        while (!the.completed && !the.filter.isForceTerminated() && !isForceStoped()) {
173                                            
174                                                if (Thread.currentThread().isInterrupted()){
175                                                    the.filter.terminated = true;
176                                                    the.filter.forceTerminated = true;
177                                                    the.completed = false;
178                                                        break;
179                                                }
180
181                                                the.filter.setCacheArray(the);
182                                                int size = the.size();
183                                                while (index < size && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) {
184                                                        try {
185                                                        if (Thread.currentThread().isInterrupted()){
186                                                            the.filter.terminated = true;
187                                                            the.filter.forceTerminated = true;
188                                                            the.completed = false;
189                                                                break;
190                                                        }                       
191                                                        
192                                                                if (forceStop) {
193                                                                        the.filter.terminated = true;
194                                                                        the.filter.forceTerminated = true;
195                                                                        the.completed = false;
196                                                                        break;
197                                                                }
198                                                                
199                                                                if (the.filter.forceTerminated){
200                                                                    the.completed = false;
201                                                                        break;
202                                                                }
203                                                                
204                                                                Object t = the.get(index);
205                                                                if (t == null) {
206                                                                        the.completed = true;
207                                                                        break;
208                                                                }
209                                                                lastItem = t;
210                                    the.INDEX = index;
211                                    the.OBJECT = t;                                                             
212                                                                BATCH.add(t);
213                                                                the.filter.execute(index, t);
214                                                                String cacheKey = the.getCacheKey(index);
215                                                                StoredCache.remove(cacheKey);
216                                                        } catch (Exception ee) {
217                                                                LoggerFactory.getLogger(CacheArray.class).warn(ee);
218                                                        }
219                                                        index++;
220                                                try {
221                                                        Thread.sleep(the.filter.getTimer());
222                                                } catch (InterruptedException e) {
223                                                        LoggerFactory.getLogger(CacheArray.class).warn(e);
224                                                        the.filter.terminated = true;
225                                                        the.completed = false;
226                                                        break;
227                                                }                                                       
228                                                }
229                                                BATCH_INDEX++;
230                                                the.filter.executeBatch(BATCH_INDEX,BATCH);
231                                                BATCH.clear();
232                                                try {
233                                                        Thread.sleep(the.filter.getLoopTimer());
234                                                } catch (InterruptedException e) {
235                                                        LoggerFactory.getLogger(CacheArray.class).warn(e);
236                                                        the.filter.terminated = true;
237                                                        the.completed = false;
238                                                        break;
239                                                }                                               
240                                        }
241                                } finally {
242                                    
243                                        if(the.completed) {
244                                            the.SIZE = index;
245                                            the.filter.completed(index);
246                                            setUsingPoolSize(-1);
247                                        }
248                                        
249                                        if(the.filter.terminated){
250                                            the.filter.terminated();
251                                            setUsingPoolSize(-1);
252                                        }
253                                        
254                                        the.removeAll();
255                                        
256                                        Thread.currentThread().interrupt();
257                                }
258
259                        }
260                };
261                if (arrayPool == null) {
262                    Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE);
263                    arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
264                }
265                arrayPool.execute(runnable);
266        }
267
268        /**
269         * Get all cache keys
270         * @return List
271         * */
272        public synchronized List<String> getCacheKeys() {
273                return new ArrayList<String>(Collections.synchronizedList(cacheKeys));
274        }
275
276        /**
277         * Get all cache size
278         * @reteurn int
279         * */
280        public synchronized int size() {
281                return cacheKeys.size();
282        }
283
284        /**
285         * Add object to the cache list
286         * @return - return cacke key
287         * @value - It is object value
288         * */
289        public synchronized String add(Object value) {
290            if(nullIsCompleted || (!nullIsCompleted && value != null)) {
291            if(this.filter != null && !this.filter.isForceTerminated() && !isForceStoped()){
292                        String key = CommonTools.generateId(16);
293                        StoredCache.set(key, value);
294                        cacheKeys.add(key);
295                        return key;
296            }
297            }
298            return null;
299        }
300        
301        /**
302         * Add object to the cache list
303         * @return - return cacke key
304         * @value - It is object value
305         * @sleepMs - It is object value
306         * @throws InterruptedException
307         * */
308        public synchronized String add(Object value,long sleepMs) throws InterruptedException {
309            String key = add(value);
310            
311            if(sleepMs > 0) Thread.sleep(sleepMs);
312            
313            return key;
314        }       
315
316        /**
317         * Add list to the cache list
318         * @param list - It is object list value
319         * */
320        public synchronized void addAll(List list) {
321                for (Object value : list) {
322                        add(value);
323                }
324        }
325
326        /**
327         * Remove cache list index object
328         * @return - return removed cache key name
329         * @param index - It is remove index cache
330         * */
331        public synchronized String remove(int index) {
332                String key = cacheKeys.get(index);
333                StoredCache.remove(key);
334                cacheKeys.remove(index);
335                return key;
336        }
337
338        /**
339         * Remove all cache list
340         * */
341        public synchronized void removeAll() {
342                for (String key : cacheKeys) {
343                        StoredCache.remove(key);
344                }
345                cacheKeys.clear();
346        }
347
348        /**
349         * Get cache value by index
350         * @return - Cache value
351         * @param index - Cache index
352         * */
353        public synchronized Object get(int index) {
354                String key = cacheKeys.get(index);
355                return StoredCache.get(key);
356        }
357
358        /**
359         * Get cache key by index
360         * @return - Cache key
361         * @param index - Cache index
362         * */
363        public synchronized String getCacheKey(int index) {
364                return cacheKeys.get(index);
365        }
366
367        /**
368         * Check complete status
369         * @return - 'true' is completed
370         * */
371        public synchronized Boolean isCompleted() {
372                return completed;
373        }
374        
375        public static Boolean isStoped() {
376                return stop;
377        }       
378        
379        public static Boolean isForceStoped() {
380                return forceStop;
381        }       
382        
383        public static void stop() {
384                stop = true;
385        }
386        
387        public static void forceStop() {
388                stop = true;
389                forceStop = true;
390        }       
391        
392        public static void start() {
393                stop = false;
394        }
395        
396        public void terminate(){
397            if(filter != null) filter.terminate();
398        }
399
400        public void forceTerminate(){
401            if(filter != null) filter.forceTerminate();
402        }
403        
404        public Boolean isTerminated(){
405            return filter == null ? true : filter.isTerminated();
406        }
407        
408        public Boolean isForceTerminated(){
409            return filter == null ? true : filter.isForceTerminated();
410        }       
411        
412        public CacheArrayFilter getFilter(){
413            return filter;
414        }
415        
416        public Integer getIndex(){
417            return INDEX;
418        }
419        
420        public Integer getSize(){
421            return SIZE;
422        }
423        
424        public Object getObject(){
425            return OBJECT;
426        }               
427
428        public Object getBatch(){
429            return BATCH;
430        }       
431        
432        public Integer getBatchIndex(){
433            return BATCH_INDEX;
434        }
435        
436        private static synchronized Integer setUsingPoolSize(int value){
437            return USING_POOL_SIZE += value;
438        }
439        
440        public static synchronized Integer getUsingPoolSize(){
441            return USING_POOL_SIZE;
442        }       
443
444        public static Integer getMaxPoolSize(){
445            return MAX_POOL_SIZE;
446        }       
447        
448        public static synchronized double getUsage(){
449            return getUsingPoolSize()/(MAX_POOL_SIZE*1.0D);
450        }
451        
452        public boolean isNulIsCompleted(){
453            return nullIsCompleted;
454        }
455        
456        public void setNulIsCompleted(boolean nullIsCompleted){
457            this.nullIsCompleted = nullIsCompleted;
458        }       
459}