写点什么

【稳定性平台】GOREPLAY 流量录制回放实战

用户头像
得物技术
关注
发布于: 刚刚

GoReplay 简介

随着应用程序的复杂度的增长,测试它所需要的工作量也呈指数级增长。 GoReplay 为我们提供了复用现有流量进行测试的简单想法。GoReplay 是一个用 golang 开发的简单的流量录制插件,支持多种方式的过滤,限流放大,重写等等特性。GoReplay 可以做到对代码完全无侵入性,也不需要更改你的生产基础设施,并且与语言无关。它不是代理,而是直接监听网卡上的流量。


GoReplay 工作方式:listener server 捕获流量,并将其发送至 replay server 或者保存至文件,或者保存到 kafka。然后 replay server 会将流量转移至配置的地址


使用过程

需求:接到算法侧的需求,需要录制真实的生产环境流量,并且随时回放到任意环境。

由于算法侧部分场景为非 Java 语言编写,现存的流量录制平台暂时无法支持,需要采用新的录制组件来支撑压测需求,遂选择 goreplay 。

GoReplay 支持将录制的数据存储到本地文件中,然后回放时从文件中读取。考虑到每次录制回放时需要进行存储及下发文件的复杂度,我们期望使用更便捷的方式来管理数据。

GoReplay 也是原生支持录制数据存储到 kafka 中的,但是在使用的时候,发现它有较大的限制;使用 kafka 存储数据时,必须是流量录制的同时进行流量回放,其架构图如下:


流程 1-4 无法拆分,只能同时进行

这会显得流量录制回放功能很鸡肋,我们需要录制好的数据任意时刻重放,并且也要支持将一份录制好的数据多次重放。既然它已经将流量数据存储到了 kafka,我们就可以考虑对 GoReplay 进行改造,以让他支持我们的需求。


改造后的流量录制回放架构图:


图中,1-2 与 3-5 阶段是相互独立的


也就是说,流量录制过程与回放过程可以拆开。只需要在录制开始与结束的时候记录 kafka 的 offset,就可以知道这个录制任务包含了哪些数据,我们可以轻松的将每一段录制数据,整理成录制任务,然后在需要的时候进行流量回放。


改造与整合

kafka offset 支持改造

简要过程:

源码中的 InputKafkaConfig 的定义

type InputKafkaConfig struct {	producer sarama.AsyncProducer	consumer sarama.Consumer	Host     string `json:"input-kafka-host"`	Topic    string `json:"input-kafka-topic"`	UseJSON  bool   `json:"input-kafka-json-format"`}
复制代码

修改后的 InputKafkaConfig 的定义

type InputKafkaConfig struct {	producer  sarama.AsyncProducer	consumer  sarama.Consumer	Host      string `json:"input-kafka-host"`	Topic     string `json:"input-kafka-topic"`	UseJSON   bool   `json:"input-kafka-json-format"`	StartOffset    int64  `json:"input-kafka-offset"`	EndOffset int64  `json:"input-kafka-end-offset"`}
复制代码

源码中,从 kafka 读取数据的片段:可以看到,它选取的 offset 是 Newest

for index, partition := range partitions {		consumer, err := con.ConsumePartition(config.Topic, partition, sarama.OffsetNewest)
go func(consumer sarama.PartitionConsumer) { defer consumer.Close()
for message := range consumer.Messages() { i.messages <- message } }(consumer)
}
复制代码

修改过后的从 kafka 读数据的片段:

for index, partition := range partitions {		consumer, err := con.ConsumePartition(config.Topic, partition, config.StartOffset)		offsetEnd := config.EndOffset - 1
go func(consumer sarama.PartitionConsumer) { defer consumer.Close()
for message := range consumer.Messages() { // 比较消息的offset, 当超过这一批数据的最大值的时候,关闭通道 if offsetFlag && message.Offset > offsetEnd { i.quit <- struct{}{} break } i.messages <- message } }(consumer) }
复制代码

此时,只要在启动回放任务时,指定 kafka offset 的范围。就可以达到我们想要的效果了。


整合到压测平台


通过页面简单的填写选择操作,然后生成启动命令,来替代冗长的命令编写

StringBuilder builder = new StringBuilder("nohup /opt/apps/gor/gor");// 拼接参数 组合命令builder.append(" --input-kafka-host ").append("'").append(kafkaServer).append("'");builder.append(" --input-kafka-topic ").append("'").append(kafkaTopic).append("'");builder.append(" --input-kafka-start-offset ").append(record.getStartOffset());builder.append(" --input-kafka-end-offset ").append(record.getEndOffset());builder.append(" --output-http ").append(replayDTO.getTargetAddress());builder.append(" --exit-after ").append(replayDTO.getMonitorTimes()).append("s");if (StringUtils.isNotBlank(replayDTO.getExtParam())) {  builder.append(" ").append(replayDTO.getExtParam());}builder.append(" > /opt/apps/gor/replay.log 2>&1 &");String completeParam = builder.toString();
复制代码

压测平台通过 Java agent 暴露的接口来控制 GoReplay进程的启停

String sourceAddress = replayDTO.getSourceAddress();String[] split = sourceAddress.split(COMMA);for (String ip : split) {  String uri = String.format(HttpTrafficRecordServiceImpl.BASE_URL + "/gor/start", ip, 	 											HttpTrafficRecordServiceImpl.AGENT_PORT);  // 重新创建对象  GoreplayRequest request = new GoreplayRequest();  request.setConfig(replayDTO.getCompleteParam());  request.setType(0);  try {    restTemplate.postForObject(uri, request, String.class);  } catch (RestClientException e) {    LogUtil.error("start gor fail,please check it!", e);    MSException.throwException("start gor fail,please check it!", e);  }}
复制代码


发布于: 刚刚阅读数: 2
用户头像

得物技术

关注

得物APP技术部 2019.11.13 加入

关注微信公众号「得物技术」

评论

发布
暂无评论
【稳定性平台】GOREPLAY流量录制回放实战