写点什么

高优异步任务解决双重异步集合点阻塞问题

作者:FunTester
  • 2024-03-12
    河北
  • 本文字数:3185 字

    阅读完需:约 10 分钟

在性能测试的实践当中,异步任务是离不开的。Java 异步编程提高了应用程序的性能和响应性,通过避免线程阻塞提高了资源利用率,并简化了并发编程的复杂性。改善用户体验,避免死锁和线程阻塞等问题。异步编程利用 CompletableFuture、Future 等工具和 API 简化了开发流程,提高了系统的稳定性和可靠性。

缘起

我也参照了 Go 语言的 go 关键字,自定义了 fun 关键字Java自定义异步功能实践 。但是在使用过程中,遇到了一个略显尴尬的问题,就是如果当一个异步任务中,又增加一个异步任务,且使用集合点设置。那么就会阻塞线程池,导致大量任务阻塞的情况。


比如一个学校,200 个班级,每个班级有一个班主任,要给 30 个学生发作业,之后再报告作业分发已完成。按照之前的思路,我会包装两个异步且设置集合点的任务,伪代码如下:


    static void main(String[] args) {        200.times {            fun {                sleep(1.0)// 模拟业务处理                pushHomework()// 布置作业            }        }
}
/** * 布置作业 */ static void pushHomework() { FunPhaser phaser = new FunPhaser()// 创建同步屏障 30.times { fun { sleep(1.0)// 模拟业务处理 output("布置作业") } , phaser } phaser.await()// 等待所有作业布置完成 }
复制代码


最终的结果就是,等于最大线程数的任务会阻塞在 pushHomework() 方法中,而 pushHomework() 方法需要完成的异步任务又全都等待在线程池的等待队列中。

初解

一开始我的思路采取优先级策略。如果区分任务的优先级,让有可能阻塞在等待队列的高优任务优先执行即可。所以我先想使用 java.util.concurrent.PriorityBlockingQueue 当做 java.util.concurrent.BlockingQueue 的实现当做异步线程池的等待队列。


但也无法解决问题,因为依然存在阻塞的问题,只不过概率变小了而已。看来不得不使用单独的异步线程池来实现了。


关于线程池的选择有两种选择:


  1. 选择最大线程数较小的线程池,只是作为辅助功能,防止阻塞。在普通异步任务执行时,优先执行高优任务,利用普通线程池优先执行高优任务。

  2. 选择最小线程数较大的线程池,大概率是缓存线程池。单独用来执行高优任务。同时也可以利用普通的线程池执行高优任务。


关于我的选择,也没有选择。根据实际的情况使用吧。高优任务的多少、需要限制的频率等等因素。我自己的项目用的是第二种,原因是我用到高优任务的机会不多,我可以在脚本中控制高优任务的数量。

方案

首先是线程池的实现代码:


priorityPool = createFixedPool(POOL_SIZE, "P")


创建时机放在了普通线程池中:


    /**     * 获取异步任务连接池     * @return     */    static ThreadPoolExecutor getFunPool() {        if (asyncPool == null) {            synchronized (ThreadPoolUtil.class) {                if (asyncPool == null) {                    asyncPool = createPool(POOL_SIZE, POOL_SIZE, ALIVE_TIME, new LinkedBlockingDeque<Runnable>(Constant.MAX_WAIT_TASK), getFactory("F"))                    daemon()                }                priorityPool = createFixedPool(POOL_SIZE, "P")//                priorityPool = createPool(1, POOL_MAX, ALIVE_TIME, new LinkedBlockingQueue<Runnable>(10), getFactory("P"), new ThreadPoolExecutor.DiscardOldestPolicy())            }        }        return asyncPool    }
复制代码


下面是调用执行高优的异步任务的方法:


    /**     * 执行高优异步任务     * @param runnable     */    static void executeSyncPriority(Runnable runnable) {      if (priorityPool == null) getFunPool()        priorityPool.execute(runnable)    }
复制代码


还有一个调用方法,用来普通线程池优先执行高优任务:


    /**     * 执行高优任务     */    static void executePriority() {        def locked = priorityLock.compareAndSet(false, true)//如果没有锁,则加锁        if (locked) {//如果加锁成功            while (priorityPool.getQueue().size() > 0) {                def poll = priorityPool.getQueue().poll()                def queue = (LinkedBlockingDeque<Runnable>) getFunPool().getQueue()                if (poll != null) {                    queue.offerFirst(poll)                }
} priorityLock.set(false)//解锁 } }
复制代码


这里用到了一个原子类,当做高优之行时候的锁 private static AtomicBoolean priorityLock = new AtomicBoolean(false) ,避免在这块浪费过多性能。这里没有 try-catch-finally ,此处没有使用,确实发生异常概率较小。


我重新修改了任务队列的实现,用的是 java.util.concurrent.LinkedBlockingDeque ,这样我就可以将高优任务直接插入到队列的最前头,可以优先执行高优任务。


对于异步关键字,我也进行了一些改动:


    /**     * 使用自定义同步器{@link FunPhaser}进行多线程同步     *     * @param f     * @param phaser     * @param log     */    public static void fun(Closure f, FunPhaser phaser, boolean log) {        if (phaser != null) phaser.register();        ThreadPoolUtil.executeSync(() -> {            try {                ThreadPoolUtil.executePriority();                f.call();            } finally {                if (phaser != null) {                    phaser.done();                    if (log) logger.info("async task {}", phaser.queryTaskNum());                }            }        });    }
复制代码


执行高优任务的关键字,我也进行了同样的封装,只不过换了个关键字和线程池:


    /**     * 提交高优任务     *     * @param f     * @param phaser     * @param log     */    public static void funny(Closure f, FunPhaser phaser, boolean log) {        if (phaser != null) phaser.register();        ThreadPoolUtil.executeSyncPriority(() -> {            try {                f.call();            } finally {                if (phaser != null) {                    phaser.done();                    if (log) logger.info("priority async task {}", phaser.queryTaskNum());                }            }        });    }
复制代码

验证

我们修改一下开始的脚本:


    static void main(String[] args) {        setPoolMax(2)        6.times {            fun {                sleep(1.0)// 模拟业务处理                pushHomework()// 布置作业            }        }
}
/** * 布置作业 */ static void pushHomework() { FunPhaser phaser = new FunPhaser()// 创建同步屏障 4.times { fun { sleep(1.0)// 模拟业务处理 output("布置作业") } , phaser } phaser.await()// 等待所有作业布置完成 }
复制代码


执行的话,线程池的 F 线程全都全都是 TIME_WAITING 状态。当把 pushHomework() 方法改成高优关键字 funny 之后问题便可迎刃而解。


控制台输出如下:


22:47:17:160 P-1  布置作业22:47:17:160 P-1  布置作业22:47:17:160 P-1  priority async task 322:47:17:160 P-1  priority async task 422:47:18:178 F-2  布置作业22:47:18:179 F-2  priority async task 322:47:19:183 F-2  布置作业
复制代码


可以看出,已经开始有了 F 线程执行高优任务了。

发布于: 刚刚阅读数: 5
用户头像

FunTester

关注

公众号:FunTester,800篇原创,欢迎关注 2020-10-20 加入

Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester

评论

发布
暂无评论
高优异步任务解决双重异步集合点阻塞问题_FunTester_InfoQ写作社区