利用多 condition 动态管理池化的异步资源背景
private final List<Condition> workerConditions;
public KeyPairExecutors(ReentrantLock lock, List<Condition> workerConditions) {this.cacheDAO = new RedisCacheDAO();this.workerConditions = workerConditions;this.lock = lock;}
/**
工作线程构造方法*/public void start() {
int coreNum = workerConditions.size();ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(coreNum, coreNum, 120, TimeUnit.SECONDS,new SynchronousQueue<>(),new KeyPairGeneratorThreadFactory("keypair_gen_", workerConditions.size()),new ThreadPoolExecutor.DiscardOldestPolicy());
Stream.iterate(0, n -> n + 1).limit(workerConditions.size()).forEach( i -> poolExecutor.submit(new KeyPairRunable(cacheDAO, lock, workerConditions.get(i))));}
class KeyPairRunable implements Runnable {
private final RedisCacheDAO cacheDAO;private final ReentrantLock lock;private final Condition condition;
public KeyPairRunable(RedisCacheDAO cacheDAO, ReentrantLock lock, Condition condition) {this.cacheDAO = cacheDAO;this.lock = lock;this.condition = condition;}
@Overridepublic void run() {
while(true) {
String keyBytes = genKeyPair();
try {int currentSize = cacheDAO.listLpush(keyBytes);// 写入记录后实时返回当前 List 元素数 if(currentSize >= RedisCacheDAO.MAX_CACHE_SIZE) {System.out.println("cache is full. " + Thread.currentThread().getName() + " ready to park.");
lock.lock();condition.await();
System.out.println("cache is consuming. " + Thread.currentThread().getName() + " unparked.");}} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + " is interuupted.");} finally {
if(lock.isLocked()) {lock.unlock();}}}}
private String genKeyPair() {
// TODO 秘钥对桩 return "";}}
class KeyPairGeneratorThreadFactory implements ThreadFactory {
private final String threadGroupName;private final AtomicInteger idSeq;
public Key
PairGeneratorThreadFactory(String threadGroupName, int maxSeq) {this.threadGroupName = threadGroupName;this.idSeq = new AtomicInteger(maxSeq);}
@Overridepublic Thread newThread(Runnable r) {
int threadId = idSeq.getAndDecrement();if(threadId < 0) {throw new UnsupportedOperationException("thread number cannot be out of range");}
return new Thread(r, threadGroupName + "_" + threadId);}}}
秘钥对生成 monitor
/**
秘钥对生成定时调度*/public enum KeyPairsMonitor {
INSTANCE;
private final ReentrantLock reentrantLock;private final List<Condition> conditionList;private final RedisCacheDAO redisCacheDAO;private final int coreSize;
KeyPairsMonitor() {
this.redisCacheDAO = new RedisCacheDAO();this.reentrantLock = new ReentrantLock();
coreSize = Runtime.getRuntime().availableProcessors();this.conditionList = new ArrayList<>(coreSize);for( int i=0; i< coreSize; i++ ) {conditionList.add(reentrantLock.newCondition());}}
/**
启动密钥生成任务,开启调度*/public void monitor() {
KeyPairExecutors executors = new KeyPairExecutors(reentrantLock, conditionList);executors.start();
buildMonitorSchedule();}
/**
构造定时任务*/private void buildMonitorSchedule() {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();service.scheduleAtFixedRate(new Runnable() {
@Overridepublic void run() {int currentSize = redisCacheDAO.listLlen();System.out.println("current cache size is: " + currentSize);
int executNum = 0;if(currentSize <= RedisCacheDAO.HALF_MAX_CACHE_SIZE) {System.out.println("current cache level is under 50% to ." + currentSize);executNum = coreSize;} else if(currentSize <= RedisCacheDAO.PERCENT_75_MAX_CACHE_SIZE) {System.out.println("current cache level is under 75% to ." + currentSize);executNum = coreSize >> 1;}
for(int i=0; i < executNum; i++) {try {reentrantLock.lock();conditionList.get(i).signal();} catch (IllegalMonitorStateException e) {// do nothing, condition no await} catch (Exception e) {System.out.println(e.getMessage());} finally {if(reentrantLock.isLocked()) {reentrantLock.unlock();}}
}}
}, 0, 5, TimeUnit.SECONDS);}
打桩 redis 操作 List 操作:
public class RedisCacheDAO {
public static final String DEFAULT_KEYPAIE_CACHE_KEY = "keypaie_redis_list_rsa_byte";public static final int MAX_CACHE_SIZE = 1 << 4;public static final int HALF_MAX_CACHE_SIZE = MAX_CACHE_SIZE >> 1;public static final int PERCENT_75_MAX_CACHE_SIZE = MAX_CACHE_SIZE - (MAX_CACHE_SIZE >> 2);
private String key;private static final AtomicInteger count = new AtomicInteger(1);
评论