001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.cache; 025 026import java.io.Serializable; 027import java.util.List; 028import java.util.ArrayList; 029import java.util.Map; 030import com.killcoding.tool.CommonTools; 031import java.util.Collection; 032import java.util.Collections; 033import com.killcoding.cache.CacheArrayFilter; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import com.killcoding.log.LoggerFactory; 037import com.killcoding.log.Logger; 038import com.killcoding.cache.CacheArray; 039import java.util.concurrent.CopyOnWriteArrayList; 040 041/** 042 * This class use asynchronous caching to handle large result and large list. 043 * Need class CacheArrayFilter to use. 044 * */ 045public class CacheArray implements java.io.Serializable { 046 047 private static Integer MAX_POOL_SIZE = 100; 048 049 private static Integer USING_POOL_SIZE = 0; 050 051 private static ExecutorService arrayPool = null; 052 053 protected Object lastItem = null; 054 055 private static boolean stop = false; 056 057 private static boolean forceStop = false; 058 059 private CacheArrayFilter filter = null; 060 061 private boolean nullIsCompleted = true; 062 063 protected Integer INDEX = -1; 064 protected Integer SIZE = -1; 065 protected Object OBJECT = null; 066 protected Integer BATCH_INDEX = -1; 067 protected final List BATCH = new ArrayList(); 068 069 /** 070 * All cache keys list 071 * */ 072 private final List<String> cacheKeys = new CopyOnWriteArrayList<String>(); 073 074 /** 075 * 'true' is completed 076 * */ 077 private boolean completed = false; 078 079 /** 080 * New a CacheArray object 081 * */ 082 public CacheArray() { 083 super(); 084 this.nullIsCompleted = true; 085 } 086 087 public CacheArray(boolean nullIsCompleted) { 088 super(); 089 this.nullIsCompleted = nullIsCompleted; 090 } 091 092 public static synchronized void initPool(int poolSize) { 093 if (arrayPool == null) { 094 MAX_POOL_SIZE = poolSize; 095 Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE); 096 arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 097 } 098 } 099 100 public CacheArrayFilter filter(final long timer,final Runnable executeRun,final Runnable completedRun,final Runnable terminatedRun) { 101 CacheArray the = this; 102 the.filter = new CacheArrayFilter(timer){ 103 @Override 104 public void execute(Integer index, Object object){ 105 the.INDEX = index; 106 the.OBJECT = object; 107 108 if(executeRun != null) executeRun.run(); 109 } 110 @Override 111 public void completed(Integer size) { 112 the.SIZE = size; 113 114 if(completedRun != null) completedRun.run(); 115 } 116 @Override 117 public void terminated() { 118 119 if(terminatedRun != null) terminatedRun.run(); 120 } 121 }; 122 filter(the.filter); 123 return the.filter; 124 } 125 126 public CacheArrayFilter filter(final long timer,final long loopTimer,final Runnable executeRun,final Runnable executeBatchRun,final Runnable completedRun,final Runnable terminatedRun) { 127 CacheArray the = this; 128 the.filter = new CacheArrayFilter(timer,loopTimer){ 129 @Override 130 public void execute(Integer index, Object object){ 131 132 if(executeRun != null) executeRun.run(); 133 } 134 135 @Override 136 public void executeBatch(Integer index,List batch) { 137 138 if(executeBatchRun != null) executeBatchRun.run(); 139 } 140 141 @Override 142 public void completed(Integer size) { 143 144 if(completedRun != null) completedRun.run(); 145 } 146 147 @Override 148 public void terminated() { 149 150 if(terminatedRun != null) terminatedRun.run(); 151 } 152 }; 153 filter(the.filter); 154 return the.filter; 155 } 156 /** 157 * Need override the method to process logic handle 158 * @param filter - CacheArrayFilter 159 * */ 160 public synchronized void filter(CacheArrayFilter _filter) { 161 this.filter = _filter; 162 int usagePool = setUsingPoolSize(+1); 163 if(usagePool > MAX_POOL_SIZE){ 164 LoggerFactory.getLogger(CacheArray.class).debug(String.format("Exceeds the entire pool size ('%s' > '%s').",usagePool,MAX_POOL_SIZE)); 165 } 166 final CacheArray the = this; 167 Runnable runnable = new Runnable() { 168 @Override 169 public void run() { 170 int index = 0; 171 try { 172 while (!the.completed && !the.filter.isForceTerminated() && !isForceStoped()) { 173 174 if (Thread.currentThread().isInterrupted()){ 175 the.filter.terminated = true; 176 the.filter.forceTerminated = true; 177 the.completed = false; 178 break; 179 } 180 181 the.filter.setCacheArray(the); 182 int size = the.size(); 183 while (index < size && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) { 184 try { 185 if (Thread.currentThread().isInterrupted()){ 186 the.filter.terminated = true; 187 the.filter.forceTerminated = true; 188 the.completed = false; 189 break; 190 } 191 192 if (forceStop) { 193 the.filter.terminated = true; 194 the.filter.forceTerminated = true; 195 the.completed = false; 196 break; 197 } 198 199 if (the.filter.forceTerminated){ 200 the.completed = false; 201 break; 202 } 203 204 Object t = the.get(index); 205 if (t == null) { 206 the.completed = true; 207 break; 208 } 209 lastItem = t; 210 the.INDEX = index; 211 the.OBJECT = t; 212 BATCH.add(t); 213 the.filter.execute(index, t); 214 String cacheKey = the.getCacheKey(index); 215 StoredCache.remove(cacheKey); 216 } catch (Exception ee) { 217 LoggerFactory.getLogger(CacheArray.class).warn(ee); 218 } 219 index++; 220 try { 221 Thread.sleep(the.filter.getTimer()); 222 } catch (InterruptedException e) { 223 LoggerFactory.getLogger(CacheArray.class).warn(e); 224 the.filter.terminated = true; 225 the.completed = false; 226 break; 227 } 228 } 229 BATCH_INDEX++; 230 the.filter.executeBatch(BATCH_INDEX,BATCH); 231 BATCH.clear(); 232 try { 233 Thread.sleep(the.filter.getLoopTimer()); 234 } catch (InterruptedException e) { 235 LoggerFactory.getLogger(CacheArray.class).warn(e); 236 the.filter.terminated = true; 237 the.completed = false; 238 break; 239 } 240 } 241 } finally { 242 243 if(the.completed) { 244 the.SIZE = index; 245 the.filter.completed(index); 246 setUsingPoolSize(-1); 247 } 248 249 if(the.filter.terminated){ 250 the.filter.terminated(); 251 setUsingPoolSize(-1); 252 } 253 254 the.removeAll(); 255 256 Thread.currentThread().interrupt(); 257 } 258 259 } 260 }; 261 if (arrayPool == null) { 262 Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE); 263 arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 264 } 265 arrayPool.execute(runnable); 266 } 267 268 /** 269 * Get all cache keys 270 * @return List 271 * */ 272 public synchronized List<String> getCacheKeys() { 273 return new ArrayList<String>(Collections.synchronizedList(cacheKeys)); 274 } 275 276 /** 277 * Get all cache size 278 * @reteurn int 279 * */ 280 public synchronized int size() { 281 return cacheKeys.size(); 282 } 283 284 /** 285 * Add object to the cache list 286 * @return - return cacke key 287 * @value - It is object value 288 * */ 289 public synchronized String add(Object value) { 290 if(nullIsCompleted || (!nullIsCompleted && value != null)) { 291 if(this.filter != null && !this.filter.isForceTerminated() && !isForceStoped()){ 292 String key = CommonTools.generateId(16); 293 StoredCache.set(key, value); 294 cacheKeys.add(key); 295 return key; 296 } 297 } 298 return null; 299 } 300 301 /** 302 * Add object to the cache list 303 * @return - return cacke key 304 * @value - It is object value 305 * @sleepMs - It is object value 306 * @throws InterruptedException 307 * */ 308 public synchronized String add(Object value,long sleepMs) throws InterruptedException { 309 String key = add(value); 310 311 if(sleepMs > 0) Thread.sleep(sleepMs); 312 313 return key; 314 } 315 316 /** 317 * Add list to the cache list 318 * @param list - It is object list value 319 * */ 320 public synchronized void addAll(List list) { 321 for (Object value : list) { 322 add(value); 323 } 324 } 325 326 /** 327 * Remove cache list index object 328 * @return - return removed cache key name 329 * @param index - It is remove index cache 330 * */ 331 public synchronized String remove(int index) { 332 String key = cacheKeys.get(index); 333 StoredCache.remove(key); 334 cacheKeys.remove(index); 335 return key; 336 } 337 338 /** 339 * Remove all cache list 340 * */ 341 public synchronized void removeAll() { 342 for (String key : cacheKeys) { 343 StoredCache.remove(key); 344 } 345 cacheKeys.clear(); 346 } 347 348 /** 349 * Get cache value by index 350 * @return - Cache value 351 * @param index - Cache index 352 * */ 353 public synchronized Object get(int index) { 354 String key = cacheKeys.get(index); 355 return StoredCache.get(key); 356 } 357 358 /** 359 * Get cache key by index 360 * @return - Cache key 361 * @param index - Cache index 362 * */ 363 public synchronized String getCacheKey(int index) { 364 return cacheKeys.get(index); 365 } 366 367 /** 368 * Check complete status 369 * @return - 'true' is completed 370 * */ 371 public synchronized Boolean isCompleted() { 372 return completed; 373 } 374 375 public static Boolean isStoped() { 376 return stop; 377 } 378 379 public static Boolean isForceStoped() { 380 return forceStop; 381 } 382 383 public static void stop() { 384 stop = true; 385 } 386 387 public static void forceStop() { 388 stop = true; 389 forceStop = true; 390 } 391 392 public static void start() { 393 stop = false; 394 } 395 396 public void terminate(){ 397 if(filter != null) filter.terminate(); 398 } 399 400 public void forceTerminate(){ 401 if(filter != null) filter.forceTerminate(); 402 } 403 404 public Boolean isTerminated(){ 405 return filter == null ? true : filter.isTerminated(); 406 } 407 408 public Boolean isForceTerminated(){ 409 return filter == null ? true : filter.isForceTerminated(); 410 } 411 412 public Integer getIndex(){ 413 return INDEX; 414 } 415 416 public Integer getSize(){ 417 return SIZE; 418 } 419 420 public Object getObject(){ 421 return OBJECT; 422 } 423 424 public Object getBatch(){ 425 return BATCH; 426 } 427 428 public Integer getBatchIndex(){ 429 return BATCH_INDEX; 430 } 431 432 private static synchronized Integer setUsingPoolSize(int value){ 433 return USING_POOL_SIZE += value; 434 } 435 436 public static synchronized Integer getUsingPoolSize(){ 437 return USING_POOL_SIZE; 438 } 439 440 public static Integer getMaxPoolSize(){ 441 return MAX_POOL_SIZE; 442 } 443 444 public static synchronized double getUsage(){ 445 return getUsingPoolSize()/(MAX_POOL_SIZE*1.0D); 446 } 447 448 public boolean isNulIsCompleted(){ 449 return nullIsCompleted; 450 } 451 452 public void setNulIsCompleted(boolean nullIsCompleted){ 453 this.nullIsCompleted = nullIsCompleted; 454 } 455}