001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.cache; 025 026import java.io.Serializable; 027import java.util.List; 028import java.util.ArrayList; 029import java.util.Map; 030import com.killcoding.tool.CommonTools; 031import java.util.Collection; 032import java.util.Collections; 033import com.killcoding.cache.CacheArrayFilter; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import com.killcoding.log.LoggerFactory; 037import com.killcoding.log.Logger; 038import com.killcoding.cache.CacheArray; 039 040/** 041 * This class use asynchronous caching to handle large result and large list. 042 * Need class CacheArrayFilter to use. 043 * */ 044public class CacheArray<T> implements java.io.Serializable { 045 046 private static Integer MAX_POOL_SIZE = 100; 047 048 private static Integer USING_POOL_SIZE = 0; 049 050 private static ExecutorService arrayPool = null; 051 052 protected T lastItem = null; 053 054 private static boolean stop = false; 055 056 private static boolean forceStop = false; 057 058 private CacheArrayFilter filter = null; 059 060 protected Integer INDEX = -1; 061 protected Integer SIZE = -1; 062 protected Object OBJECT = null; 063 064 /** 065 * All cache keys list 066 * */ 067 private final List<String> cacheKeys = new ArrayList<String>(); 068 069 /** 070 * 'true' is completed 071 * */ 072 private boolean completed = false; 073 074 /** 075 * New a CacheArray object 076 * */ 077 public CacheArray() { 078 super(); 079 } 080 081 public static synchronized void initPool(int poolSize) { 082 if (arrayPool == null) { 083 MAX_POOL_SIZE = poolSize; 084 Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE); 085 arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 086 } 087 } 088 089 public CacheArrayFilter filter(final long timer,final Runnable executeRun,final Runnable completedRun,final Runnable terminatedRun) { 090 CacheArray the = this; 091 the.filter = new CacheArrayFilter(timer){ 092 @Override 093 public void execute(Integer index, Object object){ 094 the.INDEX = index; 095 the.OBJECT = object; 096 097 if(executeRun != null) executeRun.run(); 098 } 099 @Override 100 public void completed(Integer size) { 101 the.SIZE = size; 102 103 if(completedRun != null) completedRun.run(); 104 } 105 @Override 106 public void terminated() { 107 108 if(terminatedRun != null) terminatedRun.run(); 109 } 110 }; 111 filter(the.filter); 112 return the.filter; 113 } 114 115 /** 116 * Need override the method to process logic handle 117 * @param filter - CacheArrayFilter 118 * */ 119 public void filter(CacheArrayFilter _filter) { 120 this.filter = _filter; 121 int usagePool = setUsingPoolSize(+1); 122 if(usagePool > MAX_POOL_SIZE){ 123 LoggerFactory.getLogger(CacheArray.class).debug(String.format("Exceeds the entire pool size ('%s' > '%s').",usagePool,MAX_POOL_SIZE)); 124 } 125 final CacheArray<T> the = this; 126 Runnable runnable = new Runnable() { 127 @Override 128 public void run() { 129 int index = 0; 130 try { 131 while (!the.completed && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) { 132 133 if (Thread.currentThread().isInterrupted()){ 134 the.filter.terminated = true; 135 the.filter.forceTerminated = true; 136 the.completed = false; 137 break; 138 } 139 140 the.filter.setCacheArray(the); 141 int size = the.size(); 142 while (index < size && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) { 143 try { 144 if (Thread.currentThread().isInterrupted()){ 145 the.filter.terminated = true; 146 the.filter.forceTerminated = true; 147 the.completed = false; 148 break; 149 } 150 151 if (forceStop) { 152 the.filter.terminated = true; 153 the.filter.forceTerminated = true; 154 the.completed = false; 155 break; 156 } 157 158 if (the.filter.forceTerminated){ 159 the.completed = false; 160 break; 161 } 162 163 T t = the.get(index); 164 if (t == null) { 165 the.completed = true; 166 break; 167 } 168 lastItem = t; 169 the.filter.execute(index, t); 170 String cacheKey = the.getCacheKey(index); 171 StoredCache.remove(cacheKey); 172 } catch (Exception ee) { 173 LoggerFactory.getLogger(CacheArray.class).warn(ee); 174 } 175 index++; 176 try { 177 Thread.sleep(the.filter.getTimer()); 178 } catch (InterruptedException e) { 179 LoggerFactory.getLogger(CacheArray.class).warn(e); 180 the.filter.terminated = true; 181 the.completed = false; 182 break; 183 } 184 } 185 try { 186 Thread.sleep(the.filter.getTimer()); 187 } catch (InterruptedException e) { 188 LoggerFactory.getLogger(CacheArray.class).warn(e); 189 the.filter.terminated = true; 190 the.completed = false; 191 break; 192 } 193 } 194 } finally { 195 196 if(the.completed) { 197 the.filter.completed(index); 198 setUsingPoolSize(-1); 199 } 200 201 if(the.filter.terminated){ 202 the.filter.terminated(); 203 setUsingPoolSize(-1); 204 } 205 206 the.removeAll(); 207 208 Thread.currentThread().interrupt(); 209 } 210 211 } 212 }; 213 if (arrayPool == null) { 214 Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE); 215 arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 216 } 217 arrayPool.execute(runnable); 218 } 219 220 /** 221 * Get all cache keys 222 * @return List 223 * */ 224 public synchronized List<String> getCacheKeys() { 225 return new ArrayList<String>(Collections.synchronizedList(cacheKeys)); 226 } 227 228 /** 229 * Get all cache size 230 * @reteurn int 231 * */ 232 public synchronized int size() { 233 return cacheKeys.size(); 234 } 235 236 /** 237 * Add object to the cache list 238 * @return - return cacke key 239 * @value - It is object value 240 * */ 241 public synchronized String add(T value) { 242 if(this.filter != null && !this.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()){ 243 String key = CommonTools.generateId(16); 244 StoredCache.set(key, value); 245 cacheKeys.add(key); 246 return key; 247 } 248 return null; 249 } 250 251 /** 252 * Add list to the cache list 253 * @param list - It is object list value 254 * */ 255 public synchronized void addAll(List<T> list) { 256 for (T value : list) { 257 add(value); 258 } 259 } 260 261 /** 262 * Remove cache list index object 263 * @return - return removed cache key name 264 * @param index - It is remove index cache 265 * */ 266 public synchronized String remove(int index) { 267 String key = cacheKeys.get(index); 268 StoredCache.remove(key); 269 cacheKeys.remove(index); 270 return key; 271 } 272 273 /** 274 * Remove all cache list 275 * */ 276 public synchronized void removeAll() { 277 for (String key : cacheKeys) { 278 StoredCache.remove(key); 279 } 280 cacheKeys.clear(); 281 } 282 283 /** 284 * Get cache value by index 285 * @return - Cache value 286 * @param index - Cache index 287 * */ 288 public synchronized T get(int index) { 289 String key = cacheKeys.get(index); 290 return (T) StoredCache.get(key); 291 } 292 293 /** 294 * Get cache key by index 295 * @return - Cache key 296 * @param index - Cache index 297 * */ 298 public synchronized String getCacheKey(int index) { 299 return cacheKeys.get(index); 300 } 301 302 /** 303 * Check complete status 304 * @return - 'true' is completed 305 * */ 306 public synchronized Boolean isCompleted() { 307 return completed; 308 } 309 310 public static Boolean isStoped() { 311 return stop; 312 } 313 314 public static Boolean isForceStoped() { 315 return forceStop; 316 } 317 318 public static void stop() { 319 stop = true; 320 } 321 322 public static void forceStop() { 323 stop = true; 324 forceStop = true; 325 } 326 327 public static void start() { 328 stop = false; 329 } 330 331 public void terminate(){ 332 if(filter != null) filter.terminate(); 333 } 334 335 public void forceTerminate(){ 336 if(filter != null) filter.forceTerminate(); 337 } 338 339 public Boolean isTerminated(){ 340 return filter == null ? true : filter.isTerminated(); 341 } 342 343 public Boolean isForceTerminated(){ 344 return filter == null ? true : filter.isForceTerminated(); 345 } 346 347 public Integer getIndex(){ 348 return INDEX; 349 } 350 351 public Integer getSize(){ 352 return SIZE; 353 } 354 355 public Object getObject(){ 356 return OBJECT; 357 } 358 359 private static synchronized Integer setUsingPoolSize(int value){ 360 return USING_POOL_SIZE += value; 361 } 362 363 public static synchronized Integer getUsingPoolSize(){ 364 return USING_POOL_SIZE; 365 } 366 367 public static Integer getMaxPoolSize(){ 368 return MAX_POOL_SIZE; 369 } 370 371 public static synchronized double getUsage(){ 372 return getUsingPoolSize()/(MAX_POOL_SIZE*1.0D); 373 } 374}