深入解析 Kafka 的 offset 管理
Kafka 中的每个 partition 都由一系列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。partition 中的每个消息都有一个连续的序号,用于 partition 唯一标识一条消息。
Offset 记录着下一条将要发送给 Consumer 的消息的序号。
Offset 从语义上来看拥有两种:Current Offset 和 Committed Offset。
Current Offset
Current Offset 保存在 Consumer 客户端中,它表示 Consumer 希望收到的下一条消息的序号。它仅仅在 poll()方法中使用。例如,Consumer 第一次调用 poll()方法后收到了 20 条消息,那么 Current Offset 就被设置为 20。这样 Consumer 下一次调用 poll()方法时,Kafka 就知道应该从序号为 21 的消息开始读取。这样就能够保证每次 Consumer poll 消息时,都能够收到不重复的消息。
Committed Offset
Committed Offset 保存在 Broker 上,它表示 Consumer 已经确认消费过的消息的序号。主要通过 commitSync
和 co
mmitAsync
API 来操作。举个例子,Consumer 通过 poll() 方法收到 20 条消息后,此时 Current Offset 就是 20,经过一系列的逻辑处理后,并没有调用consumer.commitAsync()
或consumer.commitSync()
来提交 Committed Offset,那么此时 Committed Offset 依旧是 0。
Committed Offset 主要用于 Consumer Rebalance。大数据培训在 Consumer Rebalance 的过程中,一个 partition 被分配给了一个 Consumer,那么这个 Consumer 该从什么位置开始消费消息呢?答案就是 Committed Offset。另外,如果一个 Consumer 消费了 5 条消息(poll 并且成功 commitSync)之后宕机了,重新启动之后它仍然能够从第 6 条消息开始消费,因为 Committed Offset 已经被 Kafka 记录为 5。
总结一下,Current Offset 是针对 Consumer 的 poll 过程的,它可以保证每次 poll 都返回不重复的消息;而 Committed Offset 是用于 Consumer Rebalance 过程的,它能够保证新的 Consumer 能够从正确的位置开始消费一个 partition,从而避免重复消费。
在 Kafka 0.9 前,Committed Offset 信息保存在 zookeeper 的[consumers/{group}/offsets/{topic}/{partition}]目录中(zookeeper 其实并不适合进行大批量的读写操作,尤其是写操作)。而在 0.9 之后,所有的 offset 信息都保存在了 Broker 上的一个名为__consumer_offsets 的 topic 中。
Kafka 集群中 offset 的管理都是由 Group Coordinator 中的 Offset Manager 完成的。
Group Coordinator
Group Coordinator 是运行在 Kafka 集群中每一个 Broker 内的一个进程。它主要负责 Consumer Group 的管理,Offset 位移管理以及 Consumer Rebalance。
对于每一个 Consumer Group,Group Coordinator 都会存储以下信息:
订阅的 topics 列表
Consumer Group 配置信息,包括 session timeout 等
组中每个 Consumer 的元数据。包括主机名,consumer id
每个 Group 正在消费的 topic partition 的当前 offsets
Partition 的 ownership 元数据,包括 consumer 消费的 partitions 映射关系
Consumer Group 如何确定自己的 coordinator 是谁呢?简单来说分为两步:
1. 确定 Consumer Group offset 信息将要写入__consumers_offsets topic 的哪个分区。具体计算公式:
2. 该分区 leader 所在的 broker 就是被选定的 coordinator
Offset 存储模型
由于一个 partition 只能固定的交给一个消费者组中的一个消费者消费,因此 Kafka 保存 offset 时并不直接为每个消费者保存,而是以 groupid-topic-partition -> offset 的方式保存。如图所示:
group-offset.png
Kafka 在保存 Offset 的时候,实际上是将 Consumer Group 和 partition 对应的 offset 以消息的方式保存在__consumers_offsets 这个 topic 中。
__consumers_offsets 默认拥有 50 个 partition,可以通过
的方式来查询某个 Consumer Group 的 offset 信息保存在__consumers_offsets 的哪个 partition 中。下图展示了__consumers_offsets 中保存的 offset 消息的格式:
__consumers_offsets.png
__consumers_offsets_data.png
如图所示,一条 offset 消息的格式为 groupid-topic-partition -> offset。因此 consumer poll 消息时,已知 groupid 和 topic,北京大数据培训又通过 Coordinator 分配 partition 的方式获得了对应的 partition,自然能够通过 Coordinator 查找__consumers_offsets 的方式获得最新的 offset 了。
Offset 查询
前面我们已经描述过 offset 的存储模型,它是按照 groupid-topic-partition -> offset 的方式存储的。然而 Kafka 只提供了根据 offset 读取消息的模型,并不支持根据 key 读取消息的方式。那么 Kafka 是如何支持 Offset 的查询呢?
答案就是 Offsets Cache!!
Offsets Cache.JPG
如图所示,Consumer 提交 offset 时,Kafka Offset Manager 会首先追加一条条新的 commit 消息到__consumers_offsets topic 中,然后更新对应的缓存。读取 offset 时从缓存中读取,而不是直接读取__consumers_offsets 这个 topic。
Log Compaction
我们已经知道,Kafka 使用 groupid-topic-partition -> offset*的消息格式,将 Offset 信息存储在__consumers_offsets topic 中。请看下面一个例子:
__consumers_offsets.JPG
如图,对于 audit-consumer 这个 Consumer Group 来说,上面的存储了两条具有相同 key 的记录:PageViewEvent-0 -> 240
和PageViewEvent-0 -> 323
。事实上,这就是一种无用的冗余。因为对于一个 partition 来说,我们实际上只需要它当前最新的 Offsets。因此这条旧的PageViewEvent-0 -> 240
记录事实上是无用的。
为了消除这样的过期数据,Kafka 为__consumers_offsets topic 设置了 Log Compaction 功能。Log Compaction 意味着对于有相同 key 的的不同 value 值,只保留最后一个版本。如果应用只关心 key 对应的最新 value 值,可以开启 Kafka 的 Log Compaction 功能,Kafka 会定期将相同 key 的消息进行合并,只保留最新的 value 值。
这张图片生动的阐述了 Log Compaction 的过程:
Log Compaction.JPG
下图阐释了__consumers_offsets topic 中的数据在 Log Compaction 下的变化:
Log Compaction for __consumers_offsets.JPG
在新建 topic 时添加
log.cleanup.policy=compact
参数就可以为 topic 开启 Log Compaction 功能。
auto.offset.reset 参数
auto.offset.reset
表示如果 Kafka 中没有存储对应的 offset 信息的话(有可能 offset 信息被删除),消费者从何处开始消费消息。它拥有三个可选值:
earliest:从最早的 offset 开始消费
latest:从最后的 offset 开始消费
none:直接抛出 exception 给 consumer
看一下下面两个场景:
1、Consumer 消费了 5 条消息后宕机了,重启之后它读取到对应的 partition 的 Committed Offset 为 5,因此会直接从第 6 条消息开始读取。此时完全依赖于 Committed Offset 机制,和auto.offset.reset
配置完全无关。
2、新建了一个新的 Group,并添加了一个 Consumer,它订阅了一个已经存在的 Topic。此时 Kafka 中还没有这个 Consumer 相应的 Offset 信息,因此此时 Kafka 就会根据auto.offset.reset
配置来决定这个 Consumer 从何处开始消费消息。
评论