001/******************************************************************************* 002The MIT License (MIT) 003 004Copyright (c) 2024 KILLCODING.COM 005 006Permission is hereby granted, free of charge, to any person obtaining a copy 007of this software and associated documentation files (the "Software"), to deal 008in the Software without restriction, including without limitation the rights 009to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 010copies of the Software, and to permit persons to whom the Software is 011furnished to do so, subject to the following conditions: 012 013The above copyright notice and this permission notice shall be included in 014all copies or substantial portions of the Software. 015 016THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 017IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 018FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 019AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 020LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 021OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 022THE SOFTWARE. 023*****************************************************************************/ 024package com.killcoding.cache; 025 026import java.io.Serializable; 027import java.util.List; 028import java.util.ArrayList; 029import java.util.Map; 030import com.killcoding.tool.CommonTools; 031import java.util.Collection; 032import java.util.Collections; 033import com.killcoding.cache.CacheArrayFilter; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import com.killcoding.log.LoggerFactory; 037import com.killcoding.log.Logger; 038import com.killcoding.cache.CacheArray; 039 040/** 041 * This class use asynchronous caching to handle large result and large list. 042 * Need class CacheArrayFilter to use. 043 * */ 044public class CacheArray implements java.io.Serializable { 045 046 private static Integer MAX_POOL_SIZE = 100; 047 048 private static Integer USING_POOL_SIZE = 0; 049 050 private static ExecutorService arrayPool = null; 051 052 protected Object lastItem = null; 053 054 055 056 private static boolean stop = false; 057 058 private static boolean forceStop = false; 059 060 private CacheArrayFilter filter = null; 061 062 protected Integer INDEX = -1; 063 protected Integer SIZE = -1; 064 protected Object OBJECT = null; 065 protected Integer BATCH_INDEX = -1; 066 protected final List BATCH = new ArrayList(); 067 068 /** 069 * All cache keys list 070 * */ 071 private final List<String> cacheKeys = new ArrayList<String>(); 072 073 /** 074 * 'true' is completed 075 * */ 076 private boolean completed = false; 077 078 /** 079 * New a CacheArray object 080 * */ 081 public CacheArray() { 082 super(); 083 } 084 085 public static synchronized void initPool(int poolSize) { 086 if (arrayPool == null) { 087 MAX_POOL_SIZE = poolSize; 088 Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE); 089 arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 090 } 091 } 092 093 public CacheArrayFilter filter(final long timer,final Runnable executeRun,final Runnable completedRun,final Runnable terminatedRun) { 094 CacheArray the = this; 095 the.filter = new CacheArrayFilter(timer){ 096 @Override 097 public void execute(Integer index, Object object){ 098 the.INDEX = index; 099 the.OBJECT = object; 100 101 if(executeRun != null) executeRun.run(); 102 } 103 @Override 104 public void completed(Integer size) { 105 the.SIZE = size; 106 107 if(completedRun != null) completedRun.run(); 108 } 109 @Override 110 public void terminated() { 111 112 if(terminatedRun != null) terminatedRun.run(); 113 } 114 }; 115 filter(the.filter); 116 return the.filter; 117 } 118 119 public CacheArrayFilter filter(final long timer,final long loopTimer,final Runnable executeRun,final Runnable executeBatchRun,final Runnable completedRun,final Runnable terminatedRun) { 120 CacheArray the = this; 121 the.filter = new CacheArrayFilter(timer,loopTimer){ 122 @Override 123 public void execute(Integer index, Object object){ 124 125 if(executeRun != null) executeRun.run(); 126 } 127 128 @Override 129 public void executeBatch(Integer index,List batch) { 130 131 if(executeBatchRun != null) executeBatchRun.run(); 132 } 133 134 @Override 135 public void completed(Integer size) { 136 137 if(completedRun != null) completedRun.run(); 138 } 139 140 @Override 141 public void terminated() { 142 143 if(terminatedRun != null) terminatedRun.run(); 144 } 145 }; 146 filter(the.filter); 147 return the.filter; 148 } 149 /** 150 * Need override the method to process logic handle 151 * @param filter - CacheArrayFilter 152 * */ 153 public void filter(CacheArrayFilter _filter) { 154 this.filter = _filter; 155 int usagePool = setUsingPoolSize(+1); 156 if(usagePool > MAX_POOL_SIZE){ 157 LoggerFactory.getLogger(CacheArray.class).debug(String.format("Exceeds the entire pool size ('%s' > '%s').",usagePool,MAX_POOL_SIZE)); 158 } 159 final CacheArray the = this; 160 Runnable runnable = new Runnable() { 161 @Override 162 public void run() { 163 int index = 0; 164 try { 165 while (!the.completed && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) { 166 167 if (Thread.currentThread().isInterrupted()){ 168 the.filter.terminated = true; 169 the.filter.forceTerminated = true; 170 the.completed = false; 171 break; 172 } 173 174 the.filter.setCacheArray(the); 175 int size = the.size(); 176 while (index < size && !the.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()) { 177 try { 178 if (Thread.currentThread().isInterrupted()){ 179 the.filter.terminated = true; 180 the.filter.forceTerminated = true; 181 the.completed = false; 182 break; 183 } 184 185 if (forceStop) { 186 the.filter.terminated = true; 187 the.filter.forceTerminated = true; 188 the.completed = false; 189 break; 190 } 191 192 if (the.filter.forceTerminated){ 193 the.completed = false; 194 break; 195 } 196 197 Object t = the.get(index); 198 if (t == null) { 199 the.completed = true; 200 break; 201 } 202 lastItem = t; 203 the.INDEX = index; 204 the.OBJECT = t; 205 BATCH.add(t); 206 the.filter.execute(index, t); 207 String cacheKey = the.getCacheKey(index); 208 StoredCache.remove(cacheKey); 209 } catch (Exception ee) { 210 LoggerFactory.getLogger(CacheArray.class).warn(ee); 211 } 212 index++; 213 try { 214 Thread.sleep(the.filter.getTimer()); 215 } catch (InterruptedException e) { 216 LoggerFactory.getLogger(CacheArray.class).warn(e); 217 the.filter.terminated = true; 218 the.completed = false; 219 break; 220 } 221 } 222 BATCH_INDEX++; 223 the.filter.executeBatch(BATCH_INDEX,BATCH); 224 BATCH.clear(); 225 try { 226 Thread.sleep(the.filter.getLoopTimer()); 227 } catch (InterruptedException e) { 228 LoggerFactory.getLogger(CacheArray.class).warn(e); 229 the.filter.terminated = true; 230 the.completed = false; 231 break; 232 } 233 } 234 } finally { 235 236 if(the.completed) { 237 the.SIZE = index; 238 the.filter.completed(index); 239 setUsingPoolSize(-1); 240 } 241 242 if(the.filter.terminated){ 243 the.filter.terminated(); 244 setUsingPoolSize(-1); 245 } 246 247 the.removeAll(); 248 249 Thread.currentThread().interrupt(); 250 } 251 252 } 253 }; 254 if (arrayPool == null) { 255 Logger.systemMark(CacheArray.class,"MAX_POOL_SIZE={}",MAX_POOL_SIZE); 256 arrayPool = Executors.newFixedThreadPool(MAX_POOL_SIZE); 257 } 258 arrayPool.execute(runnable); 259 } 260 261 /** 262 * Get all cache keys 263 * @return List 264 * */ 265 public synchronized List<String> getCacheKeys() { 266 return new ArrayList<String>(Collections.synchronizedList(cacheKeys)); 267 } 268 269 /** 270 * Get all cache size 271 * @reteurn int 272 * */ 273 public synchronized int size() { 274 return cacheKeys.size(); 275 } 276 277 /** 278 * Add object to the cache list 279 * @return - return cacke key 280 * @value - It is object value 281 * */ 282 public synchronized String add(Object value) { 283 if(this.filter != null && !this.filter.isForceTerminated() && !isForceStoped() && !Thread.currentThread().isInterrupted()){ 284 String key = CommonTools.generateId(16); 285 StoredCache.set(key, value); 286 cacheKeys.add(key); 287 return key; 288 } 289 return null; 290 } 291 292 /** 293 * Add object to the cache list 294 * @return - return cacke key 295 * @value - It is object value 296 * @sleepMs - It is object value 297 * @throws InterruptedException 298 * */ 299 public synchronized String add(Object value,long sleepMs) throws InterruptedException { 300 String key = add(value); 301 Thread.sleep(sleepMs); 302 return key; 303 } 304 305 /** 306 * Add list to the cache list 307 * @param list - It is object list value 308 * */ 309 public synchronized void addAll(List list) { 310 for (Object value : list) { 311 add(value); 312 } 313 } 314 315 /** 316 * Remove cache list index object 317 * @return - return removed cache key name 318 * @param index - It is remove index cache 319 * */ 320 public synchronized String remove(int index) { 321 String key = cacheKeys.get(index); 322 StoredCache.remove(key); 323 cacheKeys.remove(index); 324 return key; 325 } 326 327 /** 328 * Remove all cache list 329 * */ 330 public synchronized void removeAll() { 331 for (String key : cacheKeys) { 332 StoredCache.remove(key); 333 } 334 cacheKeys.clear(); 335 } 336 337 /** 338 * Get cache value by index 339 * @return - Cache value 340 * @param index - Cache index 341 * */ 342 public synchronized Object get(int index) { 343 String key = cacheKeys.get(index); 344 return StoredCache.get(key); 345 } 346 347 /** 348 * Get cache key by index 349 * @return - Cache key 350 * @param index - Cache index 351 * */ 352 public synchronized String getCacheKey(int index) { 353 return cacheKeys.get(index); 354 } 355 356 /** 357 * Check complete status 358 * @return - 'true' is completed 359 * */ 360 public synchronized Boolean isCompleted() { 361 return completed; 362 } 363 364 public static Boolean isStoped() { 365 return stop; 366 } 367 368 public static Boolean isForceStoped() { 369 return forceStop; 370 } 371 372 public static void stop() { 373 stop = true; 374 } 375 376 public static void forceStop() { 377 stop = true; 378 forceStop = true; 379 } 380 381 public static void start() { 382 stop = false; 383 } 384 385 public void terminate(){ 386 if(filter != null) filter.terminate(); 387 } 388 389 public void forceTerminate(){ 390 if(filter != null) filter.forceTerminate(); 391 } 392 393 public Boolean isTerminated(){ 394 return filter == null ? true : filter.isTerminated(); 395 } 396 397 public Boolean isForceTerminated(){ 398 return filter == null ? true : filter.isForceTerminated(); 399 } 400 401 public Integer getIndex(){ 402 return INDEX; 403 } 404 405 public Integer getSize(){ 406 return SIZE; 407 } 408 409 public Object getObject(){ 410 return OBJECT; 411 } 412 413 public Object getBatch(){ 414 return BATCH; 415 } 416 417 public Integer getBatchIndex(){ 418 return BATCH_INDEX; 419 } 420 421 private static synchronized Integer setUsingPoolSize(int value){ 422 return USING_POOL_SIZE += value; 423 } 424 425 public static synchronized Integer getUsingPoolSize(){ 426 return USING_POOL_SIZE; 427 } 428 429 public static Integer getMaxPoolSize(){ 430 return MAX_POOL_SIZE; 431 } 432 433 public static synchronized double getUsage(){ 434 return getUsingPoolSize()/(MAX_POOL_SIZE*1.0D); 435 } 436}