写点什么

Kafka 消费客户端协调器讲解 (GroupCoordinator)

  • 2022-10-16
    江西
  • 本文字数:2898 字

    阅读完需:约 1 分钟

Kafka消费客户端协调器讲解(GroupCoordinator)

作者石臻臻,CSDN 博客之星 Top5Kafka Contributornacos Contributor华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家 KnowStreaming


KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源!

  1. 什么是协调器

  2. 协调器工作原理

  3. 协调器的 Rebalance 机制

1 协调器的生命周期

GroupCoordinator 的创建

在 Kafka 启动的时候, 会自动创建并启动 GroupCoordinator


这个 GroupCoordinator 对象创建的时候传入的几个属性需要介绍一下



    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics)


复制代码

offsetConfig 相关配置


  private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(    maxMetadataSize = config.offsetMetadataMaxSize,    loadBufferSize = config.offsetsLoadBufferSize,    offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,    offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,    offsetsTopicNumPartitions = config.offsetsTopicPartitions,    offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,    offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,    offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,    offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,    offsetCommitRequiredAcks = config.offsetCommitRequiredAcks  )
复制代码


groupConfig 相关配置

groupMetadataManager

组元信息管理类

heartbeatPurgatory

心跳监测操作,每一秒执行一次

joinPurgatory

GroupCoordinator 的启动

  def startup(enableMetadataExpiration: Boolean = true): Unit = {    info("Starting up.")    groupManager.startup(enableMetadataExpiration)    isActive.set(true)    info("Startup complete.")  }
复制代码

这个启动对于 GroupCoordinator 来说只是给属性isActive标记为了 true, 但是同时呢也调用了GroupMetadataManager.startup

定时清理 Group 元信息

这个 Group 元信息管理类呢启动了一个定时任务, 名字为:delete-expired-group-metadata

每隔 600000ms 的时候就执行一下 清理过期组元信息的操作, 这个 600000ms 时间是代码写死的。

TODO:GroupMetadataManager#cleanupGroupMetadata

GroupCoordinator OnElection

当内部 topic __consumer_offsets 有分区的 Leader 变更的时候,比如触发了 LeaderAndIsr 的请求, 发现分区 Leader 进行了切换。

那么就会执行 GroupCoordinator#OnElection 的接口, 这个接口会把任务丢个一个单线程的调度程序, 专门处理 offset 元数据缓存加载和卸载的。线程名称前缀为group-metadata-manager- ,一个分区一个任务

最终执行的任务内容是:GroupMetadataManager#doLoadGroupsAndOffsets


__consumer_offsets 的 key 有两种消息类型

key version 0: 消费组消费偏移量信息 -> value version 0: [offset, metadata, timestamp]

key version 1: 消费组消费偏移量信息-> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]

key version 2: 消费组的元信息 -> value version 0: [protocol_type, generation, protocol, leader,

在这里插入图片描述

例如 version:3 的 schemaForGroupValue

Version-0

{ protocol_type: STRING, generation: INT32, protocol: NULLABLE_STRING, leader: NULLABLE_STRING, members: ARRAY({  member_id: STRING,  client_id: STRING,  client_host: STRING,  session_timeout: INT32,  subscription: BYTES,  assignment: BYTES })}
复制代码

Version-1


{ protocol_type: STRING, generation: INT32, protocol: NULLABLE_STRING, leader: NULLABLE_STRING, members: ARRAY({  member_id: STRING,  client_id: STRING,  client_host: STRING,  rebalance_timeout: INT32,  session_timeout: INT32,  subscription: BYTES,  assignment: BYTES })}
复制代码

Version-2

{ protocol_type: STRING, generation: INT32, protocol: NULLABLE_STRING, leader: NULLABLE_STRING, current_state_timestamp: INT64, members: ARRAY({  member_id: STRING,  client_id: STRING,  client_host: STRING,  rebalance_timeout: INT32,  session_timeout: INT32,  subscription: BYTES,  assignment: BYTES })}
复制代码

Version-3

{ protocol_type: STRING, generation: INT32, protocol: NULLABLE_STRING, leader: NULLABLE_STRING, current_state_timestamp: INT64, members: ARRAY({  member_id: STRING,  group_instance_id: NULLABLE_STRING,  client_id: STRING,  client_host: STRING,  rebalance_timeout: INT32,  session_timeout: INT32,  subscription: BYTES,  assignment: BYTES })}
复制代码

Value 每个版本的 Scheme 如下



  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(    new Field(PROTOCOL_TYPE_KEY, STRING),    new Field(GENERATION_KEY, INT32),    new Field(PROTOCOL_KEY, NULLABLE_STRING),    new Field(LEADER_KEY, NULLABLE_STRING),    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(    new Field(PROTOCOL_TYPE_KEY, STRING),    new Field(GENERATION_KEY, INT32),    new Field(PROTOCOL_KEY, NULLABLE_STRING),    new Field(LEADER_KEY, NULLABLE_STRING),    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(    new Field(PROTOCOL_TYPE_KEY, STRING),    new Field(GENERATION_KEY, INT32),    new Field(PROTOCOL_KEY, NULLABLE_STRING),    new Field(LEADER_KEY, NULLABLE_STRING),    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
  private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(    new Field(PROTOCOL_TYPE_KEY, STRING),    new Field(GENERATION_KEY, INT32),    new Field(PROTOCOL_KEY, NULLABLE_STRING),    new Field(LEADER_KEY, NULLABLE_STRING),    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))


复制代码

GroupCoordinator onResignation

在这里插入图片描述

发布于: 刚刚阅读数: 3
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019-09-06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
Kafka消费客户端协调器讲解(GroupCoordinator)_kafka_石臻臻的杂货铺_InfoQ写作社区