写点什么

【kafka 原理】 消费者偏移量 __consumer_offsets_ 相关解析

  • 2022-10-21
    江西
  • 本文字数:1986 字

    阅读完需:约 1 分钟

【kafka原理】 消费者偏移量__consumer_offsets_相关解析

作者石臻臻,CSDN 博客之星 Top5Kafka Contributornacos Contributor华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家 KnowStreaming PMC)


KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,带你一起你参与开源!

我们在 kafka 的 log 文件中发现了还有很多以 __consumer_offsets_的文件夹;总共 50 个;

由于 Zookeeper 并不适合大批量的频繁写入操作,新版 Kafka 已推荐将 consumer 的位移信息保存在 Kafka 内部的 topic 中,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看 consumer 信息。

__consumer_offsets 是 kafka 自行创建的,和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。__consumer_offsets 的每条消息格式大致如图所示

可以想象成一个 KV 格式的消息,key 就是一个三元组:group.id+topic+分区号,而 value 就是 offset 的值。

考虑到一个 kafka 生成环境中可能有很多consumerconsumer group,如果这些 consumer 同时提交位移,则必将加重 __consumer_offsets 的写入负载,因此 kafka 默认为该 topic 创建了 50 个分区,并且对每个 group.id做哈希求模运算Math.abs(groupID.hashCode()) % numPartitions,从而将负载分散到不同的 __consumer_offsets 分区上。

一般情况下,当集群中第一次有消费者消费消息时会自动创建__consumer_offsets,它的副本因子受 offsets.topic.replication.factor 参数的约束,默认值为 3(注意:该参数的使用限制在 0.11.0.0 版本发生变化),分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为 50。

1. 消费 Topic 消息

打开一个 session a,执行下面的消费者命令 ;指定了消费组:szz1-group; topic:szz1-test-topic

bin/kafka-console-consumer.sh --bootstrap-server  xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group --topic szz1-test-topic
复制代码

2.产生消息

打开一个新的 session b,执行生产消息命令

bin/kafka-console-producer.sh --broker-list  xxx1:9092,xxx2:9092,xxx3:9092  --topic szz1-test-topic
复制代码

发送几条消息


然后可以看到刚刚打开的 session a 消费了消息;


3. 查看指定消费组的消费位置 offset

bin/kafka-consumer-groups.sh --bootstrap-server xxx1:9092,xxx2:9092,xxx3:9092 --describe --group szz1-group
复制代码


可以看到图中 展示了每个partition 对应的消费者 id; 因为只开了一个消费者; 所以是这个消费者同时消费 3 个partition;CURRENT-OFFSET: 当前消费组消费到的偏移量LOG-END-OFFSET: 日志最后的偏移量CURRENT-OFFSET = LOG-END-OFFSET 说明当前消费组已经全部消费了;

那么我把 session a 关掉;现在没有消费者之后; 我再发送几条消息看看;

我发送了 2 条消息之后, partition-0 partition-1LOG-END-OFFSET: 日志最后的偏移量分别增加了 1; 但是CURRENT-OFFSET: 当前消费组消费到的偏移量 保持不变;因为没有被消费;

重新打开一个消费组 继续消费*

重新打开 session 之后, 会发现控制台输出了刚刚发送的 2 条消息; 并且偏移量也更新了


4. 从头开始消费 --from-beginning

如果我们用新的消费组去消费一个 Topic,那么默认这个消费组的 offset 会是最新的; 也就是说历史的不会消费例如下面我们新开一个 session c ;消费组设置为szz1-group3

bin/kafka-console-consumer.sh --bootstrap-server   xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group3    --topic szz1-test-topic
复制代码

查看消费情况

 bin/kafka-consumer-groups.sh --bootstrap-server  xxx1:9092,xxx2:9092,xxx3:9092  --describe --group szz1-group3
复制代码


可以看到CURRENT-OFFSET = LOG-END-OFFSET ;

如何让新的消费组/者 从头开始消费呢? 加上参数 --from-beginning

5.如何确认 consume_group 在哪个__consumer_offsets-? 中

Math.abs(groupID.hashCode()) % numPartitions

6. 查找__consumer_offsets 分区数中的消费组偏移量 offset

上面的 3. 查看指定消费组的消费位置 offset 中,我们知道如何查看指定的 topic 消费组的偏移量;那还有一种方式也可以查询

先通过 consume_group 确定分区数; 例如 "szz1-group".hashCode()%50=32; 那我们就知道 szz-group消费组的偏移量信息存放在 __consumer_offsets_32中;通过命令

 bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 32 --broker-list xxx1:9092,xxx2:9092,xxx3:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
复制代码


前面的 是 key 后面的是 value;key 由 消费组+Topic+分区数 确定; 后面的 value 就包含了 消费组的偏移量信息等等

然后接着我们发送几个消息,并且进行消费; 上面的控制台会自动更新为新的 offset;

7 查询 topic 的分布情况

bin/kafka-topics.sh --describe --zookeeper xxx:2181 --topic TOPIC名称
复制代码


发布于: 刚刚阅读数: 4
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019-09-06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
【kafka原理】 消费者偏移量__consumer_offsets_相关解析_kafka_石臻臻的杂货铺_InfoQ写作社区