怒肝 15 天终于将 Kafka 的重平衡一举拿下
判断 KafkaConsumer 对象是否处在多线程环境中。注意:该对象是多线程不安全的,不能有多个 线程持有该对象。
消费组初始化,包含了队列负载(重平衡)
消息拉取
消息消费拦截器处理
关于 poll 方法的核心无非就是两个:重平衡与消费拉取,本篇文章将重点剖析 Kafka 消费者的重平衡机制。
通过对 updateAssignmentMetadataIfNeeded 方法的源码剖析,最终调用的核心方法为 ConsumerCoordinator 的 poll 方法,核心流程图如下:
消费者协调器的核心流程关键点:
消费者协调器寻找组协调器
队列负载(重平衡)
提交位点
本篇文章将深入探讨 Kafka 的重平衡机制。
在深入研究 kafka 重平衡机制之前,首先请简单思考如下问题:
重平衡会阻塞消息消费吗?
Kafka 的加入组协议哪些变更能有效减少重平衡
架构思维修炼:针对第二个问题,作为一名从源码级别去解读 Kafka,深入思考其内部的原理是架构师的一种必备素质,关于这块内容,大家私信或评论与我共同交流。
3.1 消费者协调器
在 Kafka 中,在客户端每一个消费者会对应一个消费者协调器(ConsumerCoordinator),在服务端每一个 broker 会启动一个组协调器。
如果大家对源码分析部分不感兴趣,大家可以直接跳转到文章的末尾。
接下来将对该过程进行源码级别的跟踪,根据源码提练工作机制,该部分对应上面流程图中的:ensureCoordinatorReady 方法。
该方法的关键点如下:
首先判断一下当前消费者是否已找到 broker 端的组协调器,如果以感知,则返回 true。
如果当前并没有感知组协调器,则向服务端(broker)寻找该消费组的组协调器。
寻找组协调器的过程是一个同步过程,如果出现异常,则会触发重试,但引入了重试间隔机制。
如果未超时并且没有获取组协调器,则再次尝试(do while)。
核心要点为 lookupCoordinator 方法,该方法的核心是选择一台负载最小的 broker,构建请求,向 broker 端查询消费组的组协调器,代码如下:
查询组协调器的请求,核心参数为:
ApiKeys apiKey
请求 API,类比 RocketMQ 的 RequestCode,根据该值很容易找到服务端对应的处理代码,这里为 ApiKeys.FIND_COORDINATOR。
String coordinatorKey
协调器 key,取消费组名称。
思考题:提前剧透一下:Kafka 服务端每一台 Broker 会创建一个组协调器(GroupCoordinator),每一个组协调器可以协调多个消费组,但一个消费组只会被分配给一个组协调器,那这里负载机制是什么呢?服务端众多 Broker 如何竞争该消费组的控制权呢?
coordinatorType
协调器类型,默认为 GROUP,表示普通消费组。
short minVersion
版本。
针对客户端端请求,服务端统一入口为 KafkaApis.scala,可以根据 ApiKeys 快速找到其处理入口,如图所示:
具体的处理逻辑在 KafkaApis 的 handleFindCoordinatorRequest 中,如下图所示:
服务端为消费组分配协调器的核心算法竟然非常简单:根据消费组的名称,取 hashcode,然后与 kafka 内部 topic(__consumer_offsets)的分区个数取模,然后返回该分区所在的物理 broker 作为消费组的分组协调器,即内部并没有复杂的选举机制,这样也能更好的说明,客户端在发送请求时可以挑选负载最低的 broker 进行查询的原因。
客户端收到响应结果后更新 ConsumerCoordinator 的(Node coordinator)属性,这样再次调用 coordinatorUnknown()方法,将会返回 false,至此完成消费端协调器的查找。
3.2 消费者加入消费组流程剖析
在消费者获取到协调器后,根据上文提到的协调器处理流程,接下来消费者需要加入到消费者组中,加入到消费组也是参与队列负载机制的前提,接下来我们从源码角度分析一下消费组加入消费组的流程,对应上文中的 AbstractCoordinator 的 ensureActiveGroup 方法。
该方法的核心关键点:
在加入消费组之前必须确保该消费者已经感知到组协调器。
启动心跳线程,当消费者加入到消费组后处于 MemberState.STABLE 后需要定时向协调器上报心跳,表示存活,否则将从消费组中移除。
加入消费组。
心跳线程稍后会详细介绍,先跟踪一下加入消费组的核心流程,具体实现方法为 joinGroupIfneeded,接下来对该方法进行分步解读。
Step1:加入消费组之前必须先获取对应的组协调器,因为后续所有的请求都是需要发送到组协调器上。
Step2:每一次执行重平衡之前调用其回调函数,我们可以看看 ConsumerCoordinatory 的实现,其代码如下图所示:
消费端协调器在进行重平衡(加入一个新组)之前通常会执行如下操作:
如果开启了自动提交位点,进行一次位点提交。
执行重平衡相关的事件监听器。
AbstractCoordinator#joinGroupIfneeded
这里有两个地方值得我们关注:
向消费组的组协调器发送加入请求,但加入消费组并不是目的,而是手段,最终要达成的目的是进行队列的负载均衡。
调用 onJoinComplete 方法,通知消费端协调器队列负载的最终结果,关于这点我们可以从其参数得知:
String generationId
String memberId 成员 id
String protocol 协议名称,这里是 consumer。
ByteBuffer memberAssignment
队列负载结果,包含了分配给当前消费者的队列信息,其序列后的结果如图所示:
故队列的负载机制蕴含在构建请求中,接下来深入分析一下客户端与服务端详细的交互流程。
3.2.1 构建加入消费组请求
构建加入消费组代码见 AbstractCoordinator 的 sendJoinGroupRequest,其代码如下:
![在这里插入图片描述](https://img-blog.csdnimg.cn/20210704165835399.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t
_70#pic_center)
发起一次组加入请求,请求体主要包含如下信息:
消费组的名称
session timeout,会话超时时间,默认为 10s
memberId 消费组成员 id,第一次为 null,后续服务端会为该消费者分配一个唯一的 id,构成为客户端 id + uuid。
protocolType 协议类型,消费者加入消费组固定为 consumer
消费端支持的所有队列负载算法
收到服务端响应后将会调用 JoinGroupResponseHandler 回掉,稍后会详细介绍。
3.2.2 服务端响应逻辑
服务端处理入口:KafkaApis 的 handleJoinGroupRequest 方法,该方法为委托给 GroupCoordinator。
通过这个入口,基本可以看到服务端处理加入请求的关键点:
从客户端请求中提取客户端的 memberId,如果为空,表示第一次加入消费组,还未分配 memberId。
如果协调器中不存在该消费组的信息,表示第一次加入,创建一个,并执行 doUnknownJoinGroup(第一次加入消费组逻辑)
如果协调器中已存在消费组的信息,判断一下是否已达到最大消费者个数限制默认不限制),超过则会抛出异常;然后根据消费者是否是第一次加入进行对应的逻辑处理。
组协调器会为每一个路由到的消费组维护一个组元信息(GroupMetadata),存储在 HashMap< String, GroupMetadata>,每一个消费组云信息中存储了当前的所有的消费者,由消费者主动加入,组协调器可以主动剔除消费者。
接下来分情况处理,来看一下第一次加入(doUnknownJoinGroup)与重新加入(doJoinGroup)分别详细探讨。
初次加入消费组的代码如下:
关键点如下:
首先来看一下该方法的参数含义:
GroupMetadata group
消费组的元信息,并未持久化,存储在内存中,一个消费组当前消费者的信息。
boolean requireKnownMemberId
是否一定需要知道客户端 id,如果客户端请求版本为 4,在加入消费组时需要明确知道对方的 memberId。
String clientId
客户端 ID,消息组的 memberId 生成规则为 clientId + uuid
String clientHost
消费端端 ip 地址
int rebalanceTimeoutMs
重平衡超时时间,取自消费端参数 max.poll.interval.ms,默认为 5 分钟。
int sessionTimeoutMs
会话超时时间,默认为 10s
String protocolType
协议类型,默认为 consumer
List protocols
客户端支持的队列负载算法。
对客户端进行状态验证,其校验如下:
如果消费者状态为 dead,则返回 UNKNOWN_MEMBER_ID
如果当前消费组的负载算法协议不支持新客户端端队列负载协议,则抛出 UNKNOWN_MEMBER_ID,并提示不一致的队列负载协议。
Kafka 的加入请求版本 4 在加入消费端组时使用有明确的客户端 memberId,消费组将创建的 memberId 加入到组的 pendingMember 中,并向客户端返回 MEMBER_ID_REQUIRED,引导客户端重新加入,客户端会使用服务端生成的 memberId,重新发起加入消费组。
调用 addMemberAndRebalance 方法加入消费组并触发重平衡。
接下来继续探究加入消费组并触发重平衡的具体逻辑,具体实现见 GroupCoordinator 的 addMemberAndRebalance。
核心要点如下:
组协调器为每一个消费者创建一个 MemberMetadata 对象。
如果消费组的状态为 PreparingRebalance(此状态表示正在等待消费组加入),并将组的 newMemberAdded 设置为 true,表示有新成员加入,后续需要触发重平衡。并将消费组添加到组中,这里会触发一次消费组选主,选主逻辑:该消费组的第一个加入的消费者成为该消费组中的 Leader,Leader 的职责是什么呢?
为每一个消费者创建一个 DelayedHeartbeat 对象,用于检测会话超时,组协调器如果检测会话超时,会将该消费者移除组,会重新触发重平衡,消费者为了避免被组协调器移除消费组,需要按间隔发送心跳包。
根据当前消费组的状态是否需要进行重平衡。
接下来继续深入跟踪 maybePrepareRebalance 方法,其实现如下图所示:
根据状态机的驱动规则,判断是否可以进入到 PrepareRebalance,其判断逻辑就是根据状态机的驱动,判断当前状态是否可以进入到该状态,其具体实现是为每一个状态存储了一个可以进入当前状态的前驱状态集合。
如果符合状态驱动流程,消费组将进入到 PrepareRebalance,其具体实现如下图所示:
如果当前消费组的状态为 CompletingRebalance,需要重置队列分配动作,并让消费组重新加入到消费组,即重新发送 JOIN_GROUP 请求。具体实现技巧:
1、将所有消费者已按分配算法分配到的队列信息置空
2、将空的分配结果返回给消费者,并且错误码为 REBALANCE_IN_PROGRESS,客户端收到该错会重新加入消费组。
如果当前没有消费者,则创建 InitialDelayedJoin,否则则创建 DelayedJoin,值得注意的是这里有一个参数:group.initial.rebalance.delay.ms,用于设置消费组进入到 PreparingRebalance 真正执行其业务逻辑的延迟时间,其主要目的是等待更多的消费者进入。
驱动消费组状态为 PreparingRebalance。
尝试执行 initialDelayedJoin 或 DelayedJoin 的 tryComplete 方法,如果没有完成,则创建 watch,等待执行完成,最终执行的是组协调器的相关方法,其说明如下:
接下来看一下组协调器的 tryCompleteJoin 方法,其实现如下图所示:
完成 PreparingRebalance 状态的条件是:已知的消费组都成功加入到消费组。该方法返回 true 后,onCompleteJoin 方法将被执行。
接下来看一下 GroupCoordinator 的 onCompleteJoin 方法的实现。
核心的关键点如下:
驱动消费组的状态转化为 CompletingRebalance,将进入到重平衡的第二个阶段(队列负载)
为每一个成员构建对应 JoinResponse 对象,其中三个关键点
generationId 消费组每一次状态变更,该值加一
subProtocol 当前消费者组中所有消费者都支持的队列负载算法
leaderId 消费组中的 leader,一个消费组中第一个加入的消费者为 leader
接下来,消费者协调器将根据服务端返回的响应结果,进行第二阶段的重平衡,即进入到队列负载算法。
服务端对于客户端第一次加入消费组的流程就介绍到这里,再将目光放到客户端对重平衡的响应结果之前,我们再看看组协调器是如何处理已知 memberId 的消费者加入处理逻辑。
组协调在已知 memberid 处理加入请求的核心处理代码在 GroupCoordinator 的 doJoinGroup 中,即重新加入请求。
评论