写点什么

聊聊 Kafka 在生产实践中出的一个问题

  • 2022 年 9 月 19 日
    湖南
  • 本文字数:4130 字

    阅读完需:约 14 分钟

聊聊Kafka在生产实践中出的一个问题

1、背景

最近在折腾 Kafka 日志集群,由于公司部署的应用不断增加,日志采集程序将采集到的日志发送到 Kafka 集群时出现了较大延迟,总的 TPS 始终上不去,为了不影响业务团队通过日志排查问题,采取了先解决问题,再排查的做法,对 Kafka 集群进行扩容,但扩容后尴尬的是新增加的 5 台机器中,有两台机器的消费发送响应时间比其他机器明显高出不少,为了确保消息服务的稳定性,又临时对集群进行缩容,将这台机器从集群中剔除, 具体的操作就是简单粗暴的使用 kill pid 命令 ,但意外发生了。

发现 Java 客户端报如下错误:



而 Go 客户端报的错误如下所示:

基本可以认为是部分分区没有在线 Leader,无法成功发送消息。

2、问题分析

那为什么会出现这个问题吗?Kafka 一个节点下线,不是会自动触发故障转移,分区 leader 不是会被重新选举吗?请带着这个疑问,开始我们今天的探究之旅。

首先我们可以先看看当前存在问题的分区的路由信息,从第一张图中看出主题 dw_test_kafka_0816000 的 101 分区消息发送失败,我们在 Zookeeper 中看一下其状态,具体命令如下:

./zkCli.sh -server 127.0.0.1:2181get -s  /kafka_cluster_01/brokers/topics/dw_test_kafka_0816000/partitions/101/state
复制代码

该命令可以看到对应分区的相信信息,如下图所示:



这里显示出 leader 的状态为-1,而 isr 列表中只有一副本,在 broker-1 上,但此时 broker id 为 1 的机器已经下线了,那为什么不会触发分区 Leader 重新选举呢?

其实看到这里,我相信你只要稍微细想一下,就能发现端倪,isr 字段的值为 1,说明该分区的副本数为 1,说明该分区只在一个 Broker 上存储数据,一旦 Broker 下线,由于集群内其他 Broker 上并没有该分区的数据,此时是无法进行故障转移的,因为一旦要进行故障转移,分区的数据就会丢失,这样带来的影响将是非常严重的。

那为什么该主题的副本数会设置为 1 呢?那是因为当时集群的压力太大,节点之间复制数据量巨大,网卡基本满负荷在运转,而又是日志集群,对数据的丢失的接受程度较大,故当时为了避免数据在集群之间的大量复制,将该主题的副本数设置为了 1。

但集群节点的停机维护是少不了的,总不能每一次停机维护,都会出现一段时间数据写入失败吧。要解决这个问题,我们在停机之前,需要先对主题进行分区移动,将该主题的分区从需要停机的集群中移除。

主题分区移动的具体做法,请参考我之前的一篇文章 Kafka 主题迁移实践 的第三部分。

3、Kafka 节点下线分区的故障转移机制

Kafka 单副本的主题在集群内一台节点下线后,将无法完成分区的故障转移机制,为了深入掌握底层的一些实现细节,我想再深入探究一下 kafka 节点下线的一些故障转移机制。

温馨提示:接下来主要是从源码角度深入探究实现原理,加深对这个过程的理解,如果大家不感兴趣,可以直接进入到本文的第 4 个部分:总结。

在 Kafka 中依赖的 Zookeeper 服务器上存储了当前集群内存活的 broker 信息,具体的路径为/{namespace}/brokers/brokers/ids,具体图示如下:



并且 ids 下的每一个节点记录了 Broker 的一些信息,例如对外提供服务的协议、端口等,值得注意的是这些节点为 临时节点 ,如下图所示:



这样一旦对应的 Broker 宕机下线,对应的节点会删除,Kafka 集群内的 Controller 角色在启动时会监听该节点下节点的变化,并作出响应,最终将会调用 KafkaController 的 onBrokerFailure 方法,具体代码如下所示:



这个方法实现比较复杂,我们在这里不做过多分散,重点查找分区的故障转移机制,也就是接下来我们将具体分析 KafkaController 的 onReplicasBecomeOffline 方法,主要探究分区的故障转移机制。

3.1 onReplicasBecomeOffline 故障转移

由于该方法实现复杂,接下来将分布对其进行详解。

Step1:从需要设置为下线状态分区进行分组,分组依据为是否需要删除,没有触发删除的集合用 newofflineReplicasNotForDeletion 表示,需要被删除的集合用 newofflineReplicasForDeletion 表示。



Step2:挑选没有 Leader 的分区,用 partitionsWithoutLeader,代码如下图所示:



分区没有 Leader 的标准是:分区的 Leader 副本所在的 Broker 没有下线,并且没有被删除。

Step3:将没有 Leader 的分区状态变更为 OfflinePartition(离线状态),这里的状态更新是放在 kafka Controller 中的内存中,具体的内存结构:Map[TopicPartition, PartitionState]。

Step4:Kafka 分区状态机驱动(触发)分区状态为 OfflinePartition、NewPartition 向 OnlinePartition 转化,状态的转化主要包括两个重要的步骤:

  • 调用 PartitionStateMachine 的 doHandleStateChanges 的方法,驱动分区状态机的转换。

  • 然后调用 ControllerBrokerRequestBatch 的 sendRequestsToBrokers 方法,实现元信息在其他 Broker 上的同步。

由于篇幅的问题,我们这篇文章不会体系化的介绍 Kafka 分区状态机的实现细节,先重点关注 OfflinePartition 离线状态向 OnlinePartition 转化过程。



我们首先说明一下 OfflinePartition 离线状态向 OnlinePartition 转化过程时各个参数的含义:

  • Seq[TopicPartition] partitions 当前处于 OfflinePartition、NewPartition 状态、并且没有删除的分区。

  • PartitionState targetState 状态驱动的目标状态:OnlinePartition。

  • PartitionLeaderElectionStrategy 分区 Leader 选举策略,这里传入的是 OfflinePartitionLeaderElectionStrategy,分区离线状态的 Leader 选举策略

这里判断一下分区是否有效的依据主要是要根据状态机设置的驱动条件,例如只有分区状态为 OnlinePartition、NewPartition、OfflinePartition 三个状态才能转换为 OnlinePartition。

接下来重点看变更为 OnlinePartition 的具体实现逻辑,具体代码如下所示:



具体实现分为 3 个步骤:

  • 首先先分别帅选出当前状态为 NewPartition 的集合与(OfflinePartition 或者 OnlinePartition)分区。

  • 状态为 NewPartition 的分区,执行分区的初始化,通常为分区扩容或主题新创建

  • 状态为 OfflinePartition 或者 OnlinePartition 的执行分区重新选举,因为这些集合中的分区是当前没有 Leader 的分区,这些分区暂时无法接受读写请求。

接下来我们重点看一下离线状态变更为 OnlinePartition 的分区 leader 选举实现,具体方法为:PartitionStateMachine 的 electLeaderForPartitions 方法,其代码如下所示:



这个方法的实现结构比较简单,返回值为两个集合,一个选举成功的集合,一个选举失败的集合,同时选举过程中如果出现 可恢复异常,则会进行重试 。

具体的重试逻辑由 doElectLeaderForPartitions 方法实现,该方法非常复杂。

3.2 分区选举机制

分区选举由 PartitionStateMachine 的 doElectLeaderForPartitions 方法实现,接下来分步进行讲解。

Step1:首先从 Zookeeper 中获取需要选举分区的元信息,代码如下所示:



Kafka 中主题的路由信息存储在 Zookeeper 中,具体路径为:/{namespace}/brokers/topics/{topicName}}/partitions/{partition}/state,具体存储的内容如下所示:



Step2:将查询出来的主题分区元信息,组装成 Map< TopicPartition, LeaderIsrAndControllerEpoch>的 Map 结构,代码如下所示:



Step3:将分区中的 controllerEpoch 与当前 Kafka Controller 的 epoch 对比,刷选出无效与有效集合,具体代码如下所示:



如果当前控制器的 controllerEpoch 小于分区状态中的 controllerEpoch,说明已有新的 Broker 已取代当前 Controller 成为集群新的 Controller,本次无法进行 Leader 选取,并且打印日志。

Step4:根据 Leader 选举策略进行 Leader 选举,代码如下所示:



由于我们这次是由 OfflinePartition 状态向 OnlinePartition 状态转换,进入的分支为 leaderForOffline,稍后我们再详细介绍该方法,经过选举后的返回值为两个集合,其中 partitionsWithoutLeaders 表示未成功选举出 Leader 的分区,而 partitionsWithLeaders 表示成功选举出 Leader 的分区。

Step5:没有成功选举出 Leader 的分区打印对应日志,并加入到失败队列集合中,如下图所示:



Step5:将选举结果更新到 zookeeper 中,如下图所示:

Step6:将最新的分区选举结果同步到其他 Broker 节点上。



更新分区状态的请求 LEADER_AND_ISR 被其他 Broker 接受后,会根据分区的 leader 与副本信息,成为该分区的 Leader 节点或从节点,关于这块的实现细节在专栏的后续文章中会专门提及。

那 OfflinePartitionLeaderElectionStrategy 选举策略具体是如何进行选举的呢?接下来我们探究其实现细节。

3.3 OfflinePartitionLeaderElectionStrategy 选举策略

OfflinePartitionLeaderElectionStrategy 的选举策略实现代码见 PartitionStateMachine 的 leaderForOffline,我们还是采取分步探讨的方式。

Step1:主要初始化几个集合,代码如下



对上面的变量做一个简单介绍:

  • partitionsWithNoLiveInSyncReplicas 分区的副本所在的 Broker 全部不存活

  • partitionsWithLiveInSyncReplicas 分区副本集合所在的 broker 部分或全部存活

  • partitionsWithUncleanLeaderElectionState 主题是否开启了副本不在 isr 集合中也可以参与 Leader 竞选,可在主题级别设置 unclean.leader.election.enable,默认为 false。

Step2:执行分区 Leader 选举,具体实现代码如下所示:



首先解释如下几个变量的含义:

  • assignment 分区设置的副本集(所在 brokerId)

  • liveReplicas 当前在线的副本集

具体的选举算法如下所示:



离线转在线的选举算法比较简单:如果 unclean.leader.election.enable=false,则从存活的 ISR 集合中选择第一个成为分区的 Leader,如果没有存活的 ISR 副本,并且 unclean.leader.election.enable=true,则选择一个在线的副本,否则返回 NONE,表示没有成功选择一个合适的 Leader。

然后返回本次选举的结果,完成本次选举。

4、总结

本文从一个生产实际故障开始进行分析,经过分析得出单副本主题在集群中单台节点下线会引起部分队列无法写入,解决办法是要先执行主题分区移动,也就是将需要停止的 broker 上所在的分区移动到其他 broker 上,这个过程并不会对消息发送,消息消费造成影响。

最后大家如果和我一样,喜欢看看分区故障转移相关实现细节的话,我也带领大家一睹源码,加深对分区选举机制的理解,做到举一反三。

原文:https://mp.weixin.qq.com/s?__biz=MzIzNzgyMjYxOQ==&mid=2247490385&idx=1&sn=7f6bbb1cdb3b3a11b818e04b664bc907

如果感觉本文对你有帮助,点赞关注支持一下,想要了解更多 Java 后端,大数据,算法领域最新资讯可以关注我公众号【架构师老毕】私信 666 还可获取更多 Java 后端,大数据,算法 PDF+大厂最新面试题整理+视频精讲

用户头像

需要资料添加小助理vx:bjmsb2020 2021.10.19 加入

爱生活爱编程

评论

发布
暂无评论
聊聊Kafka在生产实践中出的一个问题_kafka_Java永远的神_InfoQ写作社区