Kafka 又出问题了!
写在前面
估计运维年前没有祭拜服务器,Nginx 的问题修复了,Kafka 又不行了。今天,本来想再睡会,结果,电话又响了。还是运营,“喂,冰河,到公司了吗?赶紧看看服务器吧,又出问题了“。“在路上了,运维那哥们儿还没上班吗”? “还在休假。。。”, 我:“。。。”。哎,这哥们儿是跑路了吗?先不管他,问题还是要解决。
问题重现
到公司后,放下我专用的双肩包,拿出我的利器——笔记本电脑,打开后迅速登录监控系统,发现主要业务系统没啥问题。一个非核心服务发出了告警,并且监控系统中显示这个服务频繁的抛出如下异常。
从上面输出的异常信息,大概可以判断出系统出现的问题:Kafka 消费者在处理完一批 poll 消息后,在同步提交偏移量给 broker 时报错了。大概就是因为当前消费者线程的分区被 broker 给回收了,因为 Kafka 认为这个消费者挂掉了,我们可以从下面的输出信息中可以看出这一点。
Kafka 内部触发了 Rebalance 机制,明确了问题,接下来,我们就开始分析问题了。
分析问题
既然 Kafka 触发了 Rebalance 机制,那我就来说说 Kafka 触发 Rebalance 的时机。
什么是 Rebalance
举个具体点的例子,比如某个分组下有 10 个 Consumer 实例,这个分组订阅了一个 50 个分区的主题。正常情况下,Kafka 会为每个消费者分配 5 个分区。这个分配的过程就是 Rebalance。
触发 Rebalance 的时机
当 Kafka 中满足如下条件时,会触发 Rebalance:
组内成员的个数发生了变化,比如有新的消费者加入消费组,或者离开消费组。组成员离开消费组包含组成员崩溃或者主动离开消费组。
订阅的主题个数发生了变化。
订阅的主题分区数发生了变化。
后面两种情况我们可以人为的避免,在实际工作过程中,对于 Kafka 发生 Rebalance 最常见的原因是消费组成员的变化。
消费者成员正常的添加和停掉导致 Rebalance,这种情况无法避免,但是时在某些情况下,Consumer 实例会被 Coordinator 错误地认为 “已停止” 从而被“踢出”Group,导致 Rebalance。
当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经 “死” 了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。这个时间可以通过 Consumer 端的参数 session.timeout.ms
进行配置。默认值是 10 秒。
除了这个参数,Consumer 还提供了一个控制发送心跳请求频率的参数,就是 heartbeat.interval.ms
。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED
标志封装进心跳请求的响应体中。
除了以上两个参数,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms
参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起 “离开组” 的请求,Coordinator 也会开启新一轮 Rebalance。
通过上面的分析,我们可以看一下那些 rebalance 是可以避免的:
第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被 “踢出”Group 而引发的。这种情况下我们可以设置 session.timeout.ms 和 heartbeat.interval.ms 的值,来尽量避免 rebalance 的出现。(以下的配置是在网上找到的最佳实践,暂时还没测试过)
设置 session.timeout.ms = 6s。
设置 heartbeat.interval.ms = 2s。
要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即
session.timeout.ms >= 3 * heartbeat.interval.ms
。
将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer,早日把它们踢出 Group。
第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,max.poll.interval.ms
参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,最好将该参数值设置得大一点,比下游最大处理时间稍长一点。
总之,要为业务处理逻辑留下充足的时间。这样,Consumer 就不会因为处理这些消息的时间太长而引发 Rebalance 。
拉取偏移量与提交偏移量
kafka
的偏移量(offset
)是由消费者进行管理的,偏移量有两种,拉取偏移量
(position)与提交偏移量
(committed)。拉取偏移量代表当前消费者分区消费进度。每次消息消费后,需要提交偏移量。在提交偏移量时,kafka
会使用拉取偏移量
的值作为分区的提交偏移量
发送给协调者。
如果没有提交偏移量,下一次消费者重新与 broker 连接后,会从当前消费者 group 已提交到 broker 的偏移量处开始消费。
所以,问题就在这里,当我们处理消息时间太长时,已经被 broker 剔除,提交偏移量又会报错。所以拉取偏移量没有提交到 broker,分区又 rebalance。下一次重新分配分区时,消费者会从最新的已提交偏移量处开始消费。这里就出现了重复消费的问题。
异常日志提示的方案
其实,说了这么多,Kafka 消费者输出的异常日志中也给出了相应的解决方案。
接下来,我们说说 Kafka 中的拉取偏移量和提交偏移量。
其实,从输出的日志信息中,也大概给出了解决问题的方式,简单点来说,就是可以通过增加 max.poll.interval.ms
时长和 session.timeout.ms
时长,减少 max.poll.records
的配置值,并且消费端在处理完消息时要及时提交偏移量。
问题解决
通过之前的分析,我们应该知道如何解决这个问题了。这里需要说一下的是,我在集成 Kafka 的时候,使用的是 SpringBoot 和 Kafka 消费监听器,消费端的主要代码结构如下所示。
上述代码逻辑比较简单,就是获取到 Kafka 中的消息后直接打印输出到日志文件中。
尝试解决
这里,我先根据异常日志的提示信息进行配置,所以,我在 SpringBoot 的 application.yml 文件中新增了如下配置信息。
配置完成后,再次测试消费者逻辑,发现还是抛出 Rebalance 异常。
最终解决
我们从另一个角度来看下 Kafka 消费者所产生的问题:一个 Consumer 在生产消息,另一个 Consumer 在消费它的消息,它们不能在同一个 groupId 下面,更改其中一个的 groupId 即可。
这里,我们的业务项目是分模块和子系统进行开发的,例如模块 A 在生产消息,模块 B 消费模块 A 生产的消息。此时,修改配置参数,例如 session.timeout.ms: 60000
,根本不起作用,还是抛出Rebalance
异常。
此时,我尝试修改下消费者分组的 groupId,将下面的代码
修改为如下所示的代码。
再次测试,问题解决~~
这次解决的问题真是个奇葩啊!!接下来写个【Kafka 系列】专题,详细介绍 Kafka 的原理、源码解析和实战等内容,小伙伴们你们觉得呢?欢迎文末留言讨论~~
推荐阅读
好了,今天就到这儿吧,我是冰河,大家有啥问题可以在下方留言,也可以加我微信:sun_shine_lyz,我拉你进群,一起交流技术,一起进阶,一起牛逼~~
版权声明: 本文为 InfoQ 作者【冰河】的原创文章。
原文链接:【http://xie.infoq.cn/article/bb6a5c1b0d064a43e890f7719】。文章转载请联系作者。
评论