一、问题概述
客户的生产环境突然在近期间歇式的收到了 Kafka CRC 的相关异常,异常内容如下
Record batch for partition skywalking-traces-0 at offset 292107075 is invalid, cause: Record is corrupt (stored crc = 1016021496, compute crc = 1981017560)
复制代码
报错没有规律性,有可能半天都不出现一次,也有可能一小时出现 2、3 次
而这个报错会导致 Kafka 的 Consumer hang 死,即无法继续消费后续的消息,只能手动重启 Consumer 才能继续,是非常严厉的报错,导致生产不可用
简单解释一下这个异常的原因,Kafka 会在每个 Batch 的 header 中存储持久化下来的消息体的 CRC,所谓 CRC 可以简单理解为 TCP 的 checkSum,而后 Consumer 在收到这条消息后,将 Batch header 中存储的 CRC 取出来,然后再根据统一的 CRC 算法计算收到的消息体的 CRC 值,而后拿上这两个值做一下比对,如果一样,说明消息没有被篡改,如果不一样,就会扔出 CRC 异常
由于是公司的重保客户,又出现如此严重的惊天 bug,一时间公司的高层领导均高度重视此事,每天晚上的夕会(均是领导)也都要单独询问解决进度,一时间,压力山大
二、分析定位
2.1、问题分析
在公司另外一个项目中(简称 A 客户,我们当前要处理的简称 B 客户),这个报错并不陌生,且报错的内容一模一样,那这两者会不会是同一个 bug 呢?答案是否定的
那这个问题不是常规问题,就比较棘手了
2.2、Kafka 自身 bug ?
笔者长期活跃在 Kafka 社区,是 Kafka 社区的 Contributer,且在阿里的公有云工作多年,但这个报错在之前却是一次没有遇到过
在我印象中,CRC 错误已经好久没见到了,而后我排查了社区所有 CRC 有关的 issue list,发现只有在 0.10 之前的低版本中出现过类似异常,而我们的 B 客户生产环境使用的是 2.8.2,这是整个 2 版本中的最后一个版本,而 Kafka 2.0 以后的版本却很少出现这个异常
而 Kafka 自身计算 CRC 的逻辑却非常简洁,只有几十行 java 代码,综上,笔者认为 Kafka 自身出 bug 的概率相对较小,不能把重要精力放在这块
2.3、报错倾向分析
接下来我们便对这个异常的报错倾向开始分析,经过各种排查,它具备以下特质:
至此,好像均一切正常,粗略排查后,没有发现有价值的线索。因此我们便将重心转移至“网络”+“磁盘”这两块看似不太可能出现问题的基础能力上
三、网络排查
3.1、埋点
进行网络排查的话,我们就要执行 TCP 抓包,即执行 tcpdump 命令,但抓包也不是简单在机器上执行一个命令那么简单,当前的部署环境是 3 台 Broker、3 台 Consumer,异常会暴露在 Consumer 中,但单台 Consumer 可能会与多台 Broker 建连
因此抓包的思路是在 3 台 Broker 上均创建监听,且在某一台 consumer 也创建监听,而后观察 TCP 发送出去的数据与接收到的数据是否一致
首先在 3 台 Broker 上执行 dump 命令,由于出现问题的概率不高,可能导致网络 dump 包非常大,因此在命令中将 TCP 包拆成 500M 大小的文件,且只保留最近的 5 个文件,3 台 Broker 的 IP 如下:
10.0.0.70
10.0.0.71
10.0.0.72
而 3 台 Consumer 的 IP 为:
10.0.0.18
10.0.0.19
10.0.0.20
首先在 1 台 consumer 上执行如下命令:
sudo nuhup tcpdump -i vethfe2axxxx -nve host 10.0.0.70 or host 10.0.0.71 or host 10.0.0.72 -w broker_18.pcap -C 500 -W 5 -Z ccadmin &
sudo nuhup tcpdump -i vethfe2axxxx -nve host 10.0.0.70 or host 10.0.0.71 or host 10.0.0.72 -w broker_19.pcap -C 500 -W 5 -Z ccadmin &
sudo nuhup tcpdump -i vethfe2axxxx -nve host 10.0.0.70 or host 10.0.0.71 or host 10.0.0.72 -w broker_20.pcap -C 500 -W 5 -Z ccadmin &
复制代码
其次在 3 台 Broker 上分别执行如下:
sudo nuhup tcpdump -i xxxxxxxxx1 -nve host 10.0.0.18 -w broker_18.pcap -C 500 -W 5 -Z ccadmin &
sudo nuhup tcpdump -i xxxxxxxxx2 -nve host 10.0.0.19 -w broker_19.pcap -C 500 -W 5 -Z ccadmin &
sudo nuhup tcpdump -i xxxxxxxxx3 -nve host 10.0.0.20 -w broker_20.pcap -C 500 -W 5 -Z ccadmin &
复制代码
简单解释一下命令含义:
3.2、异常浮现
而后我们在屏幕前等待了漫长的近 2 个小时后,终于捕获到了一条异常信息,因此马不停蹄地将数据下载下来,进行对照分析。首先看到的便是Wireshark
帮助解析生成的前后报文
报文总长度为 2712 个字节,约 3K 的数据,但我们惊讶地发现,其中有 15 个字节的数据出现了偏差
至此,定位可能就是网络传输出现了问题,不过我们还需要做更多的校验工作。Wireshark
工具虽然好用,但却不是万能的,它只能解析 Kafka 的传输协议,无法对内容进行校验与还原
(我们在样本中随机取了 3 个正常交互的消息进行对比,发现在网络发送前与发送后的数据完全一致)
3.3、Kafka 协议解析
Kafka 的协议分为传输协议及存储协议,笔者对它们相对比较熟悉,因此开始直接对前后传输的报文进行了解析,首先是 Request
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.RequestHeader;
import javax.xml.bind.DatatypeConverter;
import java.nio.ByteBuffer;
public class MyTest {
public static void main(String[] args) {
// 08-30 15:06:40.820028
String hexString =
"0e 1f 48 2d 7e 32 06 82 25 e9 a3 d9 08 00 45 00 " +
"00 ab 3f b3 40 00 40 06 35 ed 62 02 00 55 62 02 " +
"00 54 eb 2e 23 85 68 57 32 37 b5 08 3f b2 80 18 " +
"7d 2c c5 4a 00 00 01 01 08 0a b7 d1 e6 5c 26 30 " +
"b9 11 00 00 00 73 00 01 00 0c 11 85 6f bf 00 15 " +
"62 72 6f 6b 65 72 2d 31 30 30 32 2d 66 65 74 63 " +
"68 65 72 2d 30 00 00 00 03 ea 00 00 01 f4 00 00 " +
"00 01 00 a0 00 00 00 72 52 11 e0 11 85 6f bf 02 " +
"13 5f 5f 63 6f 6e 73 75 6d 65 72 5f 6f 66 66 73 " +
"65 74 73 02 00 00 00 15 00 00 00 03 00 00 00 00 " +
"13 36 67 3a 00 00 00 03 00 00 00 00 00 00 00 00 " +
"00 10 00 00 00 00 01 01 00";
// FetchRequestData(clusterId=null, replicaId=1002, maxWaitMs=500, minBytes=1, maxBytes=10485760,
// isolationLevel=0, sessionId=1917981152, sessionEpoch=293957567,
// topics=[FetchTopic(topic='__consumer_offsets', partitions=[FetchPartition(partition=21, currentLeaderEpoch=3,
// fetchOffset=322332474, lastFetchedEpoch=3, logStartOffset=0, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='')
StringBuilder sb = new StringBuilder();
char[] charArray = hexString.toCharArray();
for (char c : charArray) {
if (c != ' ') {
sb.append(c);
}
}
hexString = sb.toString();
byte[] byteArray = DatatypeConverter.parseHexBinary(hexString);
ByteBuffer byteBuffer = ByteBuffer.wrap(byteArray);
System.out.println("len is : " + byteBuffer.array().length);
byteBuffer.get(new byte[66]);
byteBuffer.getInt();
RequestHeader header = RequestHeader.parse(byteBuffer);
System.out.println(header);
FetchRequest fetchRequest = FetchRequest.parse(byteBuffer, (short) 12);
System.out.println(fetchRequest);
}
}
复制代码
最终输出如下
len is : 185
RequestHeader(apiKey=FETCH, apiVersion=12, clientId=broker-1002-fetcher-0, correlationId=293957567)
FetchRequestData(clusterId=null, replicaId=1002, maxWaitMs=500, minBytes=1, maxBytes=10485760, isolationLevel=0, sessionId=1917981152, sessionEpoch=293957567, topics=[FetchTopic(topic='__consumer_offsets', partitions=[FetchPartition(partition=21, currentLeaderEpoch=3, fetchOffset=322332474, lastFetchedEpoch=3, logStartOffset=0, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='')
复制代码
且 TCP 传输前后的内容一致,因此问题没有出现在 Request 上
接下来就需要进行 Response 的二进制协议解析,代码如下
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ResponseHeader;
import javax.xml.bind.DatatypeConverter;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.Map;
public class MyTest2 {
public static void main(String[] args) {
08-30 15:06:41.853611
String hexString =
"0e 1f 48 2d 7e 32 06 82 25 e9 a3 d9 08 00 45 00 " +
"00 ab 3f b3 40 00 40 06 35 ed 62 02 00 55 62 02 " +
"00 54 eb 2e 23 85 68 57 32 37 b5 08 3f b2 80 18 " +
"7d 2c c5 4a 00 00 01 01 08 0a b7 d1 e6 5c 26 30 " +
"b9 11 00 00 00 f9 11 85 6f c1 00 00 00 00 00 00 " +
"00 72 52 11 e0 02 13 5f 5f 63 6f 6e 73 75 6d 65 " +
"72 5f 6f 66 66 73 65 74 73 03 00 00 00 03 00 00 " +
"00 00 00 00 15 09 0c 9c 00 00 00 00 15 09 0c 9c " +
"00 00 00 00 00 00 00 00 00 ff ff ff ff 89 01 " +
" 00 00 00 00 15 09 0c 9c 00 00 00 7c 00 00 " +
"00 03 02 08 16 d8 10 00 00 00 00 00 00 00 00 01 " +
"91 a2 1b 8f 3c 00 00 01 91 a2 1b 8f 3c ff ff ff " +
"ff ff ff ff ff ff ff 00 00 00 00 00 00 00 01 92 " +
"01 00 00 00 56 00 01 00 0b 43 43 4d 2d 65 6e 63 " +
"72 79 70 74 00 16 43 53 53 2d 49 49 53 2d 53 69 " +
"74 75 61 74 69 6f 6e 51 75 65 72 79 00 00 00 00 " +
"30 00 03 00 00 00 00 00 00 00 00 ff ff ff ff 00 " +
"00 00 00 01 91 a2 1b 8f 3b 00 " +
" 00 00 00 00 15 00 00 00 00 00 00 13 36 67 " +
"47 00 00 00 00 13 36 67 47 00 00 00 00 00 00 00 " +
"00 00 ff ff ff ff 01 00 00 00 ";
// String hexString =
// "0e 1f 48 2d 7e 32 06 82 25 e9 a3 d9 08 00 45 00 " +
// "00 ab 3f b3 40 00 40 06 35 ed 62 02 00 55 62 02 " +
// "00 54 eb 2e 23 85 68 57 32 37 b5 08 3f b2 80 18 " +
// "7d 2c c5 4a 00 00 01 01 08 0a b7 d1 e6 5c 26 30 " +
// "b9 11 00 00 04 65 11 85 6f c0 00 00 00 00 00 00 " +
// "00 72 52 11 e0 03 13 5f 5f 63 6f 6e 73 75 6d 65 " +
// "72 5f 6f 66 66 73 65 74 73 02 00 00 00 15 00 00 " +
// "00 00 00 00 13 36 67 3b 00 00 00 00 13 36 67 3b " +
// "00 00 00 00 00 00 00 00 00 ff ff ff ff da 07 " +
//
// " 00 00 00 00 13 36 67 3b 00 00 03 cd 00 00 " +
// "00 03 02 5c 37 79 85 00 00 00 00 00 0b 00 00 01 " +
// "91 a2 1b 8f 3c 00 00 01 91 a2 1b 8f 37 ff ff ff " +
// "ff ff ff ff ff ff ff 00 00 00 00 00 00 00 0c 96 " +
// "01 00 00 00 5a 00 01 00 0b 43 43 4d 2d 65 6e 63 " +
// "72 79 70 74 00 16 43 53 53 2d 49 49 53 2d 53 69 " +
// "74 75 61 74 69 6f 6e 51 75 65 72 79 00 00 00 00 " +
// "30 00 03 00 00 00 00 00 00 00 00 ff ff ff ff 00 " +
// "00 00 00 01 91 a2 1b 8f 3b 00 " +
//
//
//
// " 00 00 00 00 15 00 00 00 00 00 00 13 36 67 " +
// "47 00 00 00 00 13 36 67 47 00 00 00 00 00 00 00 " +
// "00 00 ff ff ff ff 01 00 00 00 ";
StringBuilder sb = new StringBuilder();
char[] charArray = hexString.toCharArray();
for (char c : charArray) {
if (c != ' ') {
sb.append(c);
}
}
hexString = sb.toString();
byte[] byteArray = DatatypeConverter.parseHexBinary(hexString);
ByteBuffer byteBuffer = ByteBuffer.wrap(byteArray);
System.out.println("len is : " + byteBuffer.array().length);
byteBuffer.get(new byte[66]);
byteBuffer.getInt();
ResponseHeader responseHeader = ResponseHeader.parse(byteBuffer, (short) 0);
System.out.println("responseHeader is " + responseHeader);
FetchResponse<MemoryRecords> fetchResponse = FetchResponse.parse(byteBuffer, (short) 11);
System.out.println(fetchResponse);
LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> map = fetchResponse.responseData();
System.out.println("map size is : " + map.size());
System.out.println("map is : " + map);
for (Map.Entry<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> entry : map.entrySet()) {
System.out.println();
System.out.println();
System.out.println();
System.out.println();
System.out.println("TP is : " + entry.getKey());
FetchResponse.PartitionData<MemoryRecords> value = entry.getValue();
MemoryRecords records = value.records();
records.batches().forEach(batch -> {
System.out.println("isValid: " + batch.isValid());
System.out.println("crc : " + batch.checksum());
System.out.println("baseOffset : " + batch.baseOffset());
});
}
}
}
复制代码
其中关键的部分是查看 2 个 CRC 的逻辑:
下图是真实生产业务日志中爆出的异常
其中发现计算出来的 CRC 是 2481280076。而后将我们 dump 下来的二进制进行协议解码,执行上述代码后发现结果与生产环境的一致
四、磁盘排查
后续我们对发送出去的内容与磁盘的内容做了对比,内容一致,因此 bug 不是因磁盘导致
五、结论
由于出现问题的消息在 TCP 发送前后出现了 diff,而正常收到的消息,在 TCP 发送前后均一致,因此
定位为网络出现了问题
网络的同学也认为这个现象不符合预期,已经开始介入了排查
文章转载自:昔久
原文链接:https://www.cnblogs.com/xijiu/p/18393218
体验地址:http://www.jnpfsoft.com/?from=infoq
评论