线上 kafka 消息堆积,consumer 掉线,怎么办?
线上 kafka 消息堆积,所有 consumer 全部掉线,到底怎么回事?
最近处理了一次线上故障,具体故障表现就是 kafka 某个 topic 消息堆积,这个 topic 的相关 consumer 全部掉线。
整体排查过程和事后的复盘都很有意思,并且结合本次故障,对 kafka 使用的最佳实践有了更深刻的理解。
好了,一起来回顾下这次线上故障吧,最佳实践总结放在最后,千万不要错过。
1、现象
线上 kafka 消息突然开始堆积
消费者应用反馈没有收到消息(没有处理消息的日志)
kafka 的 consumer group 上看没有消费者注册
消费者应用和 kafka 集群最近一周内没有代码、配置相关变更
2、排查过程
服务端、客户端都没有特别的异常日志,kafka 其他 topic 的生产和消费都是正常,所以基本可以判断是客户端消费存在问题。
所以我们重点放在客户端排查上。
1)arthas 在线修改日志等级,输出 debug
由于客户端并没有明显异常日志,因此只能通过 arthas 修改应用日志等级,来寻找线索。
果然有比较重要的发现:
看起来是 kafka-client 自己主动发送消息给 kafka 集群,进行自我驱逐了。因此 consumer 都掉线了。
2)arthas 查看相关线程状态变量用 arthas vmtool 命令进一步看下 kafka-client 相关线程的状态。
可以看到 HeartbeatThread 线程状态是 WAITING,Cordinator 状态是 UNJOINED。
此时,结合源码看,大概推断是由于消费时间过长,导致客户端自我驱逐了。
于是立刻尝试修改 max.poll.records,减少一批拉取的消息数量,同时增大 max.poll.interval.ms 参数,避免由于拉取间隔时间过长导致自我驱逐。
参数修改上线后,发现 consumer 确实不掉线了,但是消费一段时间后,还是就停止消费了。
3、最终原因
相关同学去查看了消费逻辑,发现了业务代码中的死循环,确认了最终原因。
消息内容中的一个字段有新的值,触发了消费者消费逻辑的死循环,导致后续消息无法消费。消费阻塞导致消费者自我驱逐,partition 重新 reblance,所有消费者逐个自我驱逐。
这里核心涉及到 kafka 的消费者和 kafka 之间的保活机制,可以简单了解一下。
kafka-client 会有一个独立线程 HeartbeatThread 跟 kafka 集群进行定时心跳,这个线程跟 lisenter 无关,完全独立。
根据 debug 日志显示的“Sending LeaveGroup request”信息,我们可以很容易定位到自我驱逐的逻辑。
HeartbeatThread 线程在发送心跳前,会比较一下当前时间跟上次 poll 时间,一旦大于 max.poll.interval.ms 参数,就会发起自我驱逐了。
4、进一步思考
虽然最后原因找到了,但是回顾下整个排查过程,其实并不顺利,主要有两点:
kafka-client 对某个消息消费超时能否有明确异常?而不是只看到自我驱逐和 rebalance
有没有办法通过什么手段发现 消费死循环?
4.1 kafka-client 对某个消息消费超时能否有明确异常?
4.1.1 kafka 似乎没有类似机制
我们对消费逻辑进行断点,可以很容易看到整个调用链路。
对消费者来说,主要采用一个线程池来处理每个 kafkaListener,一个 listener 就是一个独立线程。
这个线程会同步处理 poll 消息,然后动态代理回调用户自定义的消息消费逻辑,也就是我们在 @KafkaListener 中写的业务。
所以,从这里可以知道两件事情。
第一点,如果业务消费逻辑很慢或者卡住了,会影响 poll。
第二点,这里没有看到直接设置消费超时的参数,其实也不太好做。
因为这里做了超时中断,那么 poll 也会被中断,是在同一个线程中。所以要么 poll 和消费逻辑在两个工作线程,要么中断掉当前线程后,重新起一个线程 poll。
所以从业务使用角度来说,可能的实现,还是自己设置业务超时。比较通用的实现,可以是在消费逻辑中,用线程池处理消费逻辑,同时用 Future get 阻塞超时中断。
google 了一下,发现 kafka 0.8 曾经有 consumer.timeout.ms 这个参数,但是现在的版本没有这个参数了,不知道是不是类似的作用。
4.1.2 RocketMQ 有点相关机制
然后去看了下 RocketMQ 是否有相关实现,果然有发现。
在 RocketMQ 中,可以对 consumer 设置 consumeTimeout,这个超时就跟我们的设想有一点像了。
consumer 会启动一个异步线程池对正在消费的消息做定时做 cleanExpiredMsg() 处理。
注意,如果消息类型是顺序消费(orderly),这个机制就不生效。
如果是并发消费,那么就会进行超时判断,如果超时了,就会将这条消息的信息通过 sendMessageBack() 方法发回给 broker 进行重试。
如果消息重试超过一定次数,就会进入 RocketMQ 的死信队列。
spring-kafka 其实也有做类似的封装,可以自定义一个死信 topic,做异常处理
4.2 有没有办法通过什么手段快速发现死循环?
一般来说,死循环的线程会导致 CPU 飙高、OOM 等现象,在本次故障中,并没有相关异常表现,所以并没有联系到死循环的问题。
那通过这次故障后,对 kafka 相关机制有了更深刻了解,poll 间隔超时很有可能就是消费阻塞甚至死循环导致。
所以,如果下次出现类似问题,消费者停止消费,但是 kafkaListener 线程还在,可以直接通过 arthas 的 thread id 命令查看对应线程的调用栈,看看是否有异常方法死循环调用。
5、最佳实践
通过此次故障,我们也可以总结几点 kafka 使用的最佳实践:
使用消息队列进行消费时,一定需要多考虑异常情况,包括幂等、耗时处理(甚至死循环)的情况。
尽量提高客户端的消费速度,消费逻辑另起线程进行处理,并最好做超时控制。
减少 Group 订阅 Topic 的数量,一个 Group 订阅的 Topic 最好不要超过 5 个,建议一个 Group 只订阅一个 Topic。
参考以下说明调整参数值:max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> * <max.poll.interval.ms>的积。max.poll.interval.ms: 该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。
希望能够抛砖引玉,提供一些启发和思考。如果你有其他补充和建议,欢迎留言讨论。
原文:https://www.cnblogs.com/awan-note/p/16850953.html
如果感觉本文对你有帮助,点赞关注支持一下,想要了解更多 Java 后端,大数据,算法领域最新资讯可以关注我公众号【架构师老毕】私信 666 还可获取更多 Java 后端,大数据,算法 PDF+大厂最新面试题整理+视频精讲
评论