写点什么

【连载 11】Phaser 类

作者:FunTester
  • 2025-01-13
    河北
  • 本文字数:4919 字

    阅读完需:约 16 分钟

2.6 Phaser

Phaser 是上一节提到的更高级的线程同步工具。Phaser 的包路径是 java.util.concurrent.Phaser,属于 Java 多线程编程的核心功能。Phaser 类的主要功能是控制多个线程在特定的同步时间点同步执行。从文字介绍上看,它似乎没有特别之处,但其实际功能相比 CountDownLatch 增强了不止一星半点。Phaser 可以说是 Java 多线程同步的终极解决方案。


Phaser 类支持多阶段线程同步、动态的注册和注销、指定同步阶段、子同步功能,可以在到达集合点后不阻塞继续执行下一阶段,还可以中断等待的阶段、全局管理等。


终究是 Phaser 类功能太强大了,而作为性能测试工具,它有些高攀不起。所以在性能测试中使用到的还是 Phaser 类的基础功能。总结起来有两点原因:一是性能测试需要的场景复杂程度相对 Phaser 类来讲,还是小儿科了;二是使用 Java 进行性能测试时,尽量避免使用逻辑复杂的解决方案。还是那句话,如果遇到过于复杂的场景,则抛开 Phaser,寻求更加简单、可靠的解决方案。


相比 CountDownLatch,Phaser 在实战中典型的使用场景是处理不定数量的并发任务同步问题。CountDownLatch 需要提前确定同步数量,但 Phaser 不需要。在使用当中,通常的使用流程如下:


  1. 创建 Phaser 对象,同步数量为 1。

  2. 指定多线程任务,每个任务开始前使用 Phaser 对象注册,完成之后注销。

  3. 等待同步线程使用 Phaser 对象进行等待,直到全部注册任务都完成。

2.6.1 基础方法

在 Java 进行性能测试中,Phaser 类常用的构造方法只有 1 个:


public Phaser(int parties) {    this(null, parties);}
复制代码


这个方法只有一个 int 数据类型的参数,表示同步数量,这一点跟 CountDownLatch 类一样。该方法对应 Phaser 工作流程的第一步。


Phaser 工作流程第二步,注册:


public int register() {    return doRegister(1);}
复制代码


这个方法没有参数,含义是当前线程申请同步数量加一,返回 int 类型数据,含义是当前同步阶段。该方法存在失败的可能,若尝试注册数量超过阈值,则会抛出 IllegalStateException 异常。


注销:


public int arriveAndDeregister() {    return doArrive(ONE_DEREGISTER);}
复制代码


这个方法同样没有参数,含义是当前线程到达集合点并申请同步数量减一,返回 int 类型数据,含义是当前同步阶段。该方法存在失败可能,若是注册人数或者未到达人数会因为该注销行为变成负值,则会抛出 IllegalStateException 异常。


如果多线程任务到达集合点,期望等待其他线程都到达,并且继续参与下一个阶段的同步,可以使用下面这个方法:


public int arriveAndAwaitAdvance() {   // 方法体内容过多,省去。}
复制代码


如果多线程任务不想等其他线程,直接进入下一阶段同步,可以使用下面这个方法:


public int arrive() {    return doArrive(ONE_ARRIVAL);}
复制代码


Phaser 工作流程第三步,同步线程的等待方法与多线程任务到达集合点方法重合,即使用 arriveAndAwaitAdvance() 方法。若是多阶段同步的话,还可以指定需要等待的同步阶段,通过调用下面的方法实现:


public int awaitAdvance(int phase) {    final Phaser root = this.root;    long s = (root == this) ? state : reconcileState();    int p = (int)(s >>> PHASE_SHIFT);    if (phase < 0)        return phase;    if (p == phase)        return root.internalAwaitAdvance(phase, null);    return p;}
复制代码


这里建议尽量避免使用多阶段同步,尽量不在同步线程外调用该方法。因为这样会使得代码逻辑复杂程度数量级上升,容易造成无限等待。


通常我们会统计异步任务完成的数量,此时还会用到另外一个方法,获取同步计数:


public int getArrivedParties() {    return arrivedOf(reconcileState());}
复制代码


该方法没有参数,返回 int 数据类型,含义是已经注册且已经到达集合点的数量。在 Java 性能测试中,通常用来统计多线程任务的完成进度。这里请注意,统计数量不包含那些已经注销的任务,如果要统计所有完成的任务,请在到达集合点时使用 arrive() 方法而不是 arriveAndDeregister()

2.6.2 最佳实战

下面通过一个小例子演示 Phaser 使用方法。


package org.funtester.performance.books.chapter02.section5;
import java.util.concurrent.Phaser;
/** * Phaser 演示类 */public class PhaserDemo {
public static void main(String[] args) throws InterruptedException { Phaser phaser = new Phaser(1); // 创建 Phaser 对象,将参与的线程数初始化为 1 for (int i = 0; i < 3; i++) { // 创建并启动 3 个线程 phaser.register(); // 每创建并启动一个线程,注册一次 new Thread(() -> { // 创建异步线程 phaser.arrive(); // 每个线程执行完任务后,通知 phaser,当前线程任务完成 System.out.println(System.currentTimeMillis() + " 完成完成 " + Thread.currentThread().getName()); // 打印当前线程完成任务的时间和线程名称 }).start(); // 启动异步线程 } System.out.println(System.currentTimeMillis() + " 完成任务总数: " + phaser.getArrivedParties()); // 打印已经完成任务的线程数 Thread.sleep(10); // 等待 10 毫秒 System.out.println(System.currentTimeMillis() + " 完成任务总数: " + phaser.getArrivedParties()); // 打印已经完成任务的线程数 phaser.arriveAndAwaitAdvance(); // 通知 phaser,当前线程任务完成,并等待其他线程完成任务 System.out.println(System.currentTimeMillis() + " 完成任务总数: " + phaser.getArrivedParties()); // 打印已经完成任务的线程数 }}
复制代码


上面这个例子中,首先创建了 Phaser 对象,并设置同步数量等于 1。其次创建 3 个异步线程,分别在创建线程之前将同步对象注册一次,每个线程执行逻辑为:到达集合点,不阻塞立即打印日志。main 线程立即打印任务总数日志,然后休眠 10 毫秒,再打印任务总数日志,到达同步点并且阻塞等待所有线程到达同步点,最后再打印一次任务总数日志。


1698560341651  完成完成 Thread-01698560341651  完成任务总数: 21698560341651  完成完成 Thread-11698560341652  完成完成 Thread-21698560341662  完成任务总数: 31698560341662  完成任务总数: 0
复制代码


可以看出,第一次打印任务总数时,只有 2 个线程完成了任务。当 main 线程休眠完成之后,所有线程完成,所以第二次打印任务总数就是 3 了。当 main 线程到达同步点后,再打印日志任务总数就是 0 了,原因是因为所有线程到达集合点之后,已经进行了第二阶段的同步,所以打印出来的是第二个阶段到达集合点的线程数,即为 0。

2.6.4 使用场景

对于大多数线程同步场景来说,动用 Phaser 的确大材小用,所以实际使用场景也不是很多。上面提到过的多线程处理批量任务,例如我需要把 1 万个用户个人资料都添加上收货地址,然后再用这 1 万账号进行商品下单的操作。那么需要这么设计用例:


  1. 前置阶段初始化用户的收货地址。

  2. 待所有任务完成后,进行下单的性能测试。

  3. 待压测结束后,重置用户数据,恢复测试用户的元状态。


这其中步骤 2 和 3 均涉及到了多线程同步,Phaser 是最好的选择。此外,具有阶段性的多线程任务非常适合 Phaser 大展拳脚,例如:要先从注册账号开始,其次将注册成功的账号进行用户信息初始化,然后再执行性能测试,最后清理数据。

2.6.5 自定义同步类

虽然 java.util.concurrent.Phaser 功能强大,但毕竟不是为了性能测试开发的功能类,在实践中也会遇到一些水土不服的情况,总结为下面两种:


  1. 注册同步数量有上限,对应代码 private static final int MAX_PARTIES = 0xffff,约 6 万多。

  2. 极限性能不理想。在 Phaser 功能设计中,涉及多处锁的操作,在高并发情况下性能表现不佳。


基于这样的情况,如果我们有需求,就可以自己设计一款功能简化之后的同步类。这个同步类需要实现以下功能:


  1. 线程安全计数,统计未完成的注册任务数量。

  2. 线程安全计数,统计已完成任务数量。

  3. 提供注册和完成方法。

  4. 提供返回注册数量和完成数量的方法。


线程安全技术类,我们就选择 java.util.concurrent.atomic.AtomicInteger,可以规避掉 Phaser 上限较低的问题,如果还觉得不够,可以用 java.util.concurrent.atomic.AtomicLong 替代。其他方法就比较容易,笔者将这个自定义的同步类叫做 FunPhaser,以下是代码实践内容:


package org.funtester.performance.books.chapter02.section6;
import java.util.concurrent.atomic.AtomicInteger;
/** * 自定义多线程同步类 */public class FunPhaser {
/** * 任务总数索引, 用于标记任务完成状态 * 注册增加, 任务完成减少 */ AtomicInteger index;
/** * 任务总数, 用于记录任务完成数量 */ AtomicInteger taskNum;
public FunPhaser() { this.index = new AtomicInteger(); // 初始化 this.taskNum = new AtomicInteger(); // 初始化 }
/** * 注册任务, 并返回当前注册数量 * @return */ public int register() { return this.index.incrementAndGet(); }
/** * 任务完成 * @return */ public void done() { this.index.getAndDecrement(); this.taskNum.getAndIncrement(); }
/** * 等待所有任务完成 * @return */ public void await() throws InterruptedException { long start = System.currentTimeMillis(); while (index.get() > 0) { if (System.currentTimeMillis() - start > 100000) { // 默认超时时间 100 秒 System.out.println(System.currentTimeMillis() - start); break; } Thread.sleep(100); } }
/** * 等待所有任务完成 * @param timeout 超时时间, 单位毫秒 * @return */ public void await(int timeout) throws InterruptedException { long start = System.currentTimeMillis(); while (index.get() > 0) { if (System.currentTimeMillis() - start >= timeout) { break; } Thread.sleep(100); } }
/** * 获取注册总数 * @return */ public int queryRegisterNum() { return index.get(); }
/** * 获取任务完成总数 * @return */ public int queryTaskNum() { return taskNum.get(); }}
复制代码


下面写个用例进行测试。思路如下:启动 10 个线程,每个线程注册一次,休眠模拟业务执行,最后完成任务。代码如下:


FunPhaser phaser = new FunPhaser(); // 创建 Phaserfor (int i = 0; i < 10; i++) { // 创建 10 个线程    new Thread(() -> { // 创建线程        for (int j = 0; j < 10; j++) { // 每个线程执行 10 次任务            phaser.register();            try {                Thread.sleep(10); // 模拟任务执行时间            } catch (InterruptedException e) {                throw new RuntimeException(e);            } finally {                phaser.done(); // 任务完成            }        }    }).start();}try {    phaser.await(); // 等待所有任务完成} catch (InterruptedException e) {    throw new RuntimeException(e);}System.out.println("任务注册数 " + phaser.queryRegisterNum() + " 个");System.out.println("任务完成总数 " + phaser.queryTaskNum() + " 个");
复制代码


控制台打印如下:


任务注册数 0 个任务完成总数 100 个
复制代码


可以看出,我们最初的预想已经完美实现。


书的名字:从 Java 开始做性能测试


如果本书内容对你有所帮助,希望各位不吝赞赏,让我可以贴补家用。赞赏两位数可以提前阅读未公开章节。我也会尝试制作本书的视频教程,包括必要的答疑。

用户头像

FunTester

关注

公众号:FunTester,800篇原创,欢迎关注 2020-10-20 加入

Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester

评论

发布
暂无评论
【连载 11】Phaser 类_FunTester_InfoQ写作社区