早前的旧文中,我分享了使用 java.util.concurrent.Phaser
在处理大量异步任务场景下的使用。其中用到了 phaser 类的重要特性 可以灵活设置同步数量,在使用过程中注册新的同步对象。
但是在后续的使用过程中遇到的一些问题,主要有一下两点:
注册同步等待总量有上限 private static final int MAX_PARTIES = 0xffff;
功能复杂,API 丰富但大部分用不到,在使用过程中经常调错 API
今天终于无法忍受,特别是低一点,导致大量异步任务会丢数据。之前是按照非同步方式执行大量任务,但是今天遇到了不定任务量,一时没想到这茬,导致了半个小时数据构造化为泡影。
重写思路
一怒之下,决定自己重写一个加强版。总结了设计思路如下:
线程安全计数,用于统计完成任务数
线程安全状态技术,用于统计多少任务尚未完成
注册方法,用于增加任务统计技术
完成方法,用于减少未完成数量,增加完成任务数量
返回各类状态信息的方法
实现这样的功能,我们就得到了一个简单但加强功能的多线程同步类,用来替代 java.util.concurrent.Phaser
,我命名为 FunPhaser
。代码如下:
package com.funtester.frame
import java.util.concurrent.atomic.AtomicInteger
/**
* 自定义同步类,避免{@link java.util.concurrent.Phaser}的不足,总数量受限于65535
* 用于多线程任务同步,任务完成后,调用{@link #done()}方法,任务总数减少,当任务总数为0时,调用{@link #await()}方法,等待所有任务完成
*/
class FunPhaser extends SourceCode {
/**
* 任务总数索引,用于标记任务完成状态
* 注册增加,任务完成减少
*/
AtomicInteger index
/**
* 任务总数,用于记录任务完成数量
*/
AtomicInteger taskNum
FunPhaser() {
this.index = new AtomicInteger()
this.taskNum = new AtomicInteger()
}
/**
* 注册任务
* @return
*/
def register() {
this.index.getAndIncrement()
}
/**
* 任务完成
* @return
*/
def done() {
this.index.getAndDecrement()
this.taskNum.getAndIncrement()
}
/**
* 等待所有任务完成
* @return
*/
def await() {
waitFor {index.get() == 0}
}
/**
* 获取任务完成总数
* @return
*/
int queryTaskNum() {
return taskNum.get()
}
}
复制代码
源码解读
这个自定义同步类 FunPhaser
用于多线程任务同步,它避免了 java.util.concurrent.Phaser
的不足,即总数量受限于 65535。
FunPhaser
类有以下几个成员变量:
FunPhaser
类提供了以下几个方法:
index
和 taskNum
是 AtomicInteger
类型的属性,用于原子地操作整数值。
FunPhaser()
是该类的构造函数,初始化了 index
和 taskNum
属性。
register()
方法用于注册任务,每次调用会增加 index
的值,表示新增一个任务。
done()
方法用于标记任务完成,每次调用会减少 index
的值并增加 taskNum
的值。
await()
方法用于等待所有任务完成。它调用了 waitFor
方法,等待 index
的值变为 0,表示所有任务已经完成。
queryTaskNum()
方法用于获取任务完成的总数,返回 taskNum
的值。
演示 Demo
FunPhaser
类的使用方法如下:
import com.funtester.frame.FunPhaser;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FunPhaserDemo {
public static void main(String[] args) throws InterruptedException {
// 创建FunPhaser实例
FunPhaser phaser = new FunPhaser();
// 创建固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 注册10个任务
for (int i = 0; i < 10; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
// 注册任务
phaser.register();
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 标记任务完成
phaser.done();
}
});
}
// 等待所有任务完成
phaser.await();
// 输出已完成的任务数量
System.out.println("已完成的任务数量: " + phaser.queryTaskNum());
// 关闭线程池
executorService.shutdown();
}
}
复制代码
在这个示例中,我们创建了一个FunPhaser
对象,并使用固定大小为 5 的线程池来执行 10 个异步任务。每个任务在开始前调用register()
方法注册,完成后调用done()
方法标记。主线程通过调用await()
方法等待所有任务完成,并最终输出已完成的任务数量。
自定义关键字
在自定义关键字中的使用如下:
/**
* 使用自定义同步器{@link FunPhaser}进行多线程同步
*
* @param f 代码块
* @param phaser 同步器
*/
public static void fun(Closure f, FunPhaser phaser) {
if (phaser != null) phaser.register();
ThreadPoolUtil.executeSync(() -> {
try {
f.call();
} finally {
if (phaser != null) {
phaser.done();
logger.info("async task {}", phaser.queryTaskNum());
}
}
});
}
复制代码
作为对照旧的实现代码如下:
/**
* 异步执行代码块,使用{@link Phaser}进行多线程同步
*
* @param f 代码块
* @param phaser 同步器
*/
public static void fun(Closure f, Phaser phaser) {
if (phaser != null) phaser.register();
ThreadPoolUtil.executeSync(() -> {
try {
f.call();
} finally {
if (phaser != null) {
phaser.arrive();
logger.info("异步任务完成 {}", phaser.getArrivedParties());
}
}
});
}
复制代码
这两个实现代码的功能都是相同的,都是使用同步器来进行多线程任务同步。
旧的实现代码使用的是 Phaser
类。Phaser
类是一个通用的同步器,可以用于各种多线程任务同步场景。在旧的实现代码中,我们使用 register()
方法来注册任务,使用 arrive()
方法来表示任务完成。
新的实现代码使用的是 FunPhaser
类。FunPhaser
类是一个自定义的同步器,它避免了 Phaser
类的不足,即总数量受限于 65535。在新的实现代码中,我们使用 register()
方法来注册任务,使用 done()
方法来表示任务完成。
两种实现代码的对比
总体而言,新的实现代码比旧的实现代码更加简洁易用,并且避免了 Phaser
类的不足。
评论