2.6 Phaser
Phaser 是上一节提到的更高级的线程同步工具。Phaser 的包路径是 java.util.concurrent.Phaser
,属于 Java 多线程编程的核心功能。Phaser 类的主要功能是控制多个线程在特定的同步时间点同步执行。从文字介绍上看,它似乎没有特别之处,但其实际功能相比 CountDownLatch
增强了不止一星半点。Phaser 可以说是 Java 多线程同步的终极解决方案。
Phaser 类支持多阶段线程同步、动态的注册和注销、指定同步阶段、子同步功能,可以在到达集合点后不阻塞继续执行下一阶段,还可以中断等待的阶段、全局管理等。
终究是 Phaser 类功能太强大了,而作为性能测试工具,它有些高攀不起。所以在性能测试中使用到的还是 Phaser 类的基础功能。总结起来有两点原因:一是性能测试需要的场景复杂程度相对 Phaser 类来讲,还是小儿科了;二是使用 Java 进行性能测试时,尽量避免使用逻辑复杂的解决方案。还是那句话,如果遇到过于复杂的场景,则抛开 Phaser,寻求更加简单、可靠的解决方案。
相比 CountDownLatch
,Phaser 在实战中典型的使用场景是处理不定数量的并发任务同步问题。CountDownLatch
需要提前确定同步数量,但 Phaser 不需要。在使用当中,通常的使用流程如下:
创建 Phaser 对象,同步数量为 1。
指定多线程任务,每个任务开始前使用 Phaser 对象注册,完成之后注销。
等待同步线程使用 Phaser 对象进行等待,直到全部注册任务都完成。
2.6.1 基础方法
在 Java 进行性能测试中,Phaser 类常用的构造方法只有 1 个:
public Phaser(int parties) {
this(null, parties);
}
复制代码
这个方法只有一个 int
数据类型的参数,表示同步数量,这一点跟 CountDownLatch
类一样。该方法对应 Phaser 工作流程的第一步。
Phaser 工作流程第二步,注册:
public int register() {
return doRegister(1);
}
复制代码
这个方法没有参数,含义是当前线程申请同步数量加一,返回 int
类型数据,含义是当前同步阶段。该方法存在失败的可能,若尝试注册数量超过阈值,则会抛出 IllegalStateException
异常。
注销:
public int arriveAndDeregister() {
return doArrive(ONE_DEREGISTER);
}
复制代码
这个方法同样没有参数,含义是当前线程到达集合点并申请同步数量减一,返回 int
类型数据,含义是当前同步阶段。该方法存在失败可能,若是注册人数或者未到达人数会因为该注销行为变成负值,则会抛出 IllegalStateException
异常。
如果多线程任务到达集合点,期望等待其他线程都到达,并且继续参与下一个阶段的同步,可以使用下面这个方法:
public int arriveAndAwaitAdvance() {
// 方法体内容过多,省去。
}
复制代码
如果多线程任务不想等其他线程,直接进入下一阶段同步,可以使用下面这个方法:
public int arrive() {
return doArrive(ONE_ARRIVAL);
}
复制代码
Phaser 工作流程第三步,同步线程的等待方法与多线程任务到达集合点方法重合,即使用 arriveAndAwaitAdvance()
方法。若是多阶段同步的话,还可以指定需要等待的同步阶段,通过调用下面的方法实现:
public int awaitAdvance(int phase) {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
return root.internalAwaitAdvance(phase, null);
return p;
}
复制代码
这里建议尽量避免使用多阶段同步,尽量不在同步线程外调用该方法。因为这样会使得代码逻辑复杂程度数量级上升,容易造成无限等待。
通常我们会统计异步任务完成的数量,此时还会用到另外一个方法,获取同步计数:
public int getArrivedParties() {
return arrivedOf(reconcileState());
}
复制代码
该方法没有参数,返回 int
数据类型,含义是已经注册且已经到达集合点的数量。在 Java 性能测试中,通常用来统计多线程任务的完成进度。这里请注意,统计数量不包含那些已经注销的任务,如果要统计所有完成的任务,请在到达集合点时使用 arrive()
方法而不是 arriveAndDeregister()
。
2.6.2 最佳实战
下面通过一个小例子演示 Phaser 使用方法。
package org.funtester.performance.books.chapter02.section5;
import java.util.concurrent.Phaser;
/**
* Phaser 演示类
*/
public class PhaserDemo {
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser(1); // 创建 Phaser 对象,将参与的线程数初始化为 1
for (int i = 0; i < 3; i++) { // 创建并启动 3 个线程
phaser.register(); // 每创建并启动一个线程,注册一次
new Thread(() -> { // 创建异步线程
phaser.arrive(); // 每个线程执行完任务后,通知 phaser,当前线程任务完成
System.out.println(System.currentTimeMillis() + " 完成完成 " + Thread.currentThread().getName()); // 打印当前线程完成任务的时间和线程名称
}).start(); // 启动异步线程
}
System.out.println(System.currentTimeMillis() + " 完成任务总数: " + phaser.getArrivedParties()); // 打印已经完成任务的线程数
Thread.sleep(10); // 等待 10 毫秒
System.out.println(System.currentTimeMillis() + " 完成任务总数: " + phaser.getArrivedParties()); // 打印已经完成任务的线程数
phaser.arriveAndAwaitAdvance(); // 通知 phaser,当前线程任务完成,并等待其他线程完成任务
System.out.println(System.currentTimeMillis() + " 完成任务总数: " + phaser.getArrivedParties()); // 打印已经完成任务的线程数
}
}
复制代码
上面这个例子中,首先创建了 Phaser 对象,并设置同步数量等于 1。其次创建 3 个异步线程,分别在创建线程之前将同步对象注册一次,每个线程执行逻辑为:到达集合点,不阻塞立即打印日志。main
线程立即打印任务总数日志,然后休眠 10 毫秒,再打印任务总数日志,到达同步点并且阻塞等待所有线程到达同步点,最后再打印一次任务总数日志。
1698560341651 完成完成 Thread-0
1698560341651 完成任务总数: 2
1698560341651 完成完成 Thread-1
1698560341652 完成完成 Thread-2
1698560341662 完成任务总数: 3
1698560341662 完成任务总数: 0
复制代码
可以看出,第一次打印任务总数时,只有 2 个线程完成了任务。当 main
线程休眠完成之后,所有线程完成,所以第二次打印任务总数就是 3 了。当 main
线程到达同步点后,再打印日志任务总数就是 0 了,原因是因为所有线程到达集合点之后,已经进行了第二阶段的同步,所以打印出来的是第二个阶段到达集合点的线程数,即为 0。
2.6.4 使用场景
对于大多数线程同步场景来说,动用 Phaser 的确大材小用,所以实际使用场景也不是很多。上面提到过的多线程处理批量任务,例如我需要把 1 万个用户个人资料都添加上收货地址,然后再用这 1 万账号进行商品下单的操作。那么需要这么设计用例:
前置阶段初始化用户的收货地址。
待所有任务完成后,进行下单的性能测试。
待压测结束后,重置用户数据,恢复测试用户的元状态。
这其中步骤 2 和 3 均涉及到了多线程同步,Phaser 是最好的选择。此外,具有阶段性的多线程任务非常适合 Phaser 大展拳脚,例如:要先从注册账号开始,其次将注册成功的账号进行用户信息初始化,然后再执行性能测试,最后清理数据。
2.6.5 自定义同步类
虽然 java.util.concurrent.Phaser
功能强大,但毕竟不是为了性能测试开发的功能类,在实践中也会遇到一些水土不服的情况,总结为下面两种:
注册同步数量有上限,对应代码 private static final int MAX_PARTIES = 0xffff
,约 6 万多。
极限性能不理想。在 Phaser 功能设计中,涉及多处锁的操作,在高并发情况下性能表现不佳。
基于这样的情况,如果我们有需求,就可以自己设计一款功能简化之后的同步类。这个同步类需要实现以下功能:
线程安全计数,统计未完成的注册任务数量。
线程安全计数,统计已完成任务数量。
提供注册和完成方法。
提供返回注册数量和完成数量的方法。
线程安全技术类,我们就选择 java.util.concurrent.atomic.AtomicInteger
,可以规避掉 Phaser 上限较低的问题,如果还觉得不够,可以用 java.util.concurrent.atomic.AtomicLong
替代。其他方法就比较容易,笔者将这个自定义的同步类叫做 FunPhaser
,以下是代码实践内容:
package org.funtester.performance.books.chapter02.section6;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 自定义多线程同步类
*/
public class FunPhaser {
/**
* 任务总数索引, 用于标记任务完成状态
* 注册增加, 任务完成减少
*/
AtomicInteger index;
/**
* 任务总数, 用于记录任务完成数量
*/
AtomicInteger taskNum;
public FunPhaser() {
this.index = new AtomicInteger(); // 初始化
this.taskNum = new AtomicInteger(); // 初始化
}
/**
* 注册任务, 并返回当前注册数量
* @return
*/
public int register() {
return this.index.incrementAndGet();
}
/**
* 任务完成
* @return
*/
public void done() {
this.index.getAndDecrement();
this.taskNum.getAndIncrement();
}
/**
* 等待所有任务完成
* @return
*/
public void await() throws InterruptedException {
long start = System.currentTimeMillis();
while (index.get() > 0) {
if (System.currentTimeMillis() - start > 100000) { // 默认超时时间 100 秒
System.out.println(System.currentTimeMillis() - start);
break;
}
Thread.sleep(100);
}
}
/**
* 等待所有任务完成
* @param timeout 超时时间, 单位毫秒
* @return
*/
public void await(int timeout) throws InterruptedException {
long start = System.currentTimeMillis();
while (index.get() > 0) {
if (System.currentTimeMillis() - start >= timeout) {
break;
}
Thread.sleep(100);
}
}
/**
* 获取注册总数
* @return
*/
public int queryRegisterNum() {
return index.get();
}
/**
* 获取任务完成总数
* @return
*/
public int queryTaskNum() {
return taskNum.get();
}
}
复制代码
下面写个用例进行测试。思路如下:启动 10 个线程,每个线程注册一次,休眠模拟业务执行,最后完成任务。代码如下:
FunPhaser phaser = new FunPhaser(); // 创建 Phaser
for (int i = 0; i < 10; i++) { // 创建 10 个线程
new Thread(() -> { // 创建线程
for (int j = 0; j < 10; j++) { // 每个线程执行 10 次任务
phaser.register();
try {
Thread.sleep(10); // 模拟任务执行时间
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
phaser.done(); // 任务完成
}
}
}).start();
}
try {
phaser.await(); // 等待所有任务完成
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务注册数 " + phaser.queryRegisterNum() + " 个");
System.out.println("任务完成总数 " + phaser.queryTaskNum() + " 个");
复制代码
控制台打印如下:
可以看出,我们最初的预想已经完美实现。
书的名字:从 Java 开始做性能测试 。
如果本书内容对你有所帮助,希望各位不吝赞赏,让我可以贴补家用。赞赏两位数可以提前阅读未公开章节。我也会尝试制作本书的视频教程,包括必要的答疑。
评论