写点什么

动态模型之增压暂停【FunTester 测试框架】

用户头像
FunTester
关注
发布于: 刚刚

距离上次对 FunTester 测试框架功能规划之后,已经很久没有更新过功能规划了,主要因素是 FunTester 测试框架目前支持的功能已经完全满足工作需求。无论是分布式性能测试框架,还是全链路性能测试支持,以及量化模拟线上流量,基本技术验证都完成了,余下的都是在技术方案的上进行调整以更适应现在工作需求,不存在技术障碍。


虽然 FunTester 测试框架还在不断更新,但很久没进行过功能更新了。最近在设想为了可能用到的测试场景中,动态压力是目前最有可能在工作中应用的。

终极目标

最终想要实现的就是在不断压测过程中,灵活增加、减少或者保持测试压力,这样既可以一次执行用例的过程中,实现多个测试场景的覆盖,也可以避免从 0 开始加压导致测试时间的增加。


以往的性能测试用例,点击开始之后,除了主动结束,中间很少进行压力调节,动态压力就是为了在压测过程中干预压力,实现测试过程中对发压端进行调节。这个需求可以通过容器化和分布式等技术加持下实现。我的想法就是直接对单个 JVM 线程池运行的任务进行调节,适用于非集群压测和对压力需求比较灵活的场景,当然如果应用到日常性能巡检,也是可以提升效能的。

主要实现功能:

  • 性能测试执行中,动态增减性能测试压力值。

  • 性能测试执行中,动态注入新的流量模型任务。

这样的好处:

  • 动态增减执行任务,达到一次试执行测试多种压力目的。

  • 动态增减不同模型的任务,达到动态修改压测流量的目的。


万里长征第一步:增加暂停功能实现。

需求来源

在工作中,经常会遇到一类场景:预计系统提供的 QPS 在 1 万,线程数 100,我们设计了 0 线程为起始,间隔 30s 增加 10 的压力,最大值 120。(当然也可以使用 QPS 递增模式,下面的演示 Demo 会使用线程递增)。


可实际情况是 80 线程已经到达性能瓶颈,然后需求是在 80 线程的压力下保持一阵子,方便收集数据和获取现场。

思路

基本实现

因为使用场景是在压力递增的过程中,终止递增并保持压力,所以使用了性能测试软启动初探中的软启动方案。使用for循环来递增压力。然后在某个时刻触发终止递增。

触发条件

预期是在性能测试服务化中实践和本地脚本测试中使用,目前只实现了本地脚本测试中功能,思路是通过一个线程安全变量来标记执行状态,是否是持续递增压力还是需要终止压力,这里选择了另起一个线程来完成这件任务,通过从控制台输入内容来控制这个开关。我目前只是做的 Demo 是一个没有回头路的方案,只有终止压力没有暂停后继续增加或者结束。


多线程类任务如下:


    private static class FunTester implements Runnable {
@Override public void run() { waitForKey(INTPUT_KEY); HOLD.set(1); output("压力暂停"); }
}
复制代码


这里从控制台等待输入,如果等于com.funtester.config.Constant#INTPUT_KEY那么就设置com.funtester.frame.execute.HoldConcurrent#HOLD设置为 1,从而标记测试任务已经进入了终止增压阶段。

多线程同步

这里参考Java线程同步三剑客内容,我使用了java.util.concurrent.Phaser,原因是java.util.concurrent.CountDownLatch不灵活,无法满足需求,而java.util.concurrent.CyclicBarrier虽然灵活也能满足多线程同步需求,但无法灵活增减需要同步的线程数,故而也放弃了。

导致问题

这种动态压力模型是在加压阶段进行的,也就是线程并不是固定,进而带来了一些问题,目前尚未解决。


  • 本地数据统计失真。

  • 暂不支持多任务,暂无法服务化。

实现 Demo

多线程任务类

首先是com.funtester.base.constaint.HoldThread类,实现了com.funtester.base.constaint.ThreadBase的部分方法,具体与其他实现区别主要是在com.funtester.base.constaint.HoldThread#beforecom.funtester.base.constaint.HoldThread#after方法中对于标记多线程同步对象com.funtester.frame.execute.HoldConcurrent#phaser的操作。


代码如下:


package com.funtester.base.constaint;
import com.funtester.frame.execute.HoldConcurrent;import com.funtester.httpclient.GCThread;import com.funtester.utils.Time;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;
/** * 动态压力模型,增压暂停 */public abstract class HoldThread<F> extends ThreadBase<F> {
private static final long serialVersionUID = -4617192188292407063L;
private static final Logger logger = LogManager.getLogger(HoldThread.class);
public HoldThread(F f, int limit, boolean isTimesMode) { this.isTimesMode = isTimesMode; this.limit = limit; this.f = f; }
protected HoldThread() { super(); }
/** * */ @Override public void run() { try { before(); long ss = Time.getTimeStamp(); while (true) { try { executeNum++; long s = Time.getTimeStamp(); doing(); long et = Time.getTimeStamp(); short diff = (short) (et - s); costs.add(diff); } catch (Exception e) { logger.warn("执行任务失败!", e); errorNum++; } finally { if ((isTimesMode ? executeNum >= limit : (Time.getTimeStamp() - ss) >= limit) || ThreadBase.needAbort() || status()) break; } } long ee = Time.getTimeStamp(); if ((ee - ss) / 1000 > RUNUP_TIME + 3) logger.info("线程:{},执行次数:{},错误次数: {},总耗时:{} s", threadName, executeNum, errorNum, (ee - ss) / 1000.0); HoldConcurrent.allTimes.addAll(costs); HoldConcurrent.requestMark.addAll(marks); } catch (Exception e) { logger.warn("执行任务失败!", e); } finally { after(); } }
@Override public void before() { super.before(); HoldConcurrent.phaser.register(); }
@Override protected void after() { super.after(); GCThread.stop(); HoldConcurrent.phaser.arriveAndDeregister();
}

}
复制代码

执行类

执行类com.funtester.frame.execute.HoldConcurrent与其他模型区别在于:1.没有固定稳定压力,增压结束即稳定压力;2.多线程任务接受控制台输出内容控制任务阶段;3.当用户一直不输入com.funtester.config.Constant#INTPUT_KEY,存在无法自然停止隐患。


代码如下:


package com.funtester.frame.execute;
import com.funtester.base.bean.PerformanceResultBean;import com.funtester.base.constaint.ThreadBase;import com.funtester.config.Constant;import com.funtester.frame.Save;import com.funtester.frame.SourceCode;import com.funtester.utils.RWUtil;import com.funtester.utils.Time;import org.apache.commons.lang3.StringUtils;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;
import java.util.ArrayList;import java.util.List;import java.util.Vector;import java.util.concurrent.ExecutorService;import java.util.concurrent.Phaser;import java.util.concurrent.atomic.AtomicInteger;
import static java.util.stream.Collectors.toList;
/** * 并发类,用于启动压力脚本 */public class HoldConcurrent extends SourceCode {
private static Logger logger = LogManager.getLogger(HoldConcurrent.class);
/** * 用来标记状态 */ public static AtomicInteger HOLD = new AtomicInteger(0);
/** * 开始时间 */ private long startTime;
/** * 结束时间 */ private long endTime;
/** * 任务描述 */ public String desc;
/** * 任务集 */ public List<ThreadBase> threads = new ArrayList<>();
/** * 线程数 */ public int threadNum;
/** * 执行失败总数 */ private int errorTotal;
/** * 任务执行失败总数 */ private int failTotal;
/** * 执行总数 */ private int executeTotal;
/** * 用于记录所有请求时间 */ public static Vector<Short> allTimes = new Vector<>();
/** * 记录所有markrequest的信息 */ public static Vector<String> requestMark = new Vector<>();
/** * 线程池 */ ExecutorService executorService;

/** * 多线程多阶段同步类,用于多线程任务阶段管理 */ public static Phaser phaser;
/** * @param thread 线程任务 * @param threadNum 线程数 * @param desc 任务描述 */ public HoldConcurrent(ThreadBase thread, int threadNum, String desc) { this(threadNum, desc); range(threadNum).forEach(x -> threads.add(thread.clone())); }
/** * @param threads 线程组 * @param desc 任务描述 */ public HoldConcurrent(List<ThreadBase> threads, String desc) { this(threads.size(), desc); this.threads = threads; }
private HoldConcurrent(int threadNum, String desc) { this.threadNum = threadNum; this.desc = StatisticsUtil.getFileName(desc); phaser = new Phaser(1); executorService = ThreadPoolUtil.createFixedPool(threadNum); }
private HoldConcurrent() {
}
/** * 执行多线程任务 * 默认取list中thread对象,丢入线程池,完成多线程执行,如果没有threadname,name默认采用desc+线程数作为threadname,去除末尾的日期 */ public PerformanceResultBean start() { Thread funtester = new Thread(new FunTester()); funtester.start(); ThreadBase.progress = new Progress(threads, StatisticsUtil.getTrueName(desc)); ThreadBase.progress.threadNum = 0; new Thread(ThreadBase.progress).start(); startTime = Time.getTimeStamp(); for (int i = 0; i < threadNum; i++) { if (HOLD.get() == 1) { threadNum = i; break; } ThreadBase thread = threads.get(i); if (StringUtils.isBlank(thread.threadName)) thread.threadName = StatisticsUtil.getTrueName(desc) + i; sleep(RUNUP_TIME / threadNum); executorService.execute(thread); ThreadBase.progress.threadNum = i + 1; logger.info("已经启动了 {} 个线程!", i + 1); } phaser.arriveAndAwaitAdvance(); executorService.shutdown(); ThreadBase.progress.stop(); threads.forEach(x -> { if (x.status()) failTotal++; errorTotal += x.errorNum; executeTotal += x.executeNum; }); endTime = Time.getTimeStamp(); HOLD.set(0); logger.info("总计{}个线程,共用时:{} s,执行总数:{},错误数:{},失败数:{}", threadNum, Time.getTimeDiffer(startTime, endTime), formatLong(executeTotal), errorTotal, failTotal); return over(); }
private static class FunTester implements Runnable {
@Override public void run() { waitForKey(INTPUT_KEY); HOLD.set(1); output("压力暂停"); }
}
private PerformanceResultBean over() { Save.saveIntegerList(allTimes, DATA_Path.replace(LONG_Path, EMPTY) + StatisticsUtil.getFileName(threadNum, desc)); Save.saveStringListSync(HoldConcurrent.requestMark, MARK_Path.replace(LONG_Path, EMPTY) + desc); allTimes = new Vector<>(); requestMark = new Vector<>(); return countQPS(threadNum, desc, Time.getTimeByTimestamp(startTime), Time.getTimeByTimestamp(endTime)); }

/** * 计算结果 * <p>此结果仅供参考</p> * 此处因为start和end的不准确问题,所以采用改计算方法,与fixQPS有区别 * * @param name 线程数 */ public PerformanceResultBean countQPS(int name, String desc, String start, String end) { List<String> strings = RWUtil.readTxtFileByLine(Constant.DATA_Path + StatisticsUtil.getFileName(name, desc)); int size = strings.size() == 0 ? 1 : strings.size(); List<Integer> data = strings.stream().map(x -> changeStringToInt(x)).collect(toList()); int sum = data.stream().mapToInt(x -> x).sum(); String statistics = StatisticsUtil.statistics(data, desc, threadNum); int rt = sum / size; double qps = 1000.0 * name / (rt == 0 ? 1 : rt); double qps2 = (executeTotal + errorTotal) * 1000.0 / (endTime - startTime); return new PerformanceResultBean(desc, start, end, name, size, rt, qps, qps2, getPercent(executeTotal, errorTotal), getPercent(threadNum, failTotal), executeTotal, statistics); }

/** * 用于做后期的计算 * * @param name * @param desc * @return */ public PerformanceResultBean countQPS(int name, String desc) { return countQPS(name, desc, Time.getDate(), Time.getDate()); }

}
复制代码

测试

测试脚本

这里用了简单的多线程sleep进行测试,不再使用本地 HTTP 接口了。脚本内容如下:


package com.funtest.groovytest
import com.funtester.base.constaint.HoldThreadimport com.funtester.base.constaint.ThreadBaseimport com.funtester.frame.SourceCodeimport com.funtester.frame.execute.HoldConcurrent/** * 加压暂停 */class HoldTest extends SourceCode { public static void main(String[] args) { def tester = new FunTester()
new HoldConcurrent(tester,10,"终止压力递增保持压力测试").start()
}
private static class FunTester extends HoldThread {
FunTester() { super(null, 100, true) }
@Override protected void doing() throws Exception { sleep(0.1) }
@Override ThreadBase clone() { return new FunTester() }
}
}
复制代码

控制台输出

下面是完整的控制台输出:


INFO-> 当前用户:oker,工作目录:/Users/oker/IdeaProjects/funtester/src/test/groovy/com/funtest/groovytest/,系统编码格式:UTF-8,系统Mac OS X版本:10.16WARN-> 请输入“FunTester”继续运行!INFO-> 已经启动了 1 个线程!INFO-> 终止压力递增保持压力测试进度:▍  2% ,当前QPS: 0INFO-> 已经启动了 2 个线程!INFO-> 终止压力递增保持压力测试进度:▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍  31% ,当前QPS: 20INFO-> 已经启动了 3 个线程!INFO-> 终止压力递增保持压力测试进度:▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍  60% ,当前QPS: 30FunTesterINFO-> 本次共等待:9.535秒!INFO-> 压力暂停INFO-> 已经启动了 4 个线程!INFO-> 终止压力递增保持压力测试进度:▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍  89% ,当前QPS: 39INFO-> 终止压力递增保持压力测试进度:▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍  100%INFO-> 总计4个线程,共用时:13.439 s,执行总数:227,错误数:0,失败数:0INFO-> 数据保存成功!文件名:/Users/oker/IdeaProjects/funtester/src/test/groovy/com/funtest/groovytest/long/data/终止压力递增保持压力测试231809_4INFO-> ~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~ JSON ~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~>  {>  ① . "rt":103,>  ① . "failRate":0.0,>  ① . "threads":4,>  ① . "deviation":"56.51%",>  ① . "errorRate":0.0,>  ① . "executeTotal":227,>  ① . "qps2":16.891137733462312,>  ① . "total":227,>  ① . "qps":38.83495145631068,>  ① . "startTime":"2021-09-23 18:09:02",>  ① . "endTime":"2021-09-23 18:09:16",>  ① . "mark":"终止压力递增保持压力测试231809",>  ① . "table":"eJwBLwDQ/+aVsOaNrumHj+WkquWwkSzml6Dms5Xnu5jlm74hIOW6lOW9k+Wkp+S6jiAxMDI0/eodgA==">  }~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~ JSON ~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~INFO-> 数据量太少,无法绘图! 应当大于 1024
Process finished with exit code 0
复制代码


可以看出QPSQPS2相差一倍以上,这个比较容易理解,等差数列既视感。下一步会努力实现本地灵活增减压力值和全局任务管理功能,敬请期待!!!

Have Fun ~ Tester !

发布于: 刚刚阅读数: 2
用户头像

FunTester

关注

公众号:FunTester,650+原创,欢迎关注 2020.10.20 加入

Have Fun,Tester! 公众号FunTester,坚持原创文章的测试人。 FunTester测试框架作者,DCS_FunTester分布式性能测试框架作者。

评论

发布
暂无评论
动态模型之增压暂停【FunTester测试框架】