写点什么

基于时间戳的日志回放引擎

作者:FunTester
  • 2022 年 8 月 22 日
    北京
  • 本文字数:4645 字

    阅读完需:约 15 分钟

之前写过一个日志回放引擎的第一代千万级日志回放引擎设计稿,当时理解的日志回放就是把日志记录的请求重新发出去,这就是回放线上用户的流量了。可是在我最近看goreplay的过程中,重新刷新了我的认知。


查阅了一些资料,终于算是了解了一些基于时间戳的方案和思路。大体如下:通过工具把线上某段时间的流量记录下来,其中包含时间戳等信息,然后通过回放引擎把流量回放出去。

解决思路

目前流量回放集中于 HTTP 流量,所以之前写过的引擎的发压部分还是可以继续使用。所以我也有了自己的解决思路:


  1. 日志清洗,其实就是把规范化的日志解析成引擎框架可以使用的对象,通常包含 HTTP 请求的组成部分。

  2. 按照时间戳排序,通常使用现成的工具这一步是可以省略,但是由于日志记录是已经存在的组件,这里需要做一些兼容性工作

  3. 日志回放,通过线程池和连接池两个池化技术可以解决性能方面的问题。再结合当前的分布式方案做一些兼容功能即可。


其中最最核心的应该就是队列的选择,这里我用看 java 的java.util.concurrent.DelayQueue,也没找到其他更好的框架了。其实在一开始我想复用自己写之前写的日志回放框架的队列,也尝试对集中常用队列进行了性能测试:



本来想是用多线程去读取日志的过程中,通过判断每一条日志是否到时间点,然后丢到一个线程安全的队列中,后面用线程池取队列中的对象,发送请求的。但是仔细想来太复杂了,流量过了好几手,不利于实现和拓展功能。


然后我重新对java.util.concurrent.DelayQueue进行了性能测试延迟队列DelayQueue性能测试,有了测试结果之后,就可以放心大胆地干了。关于延迟队列的基本使用可参考下单延迟10s撤单性能测试

实现

总体来说实现起来思路比较清晰,我分成三部分分享。

属性定义

  1. 我首先定义了一个com.funtester.frame.execute.ReplayConcurrent.ReplayLog日志对象,用于存储每一个请求日志

  2. 然后定义一个com.funtester.frame.execute.ReplayConcurrent#logs用来存储日志,这里旧事重提一下,千万级别的日志对象,存储在内存里面是 OK 的,所以我才会采用这种方式。为什么要从日志文件中转一手呢?因为日志是不按照时间戳排序的。

  3. 再定义com.funtester.frame.execute.ReplayConcurrent#logDelayQueue用来当作回放请求队列,也就是流量中转站,生产者从com.funtester.frame.execute.ReplayConcurrent#logs中取,clone之后丢到队列中;消费者从队列中取对象,丢给线程池。

  4. 定义com.funtester.frame.execute.ReplayConcurrent#handle当作是处理流量的方法,就是把流量对象包装成HttpRequestBase对象然后发送出去

生产者

  1. 确定使用异步线程完成,使用Java自定义异步功能实践

  2. 根据com.funtester.frame.execute.ReplayConcurrent#logDelayQueue性能测试数据,添加com.funtester.frame.execute.ReplayConcurrent#threadNum参数来控制。

  3. 多线程取com.funtester.frame.execute.ReplayConcurrent#logs对象,用到了几个线程安全类,用于保障多线程是顺序读取,避免了在延迟队列中进行排序操作。

  4. 使用了com.funtester.frame.execute.ReplayConcurrent#getMAX_LENGTH控制队列的长度。貌似没找到限制延迟队列长度的 API。只能自己实现了,思路当添加日志数量超过最大值,存储当前队列长度。当长度大于最大长度,则在下一次添加对象前,休眠 1s,然后在重置本地存储的队列长度。这样可以解决这个问题。当然最大值设置足够高,避免 1s 中内队列变成空。回放引擎设计 50 万 QPS,所以我就先设置了 80 万的最大长度。后续可以根据实际情况调整。

消费者

  1. 依旧使用异步,生产者

  2. 使用 API 时java.util.concurrent.DelayQueue#poll(long, java.util.concurrent.TimeUnit),避免阻塞导致线程无法终止。

  3. 引入com.funtester.frame.execute.ReplayConcurrent#getMultiple控制流量回放的倍数。

  4. 使用com.funtester.frame.execute.ReplayConcurrent#getTotal记录回放的日志数量。

  5. 使用com.funtester.frame.execute.ReplayConcurrent#getHandle处理日志对象。


代码如下:


package com.funtester.frame.execute
import com.funtester.base.bean.AbstractBeanimport com.funtester.frame.SourceCodeimport com.funtester.utils.LogUtilimport com.funtester.utils.RWUtilimport org.apache.logging.log4j.LogManagerimport org.apache.logging.log4j.Logger
import java.util.concurrent.DelayQueueimport java.util.concurrent.Delayedimport java.util.concurrent.ThreadPoolExecutorimport java.util.concurrent.TimeUnitimport java.util.concurrent.atomic.AtomicIntegerimport java.util.concurrent.atomic.LongAdder
/** * 回放功能执行类*/class ReplayConcurrent extends SourceCode {
private static Logger logger = LogManager.getLogger(ReplayConcurrent.class);
static ThreadPoolExecutor executor
static boolean key = true
static int MAX_LENGTH = 800000
int threadNum = 2
String name
String fileName
int multiple
Closure handle
List<ReplayLog> logs
DelayQueue<ReplayLog> logDelayQueue = new DelayQueue<ReplayLog>()
LongAdder total = new LongAdder()
ReplayConcurrent(String name, String fileName, int multiple, Closure handle) { this.name = name this.fileName = fileName this.multiple = multiple this.handle = handle
}
void start() { if (executor == null) executor = ThreadPoolUtil.createCachePool(THREADPOOL_MAX, "R") time({ RWUtil.readFile(fileName, { def delay = new ReplayLog(it) if (delay.getTimestamp() != 0) logDelayQueue.add(delay) }) }, 1, "读取日志$fileName") logs = logDelayQueue.toList() def timestamp = logs.get(0).getTimestamp() logDelayQueue.clear() AtomicInteger index = new AtomicInteger() AtomicInteger size = new AtomicInteger() def LogSize = logs.size() AtomicInteger diff = new AtomicInteger() threadNum.times { fun { while (key) { if (index.get() % LogSize == 0) diff.set(getMark() - timestamp) if (index.get() % MAX_LENGTH == 0) size.set(logDelayQueue.size()) if (size.get() > MAX_LENGTH) { sleep(1.0) size.set(logDelayQueue.size()) } def replay = logs.get(index.getAndIncrement() % LogSize) logDelayQueue.add(replay.clone(replay.timestamp + diff.get())) } } } threadNum.times { fun { while (key) { def poll = logDelayQueue.poll(1, TimeUnit.SECONDS) if (poll != null) { executor.execute { multiple.times { handle(poll.getUrl()) total.add(1) } }
} } } } fun { while (key) { sleep(COUNT_INTERVAL as double) int real = total.sumThenReset() / COUNT_INTERVAL as int def active = executor.getActiveCount() def count = active == 0 ? 1 : active logger.info("{} ,实际QPS:{} 活跃线程数:{} 单线程效率:{}", name, real, active, real / count as int) } }
}
/** * 中止 * @return */ def stop() { key = false executor.shutdown() logger.info("replay压测关闭了!") }
/** * 日志对象*/ static class ReplayLog extends AbstractBean implements Delayed {
int timestamp
String url
ReplayLog(String logLine) { def log = LogUtil.getLog(logLine) this.url = log.getUrl() this.timestamp = log.getTime() }
ReplayLog(int timestamp, String url) { this.timestamp = timestamp this.url = url }
@Override long getDelay(TimeUnit unit) { return this.timestamp - getMark() }
@Override int compareTo(Delayed o) { return this.timestamp - o.timestamp }
protected Object clone(int timestamp) { return new ReplayLog(timestamp, this.url) } }}
复制代码

自测

下面是我的测试用例:


package com.okcoin.hickwall.presses
import com.okcoin.hickwall.presses.funtester.frame.execute.ReplayConcurrentimport com.okcoin.hickwall.presses.funtester.httpclient.FunHttp
class RplayT extends FunHttp {
static String HOST = "http://localhost:12345"
public static void main(String[] args) { def fileName = "api.log" new ReplayConcurrent("测试回放功能", fileName, 1, {String url -> getHttpResponse(getHttpGet(HOST + url)) }).start()
}}
复制代码


测试结果如下:


22:45:43.510 main   ###### #     #  #    # ####### ######  #####  ####### ######  #####  #      #     #  ##   #    #    #       #         #    #       #    #  ####   #     #  # #  #    #    ####    #####     #    ####    #####  #      #     #  #  # #    #    #            #    #    #       #   #  #       #####   #    #    #    ######  #####     #    ######  #    #  10:56:18 F-5 测试回放功能, 实际QPS:23162 活跃线程数:0单线程效率:2316210:56:23 F-5 测试回放功能, 实际QPS:36575 活跃线程数:6单线程效率:609510:56:28 F-5 测试回放功能, 实际QPS:38974 活跃线程数:21单线程效率:1855 10:56:33 F-5 测试回放功能, 实际QPS:32798 活跃线程数:8单线程效率:409910:56:38 F-5 测试回放功能,实际QPS:35224 活跃线程数:4单线程效率:880610:56:43 F-5 测试回放功能,实际QPS:28426 活跃线程数:0单线程效率:2842610:56:48 F-5 测试回放功能, 实际QPS:33607 活跃线程数:6单线程效率:560110:56:53 F-5 测试回放功能,实际QPS:34392 活跃线程数:0单线程效率:34392
复制代码


发布于: 刚刚阅读数: 3
用户头像

FunTester

关注

公众号:FunTester,800篇原创,欢迎关注 2020.10.20 加入

Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester

评论

发布
暂无评论
基于时间戳的日志回放引擎_FunTester_InfoQ写作社区