写点什么

DCS_FunTester 分布式压测框架更新(二)

用户头像
FunTester
关注
发布于: 1 小时前

经过一阵子的断断续续的测试,DCS_FunTester分布式压测框架更新(一)完毕之后又增加了一些更新。

增加方案三支持

分布式性能测试框架用例方案设想(三)基于docker的分布式性能测试框架功能验证(三)中,我提到了方案三:基于Groovy脚本执行的测试用例,这次更新将支持执行Groovy测试用例。目前除了访问验证以外,但是对于脚本内容尚未过滤。


下面是master节点实现方法:


    @Override    int runScript(GroovyScript script) {        def mark = SourceCode.getMark()        def num = script.getMark()        def hosts = NodeData.getRunHost(num)        try {            hosts.each {                script.setMark(mark)                def re = MasterManager.runRequest(it, script)                if (!re) FailException.fail()                NodeData.addTask(it, mark)            }        } catch (FailException e) {            hosts.each { f -> MasterManager.stop(f) }            FailException.fail("多节点执行失败!")        }        mark    }
复制代码


下面是slave节点的实现方法:


    @Override    public void runScript(GroovyScript script) {        ExecuteGroovy.executeScript(script.getScript());    }
复制代码


这里没有传值,留个参数params以后可以用来做脚本化参数配置。

增加注册机制

增加了master节点之后也就没有slave节点的直接访问。


注册机制我自己写了一个简单的实现,放在一个类里面。


package com.funtester.master.common.basedata;
import com.funtester.base.bean.PerformanceResultBean;import com.funtester.base.exception.FailException;import com.funtester.frame.SourceCode;import com.funtester.master.common.bean.manager.RunInfoBean;
import java.util.ArrayList;import java.util.List;import java.util.concurrent.ConcurrentHashMap;
public class NodeData {
/** * 节点状态 */ public static ConcurrentHashMap<String, Boolean> status = new ConcurrentHashMap<>();
/** * 节点的运行信息,通过progress获取 */ public static ConcurrentHashMap<String, String> runInfos = new ConcurrentHashMap<>();
/** * 节点的运行结果 */ public static ConcurrentHashMap<Integer, List<PerformanceResultBean>> results = new ConcurrentHashMap<>();
/** * 节点更新时间 */ public static ConcurrentHashMap<String, Integer> time = new ConcurrentHashMap<>();
/** * 节点运行的任务id */ public static ConcurrentHashMap<String, Integer> tasks = new ConcurrentHashMap<>();
public static void register(String host, boolean s) { synchronized (status) { status.put(host, s); mark(host); } }
/** * 可用节点 * * @return */ public static List<String> available() { synchronized (status) { List<String> availables = new ArrayList<>(); status.forEach((k, v) -> { if (v) availables.add(k); }); return availables; } }
/** * 标记节点时间 * * @param host */ private static void mark(String host) { time.put(host, SourceCode.getMark()); }
/** * 检查,删除过期节点和过期数据,提供定时任务执行 */ public static void check() { int timeStamp = SourceCode.getMark(); List<String> hkeys = new ArrayList<>(); synchronized (status) { time.forEach((k, v) -> { if (timeStamp - v > 12) { hkeys.add(k); } }); hkeys.forEach(f -> status.remove(f)); } synchronized (runInfos) { hkeys.forEach(f -> runInfos.remove(f)); } synchronized (tasks) { hkeys.forEach(f -> tasks.remove(f)); tasks.forEach((k, v) -> { if (timeStamp - v > 60 * 30) tasks.put(k, 0); }); } synchronized (results) { List<Integer> tkeys = new ArrayList<>(); results.forEach((k, v) -> { if (k - timeStamp > 3_3600) { tkeys.add(k); } }); tkeys.forEach(f -> results.remove(f)); } }
/** * 添加运行信息 * * @param bean */ public static void addRunInfo(RunInfoBean bean) { synchronized (runInfos) { runInfos.put(bean.getHost(), bean.getRuninfo()); } }
/** * 获取描述的的用例任务运行信息 * * @param desc 任务描述信息 * @return */ public static List<String> getRunInfo(String desc) { synchronized (runInfos) { ArrayList<String> infos = new ArrayList<>(); runInfos.forEach((k, v) -> { if (v.contains(desc)) { infos.add(v); } }); return infos; } }
/** * 添加运行信息 * * @param bean */ public static void addResult(int mark, PerformanceResultBean bean) { synchronized (results) { results.computeIfAbsent(mark, f -> new ArrayList<PerformanceResultBean>()); results.get(mark).add(bean); } }
/** * 添加节点运行任务id * @param host * @param mark */ public static void addTask(String host, Integer mark) { synchronized (tasks) { if (status.get(host) != null && status.get(host) == false) { tasks.put(host, mark); } } }
public static List<String> getRunHost(int num) { synchronized (status) { List<String> available = available(); if (num < 1 || num > available.size()) FailException.fail("没有足够节点执行任务"); List<String> nods = new ArrayList<>(); for (int i = 0; i < num; i++) { String random = SourceCode.random(available); status.put(random, false); nods.add(random); } return nods; }

}
}
复制代码


这里写的有点复杂,未来计划写入Redis或者借助其他的成熟组件完成。本来想把节点信息封装成一个对象的形式,后来想想还是比较麻烦,如果分开处理会比较容易。

取消 slave 节点访问

统一由master节点分配任务运行用例,自然要取消slave节点的访问权限,但是现在还有一部分接口暴露出来,swagger文档中没有表名。其实刷新master节点信息重新注册节点两个功能留作子节点出错时候使用。

service 层提取

之前的功能全然写成了一个静态方法,提取了service接口,主要方法如下:


package com.funtester.master.service
import com.funtester.slave.common.bean.run.GroovyScriptimport com.funtester.slave.common.bean.run.HttpRequestimport com.funtester.slave.common.bean.run.HttpRequestsimport com.funtester.slave.common.bean.run.LocalMethodimport com.funtester.slave.common.bean.run.ManyRequest
interface IRunService {
public int runRequest(HttpRequest request)
public int runRequests(HttpRequests request)
public int runMethod(LocalMethod method)
public int runScript(GroovyScript script)
}
复制代码


其中每个对象都有一个mark属性,对于master节点来说,就是执行的节点数,对于slave节点来说,就是执行任务的标记。

更新同步信息

这里分享一下思路:


  1. 启动master节点

  2. 启动slave节点,首先会请求master(配置或者接口设置),获取本机IP

  3. 然后slave节点通过定时任务将状态同步到master节点。


没有使用Socket接口,总觉得麻烦。


Have Fun ~ Tester !

FunTester 测试框架和分布式测试框架 DCS_FunTester 官方账号,欢迎关注!




点击阅读阅文,查看 FunTester 历史原创集合

发布于: 1 小时前阅读数: 2
用户头像

FunTester

关注

公众号:FunTester,Have Fun, Tester! 2020.10.20 加入

Have Fun,Tester!

评论

发布
暂无评论
DCS_FunTester分布式压测框架更新(二)