基于时间戳的日志回放引擎
之前写过一个日志回放引擎的第一代千万级日志回放引擎设计稿,当时理解的日志回放就是把日志记录的请求重新发出去,这就是回放线上用户的流量了。可是在我最近看goreplay的过程中,重新刷新了我的认知。
查阅了一些资料,终于算是了解了一些基于时间戳的方案和思路。大体如下:通过工具把线上某段时间的流量记录下来,其中包含时间戳等信息,然后通过回放引擎把流量回放出去。
解决思路
目前流量回放集中于 HTTP 流量,所以之前写过的引擎的发压部分还是可以继续使用。所以我也有了自己的解决思路:
- 日志清洗,其实就是把规范化的日志解析成引擎框架可以使用的对象,通常包含 HTTP 请求的组成部分。 
- 按照时间戳排序,通常使用现成的工具这一步是可以省略,但是由于日志记录是已经存在的组件,这里需要做一些兼容性工作 
- 日志回放,通过线程池和连接池两个池化技术可以解决性能方面的问题。再结合当前的分布式方案做一些兼容功能即可。 
其中最最核心的应该就是队列的选择,这里我用看 java 的java.util.concurrent.DelayQueue,也没找到其他更好的框架了。其实在一开始我想复用自己写之前写的日志回放框架的队列,也尝试对集中常用队列进行了性能测试:
- Java&Go高性能队列之LinkedBlockingQueue性能测试 - 2022-01-10
- Java&Go高性能队列之Disruptor性能测试 - 2022-02-14
- Java&Go高性能队列之channel性能测试 - 2022-02-17
本来想是用多线程去读取日志的过程中,通过判断每一条日志是否到时间点,然后丢到一个线程安全的队列中,后面用线程池取队列中的对象,发送请求的。但是仔细想来太复杂了,流量过了好几手,不利于实现和拓展功能。
然后我重新对java.util.concurrent.DelayQueue进行了性能测试延迟队列DelayQueue性能测试,有了测试结果之后,就可以放心大胆地干了。关于延迟队列的基本使用可参考下单延迟10s撤单性能测试。
实现
总体来说实现起来思路比较清晰,我分成三部分分享。
属性定义
- 我首先定义了一个 - com.funtester.frame.execute.ReplayConcurrent.ReplayLog日志对象,用于存储每一个请求日志
- 然后定义一个 - com.funtester.frame.execute.ReplayConcurrent#logs用来存储日志,这里旧事重提一下,千万级别的日志对象,存储在内存里面是 OK 的,所以我才会采用这种方式。为什么要从日志文件中转一手呢?因为日志是不按照时间戳排序的。
- 再定义 - com.funtester.frame.execute.ReplayConcurrent#logDelayQueue用来当作回放请求队列,也就是流量中转站,生产者从- com.funtester.frame.execute.ReplayConcurrent#logs中取,- clone之后丢到队列中;消费者从队列中取对象,丢给线程池。
- 定义 - com.funtester.frame.execute.ReplayConcurrent#handle当作是处理流量的方法,就是把流量对象包装成- HttpRequestBase对象然后发送出去
生产者
- 确定使用异步线程完成,使用Java自定义异步功能实践。 
- 根据 - com.funtester.frame.execute.ReplayConcurrent#logDelayQueue性能测试数据,添加- com.funtester.frame.execute.ReplayConcurrent#threadNum参数来控制。
- 多线程取 - com.funtester.frame.execute.ReplayConcurrent#logs对象,用到了几个线程安全类,用于保障多线程是顺序读取,避免了在延迟队列中进行排序操作。
- 使用了 - com.funtester.frame.execute.ReplayConcurrent#getMAX_LENGTH控制队列的长度。貌似没找到限制延迟队列长度的 API。只能自己实现了,思路当添加日志数量超过最大值,存储当前队列长度。当长度大于最大长度,则在下一次添加对象前,休眠 1s,然后在重置本地存储的队列长度。这样可以解决这个问题。当然最大值设置足够高,避免 1s 中内队列变成空。回放引擎设计 50 万 QPS,所以我就先设置了 80 万的最大长度。后续可以根据实际情况调整。
消费者
- 依旧使用异步,生产者 
- 使用 API 时 - java.util.concurrent.DelayQueue#poll(long, java.util.concurrent.TimeUnit),避免阻塞导致线程无法终止。
- 引入 - com.funtester.frame.execute.ReplayConcurrent#getMultiple控制流量回放的倍数。
- 使用 - com.funtester.frame.execute.ReplayConcurrent#getTotal记录回放的日志数量。
- 使用 - com.funtester.frame.execute.ReplayConcurrent#getHandle处理日志对象。
代码如下:
自测
下面是我的测试用例:
测试结果如下:
版权声明: 本文为 InfoQ 作者【FunTester】的原创文章。
原文链接:【http://xie.infoq.cn/article/acc1cc2f5b4876423fe18b546】。文章转载请联系作者。










 
    
评论