图解 KafkaConsumer SyncGroupRequest 请求流程
作者:石臻臻,CSDN 博客之星 Top5、Kafka Contributor、nacos Contributor、华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家、 KnowStreaming PMC)。
KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,带你一起你参与开源! 。KnowStreaming 体验环境请访问:https://demo.knowstreaming.com/
在上一篇文章中,我们分析了 JoinGropRequest 的流程,详细请看Kafka消费者JoinGroupRequest流程解析
那么我们知道,在执行完了 JoinGroupRequest 之后, 所有的 Member 都对消费组协调器发起了 SyncGroupRequest 请求
那么 SyncGroup 具体做了哪些事情呢?我们今天来一起分析一下!
11. 请求参数
当我们 JoinGroup 完成之后, 消费者客户端收到了 Response, 然后就会立马发起 SyncGroupRequest
相关代码如下
JoinGroupResponseHandler#onJoinLeader 或者 JoinGroupResponseHandler#onJoinFollow
这两个的区别是,如果当前 Member 是 Leder Member 则调用的是 onJoinLeader。’
onJoinFollow
和 onJoinLeader
区别在于前者不会带上 Assignments 数据,onJoinLeader 互根据分区分配策略计算一下当前的分配情况然后传入请求。
1.1 请求头 RequestHeader
一般来说, 正常的一个请求都包含如下数据
在这里插入图片描述
1 .header_version 请求头版本号
请求头版本号, 请求头的版本目前有 0 和 1; 每个请求对应使用哪个 headerVersion 的映射关系在ApiMessageType#requestHeaderVersion
2. api_version: 请求的标识 ID
每个类型的请求都有他对应的唯一 ID, 比如这里的 JoinGroupRequest 对应的 ID 是 11 ; 映射关系在 ApiMessageType
3. api_version: 该请求的版本号
因为我们可能会对某个请求类型做过改动, 并且改动了请求的 Schemas, 那么每次改动都是一个版本, 比如 SyncGroupRequest 这个请求总共就有 6 个版本, 那么当前发起的请求的版本号是 :Schema.length -1 = 6 - 1 = 5
下面的 JoinGroupRequest 的 Schemas, 不同请求类型的 Schemas 不一样, 可以通过 ApiKeys 下面的每个请求查看。
在这里插入图片描述
4. client_id: 客户端 ID
客户端唯一标识
5. correlation_id: 每次发起的请求的唯一标识
发起的每次请求的唯一标识, 该值自增。
1.2 RequestBody
上面的 RequestHeader 基本上都大差不差,不同 Request 类型的 RequestBody 是不一样的.
本次 SyncGroupRequest 的属性如下
在这里插入图片描述
1. group_id 消费组 id
属性 group.id 配置
2. member_id 消费者成员 ID
消费者的成员 id, 默认就是空字符串, 客户端不可设置。 该值会在后续的请求中返回并被赋值。
3. group_instance_id
客户端属性:group.instance.id
默认值 空)
Kafka2.3 版本引入的新参数. 用户提供的消费者实例的唯一标识符。 如果设置了,消费者则被视为静态成员, 静态成员配以较大的 session 超时设置能够避免因成员临时不可用(比如重启)而引发的 Rebalance, 如果不设置, 消费者将作为动态成员
4. generation_id 消费组协调器年代 ID
每个消费组协调器的年代 ID,没经过一次变化 这个 id 就是自增 1, 比如新增 Member、Leave Member 等等操作都会引起年代的增长。
5. protocol_type 协议类型
Consumer 发起的协议是 comsumer
, 另一个可选项为connect
6. protocol_name 分配策略
消费组分区分配策略, 比如range
7. assignments 每个 member 对应的分配分区
这个参数,只有是 Leader Member 发起请求的时候才会带上, 这个请求是每个 Member 分配到的分区信息。
里面包含着 partitions 、userData 等信息。如下
单个 assignment 的数据结构是
22. 发起请求
2.1 向哪个协调器节点发起请求
既然要发起请求,那么究竟是哪个节点呢?之前客户端向集群发起请求的计算方式一般都是获取最小负载的节点发起请求。
那么这里可不一样, 这里发起请求的 Node 是有具体要求的, 那就是向协调器 coordinator 发起。
那么问题来了, 谁是协调器, 协调器的节点是哪个?
先说结论:
该客户端的group.id
的 hash 值跟__consumer_offsets
分区数取模得到的分区号, 这个分区号的 Leader 在哪个 Broker,那么这个 Node 就在哪个 Broker 上。
一个 Broker 可能有多个 Node, 使用哪个 Node 取决于客户端发起请求时候使用的是哪个 listener。 客户端发起请求对应的 listener 就对应着相应的 Node。
PS: 如果 leader 不存在或者不在线, 会提示异常:The coordinator is not available
具体流程请看
寻找协调器FindCoordinatorRequest请求流程
2.2 发起请求时机
我们既然要发起 SyncGroup 的请求, 那么我们是在什么时候才会触发这个请求呢?
在上一篇文章 Kafka消费者JoinGroupRequest流程解析 我们有知道, 在所有 JoinGroupRequest 都 Join 完成并收到回调之后, 这些收到回调的 Member 都会立马发起 SyncGroup 请求,来请求协同最终的分配。
我们看一下 JoinGroupRequest 的时序图
在这里插入图片描述
上图的左边 JoinGroupResponseHandler 最后是发起了 SyncGroupRequest
但是要注意, Leader Member 和 Follower Member 是有区别的, Leader Member 会接收到所有 Member 的元信息,然后根据 Kafka消费者客户端分区分配策略 计算每个 Member 应该分配到的分区信息, 然后把这个信息当做入参, 发送给组协调器( GroupCoordinator )
但是 Follow Member 没有这个信息的, 你看下面这个就是 Follower 的请求,Assignments 那一个属性都是空集合。
AbstractCoordinator#onJoinFollower
33. 协调器接受请求
协调器接受到客户端发来的 SyncGroup 请求进行处理
处理入口:KafkaApi#handleSyncGroupRequest
真正处理的地方:GroupCoordinator#handleSyncGroup
如果发现消费组协调器还在加载中,返回异常 REBALANCE_IN_PROGRESS, 让客户端重新从 JoinGroup 发起请求。
如果当前缓存中不存在给定的 memberId 的 menber,说明可能已经异常,则返回异常
执行
doSyncGroup
doSyncGroup
代码省略。
一些异常检验
如果当前 Group 的 State 是 Empty, 则直接返回 UNKNOWN_MEMBER_ID 异常(Group 都是空的,还同步个球)
如果当前状态是 PreparingRebalance,返回异常: REBALANCE_IN_PROGRESS; 这个状态一般还没有 JoinGroup 完成呢
如果状态是 Stable,什么也不做,直接返回当前 Group 的分组情况就行了。
如果是 CompletingRebalance 状态,那么就对了, JoinGroup 完成之后就是这么个状态的。这里要判断一下过来的 Member 是不是 Leader Member,不是 Leader Member 啥也不做。就设置了一下回调参数,一会自己会被回调。如果是 Member 的话,我们就需要把当前的 Group 元信息给持久化一下了。相当于是每次重平衡都会把新的数据持久化存储;详情看下面
状态是 CompletingRebalance 并且请求来源是 Leader Member 则做下面的逻辑
这段代码做的事情如下
3.1 存储 Group 元信息 storeGroup
根据
group.id
找到对应的__consumer_offset
分区,获取 magicValue, 就是看下当前日志文件的版本号,关于消息体结构请看图解Kafka的RecordBatch结构组装 Group 元信息的 Key、Value 结构,关于 Group 元信息数据结构请看:消费组偏移量_consumer_offset的数据结构
在这里插入图片描述
消息的 压缩类型是
offsets.topic.compression.codec
默认是 0; 对应的压缩 k-v 映射关系为{ 0:NONE、1:GZIP、2:SNAPPY、3:LZ4、4: ZSTD}把消息写入到 对应的 TopicPartition 中 ,Topic 为
__consumer_offset
,分区是group.id
计算出来的。写入消息的一些基本配置如下:在写入过程如果出现写入失败,异常了,则重置所有 Member 的分配方案为 Empty,并返回数据给各自的消费者客户端(客户端会再次发起 JoinGroup 请求);完了之后执行
maybePrepareRebalance
方法; 这个时候状态会流转为 PreparingRebalance,等待 Memmber 们的再次 JoinGroup。然后再加一个有效期为[max.poll.interval.ms
,默认值 300000(5 分钟)] 的 onCompleteJoin 任务, 这个任务会在所有的 Member 都 JoinGrop 之后,会执行方法:GroupCoordinator#onCompleteJoin
, 是不是熟悉的方法又来了, ( 因为我们在 Kafka消费者JoinGroupRequest流程解析 分析过 ) 把结果返回给所有的 Member,告知 JoinGroup 成功了! 那么客户端是不是又得重新再来发起 SyncGropuRequest(在这个方法里面呢,如果 Group 已经是 Empty 了,那么也是会去在写入一个空的 Group 元信息的。)如果正常写入, 则传播新的分配方案给每个 Member,注意,每个 Member 只会收到自己的分配方案。然后当前 Group 状态流转为 Stable
44. 返回客户端
上面我们说到了 Sync 之后会给 Member 发起回调,那么拿到回调之后客户端是如何处理的呢?
SyncGroupResponseHandler#handle
检验一些异常, 如果异常会重新 Join;requestRejoin
如果无异常,则将客户端状态更新为 STABLE 状态, 一定是从 COMPLETING_REBALANCE 流转到 STABLE; 关于客户端状态流转图请看
把接收到的分配信息
memberAssignment
更新一下消费者客户端,这个更新的逻辑在 AbstractCoordinator#onJoinComplete
4.1 onJoinComplete 同步成功
这个是只有当 Member 的状态为 MemberState.STABLE 的时候才会被执行到的。
Member 的状态稳定了之后, 并且也拿到了新的分配方案了
在这里插入图片描述
AbstractCoordinator#onJoinComplete
这里代码太长,总结一下
更新客户端元数据
做一些简单校验
如果是以正则表达式规则订阅的 Topic,尝试去更新一下是否有命中新的 Topic
回调
assignor.onAssignment
把新的分配方式更新到自己的缓存里面。
5 总结
在这里插入图片描述
6 附录: 消费者客户端状态流转图和消费组协调器状态流转图
在这里插入图片描述
在这里插入图片描述
版权声明: 本文为 InfoQ 作者【石臻臻的杂货铺】的原创文章。
原文链接:【http://xie.infoq.cn/article/6330fb2c0cbdb67caf1d4944b】。未经作者许可,禁止转载。
评论