写点什么

利用多 condition 动态管理池化的异步资源背景

用户头像
极客good
关注
发布于: 刚刚

private final List<Condition> workerConditions;


public KeyPairExecutors(ReentrantLock lock, List<Condition> workerConditions) {this.cacheDAO = new RedisCacheDAO();this.workerConditions = workerConditions;this.lock = lock;}


/**


  • 工作线程构造方法*/public void start() {


int coreNum = workerConditions.size();ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(coreNum, coreNum, 120, TimeUnit.SECONDS,new SynchronousQueue<>(),new KeyPairGeneratorThreadFactory("keypair_gen_", workerConditions.size()),new ThreadPoolExecutor.DiscardOldestPolicy());


Stream.iterate(0, n -> n + 1).limit(workerConditions.size()).forEach( i -> poolExecutor.submit(new KeyPairRunable(cacheDAO, lock, workerConditions.get(i))));}


class KeyPairRunable implements Runnable {


private final RedisCacheDAO cacheDAO;private final ReentrantLock lock;private final Condition condition;


public KeyPairRunable(RedisCacheDAO cacheDAO, ReentrantLock lock, Condition condition) {this.cacheDAO = cacheDAO;this.lock = lock;this.condition = condition;}


@Overridepublic void run() {


while(true) {


String keyBytes = genKeyPair();


try {int currentSize = cacheDAO.listLpush(keyBytes);// 写入记录后实时返回当前 List 元素数 if(currentSize >= RedisCacheDAO.MAX_CACHE_SIZE) {System.out.println("cache is full. " + Thread.currentThread().getName() + " ready to park.");


lock.lock();condition.await();


System.out.println("cache is consuming. " + Thread.currentThread().getName() + " unparked.");}} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + " is interuupted.");} finally {


if(lock.isLocked()) {lock.unlock();}}}}


private String genKeyPair() {


// TODO 秘钥对桩 return "";}}


class KeyPairGeneratorThreadFactory implements ThreadFactory {


private final String threadGroupName;private final AtomicInteger idSeq;


public Key


【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


PairGeneratorThreadFactory(String threadGroupName, int maxSeq) {this.threadGroupName = threadGroupName;this.idSeq = new AtomicInteger(maxSeq);}


@Overridepublic Thread newThread(Runnable r) {


int threadId = idSeq.getAndDecrement();if(threadId < 0) {throw new UnsupportedOperationException("thread number cannot be out of range");}


return new Thread(r, threadGroupName + "_" + threadId);}}}


秘钥对生成 monitor


/**


  • 秘钥对生成定时调度*/public enum KeyPairsMonitor {


INSTANCE;


private final ReentrantLock reentrantLock;private final List<Condition> conditionList;private final RedisCacheDAO redisCacheDAO;private final int coreSize;


KeyPairsMonitor() {


this.redisCacheDAO = new RedisCacheDAO();this.reentrantLock = new ReentrantLock();


coreSize = Runtime.getRuntime().availableProcessors();this.conditionList = new ArrayList<>(coreSize);for( int i=0; i< coreSize; i++ ) {conditionList.add(reentrantLock.newCondition());}}


/**


  • 启动密钥生成任务,开启调度*/public void monitor() {


KeyPairExecutors executors = new KeyPairExecutors(reentrantLock, conditionList);executors.start();


buildMonitorSchedule();}


/**


  • 构造定时任务*/private void buildMonitorSchedule() {


ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();service.scheduleAtFixedRate(new Runnable() {


@Overridepublic void run() {int currentSize = redisCacheDAO.listLlen();System.out.println("current cache size is: " + currentSize);


int executNum = 0;if(currentSize <= RedisCacheDAO.HALF_MAX_CACHE_SIZE) {System.out.println("current cache level is under 50% to ." + currentSize);executNum = coreSize;} else if(currentSize <= RedisCacheDAO.PERCENT_75_MAX_CACHE_SIZE) {System.out.println("current cache level is under 75% to ." + currentSize);executNum = coreSize >> 1;}


for(int i=0; i < executNum; i++) {try {reentrantLock.lock();conditionList.get(i).signal();} catch (IllegalMonitorStateException e) {// do nothing, condition no await} catch (Exception e) {System.out.println(e.getMessage());} finally {if(reentrantLock.isLocked()) {reentrantLock.unlock();}}


}}


}, 0, 5, TimeUnit.SECONDS);}


打桩 redis 操作 List 操作:


public class RedisCacheDAO {


public static final String DEFAULT_KEYPAIE_CACHE_KEY = "keypaie_redis_list_rsa_byte";public static final int MAX_CACHE_SIZE = 1 << 4;public static final int HALF_MAX_CACHE_SIZE = MAX_CACHE_SIZE >> 1;public static final int PERCENT_75_MAX_CACHE_SIZE = MAX_CACHE_SIZE - (MAX_CACHE_SIZE >> 2);


private String key;private static final AtomicInteger count = new AtomicInteger(1);

用户头像

极客good

关注

还未添加个人签名 2021.03.18 加入

还未添加个人简介

评论

发布
暂无评论
利用多condition动态管理池化的异步资源背景