001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.cache; 025 026import java.io.Serializable; 027import java.util.List; 028import java.util.ArrayList; 029import java.util.Map; 030import com.killcoding.tool.CommonTools; 031import java.util.Collection; 032import java.util.Collections; 033import com.killcoding.cache.CacheArrayFilter; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import com.killcoding.log.LoggerFactory; 037import com.killcoding.log.Logger; 038import com.killcoding.cache.CacheArray; 039 040/** 041 * This class use asynchronous caching to handle large result and large list. 042 * Need class CacheArrayFilter to use. 043 * */ 044public class CacheArray implements java.io.Serializable { 045 046 private static Integer MAX_POOL_SIZE = 100; 047 048 private static Integer USING_POOL_SIZE = 0; 049 050 private static ExecutorService arrayPool = null; 051 052 protected Object lastItem = null; 053 054 private static boolean stop = false; 055 056 private static boolean forceStop = false; 057 058 private CacheArrayFilter filter = null; 059 060 private boolean nullIsCompleted = true; 061 062 protected Integer INDEX = -1; 063 protected Integer SIZE = -1; 064 protected Object OBJECT = null; 065 protected Integer BATCH_INDEX = -1; 066 protected final List BATCH = new ArrayList(); 067 068 /** 069 * All cache keys list 070 * */ 071 private final List<String> cacheKeys = new ArrayList<String>(); 072 073 /** 074 * 'true' is completed 075 * */ 076 private boolean completed = false; 077 078 /** 079 * New a CacheArray object 080 * */ 081 public CacheArray() { 082 super(); 083 this.nullIsCompleted = true; 084 } 085 086 public CacheArray(boolean nullIsCompleted) { 087 super(); 088 this.nullIsCompleted = nullIsCompleted; 089 } 090 091 public static synchronized void initPool(int poolSize) { 092 if (arrayPool == null) { 093 MAX_POOL_SIZE = poolSize; 094 Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE); 095 arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 096 } 097 } 098 099 public CacheArrayFilter filter(final long timer,final Runnable executeRun,final Runnable completedRun,final Runnable terminatedRun) { 100 CacheArray the = this; 101 the.filter = new CacheArrayFilter(timer){ 102 @Override 103 public void execute(Integer index, Object object){ 104 the.INDEX = index; 105 the.OBJECT = object; 106 107 if(executeRun != null) executeRun.run(); 108 } 109 @Override 110 public void completed(Integer size) { 111 the.SIZE = size; 112 113 if(completedRun != null) completedRun.run(); 114 } 115 @Override 116 public void terminated() { 117 118 if(terminatedRun != null) terminatedRun.run(); 119 } 120 }; 121 filter(the.filter); 122 return the.filter; 123 } 124 125 public CacheArrayFilter filter(final long timer,final long loopTimer,final Runnable executeRun,final Runnable executeBatchRun,final Runnable completedRun,final Runnable terminatedRun) { 126 CacheArray the = this; 127 the.filter = new CacheArrayFilter(timer,loopTimer){ 128 @Override 129 public void execute(Integer index, Object object){ 130 131 if(executeRun != null) executeRun.run(); 132 } 133 134 @Override 135 public void executeBatch(Integer index,List batch) { 136 137 if(executeBatchRun != null) executeBatchRun.run(); 138 } 139 140 @Override 141 public void completed(Integer size) { 142 143 if(completedRun != null) completedRun.run(); 144 } 145 146 @Override 147 public void terminated() { 148 149 if(terminatedRun != null) terminatedRun.run(); 150 } 151 }; 152 filter(the.filter); 153 return the.filter; 154 } 155 /** 156 * Need override the method to process logic handle 157 * @param filter - CacheArrayFilter 158 * */ 159 public synchronized void filter(CacheArrayFilter _filter) { 160 this.filter = _filter; 161 int usagePool = setUsingPoolSize(+1); 162 if(usagePool > MAX_POOL_SIZE){ 163 LoggerFactory.getLogger(CacheArray.class).debug(String.format("Exceeds the entire pool size ('%s' > '%s').",usagePool,MAX_POOL_SIZE)); 164 } 165 final CacheArray the = this; 166 Runnable runnable = new Runnable() { 167 @Override 168 public void run() { 169 int index = 0; 170 try { 171 while (!the.completed && !the.filter.isForceTerminated() && !isForceStoped()) { 172 173 if (Thread.currentThread().isInterrupted()){ 174 the.filter.terminated = true; 175 the.filter.forceTerminated = true; 176 the.completed = false; 177 break; 178 } 179 180 the.filter.setCacheArray(the); 181 int size = the.size(); 182 while (index < size && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) { 183 try { 184 if (Thread.currentThread().isInterrupted()){ 185 the.filter.terminated = true; 186 the.filter.forceTerminated = true; 187 the.completed = false; 188 break; 189 } 190 191 if (forceStop) { 192 the.filter.terminated = true; 193 the.filter.forceTerminated = true; 194 the.completed = false; 195 break; 196 } 197 198 if (the.filter.forceTerminated){ 199 the.completed = false; 200 break; 201 } 202 203 Object t = the.get(index); 204 if (t == null) { 205 the.completed = true; 206 break; 207 } 208 lastItem = t; 209 the.INDEX = index; 210 the.OBJECT = t; 211 BATCH.add(t); 212 the.filter.execute(index, t); 213 String cacheKey = the.getCacheKey(index); 214 StoredCache.remove(cacheKey); 215 } catch (Exception ee) { 216 LoggerFactory.getLogger(CacheArray.class).warn(ee); 217 } 218 index++; 219 try { 220 Thread.sleep(the.filter.getTimer()); 221 } catch (InterruptedException e) { 222 LoggerFactory.getLogger(CacheArray.class).warn(e); 223 the.filter.terminated = true; 224 the.completed = false; 225 break; 226 } 227 } 228 BATCH_INDEX++; 229 the.filter.executeBatch(BATCH_INDEX,BATCH); 230 BATCH.clear(); 231 try { 232 Thread.sleep(the.filter.getLoopTimer()); 233 } catch (InterruptedException e) { 234 LoggerFactory.getLogger(CacheArray.class).warn(e); 235 the.filter.terminated = true; 236 the.completed = false; 237 break; 238 } 239 } 240 } finally { 241 242 if(the.completed) { 243 the.SIZE = index; 244 the.filter.completed(index); 245 setUsingPoolSize(-1); 246 } 247 248 if(the.filter.terminated){ 249 the.filter.terminated(); 250 setUsingPoolSize(-1); 251 } 252 253 the.removeAll(); 254 255 //Thread.currentThread().interrupt(); 256 } 257 258 } 259 }; 260 if (arrayPool == null) { 261 Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE); 262 arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 263 } 264 arrayPool.execute(runnable); 265 } 266 267 /** 268 * Get all cache keys 269 * @return List 270 * */ 271 public synchronized List<String> getCacheKeys() { 272 return new ArrayList<String>(Collections.synchronizedList(cacheKeys)); 273 } 274 275 /** 276 * Get all cache size 277 * @reteurn int 278 * */ 279 public synchronized int size() { 280 return cacheKeys.size(); 281 } 282 283 /** 284 * Add object to the cache list 285 * @return - return cacke key 286 * @value - It is object value 287 * */ 288 public synchronized String add(Object value) { 289 if(nullIsCompleted || (!nullIsCompleted && value != null)) { 290 if(this.filter != null && !this.filter.isForceTerminated() && !isForceStoped()){ 291 String key = CommonTools.generateId(16); 292 StoredCache.set(key, value); 293 cacheKeys.add(key); 294 return key; 295 } 296 } 297 return null; 298 } 299 300 /** 301 * Add object to the cache list 302 * @return - return cacke key 303 * @value - It is object value 304 * @sleepMs - It is object value 305 * @throws InterruptedException 306 * */ 307 public synchronized String add(Object value,long sleepMs) throws InterruptedException { 308 String key = add(value); 309 Thread.sleep(sleepMs); 310 return key; 311 } 312 313 /** 314 * Add list to the cache list 315 * @param list - It is object list value 316 * */ 317 public synchronized void addAll(List list) { 318 for (Object value : list) { 319 add(value); 320 } 321 } 322 323 /** 324 * Remove cache list index object 325 * @return - return removed cache key name 326 * @param index - It is remove index cache 327 * */ 328 public synchronized String remove(int index) { 329 String key = cacheKeys.get(index); 330 StoredCache.remove(key); 331 cacheKeys.remove(index); 332 return key; 333 } 334 335 /** 336 * Remove all cache list 337 * */ 338 public synchronized void removeAll() { 339 for (String key : cacheKeys) { 340 StoredCache.remove(key); 341 } 342 cacheKeys.clear(); 343 } 344 345 /** 346 * Get cache value by index 347 * @return - Cache value 348 * @param index - Cache index 349 * */ 350 public synchronized Object get(int index) { 351 String key = cacheKeys.get(index); 352 return StoredCache.get(key); 353 } 354 355 /** 356 * Get cache key by index 357 * @return - Cache key 358 * @param index - Cache index 359 * */ 360 public synchronized String getCacheKey(int index) { 361 return cacheKeys.get(index); 362 } 363 364 /** 365 * Check complete status 366 * @return - 'true' is completed 367 * */ 368 public synchronized Boolean isCompleted() { 369 return completed; 370 } 371 372 public static Boolean isStoped() { 373 return stop; 374 } 375 376 public static Boolean isForceStoped() { 377 return forceStop; 378 } 379 380 public static void stop() { 381 stop = true; 382 } 383 384 public static void forceStop() { 385 stop = true; 386 forceStop = true; 387 } 388 389 public static void start() { 390 stop = false; 391 } 392 393 public void terminate(){ 394 if(filter != null) filter.terminate(); 395 } 396 397 public void forceTerminate(){ 398 if(filter != null) filter.forceTerminate(); 399 } 400 401 public Boolean isTerminated(){ 402 return filter == null ? true : filter.isTerminated(); 403 } 404 405 public Boolean isForceTerminated(){ 406 return filter == null ? true : filter.isForceTerminated(); 407 } 408 409 public Integer getIndex(){ 410 return INDEX; 411 } 412 413 public Integer getSize(){ 414 return SIZE; 415 } 416 417 public Object getObject(){ 418 return OBJECT; 419 } 420 421 public Object getBatch(){ 422 return BATCH; 423 } 424 425 public Integer getBatchIndex(){ 426 return BATCH_INDEX; 427 } 428 429 private static synchronized Integer setUsingPoolSize(int value){ 430 return USING_POOL_SIZE += value; 431 } 432 433 public static synchronized Integer getUsingPoolSize(){ 434 return USING_POOL_SIZE; 435 } 436 437 public static Integer getMaxPoolSize(){ 438 return MAX_POOL_SIZE; 439 } 440 441 public static synchronized double getUsage(){ 442 return getUsingPoolSize()/(MAX_POOL_SIZE*1.0D); 443 } 444 445 public boolean isNulIsCompleted(){ 446 return nullIsCompleted; 447 } 448 449 public void setNulIsCompleted(boolean nullIsCompleted){ 450 this.nullIsCompleted = nullIsCompleted; 451 } 452}