基于时间戳的日志回放引擎
之前写过一个日志回放引擎的第一代千万级日志回放引擎设计稿,当时理解的日志回放就是把日志记录的请求重新发出去,这就是回放线上用户的流量了。可是在我最近看goreplay
的过程中,重新刷新了我的认知。
查阅了一些资料,终于算是了解了一些基于时间戳的方案和思路。大体如下:通过工具把线上某段时间的流量记录下来,其中包含时间戳等信息,然后通过回放引擎把流量回放出去。
解决思路
目前流量回放集中于 HTTP 流量,所以之前写过的引擎的发压部分还是可以继续使用。所以我也有了自己的解决思路:
日志清洗,其实就是把规范化的日志解析成引擎框架可以使用的对象,通常包含 HTTP 请求的组成部分。
按照时间戳排序,通常使用现成的工具这一步是可以省略,但是由于日志记录是已经存在的组件,这里需要做一些兼容性工作
日志回放,通过线程池和连接池两个池化技术可以解决性能方面的问题。再结合当前的分布式方案做一些兼容功能即可。
其中最最核心的应该就是队列的选择,这里我用看 java 的java.util.concurrent.DelayQueue
,也没找到其他更好的框架了。其实在一开始我想复用自己写之前写的日志回放框架的队列,也尝试对集中常用队列进行了性能测试:
Java&Go高性能队列之LinkedBlockingQueue性能测试
2022-01-10
Java&Go高性能队列之Disruptor性能测试
2022-02-14
Java&Go高性能队列之channel性能测试
2022-02-17
本来想是用多线程去读取日志的过程中,通过判断每一条日志是否到时间点,然后丢到一个线程安全的队列中,后面用线程池取队列中的对象,发送请求的。但是仔细想来太复杂了,流量过了好几手,不利于实现和拓展功能。
然后我重新对java.util.concurrent.DelayQueue
进行了性能测试延迟队列DelayQueue性能测试,有了测试结果之后,就可以放心大胆地干了。关于延迟队列的基本使用可参考下单延迟10s撤单性能测试。
实现
总体来说实现起来思路比较清晰,我分成三部分分享。
属性定义
我首先定义了一个
com.funtester.frame.execute.ReplayConcurrent.ReplayLog
日志对象,用于存储每一个请求日志然后定义一个
com.funtester.frame.execute.ReplayConcurrent#logs
用来存储日志,这里旧事重提一下,千万级别的日志对象,存储在内存里面是 OK 的,所以我才会采用这种方式。为什么要从日志文件中转一手呢?因为日志是不按照时间戳排序的。再定义
com.funtester.frame.execute.ReplayConcurrent#logDelayQueue
用来当作回放请求队列,也就是流量中转站,生产者从com.funtester.frame.execute.ReplayConcurrent#logs
中取,clone
之后丢到队列中;消费者从队列中取对象,丢给线程池。定义
com.funtester.frame.execute.ReplayConcurrent#handle
当作是处理流量的方法,就是把流量对象包装成HttpRequestBase
对象然后发送出去
生产者
确定使用异步线程完成,使用Java自定义异步功能实践。
根据
com.funtester.frame.execute.ReplayConcurrent#logDelayQueue
性能测试数据,添加com.funtester.frame.execute.ReplayConcurrent#threadNum
参数来控制。多线程取
com.funtester.frame.execute.ReplayConcurrent#logs
对象,用到了几个线程安全类,用于保障多线程是顺序读取,避免了在延迟队列中进行排序操作。使用了
com.funtester.frame.execute.ReplayConcurrent#getMAX_LENGTH
控制队列的长度。貌似没找到限制延迟队列长度的 API。只能自己实现了,思路当添加日志数量超过最大值,存储当前队列长度。当长度大于最大长度,则在下一次添加对象前,休眠 1s,然后在重置本地存储的队列长度。这样可以解决这个问题。当然最大值设置足够高,避免 1s 中内队列变成空。回放引擎设计 50 万 QPS,所以我就先设置了 80 万的最大长度。后续可以根据实际情况调整。
消费者
依旧使用异步,生产者
使用 API 时
java.util.concurrent.DelayQueue#poll(long, java.util.concurrent.TimeUnit)
,避免阻塞导致线程无法终止。引入
com.funtester.frame.execute.ReplayConcurrent#getMultiple
控制流量回放的倍数。使用
com.funtester.frame.execute.ReplayConcurrent#getTotal
记录回放的日志数量。使用
com.funtester.frame.execute.ReplayConcurrent#getHandle
处理日志对象。
代码如下:
自测
下面是我的测试用例:
测试结果如下:
版权声明: 本文为 InfoQ 作者【FunTester】的原创文章。
原文链接:【http://xie.infoq.cn/article/acc1cc2f5b4876423fe18b546】。文章转载请联系作者。
评论