001/*******************************************************************************
002The MIT License (MIT)
003
004Copyright (c) 2024 KILLCODING.COM
005
006Permission is hereby granted, free of charge, to any person obtaining a copy
007of this software and associated documentation files (the "Software"), to deal
008in the Software without restriction, including without limitation the rights
009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
010copies of the Software, and to permit persons to whom the Software is
011furnished to do so, subject to the following conditions:
012
013The above copyright notice and this permission notice shall be included in
014all copies or substantial portions of the Software.
015
016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
022THE SOFTWARE.
023*****************************************************************************/
024package com.killcoding.cache;
025
026import java.io.Serializable;
027import java.util.List;
028import java.util.ArrayList;
029import java.util.Map;
030import com.killcoding.tool.CommonTools;
031import java.util.Collection;
032import java.util.Collections;
033import com.killcoding.cache.CacheArrayFilter;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import com.killcoding.log.LoggerFactory;
037import com.killcoding.log.Logger;
038import com.killcoding.cache.CacheArray;
039
040/**
041 * This class use asynchronous caching to handle large result and large list.
042 * Need class CacheArrayFilter to use.
043 * */
044public class CacheArray<T> implements java.io.Serializable {
045    
046    private static Integer MAX_POOL_SIZE = 100;
047    
048    private static Integer USING_POOL_SIZE = 0;
049
050        private static ExecutorService arrayPool = null;
051
052        protected T lastItem = null;
053
054        private static boolean stop = false;
055        
056        private static boolean forceStop = false;
057        
058        private CacheArrayFilter filter = null;
059        
060        protected Integer INDEX = -1;
061        protected Integer SIZE = -1;
062        protected Object OBJECT = null;
063
064        /**
065         * All cache keys list
066         * */
067        private final List<String> cacheKeys = new ArrayList<String>();
068
069        /**
070         * 'true' is completed
071         * */
072        private boolean completed = false;
073
074        /**
075         * New a CacheArray object
076         * */
077        public CacheArray() {
078                super();
079        }
080
081        public static synchronized void initPool(int poolSize) {
082                if (arrayPool == null) {
083                    MAX_POOL_SIZE = poolSize;
084                    Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE);
085                        arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
086                }
087        }
088        
089        public CacheArrayFilter filter(final long timer,final Runnable executeRun,final Runnable completedRun,final Runnable terminatedRun) {
090            CacheArray the = this;
091            the.filter = new CacheArrayFilter(timer){
092                @Override
093                public void execute(Integer index, Object object){
094                    the.INDEX = index;
095                    the.OBJECT = object;
096                    
097                    if(executeRun != null) executeRun.run();
098                }
099                @Override
100                public void completed(Integer size) {
101                    the.SIZE = size;
102                    
103                if(completedRun != null) completedRun.run();
104                }   
105                @Override
106                public void terminated() {
107                    
108                    if(terminatedRun != null) terminatedRun.run();
109                }
110            };
111            filter(the.filter);
112            return the.filter;
113        }
114
115        /**
116         * Need override the method to process logic handle
117         * @param filter - CacheArrayFilter
118         * */
119        public void filter(CacheArrayFilter _filter) {
120            this.filter = _filter;
121        int usagePool = setUsingPoolSize(+1);
122        if(usagePool > MAX_POOL_SIZE){
123            LoggerFactory.getLogger(CacheArray.class).debug(String.format("Exceeds the entire pool size ('%s' > '%s').",usagePool,MAX_POOL_SIZE));
124        }
125                final CacheArray<T> the = this;
126                Runnable runnable = new Runnable() {
127                        @Override
128                        public void run() {
129                                int index = 0;
130                                try {
131                                        while (!the.completed && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) {
132                                            
133                                                if (Thread.currentThread().isInterrupted()){
134                                                    the.filter.terminated = true;
135                                                    the.filter.forceTerminated = true;
136                                                    the.completed = false;
137                                                        break;
138                                                }
139
140                                                the.filter.setCacheArray(the);
141                                                int size = the.size();
142                                                while (index < size && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) {
143                                                        try {
144                                                        if (Thread.currentThread().isInterrupted()){
145                                                            the.filter.terminated = true;
146                                                            the.filter.forceTerminated = true;
147                                                            the.completed = false;
148                                                                break;
149                                                        }                       
150                                                        
151                                                                if (forceStop) {
152                                                                        the.filter.terminated = true;
153                                                                        the.filter.forceTerminated = true;
154                                                                        the.completed = false;
155                                                                        break;
156                                                                }
157                                                                
158                                                                if (the.filter.forceTerminated){
159                                                                    the.completed = false;
160                                                                        break;
161                                                                }
162                                                                
163                                                                T t = the.get(index);
164                                                                if (t == null) {
165                                                                        the.completed = true;
166                                                                        break;
167                                                                }
168                                                                lastItem = t;
169                                                                the.filter.execute(index, t);
170                                                                String cacheKey = the.getCacheKey(index);
171                                                                StoredCache.remove(cacheKey);
172                                                        } catch (Exception ee) {
173                                                                LoggerFactory.getLogger(CacheArray.class).warn(ee);
174                                                        }
175                                                        index++;
176                                                try {
177                                                        Thread.sleep(the.filter.getTimer());
178                                                } catch (InterruptedException e) {
179                                                        LoggerFactory.getLogger(CacheArray.class).warn(e);
180                                                        the.filter.terminated = true;
181                                                        the.completed = false;
182                                                        break;
183                                                }                                                       
184                                                }
185                                                try {
186                                                        Thread.sleep(the.filter.getTimer());
187                                                } catch (InterruptedException e) {
188                                                        LoggerFactory.getLogger(CacheArray.class).warn(e);
189                                                        the.filter.terminated = true;
190                                                        the.completed = false;
191                                                        break;
192                                                }                                               
193                                        }
194                                } finally {
195                                    
196                                        if(the.completed) {
197                                            the.filter.completed(index);
198                                            setUsingPoolSize(-1);
199                                        }
200                                        
201                                        if(the.filter.terminated){
202                                            the.filter.terminated();
203                                            setUsingPoolSize(-1);
204                                        }
205                                        
206                                        the.removeAll();
207                                        
208                                        Thread.currentThread().interrupt();
209                                }
210
211                        }
212                };
213                if (arrayPool == null) {
214                    Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE);
215                    arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE);
216                }
217                arrayPool.execute(runnable);
218        }
219
220        /**
221         * Get all cache keys
222         * @return List
223         * */
224        public synchronized List<String> getCacheKeys() {
225                return new ArrayList<String>(Collections.synchronizedList(cacheKeys));
226        }
227
228        /**
229         * Get all cache size
230         * @reteurn int
231         * */
232        public synchronized int size() {
233                return cacheKeys.size();
234        }
235
236        /**
237         * Add object to the cache list
238         * @return - return cacke key
239         * @value - It is object value
240         * */
241        public synchronized String add(T value) {
242            if(this.filter != null && !this.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()){
243                String key = CommonTools.generateId(16);
244                StoredCache.set(key, value);
245                cacheKeys.add(key);
246                return key;
247            }
248            return null;
249        }
250
251        /**
252         * Add list to the cache list
253         * @param list - It is object list value
254         * */
255        public synchronized void addAll(List<T> list) {
256                for (T value : list) {
257                        add(value);
258                }
259        }
260
261        /**
262         * Remove cache list index object
263         * @return - return removed cache key name
264         * @param index - It is remove index cache
265         * */
266        public synchronized String remove(int index) {
267                String key = cacheKeys.get(index);
268                StoredCache.remove(key);
269                cacheKeys.remove(index);
270                return key;
271        }
272
273        /**
274         * Remove all cache list
275         * */
276        public synchronized void removeAll() {
277                for (String key : cacheKeys) {
278                        StoredCache.remove(key);
279                }
280                cacheKeys.clear();
281        }
282
283        /**
284         * Get cache value by index
285         * @return - Cache value
286         * @param index - Cache index
287         * */
288        public synchronized T get(int index) {
289                String key = cacheKeys.get(index);
290                return (T) StoredCache.get(key);
291        }
292
293        /**
294         * Get cache key by index
295         * @return - Cache key
296         * @param index - Cache index
297         * */
298        public synchronized String getCacheKey(int index) {
299                return cacheKeys.get(index);
300        }
301
302        /**
303         * Check complete status
304         * @return - 'true' is completed
305         * */
306        public synchronized Boolean isCompleted() {
307                return completed;
308        }
309        
310        public static Boolean isStoped() {
311                return stop;
312        }       
313        
314        public static Boolean isForceStoped() {
315                return forceStop;
316        }       
317        
318        public static void stop() {
319                stop = true;
320        }
321        
322        public static void forceStop() {
323                stop = true;
324                forceStop = true;
325        }       
326        
327        public static void start() {
328                stop = false;
329        }
330        
331        public void terminate(){
332            if(filter != null) filter.terminate();
333        }
334
335        public void forceTerminate(){
336            if(filter != null) filter.forceTerminate();
337        }
338        
339        public Boolean isTerminated(){
340            return filter == null ? true : filter.isTerminated();
341        }
342        
343        public Boolean isForceTerminated(){
344            return filter == null ? true : filter.isForceTerminated();
345        }               
346        
347        public Integer getIndex(){
348            return INDEX;
349        }
350        
351        public Integer getSize(){
352            return SIZE;
353        }
354        
355        public Object getObject(){
356            return OBJECT;
357        }               
358        
359        private static synchronized Integer setUsingPoolSize(int value){
360            return USING_POOL_SIZE += value;
361        }
362        
363        public static synchronized Integer getUsingPoolSize(){
364            return USING_POOL_SIZE;
365        }       
366
367        public static Integer getMaxPoolSize(){
368            return MAX_POOL_SIZE;
369        }       
370        
371        public static synchronized double getUsage(){
372            return getUsingPoolSize()/(MAX_POOL_SIZE*1.0D);
373        }
374}