写点什么

【连载 17】多线程执行类

作者:FunTester
  • 2025-02-20
    河北
  • 本文字数:4732 字

    阅读完需:约 16 分钟

3.4 多线程执行类

对于线程执行类来讲,最重要的两个功能就是执行测试任务和处理测试数据。其中执行测试任务涉及控制线程执行逻辑,稍显复杂,这里我们先将测试方案简化为执行 N 个并发,每个线程间隔 1 秒启动。如此一来,我们需要将已经创建好的 ThreadTask 类对象间隔提交给线程池执行即可。线程池的选择已经在上一节讲过,由于测试方案中并发数固定,我们只创建与之对应数量的线程池即可。

3.4.1 多线程类基础能力开发

首先我们设计这个类的属性,根据需求多线程执行类需要具备以下能力:


  • 并发数量:用于接受对应数量的测试任务和创建对应大小的线程池。

  • ThreadTask 类属性相对应的线程安全的属性

  • 记录测试任务相关信息:开始、结束时间,任务描述等。

  • 线程池:用于执行测试多线程任务。

  • 开始、终止方法

  • 处理测试数据方法


首先我们需要一个创建线程池的方法,该方法被设计成静态的,需要提取到一个多线程工具类中,这里先展示代码:


/** * 创建线程池,固定线程池大小 * @param size 线程池大小 * @return */public static ThreadPoolExecutor createPool(int size) {    return new ThreadPoolExecutor(size, size, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), // 创建线程池            new ThreadFactory() { // 创建一个线程工厂                AtomicInteger index = new AtomicInteger(); // 线程安全的线程编号
@Override public Thread newThread(Runnable r) { // 重写创建线程方法 Thread thread = new Thread(r); // 创建线程 thread.setName("线程-" + index.incrementAndGet()); // 设置线程名称 return thread; // 返回创建的线程 } }); // 创建线程池}
复制代码


然后根据需求分解,设计这个类的类属性:


/** * 线程池 */ThreadPoolExecutor poolExecutor;
/** * 任务数量,即并发数量 */public int TaskNum;
/** * 总执行次数 */public AtomicInteger executeNumStatistic;
/** * 执行错误次数 */public AtomicInteger errorNumStatistic;
/** * 用于记录所有请求时间 */public Vector<Integer> costTimeStatistic;
/** * 任务描述 */public String taskDesc;
/** * 开始时间,单位毫秒 */public long startTimestamp;
/** * 结束时间,单位毫秒 */public long endTimestamp;
复制代码


上文提到我们有一个启动方法,也就是在创建执行类对象时,并不会直接把测试任务都提交到线程池,所以我们需要一个存放测试任务的 List 属性。


/** * 多线程任务集合 */public List<ThreadTask> tasks;
复制代码


下面我们来设计构造方法,由于 tasks 属性的引入,所以任务数量即并发数量 TaskNum 并不是必须的,这里可以省略掉。其他属性中,只有 taskDesc 需要从外部获取了。那么,执行类的构造方法可以如下设计:


/** * 构造方法 * @param taskDesc 任务描述 * @param tasks 多线程任务集合 */public TaskExecutor(List<ThreadTask> tasks, String taskDesc) {    this.tasks = tasks; // 初始化多线程任务集合    this.taskDesc = taskDesc; // 初始化任务描述    this.executeNumStatistic = new AtomicInteger(0); // 初始化执行次数    this.errorNumStatistic = new AtomicInteger(0); // 初始化执行错误次数    this.costTimeStatistic = new Vector<>(); // 初始化请求时间集合    this.poolExecutor = ThreadTool.createPool(tasks.size()); // 初始化线程池}
复制代码


接下来,我们处理启动方法,启动方法中就是将 tasks 中的待执行任务,间隔 1 秒提交给线程池执行。代码如下:


/** * 开始执行任务 */public void start() {    this.startTimestamp = System.currentTimeMillis(); // 记录开始时间    for (ThreadTask task : tasks) { // 遍历多线程任务集合,提交线程池执行        poolExecutor.execute(task); // 提交线程池执行        ThreadTool.sleep(1000); // 休眠1秒,间隔提交多线程任务    }}
复制代码


与之对应的主动结束测试任务的方法:


/** * 停止执行任务 */public void stop() {    for (int i = 0; i < tasks.size(); i++) {        tasks.get(i).needStop = true;    }}
复制代码

3.4.2 任务类协同

除了执行类独有的功能,还有不少需要与任务类协同完成的功能。这里面主要两类:一是逻辑关联,包括启动、停止;另一类是数据汇总。


首先我们要解决的核心功能就是确认任务结束的时间点,且同步给多线程任务和多线程执行类。从理论上来说,每个多线程任务结束之后,在 after() 方法中把数据上报(请注意这里是线程安全的)。对于执行类来讲,需要所有任务线程结束之后,才会进行后续的数据处理。


经过这番描述,不知道读者会不会感觉到熟悉,这非常符合我们前面讲过的多线程同步类 java.util.concurrent.CountDownLatch 最佳使用场景,固定线程数,一次性的线程同步。


首先我们要给执行类增加一个 CountDownLatch 类实例属性:


/** * 用于停止任务的计数器 */public CountDownLatch stopCountDownLatch;
复制代码


我们再给多线程任务类也添加一个同样的 CountDownLatch 类实例属性,代码与执行类相同,不在赘述。


然后在构造方法中完成初始化:


/** * 构造方法 * @param taskDesc 任务描述 * @param tasks 多线程任务集合 */public TaskExecutor(List<ThreadTask> tasks, String taskDesc) {    this.tasks = tasks; // 初始化多线程任务集合    this.taskDesc = taskDesc; // 初始化任务描述    this.executeNumStatistic = new AtomicInteger(0); // 初始化执行次数    this.errorNumStatistic = new AtomicInteger(0); // 初始化执行错误次数    this.costTimeStatistic = new Vector<>(); // 初始化请求时间集合    this.poolExecutor = ThreadTool.createPool(tasks.size()); // 初始化线程池    this.stopCountDownLatch = new CountDownLatch(tasks.size()); // 初始化停止任务计数器    for (int i = 0; i < tasks.size(); i++) { // 初始化多线程任务的CountDownLatch同步类属性        tasks.get(i).stopCountDownLatch = stopCountDownLatch;    }}
复制代码


在多线程任务类中,我们修改 run() 方法内容,添加 CountDownLatch 类计数功能。这里我们遵守最佳实战中的展示方法:


/** * 多线程任务执行方法 */@Overridepublic void run() {    try {        before(); // 前置处理        while (true) {            if (ABORT.get() || needStop || executeNum >= totalNum) { // 判断是否终止测试任务                break;            }            try {                executeNum++; // 记录执行次数                long start = System.currentTimeMillis(); // 记录开始时间                test(); // 测试方法                long end = System.currentTimeMillis(); // 记录结束时间                costTime.add((int) (end - start));            } catch (Exception e) {                errorNum++; // 记录错误次数                e.printStackTrace();            }        }        after(); // 后置处理    } catch (Exception e) {        e.printStackTrace();    } finally {        stopCountDownLatch.countDown(); // 计数器减一    }}
复制代码


最后我们要改造执行类的 start() 方法,在等待所有多线程任务都结束之后,执行后续操作。


/** * 开始执行任务 */public void start() {    this.startTimestamp = System.currentTimeMillis(); // 记录开始时间    for (ThreadTask task : tasks) { // 遍历多线程任务集合,提交线程池执行        poolExecutor.execute(task); // 提交线程池执行        ThreadTool.sleep(1000); // 休眠1秒,间隔提交多线程任务    }    try {        stopCountDownLatch.await(); // 等待停止任务计数器为0    } catch (InterruptedException e) {        throw new RuntimeException(e);    }    this.endTimestamp = System.currentTimeMillis(); // 记录结束时间    this.poolExecutor.shutdown(); // 关闭线程池    System.out.println(System.currentTimeMillis() + "  性能测试任务执行完毕!"); // 打印任务执行完毕}
复制代码


万事俱备,下面即将迎来激动人心的时刻,笔者将通过一个演示案例见证我们设计的压测引擎第一次启动运行:


package org.funtester.performance.books.chapter03.section4;
import org.funtester.performance.books.chapter03.common.ThreadTool;import org.funtester.performance.books.chapter03.section3.ThreadTask;
import java.util.ArrayList;import java.util.List;
public class TestEngineDemo {
public static void main(String[] args) { int tasksNum = 2; // 任务数,即并发数量 int totalNum = 3; // 单个任务的总执行次数 List<ThreadTask> tasks = new ArrayList<>(); // 任务集合 for (int i = 0; i < tasksNum; i++) { ThreadTask threadTask = new ThreadTask() { // 创建任务对象 @Override public void before() { // 重写前置处理方法 System.out.println(System.currentTimeMillis() + " before testing ! " + Thread.currentThread().getName()); // 打印前置处理日志 }
@Override public void test() { ThreadTool.sleep(500); // 模拟业务操作 System.out.println(System.currentTimeMillis() + " testing ! " + Thread.currentThread().getName()); // 打印业务操作日志 }
@Override public void after() { System.out.println(System.currentTimeMillis() + " after testing ! " + Thread.currentThread().getName()); // 打印后置处理日志 }
}; threadTask.totalNum = totalNum; // 设置任务的总执行次数 threadTask.costTime = new ArrayList<>(totalNum); // 设置任务的执行时间集合,设置容量,避免频繁扩容 tasks.add(threadTask); // 将任务添加到任务集合 } new TaskExecutor(tasks, "性能测试引擎演示").start(); // 创建并发任务执行器并启动 }}
复制代码


在上面这个例子中,实现了一个 2 个并发,单个多线程任务执行 3 次的性能测试用例。用例每个线程任务中 before()test()after() 方法都做了日志处理。控制台打印的内容如下:


1700142013891  线程-1  before testing !1700142014393  线程-1  testing !1700142014895  线程-2  before testing !1700142014896  线程-1  testing !1700142015400  线程-2  testing !1700142015400  线程-1  testing !1700142015400  线程-1  after testing !1700142015904  线程-2  testing !1700142016409  线程-2  testing !1700142016409  线程-2  after testing !1700142016409  性能测试任务执行完毕!
复制代码


从打印信息可以看出,线程任务均按照我们预期的方式执行,当所有任务执行完,执行类关闭线程池,打印结束日志。


书的名字:从 Java 开始做性能测试


如果本书内容对你有所帮助,希望各位不吝赞赏,让我可以贴补家用。赞赏两位数可以提前阅读未公开章节。我也会尝试制作本书的视频教程,包括必要的答疑。

发布于: 刚刚阅读数: 5
用户头像

FunTester

关注

公众号:FunTester,800篇原创,欢迎关注 2020-10-20 加入

Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester

评论

发布
暂无评论
【连载 17】多线程执行类_FunTester_InfoQ写作社区