package com.funtester.frame.execute;
import com.funtester.base.bean.PerformanceResultBean;import com.funtester.base.constaint.ThreadBase;import com.funtester.config.Constant;import com.funtester.frame.Save;import com.funtester.frame.SourceCode;import com.funtester.utils.RWUtil;import com.funtester.utils.Time;import org.apache.commons.lang3.StringUtils;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;
import java.util.ArrayList;import java.util.List;import java.util.Vector;import java.util.concurrent.ExecutorService;import java.util.concurrent.Phaser;import java.util.concurrent.atomic.AtomicInteger;
import static java.util.stream.Collectors.toList;
/** * 并发类,用于启动压力脚本 */public class HoldConcurrent extends SourceCode {
private static Logger logger = LogManager.getLogger(HoldConcurrent.class);
/** * 用来标记状态 */ public static AtomicInteger HOLD = new AtomicInteger(0);
/** * 开始时间 */ private long startTime;
/** * 结束时间 */ private long endTime;
/** * 任务描述 */ public String desc;
/** * 任务集 */ public List<ThreadBase> threads = new ArrayList<>();
/** * 线程数 */ public int threadNum;
/** * 执行失败总数 */ private int errorTotal;
/** * 任务执行失败总数 */ private int failTotal;
/** * 执行总数 */ private int executeTotal;
/** * 用于记录所有请求时间 */ public static Vector<Short> allTimes = new Vector<>();
/** * 记录所有markrequest的信息 */ public static Vector<String> requestMark = new Vector<>();
/** * 线程池 */ ExecutorService executorService;
/** * 多线程多阶段同步类,用于多线程任务阶段管理 */ public static Phaser phaser;
/** * @param thread 线程任务 * @param threadNum 线程数 * @param desc 任务描述 */ public HoldConcurrent(ThreadBase thread, int threadNum, String desc) { this(threadNum, desc); range(threadNum).forEach(x -> threads.add(thread.clone())); }
/** * @param threads 线程组 * @param desc 任务描述 */ public HoldConcurrent(List<ThreadBase> threads, String desc) { this(threads.size(), desc); this.threads = threads; }
private HoldConcurrent(int threadNum, String desc) { this.threadNum = threadNum; this.desc = StatisticsUtil.getFileName(desc); phaser = new Phaser(1); executorService = ThreadPoolUtil.createFixedPool(threadNum); }
private HoldConcurrent() {
}
/** * 执行多线程任务 * 默认取list中thread对象,丢入线程池,完成多线程执行,如果没有threadname,name默认采用desc+线程数作为threadname,去除末尾的日期 */ public PerformanceResultBean start() { Thread funtester = new Thread(new FunTester()); funtester.start(); ThreadBase.progress = new Progress(threads, StatisticsUtil.getTrueName(desc)); ThreadBase.progress.threadNum = 0; new Thread(ThreadBase.progress).start(); startTime = Time.getTimeStamp(); for (int i = 0; i < threadNum; i++) { if (HOLD.get() == 1) { threadNum = i; break; } ThreadBase thread = threads.get(i); if (StringUtils.isBlank(thread.threadName)) thread.threadName = StatisticsUtil.getTrueName(desc) + i; sleep(RUNUP_TIME / threadNum); executorService.execute(thread); ThreadBase.progress.threadNum = i + 1; logger.info("已经启动了 {} 个线程!", i + 1); } phaser.arriveAndAwaitAdvance(); executorService.shutdown(); ThreadBase.progress.stop(); threads.forEach(x -> { if (x.status()) failTotal++; errorTotal += x.errorNum; executeTotal += x.executeNum; }); endTime = Time.getTimeStamp(); HOLD.set(0); logger.info("总计{}个线程,共用时:{} s,执行总数:{},错误数:{},失败数:{}", threadNum, Time.getTimeDiffer(startTime, endTime), formatLong(executeTotal), errorTotal, failTotal); return over(); }
private static class FunTester implements Runnable {
@Override public void run() { waitForKey(INTPUT_KEY); HOLD.set(1); output("压力暂停"); }
}
private PerformanceResultBean over() { Save.saveIntegerList(allTimes, DATA_Path.replace(LONG_Path, EMPTY) + StatisticsUtil.getFileName(threadNum, desc)); Save.saveStringListSync(HoldConcurrent.requestMark, MARK_Path.replace(LONG_Path, EMPTY) + desc); allTimes = new Vector<>(); requestMark = new Vector<>(); return countQPS(threadNum, desc, Time.getTimeByTimestamp(startTime), Time.getTimeByTimestamp(endTime)); }
/** * 计算结果 * <p>此结果仅供参考</p> * 此处因为start和end的不准确问题,所以采用改计算方法,与fixQPS有区别 * * @param name 线程数 */ public PerformanceResultBean countQPS(int name, String desc, String start, String end) { List<String> strings = RWUtil.readTxtFileByLine(Constant.DATA_Path + StatisticsUtil.getFileName(name, desc)); int size = strings.size() == 0 ? 1 : strings.size(); List<Integer> data = strings.stream().map(x -> changeStringToInt(x)).collect(toList()); int sum = data.stream().mapToInt(x -> x).sum(); String statistics = StatisticsUtil.statistics(data, desc, threadNum); int rt = sum / size; double qps = 1000.0 * name / (rt == 0 ? 1 : rt); double qps2 = (executeTotal + errorTotal) * 1000.0 / (endTime - startTime); return new PerformanceResultBean(desc, start, end, name, size, rt, qps, qps2, getPercent(executeTotal, errorTotal), getPercent(threadNum, failTotal), executeTotal, statistics); }
/** * 用于做后期的计算 * * @param name * @param desc * @return */ public PerformanceResultBean countQPS(int name, String desc) { return countQPS(name, desc, Time.getDate(), Time.getDate()); }
}
评论