写点什么

Kafka 消费者客户端心跳请求

  • 2022-10-17
    江西
  • 本文字数:6248 字

    阅读完需:约 1 分钟

Kafka消费者客户端心跳请求

作者石臻臻,CSDN 博客之星 Top5Kafka Contributornacos Contributor华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家 KnowStreaming PMC)


KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,带你一起你参与开源! 。KnowStreaming 体验环境请访问:https://demo.knowstreaming.com/

1. 发起心跳请求

在 Consumer 客户端启动的时候,就会构建心跳监测线程 HeartbeatThread 并启动,

心跳监测线程名:kafka-coordinator-heartbeat-thread|group.id

例如:kafka-coordinator-heartbeat-thread | consumer0

        /**    * 公众号:石臻臻的杂货铺    * wx: shiyanzu001    **/    private boolean enabled = false;         private synchronized void startHeartbeatThreadIfNeeded() {        if (heartbeatThread == null) {            heartbeatThread = new HeartbeatThread();            heartbeatThread.start();        }    }
复制代码

虽然这个时候启动了, 但是 run 方法里面有个逻辑标志位enabled=false,实际上这个时候并不会发出心跳监测的。

它会根据整个消费组的状态变化而变化。

1.1 启动心跳线程

比如, 当我们的消费者客户端发起 JoinGroupRequest 并成功回调, 则就会设置enabled=true

JoinGroupResponseHandler#handle

从下面的代码可以看到, JoinGroupRequest 回调的时候,把客户端的状态流转为了 COMPLETING_REBALANCE,并启动的监测线程

在这里插入图片描述

1.2 暂停心跳线程

  1. 当客户端的状态变更为 UNJOINED 或者 PREPARING_REBALANCE 的时候

  2. 又或者心跳线程有异常的时候

那么心跳线程就会暂时停止, 因为 UNJOINED 或者 PREPARING_REBALANCE 的状态 本身并不需要去定时检查协调器在不在线, 并不关心。

1.3 发起心跳请求

有个相关的配置如下



    synchronized RequestFuture<Void> sendHeartbeatRequest() {        log.debug("Sending Heartbeat request with generation {} and member id {} to coordinator {}",            generation.generationId, generation.memberId, coordinator);        HeartbeatRequest.Builder requestBuilder =                new HeartbeatRequest.Builder(new HeartbeatRequestData()                        .setGroupId(rebalanceConfig.groupId)                        .setMemberId(this.generation.memberId)                        .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))                        .setGenerationId(this.generation.generationId));        return client.send(coordinator, requestBuilder)                .compose(new HeartbeatResponseHandler(generation));    }    
复制代码

把客户端的基本信息带上发起请求

1.4 发起 LeaveGroup(离组)请求

当客户端检测到当前时候超过了 session.timeout.ms的时候,会判定会话超时,这个时候将客户端持有的消费者协调器标记为空,需要重新寻找协调器去。

当客户端检测到当前时间具体上一次客户端poll消息超过了max.poll.interval.ms默认值300000(5 分钟)的时候, 就会执行 LeaveGroup 离组请求

AbstractCoordinator# maybeLeaveGroup


    public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {        RequestFuture<Void> future = null;
        // 从 2.3 开始,只有动态成员才会向 broker 发送 LeaveGroupRequest,group.instance.id 有效的 consumer 被视为从不发送 LeaveGroup 的静态成员,成员过期仅受 session timeout 控制        if (isDynamicMember() && !coordinatorUnknown() &&            state != MemberState.UNJOINED && generation.hasMemberId()) {            // this is a minimal effort attempt to leave the group. we do not            // attempt any resending if the request fails or times out.            log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",                generation.memberId, coordinator, leaveReason);            LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(                rebalanceConfig.groupId,                Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId))            );
            future = client.send(coordinator, request).compose(new LeaveGroupResponseHandler(generation));            client.pollNoWakeup();        }  // 重置状态为UNJOINED         resetGenerationOnLeaveGroup();
        return future;    }

复制代码
  1. 在 2.3 之后, 只有动态成员才会向 broker 发送 LeaveGroupRequest,group.instance.id 有效的 consumer 被视为从不发送 LeaveGroup 的静态成员,成员过期仅受session.timeout.ms 控制

  2. 重置客户端状态为 UNJOINED

具体的就不分析了。客户端向 GroupCoordinator 发送 LeaveGroupRequest 之后,协调器做的是

  1. 移除 Member,尝试重平衡

2. GroupCoordinator 处理请求

下面的代码看着很多, 其实也没有很复杂,基本上都是一些校验逻辑。

GroupCoordinator#handleHeartbeat


  def handleHeartbeat(groupId: String,                      memberId: String,                      groupInstanceId: Option[String],                      generationId: Int,                      responseCallback: Errors => Unit): Unit = {    validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error =>      if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS)        // the group is still loading, so respond just blindly        responseCallback(Errors.NONE)      else        responseCallback(error)      return    }
    groupManager.getGroup(groupId) match {      case None =>        responseCallback(Errors.UNKNOWN_MEMBER_ID)
      case Some(group) => group.inLock {        if (group.is(Dead)) {          // if the group is marked as dead, it means some other thread has just removed the group          // from the coordinator metadata; this is likely that the group has migrated to some other          // coordinator OR the group is in a transient unstable phase. Let the member retry          // finding the correct coordinator and rejoin.          responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)        } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "heartbeat")) {          responseCallback(Errors.FENCED_INSTANCE_ID)        } else if (!group.has(memberId)) {          responseCallback(Errors.UNKNOWN_MEMBER_ID)        } else if (generationId != group.generationId) {          responseCallback(Errors.ILLEGAL_GENERATION)        } else {          group.currentState match {            case Empty =>              responseCallback(Errors.UNKNOWN_MEMBER_ID)
            case CompletingRebalance =>              // consumers may start sending heartbeat after join-group response, in which case              // we should treat them as normal hb request and reset the timer              val member = group.get(memberId)              completeAndScheduleNextHeartbeatExpiration(group, member)              responseCallback(Errors.NONE)
            case PreparingRebalance =>                val member = group.get(memberId)                completeAndScheduleNextHeartbeatExpiration(group, member)                responseCallback(Errors.REBALANCE_IN_PROGRESS)
            case Stable =>                val member = group.get(memberId)                completeAndScheduleNextHeartbeatExpiration(group, member)                responseCallback(Errors.NONE)
            case Dead =>              throw new IllegalStateException(s"Reached unexpected condition for Dead group $groupId")          }        }      }    }  }  
复制代码

简单来说就是

  1. 校验一下 Group 协调器存不存在当前 Member

  2. 校验 Group 的状态是否 Dead,如果是 Dead 的话则客户端要重新寻找新的 GroupCoordinator 并 JoinGroup

  3. 判断客户端和 GroupCoordinator 是否在同一个年代,如果不是一个年代说明客户端需要重新 JoinGroup 了。

  4. 如果 GroupCoordinator 当前状态是 PreparingRebalance 的话, 客户端会判断自身如果是 STABLE 的话,则会重新 JoinGroup。

  5. 如果 GroupCoordinator 当前状态是 CompletingRebalance、Stable , 则会清理一下 GroupCoordinator 设置的延迟过期任务, 并重新设置一个新的任务。这个任务执行的时间是配置 session.timeout.ms 之后。如果假设没有心跳线程请求过来了, 那么这个任务就会被执行。如果执行了会有啥问题呢?请继续看下面的消费组协调器超时任务

关于 JoinGroupRequest, 是客户端发起加入到消费组的请求。

具体讲解请看:Kafka消费者JoinGroupRequest流程解析

2.1 消费组协调器超时任务

如果在session.timeout.ms 期间一直没有收到客户端来的心跳请求,那么消费组协调器超时任务就会被执行


  def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean): Unit = {    group.inLock {      if (group.is(Dead)) {    // 如果当前心跳监测到Group协调器已经Dead了,仅仅只是打印一下日志, 因为它自身可能已经不是组协调器了,他已经不能再被允许做什么了        info(s"Received notification of heartbeat expiration for member $memberId after group ${group.groupId} had already been unloaded or deleted.")        } else if (isPending) {        info(s"Pending member $memberId in group ${group.groupId} has been removed after session timeout expiration.")  // 客户端在发起第一次JoinGroup请求的时候,并没有带上memberId,但是Group会生成一个给客户端返回  // 这个时候这个member就是Pending状态的,属于待加入状态, 因为它还会发起第二次JoinGroup请求并带上这个memberId,才算是真的Join了Group  // 在这里 直接把这个memberId从Pending缓存中移除了,因为它心跳监测过期了,这意味着客户端需要重新发起第一次Join        removePendingMemberAndUpdateGroup(group, memberId)      } else if (!group.has(memberId)) {        debug(s"Member $memberId has already been removed from the group.")      } else {        val member = group.get(memberId)        if (!member.hasSatisfiedHeartbeat) {          info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
          removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")        }      }    }  }  

复制代码
  1. 如果 Group 状态是 Dead,那么什么也不做,它都不是组协调器了,它什么也做不了

  2. 如果当前的 member 是 Pending 状态,(首先了解一下 Pending 状态,Member 第一次 JoinGroup 的时候由于没有带上 memberId 参数,组协调器会生成一个 MemberId 返回给客户端,并且组协调器会在自身保持一份这个 Member 的数据,但是这个时候的 Member 是 Pending 状态的,意识是等待加入的状态, 因为它还会再发起第二次 JoinGroup 请求,并且带上这个 MemberId。这个时候才是真正的 JoinGroup。) 则把这个 Member 从 Pending 缓存中移除。也就意味着这个 Member 需要再次发起第一次 JoinGroup 请求。

  3. 其他状态就是确定期间没有心跳请求的话, 那么要把这个 Member 移除掉并更新 Group 元信息。①. 将这个 Member 在 Group 协调器缓存中移除②. 如果当前的状态是 Stable | CompletingRebalance 的话, 直接走prepareRebalance流程

prepareRebalance 流程主要做的事情就是

  1. 将状态流转到 PreparingRebalance

  2. 设置一个 DelayedJoin 超时过期任务,超时时间是max.poll.interval.ms 默认300000(5 分钟)

  3. 这个任务会在满足要求(所有 Member 都 JoinGroup 了)的时候,去执行 onCompleteJoin。这个就是跟 JoinGroup 后面的流程是一样的。主要动作就是通知所有的 Member,你们都 Join 成功了, 接下来你们该发起 SyncGroup 请求了。 具体请看:Kafka消费者JoinGroupRequest流程解析

3. 客户端处理返回数据

HeartbeatResponseHandler#handle

这一块的代码就不贴了,主要就是根据返回的异常做具体的事情。

如果没有异常的话啥也不干。

异常映射关系如下:

  1. COORDINATOR_NOT_AVAILABLE | NOT_COORDINATOR 该去重新寻找新的 GroupCoordinator

  2. REBALANCE_IN_PROGRESS;当前组协调器正在 Rebalance 中, 如果当前客户端是 STABLE 状态,说明它该重新发起 JoinGroupRequest 了,赶紧的它也要去 Join 并参与分配了呢。

  3. ILLEGAL_GENERATION | UNKNOWN_MEMBER_ID | FENCED_INSTANCE_ID : 将 Member 的状态设置为 UNJOINED,并重新 JoinGroup

4. 心跳线程状态图

我们可以先看一下 消费者客户端 Member 的状态流转图

在这里插入图片描述

了解了这个状态流转图, 也就可以知道心跳线程状态流转图了

因为心跳线程的运行只有在 两个状态:COMPLETING_REBALANCESTABLE

在这里插入图片描述

发布于: 刚刚阅读数: 4
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019-09-06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
Kafka消费者客户端心跳请求_Kafk_石臻臻的杂货铺_InfoQ写作社区