写点什么

千万级日志回放引擎设计稿

作者:FunTester
  • 2021 年 12 月 30 日
  • 本文字数:4452 字

    阅读完需:约 15 分钟

现在压测系统一直用的方案是 goreplay 进行二次开发完成的。因为整体是 Java 技术栈的,使用 goreplay 有存在两方面问题:一是兼容性,语言和开发框架上,增加了用例创建执行的复杂度;二是维护成本,goreplay 二次开发方案已经无法满足现在的性能测试需求。如果维护两套压测引擎会带来更多工作量。


所以为了尽可能解决这两方面问题,接到了一个活儿,调研一下 Java 实现日志回放功能。主要就是读了 goreplay 的源码以及它设计思路,用 Java 重现实现一遍。


这里用到了前两天分享的Disruptor高性能队列常用API演示高性能队列Disruptor在测试中应用,有兴趣的可以再翻一翻。另视频版还在制作中,年后会和大家相见。

思路

总体设计思路如下:



PS:流量递增和动态增减尚未实现,还在研究 goreplay 的源码。

日志拉取和解析

日志的拉取和初步解析依旧采取原来项目中的逻辑,通过 SQL 语句网关日志中拉取日志,并对日志内容进行初步解析,放入云 OSS 中,并将链接存入数据库(此步骤放在录制流量成功之后)。


PS:目前日志解析保留的有用信息只有 URL


日志格式如下:


/v1/level,funtester.com,-,token,-,1622611469,-/v1/level,funtester.com,-,token,-,1622611469,-/v1/level,funtester.com,-,token,-,1622611469,-/v1/level,funtester.com,-,token,-,1622611469,-/v1/level,funtester.com,-,token,-,1622611469,-/v1/level,funtester.com,-,token,-,1622611469,-/v1/level,funtester.com,-,token,-,1622611469,-
复制代码

实现步骤

  • 首先将日志中有用信息(URL)以及 token 放到内存中

  • 通过配置 host,读取 URL,以及响应 header(token,压测标识,常用 header,模拟盘标识)组装 HTTP 请求。

  • 创建 Disruptor 对象,使用异步创建生产者

  • 通过消费者消费(发出请求)消息(HTTP 请求对象),达到 HTTP 接口日志流量回复功能。

性能指标

  • 本机 6C16G 配置测试数据

  • 实测 1 千万 URL 读取速度约为 9s ~ 13s,内存无压力,如果后续更大日志量需求,可以通过 stream 方式异步读取日志,实测日志读取速度在 80 万/s 以上,满足目前需求。

  • 单生产者速度 25 万 QPS

  • 单机测试 QPS  8.8 万,CPU 跑满,触及物理极限,此数据与之前工具对比压测差异不大。

风险

  • 消费者异步对消息进行存储,超过一定数量将会丢弃消息。这个问题在消费者速度小于生产者速度时会触发。

  • 消费者数量需要在启动前设定,如果参数设置不合理,会导致消费者压力瓶颈,无法动态增加消费者。


PS:这些风险后续会逐个解决。

代码实现

生产者 Demo:

def ft = {    output("创建线程")    fun {        int i = 0        while (key) {            def url = logs.get(i % logs.size())            def get = getHttpGet(HOST + url)            get.addHeader("token", tokens.get(i % tokens.size()))            get.addHeader(HttpClientConstant.USER_AGENT)            ringBuffer.publishEvent {e, s ->                e.setRequest(get)            }            i++        }    }}ft()
复制代码

读取文件代码

/** * 通过闭包传入方法读取超大文件部分内容 * * @param filePath * @param function * @return */public static List<String> readByLine(String filePath, Function<String, String> function) {    if (StringUtils.isEmpty(filePath) || !new File(filePath).exists() || new File(filePath).isDirectory())        ParamException.fail("文件信息错误!" + filePath);    logger.debug("读取文件名:{}", filePath);    List<String> lines = new ArrayList<>();    File file = new File(filePath);    if (file.isFile() && file.exists()) { // 判断文件是否存在        try (FileInputStream fileInputStream = new FileInputStream(file);             InputStreamReader read = new InputStreamReader(fileInputStream, DEFAULT_CHARSET);             BufferedReader bufferedReader = new BufferedReader(read, 3 * 1024 * 1024);) {            String line = null;            while ((line = bufferedReader.readLine()) != null) {                String apply = function.apply(line);                if (StringUtils.isNotBlank(apply)) lines.add(apply);            }        } catch (Exception e) {            logger.warn("读取文件内容出错", e);        }    } else {        logger.warn("找不到指定的文件:{}", filePath);    }    return lines;}
复制代码

演示 Demo

package com.funtest.groovytest
import com.funtester.base.constaint.FixedThreadimport com.funtester.config.HttpClientConstantimport com.funtester.frame.execute.Concurrentimport com.funtester.frame.execute.ThreadPoolUtilimport com.funtester.httpclient.ClientManageimport com.funtester.httpclient.FunLibraryimport com.funtester.utils.ArgsUtilimport com.funtester.utils.RWUtilimport com.lmax.disruptor.EventHandlerimport com.lmax.disruptor.RingBufferimport com.lmax.disruptor.WorkHandlerimport com.lmax.disruptor.YieldingWaitStrategyimport com.lmax.disruptor.dsl.Disruptorimport com.lmax.disruptor.dsl.ProducerTypeimport org.apache.http.client.methods.HttpGetimport org.apache.http.client.methods.HttpRequestBaseimport org.junit.platform.commons.util.StringUtils
import java.util.concurrent.LinkedBlockingDequeimport java.util.function.Function
class ReplayTest extends FunLibrary {
static String url = "http://localhost:12345/test";
static HttpGet httpGet = getHttpGet(url);
// static LinkedBlockingQueue<HttpRequestBase> requests = new LinkedBlockingQueue<>()
static def HOST = "http://localhost:12345"
static def key = true
static Disruptor<RequestEvent> disruptor
public static void main(String[] args) { def logfile = "/Users/oker/Desktop/log.csv" // def logfile = "/Users/oker/Desktop/fun.csv" //1千万日志 def tokenfile = "/Users/oker/Desktop/token.csv" //2万用户token List<String> logs = RWUtil.readByLine(logfile, new Function<String, String>() {
@Override String apply(String s) { return StringUtils.isNotBlank(s) && s.startsWith("/") ? s.split(COMMA)[0] : null } }); List<String> tokens = RWUtil.readByLine(tokenfile, new Function<String, String>() {
@Override String apply(String s) { return StringUtils.isNotBlank(s) ? s.split(COMMA)[4] : null } });
output("总计 ${formatLong(logs.size())} 条日志") disruptor = new Disruptor<RequestEvent>( RequestEvent::new, 512 * 512, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() ); RingBuffer<RequestEvent> ringBuffer = disruptor.getRingBuffer();
def ft = { output("创建线程") fun { int i = 0 while (key) { def url = logs.get(i % logs.size()) def get = getHttpGet(HOST + url) get.addHeader("token", tokens.get(i % tokens.size())) get.addHeader(HttpClientConstant.USER_AGENT) ringBuffer.publishEvent {e, s -> e.setRequest(get) } i++ } } } ft() disruptor.handleEventsWith(new FunTester(10)) // 5.times {ft()}
//下面开始测试 ClientManage.init(10, 5, 0, "", 0) def util = new ArgsUtil(args) def thread = util.getIntOrdefault(0, 20) def times = util.getIntOrdefault(1, 60000) RUNUP_TIME = util.getIntOrdefault(2, 0) def tasks = [] thread.times { def tester = new FunTester(times) disruptor.handleEventsWith(tester); tasks << tester } disruptor.start(); new Concurrent(tasks, "这是千万级日志回放演示Demo").start()
}

private static class FunTester extends FixedThread implements EventHandler<RequestEvent>, WorkHandler<RequestEvent> {
LinkedBlockingDeque<HttpRequestBase> reqs = new LinkedBlockingDeque<HttpRequestBase>()
FunTester(int limit) { super(null, limit, true) }
@Override protected void doing() throws Exception { FunLibrary.executeOnly(reqs.take()) }
@Override FixedThread clone() { return new FunTester(limit) }
@Override protected void after() { super.after() key = false disruptor.shutdown() }
@Override void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception { if (reqs.size() < 100000) reqs.add(event.getRequest()) }
@Override void onEvent(RequestEvent event) throws Exception { if (reqs.size() < 100000) reqs.add(event.getRequest()) } }

private static class RequestEvent {
HttpRequestBase request;
public HttpRequestBase getRequest() { return request; }
public void setRequest(HttpRequestBase request) { this.request = request; }
}

}
复制代码


PS:这里用到了多个 group,原因在设计稿中标记了。

Have Fun ~ Tester !

发布于: 刚刚
用户头像

FunTester

关注

公众号:FunTester,650+原创,欢迎关注 2020.10.20 加入

Have Fun,Tester! 公众号FunTester,坚持原创文章的测试人。 FunTester测试框架作者,DCS_FunTester分布式性能测试框架作者。

评论

发布
暂无评论
千万级日志回放引擎设计稿