早前的旧文中,我分享了使用 java.util.concurrent.Phaser 在处理大量异步任务场景下的使用。其中用到了 phaser 类的重要特性 可以灵活设置同步数量,在使用过程中注册新的同步对象。
但是在后续的使用过程中遇到的一些问题,主要有一下两点:
注册同步等待总量有上限 private static final int MAX_PARTIES = 0xffff;
功能复杂,API 丰富但大部分用不到,在使用过程中经常调错 API
今天终于无法忍受,特别是低一点,导致大量异步任务会丢数据。之前是按照非同步方式执行大量任务,但是今天遇到了不定任务量,一时没想到这茬,导致了半个小时数据构造化为泡影。
重写思路
一怒之下,决定自己重写一个加强版。总结了设计思路如下:
线程安全计数,用于统计完成任务数
线程安全状态技术,用于统计多少任务尚未完成
注册方法,用于增加任务统计技术
完成方法,用于减少未完成数量,增加完成任务数量
返回各类状态信息的方法
实现这样的功能,我们就得到了一个简单但加强功能的多线程同步类,用来替代 java.util.concurrent.Phaser ,我命名为 FunPhaser 。代码如下:
package com.funtester.frame import java.util.concurrent.atomic.AtomicInteger /** * 自定义同步类,避免{@link java.util.concurrent.Phaser}的不足,总数量受限于65535 * 用于多线程任务同步,任务完成后,调用{@link #done()}方法,任务总数减少,当任务总数为0时,调用{@link #await()}方法,等待所有任务完成 */ class FunPhaser extends SourceCode { /** * 任务总数索引,用于标记任务完成状态 * 注册增加,任务完成减少 */ AtomicInteger index /** * 任务总数,用于记录任务完成数量 */ AtomicInteger taskNum FunPhaser() { this.index = new AtomicInteger() this.taskNum = new AtomicInteger() } /** * 注册任务 * @return */ def register() { this.index.getAndIncrement() } /** * 任务完成 * @return */ def done() { this.index.getAndDecrement() this.taskNum.getAndIncrement() } /** * 等待所有任务完成 * @return */ def await() { waitFor {index.get() == 0} } /** * 获取任务完成总数 * @return */ int queryTaskNum() { return taskNum.get() } }
复制代码
源码解读
这个自定义同步类 FunPhaser 用于多线程任务同步,它避免了 java.util.concurrent.Phaser 的不足,即总数量受限于 65535。
FunPhaser 类有以下几个成员变量:
FunPhaser 类提供了以下几个方法:
index 和 taskNum 是 AtomicInteger 类型的属性,用于原子地操作整数值。
FunPhaser() 是该类的构造函数,初始化了 index 和 taskNum 属性。
register() 方法用于注册任务,每次调用会增加 index 的值,表示新增一个任务。
done() 方法用于标记任务完成,每次调用会减少 index 的值并增加 taskNum 的值。
await() 方法用于等待所有任务完成。它调用了 waitFor 方法,等待 index 的值变为 0,表示所有任务已经完成。
queryTaskNum() 方法用于获取任务完成的总数,返回 taskNum 的值。
演示 Demo
FunPhaser 类的使用方法如下:
import com.funtester.frame.FunPhaser;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class FunPhaserDemo {
public static void main(String[] args) throws InterruptedException { // 创建FunPhaser实例 FunPhaser phaser = new FunPhaser();
// 创建固定大小的线程池 ExecutorService executorService = Executors.newFixedThreadPool(5);
// 注册10个任务 for (int i = 0; i < 10; i++) { executorService.submit(new Runnable() { @Override public void run() { // 注册任务 phaser.register();
// 模拟耗时操作 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
// 标记任务完成 phaser.done(); } }); }
// 等待所有任务完成 phaser.await();
// 输出已完成的任务数量 System.out.println("已完成的任务数量: " + phaser.queryTaskNum());
// 关闭线程池 executorService.shutdown(); }}
复制代码
在这个示例中,我们创建了一个FunPhaser对象,并使用固定大小为 5 的线程池来执行 10 个异步任务。每个任务在开始前调用register()方法注册,完成后调用done()方法标记。主线程通过调用await()方法等待所有任务完成,并最终输出已完成的任务数量。
自定义关键字
在自定义关键字中的使用如下:
/** * 使用自定义同步器{@link FunPhaser}进行多线程同步 * * @param f 代码块 * @param phaser 同步器 */ public static void fun(Closure f, FunPhaser phaser) { if (phaser != null) phaser.register(); ThreadPoolUtil.executeSync(() -> { try { f.call(); } finally { if (phaser != null) { phaser.done(); logger.info("async task {}", phaser.queryTaskNum()); } } }); }
复制代码
作为对照旧的实现代码如下:
/** * 异步执行代码块,使用{@link Phaser}进行多线程同步 * * @param f 代码块 * @param phaser 同步器 */ public static void fun(Closure f, Phaser phaser) { if (phaser != null) phaser.register(); ThreadPoolUtil.executeSync(() -> { try { f.call(); } finally { if (phaser != null) { phaser.arrive(); logger.info("异步任务完成 {}", phaser.getArrivedParties()); } } }); }
复制代码
这两个实现代码的功能都是相同的,都是使用同步器来进行多线程任务同步。
旧的实现代码使用的是 Phaser 类。Phaser 类是一个通用的同步器,可以用于各种多线程任务同步场景。在旧的实现代码中,我们使用 register() 方法来注册任务,使用 arrive() 方法来表示任务完成。
新的实现代码使用的是 FunPhaser 类。FunPhaser 类是一个自定义的同步器,它避免了 Phaser 类的不足,即总数量受限于 65535。在新的实现代码中,我们使用 register() 方法来注册任务,使用 done() 方法来表示任务完成。
两种实现代码的对比
总体而言,新的实现代码比旧的实现代码更加简洁易用,并且避免了 Phaser 类的不足。
评论