001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.cache; 025 026import java.io.Serializable; 027import java.util.List; 028import java.util.ArrayList; 029import java.util.Map; 030import com.killcoding.tool.CommonTools; 031import java.util.Collection; 032import java.util.Collections; 033import com.killcoding.cache.CacheArrayFilter; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import com.killcoding.log.LoggerFactory; 037import com.killcoding.log.Logger; 038import com.killcoding.cache.CacheArray; 039 040/** 041 * This class use asynchronous caching to handle large result and large list. 042 * Need class CacheArrayFilter to use. 043 * */ 044public class CacheArray implements java.io.Serializable { 045 046 private static Integer MAX_POOL_SIZE = 100; 047 048 private static Integer USING_POOL_SIZE = 0; 049 050 private static ExecutorService arrayPool = null; 051 052 protected Object lastItem = null; 053 054 private static boolean stop = false; 055 056 private static boolean forceStop = false; 057 058 private CacheArrayFilter filter = null; 059 060 private boolean nullIsCompleted = true; 061 062 protected Integer INDEX = -1; 063 protected Integer SIZE = -1; 064 protected Object OBJECT = null; 065 protected Integer BATCH_INDEX = -1; 066 protected final List BATCH = new ArrayList(); 067 068 /** 069 * All cache keys list 070 * */ 071 private final List<String> cacheKeys = new ArrayList<String>(); 072 073 /** 074 * 'true' is completed 075 * */ 076 private boolean completed = false; 077 078 /** 079 * New a CacheArray object 080 * */ 081 public CacheArray() { 082 super(); 083 this.nullIsCompleted = true; 084 } 085 086 public CacheArray(boolean nullIsCompleted) { 087 super(); 088 this.nullIsCompleted = nullIsCompleted; 089 } 090 091 public static synchronized void initPool(int poolSize) { 092 if (arrayPool == null) { 093 MAX_POOL_SIZE = poolSize; 094 Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE); 095 arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 096 } 097 } 098 099 public CacheArrayFilter filter(final long timer,final Runnable executeRun,final Runnable completedRun,final Runnable terminatedRun) { 100 CacheArray the = this; 101 the.filter = new CacheArrayFilter(timer){ 102 @Override 103 public void execute(Integer index, Object object){ 104 the.INDEX = index; 105 the.OBJECT = object; 106 107 if(executeRun != null) executeRun.run(); 108 } 109 @Override 110 public void completed(Integer size) { 111 the.SIZE = size; 112 113 if(completedRun != null) completedRun.run(); 114 } 115 @Override 116 public void terminated() { 117 118 if(terminatedRun != null) terminatedRun.run(); 119 } 120 }; 121 filter(the.filter); 122 return the.filter; 123 } 124 125 public CacheArrayFilter filter(final long timer,final long loopTimer,final Runnable executeRun,final Runnable executeBatchRun,final Runnable completedRun,final Runnable terminatedRun) { 126 CacheArray the = this; 127 the.filter = new CacheArrayFilter(timer,loopTimer){ 128 @Override 129 public void execute(Integer index, Object object){ 130 131 if(executeRun != null) executeRun.run(); 132 } 133 134 @Override 135 public void executeBatch(Integer index,List batch) { 136 137 if(executeBatchRun != null) executeBatchRun.run(); 138 } 139 140 @Override 141 public void completed(Integer size) { 142 143 if(completedRun != null) completedRun.run(); 144 } 145 146 @Override 147 public void terminated() { 148 149 if(terminatedRun != null) terminatedRun.run(); 150 } 151 }; 152 filter(the.filter); 153 return the.filter; 154 } 155 /** 156 * Need override the method to process logic handle 157 * @param filter - CacheArrayFilter 158 * */ 159 public synchronized void filter(CacheArrayFilter _filter) { 160 this.filter = _filter; 161 int usagePool = setUsingPoolSize(+1); 162 if(usagePool > MAX_POOL_SIZE){ 163 LoggerFactory.getLogger(CacheArray.class).debug(String.format("Exceeds the entire pool size ('%s' > '%s').",usagePool,MAX_POOL_SIZE)); 164 } 165 final CacheArray the = this; 166 Runnable runnable = new Runnable() { 167 @Override 168 public void run() { 169 int index = 0; 170 try { 171 while (!the.completed && !the.filter.isForceTerminated() && !isForceStoped()) { 172 173 if (Thread.currentThread().isInterrupted()){ 174 the.filter.terminated = true; 175 the.filter.forceTerminated = true; 176 the.completed = false; 177 break; 178 } 179 180 the.filter.setCacheArray(the); 181 int size = the.size(); 182 while (index < size && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) { 183 try { 184 if (Thread.currentThread().isInterrupted()){ 185 the.filter.terminated = true; 186 the.filter.forceTerminated = true; 187 the.completed = false; 188 break; 189 } 190 191 if (forceStop) { 192 the.filter.terminated = true; 193 the.filter.forceTerminated = true; 194 the.completed = false; 195 break; 196 } 197 198 if (the.filter.forceTerminated){ 199 the.completed = false; 200 break; 201 } 202 203 Object t = the.get(index); 204 if (t == null) { 205 the.completed = true; 206 break; 207 } 208 lastItem = t; 209 the.INDEX = index; 210 the.OBJECT = t; 211 BATCH.add(t); 212 the.filter.execute(index, t); 213 String cacheKey = the.getCacheKey(index); 214 StoredCache.remove(cacheKey); 215 } catch (Exception ee) { 216 LoggerFactory.getLogger(CacheArray.class).warn(ee); 217 } 218 index++; 219 try { 220 Thread.sleep(the.filter.getTimer()); 221 } catch (InterruptedException e) { 222 LoggerFactory.getLogger(CacheArray.class).warn(e); 223 the.filter.terminated = true; 224 the.completed = false; 225 break; 226 } 227 } 228 BATCH_INDEX++; 229 the.filter.executeBatch(BATCH_INDEX,BATCH); 230 BATCH.clear(); 231 try { 232 Thread.sleep(the.filter.getLoopTimer()); 233 } catch (InterruptedException e) { 234 LoggerFactory.getLogger(CacheArray.class).warn(e); 235 the.filter.terminated = true; 236 the.completed = false; 237 break; 238 } 239 } 240 } finally { 241 242 if(the.completed) { 243 the.SIZE = index; 244 the.filter.completed(index); 245 setUsingPoolSize(-1); 246 } 247 248 if(the.filter.terminated){ 249 the.filter.terminated(); 250 setUsingPoolSize(-1); 251 } 252 253 the.removeAll(); 254 255 Thread.currentThread().interrupt(); 256 } 257 258 } 259 }; 260 if (arrayPool == null) { 261 Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE); 262 arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 263 } 264 arrayPool.execute(runnable); 265 } 266 267 /** 268 * Get all cache keys 269 * @return List 270 * */ 271 public synchronized List<String> getCacheKeys() { 272 return new ArrayList<String>(Collections.synchronizedList(cacheKeys)); 273 } 274 275 /** 276 * Get all cache size 277 * @reteurn int 278 * */ 279 public synchronized int size() { 280 return cacheKeys.size(); 281 } 282 283 /** 284 * Add object to the cache list 285 * @return - return cacke key 286 * @value - It is object value 287 * */ 288 public synchronized String add(Object value) { 289 if(nullIsCompleted || (!nullIsCompleted && value != null)) { 290 if(this.filter != null && !this.filter.isForceTerminated() && !isForceStoped()){ 291 String key = CommonTools.generateId(16); 292 StoredCache.set(key, value); 293 cacheKeys.add(key); 294 return key; 295 } 296 } 297 return null; 298 } 299 300 /** 301 * Add object to the cache list 302 * @return - return cacke key 303 * @value - It is object value 304 * @sleepMs - It is object value 305 * @throws InterruptedException 306 * */ 307 public synchronized String add(Object value,long sleepMs) throws InterruptedException { 308 String key = add(value); 309 310 if(sleepMs > 0) Thread.sleep(sleepMs); 311 312 return key; 313 } 314 315 /** 316 * Add list to the cache list 317 * @param list - It is object list value 318 * */ 319 public synchronized void addAll(List list) { 320 for (Object value : list) { 321 add(value); 322 } 323 } 324 325 /** 326 * Remove cache list index object 327 * @return - return removed cache key name 328 * @param index - It is remove index cache 329 * */ 330 public synchronized String remove(int index) { 331 String key = cacheKeys.get(index); 332 StoredCache.remove(key); 333 cacheKeys.remove(index); 334 return key; 335 } 336 337 /** 338 * Remove all cache list 339 * */ 340 public synchronized void removeAll() { 341 for (String key : cacheKeys) { 342 StoredCache.remove(key); 343 } 344 cacheKeys.clear(); 345 } 346 347 /** 348 * Get cache value by index 349 * @return - Cache value 350 * @param index - Cache index 351 * */ 352 public synchronized Object get(int index) { 353 String key = cacheKeys.get(index); 354 return StoredCache.get(key); 355 } 356 357 /** 358 * Get cache key by index 359 * @return - Cache key 360 * @param index - Cache index 361 * */ 362 public synchronized String getCacheKey(int index) { 363 return cacheKeys.get(index); 364 } 365 366 /** 367 * Check complete status 368 * @return - 'true' is completed 369 * */ 370 public synchronized Boolean isCompleted() { 371 return completed; 372 } 373 374 public static Boolean isStoped() { 375 return stop; 376 } 377 378 public static Boolean isForceStoped() { 379 return forceStop; 380 } 381 382 public static void stop() { 383 stop = true; 384 } 385 386 public static void forceStop() { 387 stop = true; 388 forceStop = true; 389 } 390 391 public static void start() { 392 stop = false; 393 } 394 395 public void terminate(){ 396 if(filter != null) filter.terminate(); 397 } 398 399 public void forceTerminate(){ 400 if(filter != null) filter.forceTerminate(); 401 } 402 403 public Boolean isTerminated(){ 404 return filter == null ? true : filter.isTerminated(); 405 } 406 407 public Boolean isForceTerminated(){ 408 return filter == null ? true : filter.isForceTerminated(); 409 } 410 411 public Integer getIndex(){ 412 return INDEX; 413 } 414 415 public Integer getSize(){ 416 return SIZE; 417 } 418 419 public Object getObject(){ 420 return OBJECT; 421 } 422 423 public Object getBatch(){ 424 return BATCH; 425 } 426 427 public Integer getBatchIndex(){ 428 return BATCH_INDEX; 429 } 430 431 private static synchronized Integer setUsingPoolSize(int value){ 432 return USING_POOL_SIZE += value; 433 } 434 435 public static synchronized Integer getUsingPoolSize(){ 436 return USING_POOL_SIZE; 437 } 438 439 public static Integer getMaxPoolSize(){ 440 return MAX_POOL_SIZE; 441 } 442 443 public static synchronized double getUsage(){ 444 return getUsingPoolSize()/(MAX_POOL_SIZE*1.0D); 445 } 446 447 public boolean isNulIsCompleted(){ 448 return nullIsCompleted; 449 } 450 451 public void setNulIsCompleted(boolean nullIsCompleted){ 452 this.nullIsCompleted = nullIsCompleted; 453 } 454}