前几天分享过固定QPS异步任务功能初探使用了缓存线程池,利用java.util.concurrent.Semaphore实现了固定 QPS 的异步任务。
昨天录制视频的时候,谈到第二种放弃的实现思路,就是通过将任务丢到一个队列中,然后通过一个线程从线程池取任务,丢到线程池里面执行。今天早上仔细想了想还是很值得实现了,能够很好地降低线程数,刚好我之前在Java自定义异步功能实践 2021-10-19中用到了守护线程,正好充当这个任务管理线程。
下面就是具体的实现:
入口方法
这里依然写到了com.funtester.frame.SourceCode工具类中,关键字我用看funer,区别去之前的funner。
代码如下:
/** * 以固定QPS,异步执行,默认16,调整方法{@link SourceCode#setMaxQps(int)} * 改种方式在main线程结束后会以QPS /5 +5执行等待任务 * * @param f */ public static void funer(Closure f) { fun(JToG.toClosure(() -> { ThreadPoolUtil.addQPSTask(f); return null; })); }
复制代码
实现细节
这里队列我选用了java.util.concurrent.LinkedBlockingQueue,用来存储需要执行的异步任务。添加任务的方法:
/** * 添加异步固定QPS的任务 * {@link java.util.concurrent.LinkedBlockingQueue#offer(java.lang.Object)}是非阻塞,返回Boolean值 * @param closure * @return */ static def addQPSTask(Closure closure) { asyncQueue.offer(closure) }
复制代码
守护线程添加新执行逻辑:
static boolean daemon() { def thread = new Thread(new Runnable() {
@Override void run() { SourceCode.noError { while (checkMain()) { SourceCode.sleep(1.0) ASYNC_QPS.times {executeCacheSync()} } waitAsyncIdle() } ThreadPoolUtil.shutPool() } }) thread.setDaemon(true) thread.setName("Deamon") thread.start() }
复制代码
其中执行异步任务功能:
/** * 使用缓存线程池执行任务,适配第二种方式{@link com.funtester.frame.SourceCode#funer(groovy.lang.Closure)} * @return */ static def executeCacheSync() { def poll = asyncQueue.poll() if (poll != null) executeCacheSync({poll()}) }
复制代码
针对本地版本,还有一个关闭线程池的功能,这个主要是防止程序停不下来,跟固定线程的异步任务功能类似,所以写到了一起。
/** * 等待异步线程池空闲 */ static void waitAsyncIdle() { if (asyncPool == null) return SourceCode.time({ SourceCode.waitFor { ((int) (ASYNC_QPS / 5) + 1).times {executeCacheSync()} asyncPool.getQueue().size() == 0 && asyncPool.getActiveCount() == 0 && asyncQueue.size() == 0 } }, "异步线程池等待") }
/** * 关闭异步线程池,不然会停不下来*/ static void shutPool() { if (!getFunPool().isShutdown()) { log.info(Output.rgb("异步线程池关闭!")) getFunPool().shutdown() } if (cachePool != null && !cachePool.isShutdown()) { cachePool.shutdown() } }
复制代码
自测
用例:
static void main(String[] args) { setMaxQps(1) 10.times { funer { output(Time.getDate()) } } sleep(5.0) output("FunTester") }
复制代码
控制台输出:
22:02:36.770 main ###### # # # # ####### ###### ##### ####### ###### ##### # # # ## # # # # # # # # #### # # # # # # #### ##### # #### ##### # # # # # # # # # # # # # # ##### # # # ###### ##### # ###### # #
22:02:38.300 C-1 2022-10-26 22:02:3822:02:39.294 C-1 2022-10-26 22:02:3922:02:40.298 C-1 2022-10-26 22:02:4022:02:41.299 C-1 2022-10-26 22:02:4122:02:42.261 main FunTester22:02:42.301 C-1 2022-10-26 22:02:4222:02:42.320 C-1 2022-10-26 22:02:4222:02:42.525 C-1 2022-10-26 22:02:4222:02:42.730 C-1 2022-10-26 22:02:4222:02:42.931 C-1 2022-10-26 22:02:4222:02:43.133 C-1 2022-10-26 22:02:4322:02:43.133 Deamon 异步线程池等待执行耗时:825 ms22:02:43.153 Deamon 异步线程池关闭!
进程已结束,退出代码0
复制代码
评论