写点什么

图解 KafkaConsumer SyncGroupRequest 请求流程

  • 2022-11-16
    江西
  • 本文字数:9711 字

    阅读完需:约 32 分钟

图解KafkaConsumer SyncGroupRequest请求流程

作者石臻臻,CSDN 博客之星 Top5Kafka Contributornacos Contributor华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家 KnowStreaming PMC)


KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,带你一起你参与开源! 。KnowStreaming 体验环境请访问:https://demo.knowstreaming.com/


在上一篇文章中,我们分析了 JoinGropRequest 的流程,详细请看Kafka消费者JoinGroupRequest流程解析

那么我们知道,在执行完了 JoinGroupRequest 之后, 所有的 Member 都对消费组协调器发起了 SyncGroupRequest 请求

那么 SyncGroup 具体做了哪些事情呢?我们今天来一起分析一下!

11. 请求参数

当我们 JoinGroup 完成之后, 消费者客户端收到了 Response, 然后就会立马发起 SyncGroupRequest

相关代码如下

JoinGroupResponseHandler#onJoinLeader 或者 JoinGroupResponseHandler#onJoinFollow

这两个的区别是,如果当前 Member 是 Leder Member 则调用的是 onJoinLeader。’

onJoinFollowonJoinLeader区别在于前者不会带上 Assignments 数据,onJoinLeader 互根据分区分配策略计算一下当前的分配情况然后传入请求。


// 公众号:石臻臻的杂货铺    private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {        try {            // 根据分区分配策略 计算分配情况            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),                    joinResponse.data().members());
            List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();            for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {                groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()                        .setMemberId(assignment.getKey())                        .setAssignment(Utils.toArray(assignment.getValue()))                );            }
            SyncGroupRequest.Builder requestBuilder =                    new SyncGroupRequest.Builder(                            new SyncGroupRequestData()                                    .setGroupId(rebalanceConfig.groupId)                                    .setMemberId(generation.memberId)                                    .setProtocolType(protocolType())                                    .setProtocolName(generation.protocolName)                                    .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))                                    .setGenerationId(generation.generationId)                                                                      .setAssignments(groupAssignmentList)                    );            log.debug("Sending leader SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder);            return sendSyncGroupRequest(requestBuilder);        } catch (RuntimeException e) {            return RequestFuture.failure(e);        }    }    
复制代码

1.1 请求头 RequestHeader

一般来说, 正常的一个请求都包含如下数据

在这里插入图片描述

1 .header_version 请求头版本号

请求头版本号, 请求头的版本目前有 0 和 1; 每个请求对应使用哪个 headerVersion 的映射关系在ApiMessageType#requestHeaderVersion

2. api_version: 请求的标识 ID

每个类型的请求都有他对应的唯一 ID, 比如这里的 JoinGroupRequest 对应的 ID 是 11 ; 映射关系在 ApiMessageType

3. api_version: 该请求的版本号

因为我们可能会对某个请求类型做过改动, 并且改动了请求的 Schemas, 那么每次改动都是一个版本, 比如 SyncGroupRequest 这个请求总共就有 6 个版本, 那么当前发起的请求的版本号是 :Schema.length -1 = 6 - 1 = 5

下面的 JoinGroupRequest 的 Schemas, 不同请求类型的 Schemas 不一样, 可以通过 ApiKeys 下面的每个请求查看。

在这里插入图片描述


    public static final Schema[] SCHEMAS = new Schema[] {        SCHEMA_0,        SCHEMA_1,        SCHEMA_2,        SCHEMA_3,        SCHEMA_4,        SCHEMA_5    };        public static final Schema SCHEMA_5 =        new Schema(            new Field("group_id", Type.COMPACT_STRING, "The unique group identifier."),            new Field("generation_id", Type.INT32, "The generation of the group."),            new Field("member_id", Type.COMPACT_STRING, "The member ID assigned by the group."),            new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, "The unique identifier of the consumer instance provided by end user."),            new Field("protocol_type", Type.COMPACT_NULLABLE_STRING, "The group protocol type."),            new Field("protocol_name", Type.COMPACT_NULLABLE_STRING, "The group protocol name."),            new Field("assignments", new CompactArrayOf(SyncGroupRequestAssignment.SCHEMA_4), "Each assignment."),            TaggedFieldsSection.of(            )        );    
复制代码

4. client_id: 客户端 ID

客户端唯一标识

5. correlation_id: 每次发起的请求的唯一标识

发起的每次请求的唯一标识, 该值自增。

1.2 RequestBody

上面的 RequestHeader 基本上都大差不差,不同 Request 类型的 RequestBody 是不一样的.

本次 SyncGroupRequest 的属性如下

在这里插入图片描述

1. group_id 消费组 id

属性 group.id 配置

2. member_id 消费者成员 ID

消费者的成员 id, 默认就是空字符串, 客户端不可设置。 该值会在后续的请求中返回并被赋值。

3. group_instance_id

客户端属性:group.instance.id 默认值 空)

Kafka2.3 版本引入的新参数. 用户提供的消费者实例的唯一标识符。 如果设置了,消费者则被视为静态成员, 静态成员配以较大的 session 超时设置能够避免因成员临时不可用(比如重启)而引发的 Rebalance, 如果不设置, 消费者将作为动态成员

4. generation_id 消费组协调器年代 ID

每个消费组协调器的年代 ID,没经过一次变化 这个 id 就是自增 1, 比如新增 Member、Leave Member 等等操作都会引起年代的增长。

5. protocol_type 协议类型

Consumer 发起的协议是 comsumer , 另一个可选项为connect

6. protocol_name 分配策略

消费组分区分配策略, 比如range

7. assignments 每个 member 对应的分配分区

这个参数,只有是 Leader Member 发起请求的时候才会带上, 这个请求是每个 Member 分配到的分区信息。

里面包含着 partitions 、userData 等信息。如下

单个 assignment 的数据结构是

        private List<TopicPartition> partitions;        private ByteBuffer userData;
复制代码

22. 发起请求

2.1 向哪个协调器节点发起请求

既然要发起请求,那么究竟是哪个节点呢?之前客户端向集群发起请求的计算方式一般都是获取最小负载的节点发起请求。

那么这里可不一样, 这里发起请求的 Node 是有具体要求的, 那就是向协调器 coordinator 发起。

那么问题来了, 谁是协调器, 协调器的节点是哪个?


先说结论:

该客户端的group.id 的 hash 值跟__consumer_offsets 分区数取模得到的分区号, 这个分区号的 Leader 在哪个 Broker,那么这个 Node 就在哪个 Broker 上。

一个 Broker 可能有多个 Node, 使用哪个 Node 取决于客户端发起请求时候使用的是哪个 listener。 客户端发起请求对应的 listener 就对应着相应的 Node。

PS: 如果 leader 不存在或者不在线, 会提示异常:The coordinator is not available

具体流程请看

寻找协调器FindCoordinatorRequest请求流程

2.2 发起请求时机

我们既然要发起 SyncGroup 的请求, 那么我们是在什么时候才会触发这个请求呢?

在上一篇文章 Kafka消费者JoinGroupRequest流程解析 我们有知道, 在所有 JoinGroupRequest 都 Join 完成并收到回调之后, 这些收到回调的 Member 都会立马发起 SyncGroup 请求,来请求协同最终的分配。

我们看一下 JoinGroupRequest 的时序图

在这里插入图片描述

上图的左边 JoinGroupResponseHandler 最后是发起了 SyncGroupRequest

但是要注意, Leader Member 和 Follower Member 是有区别的, Leader Member 会接收到所有 Member 的元信息,然后根据 Kafka消费者客户端分区分配策略 计算每个 Member 应该分配到的分区信息, 然后把这个信息当做入参, 发送给组协调器( GroupCoordinator )

但是 Follow Member 没有这个信息的, 你看下面这个就是 Follower 的请求,Assignments 那一个属性都是空集合。

AbstractCoordinator#onJoinFollower


    private RequestFuture<ByteBuffer> onJoinFollower() {        // send follower's sync group with an empty assignment        SyncGroupRequest.Builder requestBuilder =                new SyncGroupRequest.Builder(                        new SyncGroupRequestData()                                .setGroupId(rebalanceConfig.groupId)                                .setMemberId(generation.memberId)                                .setProtocolType(protocolType())                                .setProtocolName(generation.protocolName)                                .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))                                .setGenerationId(generation.generationId)                                .setAssignments(Collections.emptyList())                );        log.debug("Sending follower SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder);        return sendSyncGroupRequest(requestBuilder);    }
复制代码

33. 协调器接受请求

协调器接受到客户端发来的 SyncGroup 请求进行处理

处理入口:KafkaApi#handleSyncGroupRequest

真正处理的地方:GroupCoordinator#handleSyncGroup


  def handleSyncGroup(groupId: String,                      generation: Int,                      memberId: String,                      protocolType: Option[String],                      protocolName: Option[String],                      groupInstanceId: Option[String],                      groupAssignment: Map[String, Array[Byte]],                      responseCallback: SyncCallback): Unit = {    validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match {      case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS =>        // 如果协调器还在加载中, 意味着该组需要从JoinGroup重新开始,返回异常:REBALANCE_IN_PROGRESS 给客户端进行处理        responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
      case Some(error) => responseCallback(SyncGroupResult(error))
      case None =>        groupManager.getGroup(groupId) match {          case None => responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))          case Some(group) => doSyncGroup(group, generation, memberId, protocolType, protocolName,            groupInstanceId, groupAssignment, responseCallback)        }    }  }


复制代码
  1. 如果发现消费组协调器还在加载中,返回异常 REBALANCE_IN_PROGRESS, 让客户端重新从 JoinGroup 发起请求。

  2. 如果当前缓存中不存在给定的 memberId 的 menber,说明可能已经异常,则返回异常

  3. 执行 doSyncGroup

doSyncGroup

代码省略。

  1. 一些异常检验

  2. 如果当前 Group 的 State 是 Empty, 则直接返回 UNKNOWN_MEMBER_ID 异常(Group 都是空的,还同步个球)

  3. 如果当前状态是 PreparingRebalance,返回异常: REBALANCE_IN_PROGRESS; 这个状态一般还没有 JoinGroup 完成呢

  4. 如果状态是 Stable,什么也不做,直接返回当前 Group 的分组情况就行了。

  5. 如果是 CompletingRebalance 状态,那么就对了, JoinGroup 完成之后就是这么个状态的。这里要判断一下过来的 Member 是不是 Leader Member,不是 Leader Member 啥也不做。就设置了一下回调参数,一会自己会被回调。如果是 Member 的话,我们就需要把当前的 Group 元信息给持久化一下了。相当于是每次重平衡都会把新的数据持久化存储;详情看下面

状态是 CompletingRebalance 并且请求来源是 Leader Member 则做下面的逻辑

     // 公众号:石臻臻的杂货铺     //wx: shiyanzu001  groupManager.storeGroup(group, assignment, (error: Errors) => {         group.inLock {                  // 另一个成员可能在我们等待此回调时加入了该组,                  // 因此我们必须确保我们仍处于 CompletingRebalance 状态并且在它被调用时处于同一代。                  // 如果我们已经转换到另一个状态,那么什么都不做                  if (group.is(CompletingRebalance) && generationId == group.generationId) {                  if (error != Errors.NONE) {                      // 写入数据的过程出现了异常了;                       //1.更新本地缓存,将所有的member分配方式设置为empty,                      //2:遍历所有member, 把孔的分配方案返回给他们。                      resetAndPropagateAssignmentError(group, error)                      maybePrepareRebalance(group, s"error when storing group assignment during SyncGroup (member: $memberId)")                    } else {                      // 传播分配方式;1.先更新本地缓存  2.遍历所有member, 把他们各自的分配方案 返回给他们。                      setAndPropagateAssignment(group, assignment)                      //装换状态为Stable                      group.transitionTo(Stable)                    }                  }                }              })              
复制代码

这段代码做的事情如下

3.1 存储 Group 元信息 storeGroup

  1. 根据group.id找到对应的__consumer_offset分区,获取 magicValue, 就是看下当前日志文件的版本号,关于消息体结构请看图解Kafka的RecordBatch结构

  2. 组装 Group 元信息的 Key、Value 结构,关于 Group 元信息数据结构请看:消费组偏移量_consumer_offset的数据结构


  1. 在这里插入图片描述

  2. 消息的 压缩类型是offsets.topic.compression.codec 默认是 0; 对应的压缩 k-v 映射关系为{ 0:NONE、1:GZIP、2:SNAPPY、3:LZ4、4: ZSTD}

  3. 把消息写入到 对应的 TopicPartition 中 ,Topic 为 __consumer_offset ,分区是group.id计算出来的。写入消息的一些基本配置如下:

  4. 在写入过程如果出现写入失败,异常了,则重置所有 Member 的分配方案为 Empty,并返回数据给各自的消费者客户端(客户端会再次发起 JoinGroup 请求);完了之后执行maybePrepareRebalance方法; 这个时候状态会流转为 PreparingRebalance,等待 Memmber 们的再次 JoinGroup。然后再加一个有效期为[ max.poll.interval.ms,默认值 300000(5 分钟)] 的 onCompleteJoin 任务, 这个任务会在所有的 Member 都 JoinGrop 之后,会执行方法:GroupCoordinator#onCompleteJoin, 是不是熟悉的方法又来了, ( 因为我们在 Kafka消费者JoinGroupRequest流程解析 分析过 ) 把结果返回给所有的 Member,告知 JoinGroup 成功了! 那么客户端是不是又得重新再来发起 SyncGropuRequest(在这个方法里面呢,如果 Group 已经是 Empty 了,那么也是会去在写入一个空的 Group 元信息的。)

  5. 如果正常写入, 则传播新的分配方案给每个 Member,注意,每个 Member 只会收到自己的分配方案。然后当前 Group 状态流转为 Stable

44. 返回客户端

上面我们说到了 Sync 之后会给 Member 发起回调,那么拿到回调之后客户端是如何处理的呢?

SyncGroupResponseHandler#handle

        /**    * 公众号:石臻臻的杂货铺    * VX : shiyanzu001    **/
    private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {        private SyncGroupResponseHandler(final Generation generation) {            super(generation);        }
        @Override        public void handle(SyncGroupResponse syncResponse,                           RequestFuture<ByteBuffer> future) {            Errors error = syncResponse.error();            if (error == Errors.NONE) {                            if (isProtocolTypeInconsistent(syncResponse.data.protocolType())) {                   //省略...                } else {                    synchronized (AbstractCoordinator.this) {                        // 因为在JoinGroup完成的时候状态已经变更为了COMPLETING_REBALANCE,                         //想要流转为STABLE只能是这个状态的                        if (!generation.equals(Generation.NO_GENERATION) && state == MemberState.COMPLETING_REBALANCE) {                                                       if (protocolNameInconsistent) {                            //省略...                            } else {                                // 一切正常,将状态变更为稳定状态                                 state = MemberState.STABLE;                                rejoinNeeded = false;                                // record rebalance latency                                lastRebalanceEndMs = time.milliseconds();                                lastRebalanceStartMs = -1L;        // 把获取到的assigment 分配数据往上抛出返回给消费者客户端处理                                future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));                            }                        } else {                              //省略...                        }
            } else {                //异常情况 ,需要重新发起 JoinGroup的请求哦                requestRejoin();
                if (error == Errors.GROUP_AUTHORIZATION_FAILED) {                    // 省略很多异常情况代码...                }            }        }    }    

复制代码
  1. 检验一些异常, 如果异常会重新 Join;requestRejoin

  2. 如果无异常,则将客户端状态更新为 STABLE 状态, 一定是从 COMPLETING_REBALANCE 流转到 STABLE; 关于客户端状态流转图请看


  1. 把接收到的分配信息memberAssignment更新一下消费者客户端,这个更新的逻辑在 AbstractCoordinator#onJoinComplete

4.1 onJoinComplete 同步成功

这个是只有当 Member 的状态为 MemberState.STABLE 的时候才会被执行到的。

Member 的状态稳定了之后, 并且也拿到了新的分配方案了

在这里插入图片描述

AbstractCoordinator#onJoinComplete

 /** * 公众号:石臻臻的杂货铺 * wx: shiyanzu001 **/        @Override    protected void onJoinComplete(int generation,                                  String memberId,                                  String assignmentStrategy,                                  ByteBuffer assignmentBuffer) {  //省略..          }

复制代码

这里代码太长,总结一下

  1. 更新客户端元数据

  2. 做一些简单校验

  3. 如果是以正则表达式规则订阅的 Topic,尝试去更新一下是否有命中新的 Topic

  4. 回调assignor.onAssignment

  5. 把新的分配方式更新到自己的缓存里面。

5 总结

在这里插入图片描述

6 附录: 消费者客户端状态流转图和消费组协调器状态流转图

在这里插入图片描述

在这里插入图片描述

发布于: 2022-11-16阅读数: 16
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019-09-06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
图解KafkaConsumer SyncGroupRequest请求流程_kafka_石臻臻的杂货铺_InfoQ写作社区