大数据 -75 Kafka 高水位线 HW 与日志末端 LEO 全面解析:副本同步与消费一致性核心

🚀 AI 篇持续更新中!(长期更新)
AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖
💻 Java 篇正式开启!(300 篇)
目前 2025 年 08 月 18 日更新到:Java-100 深入浅出 MySQL 事务隔离级别:读未提交、已提交、可重复读与串行化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!
📊 大数据板块已完成多项干货更新(300 篇):
包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解

章节内容
上节我们完成了如下内容:
Kafka 控制器中的 Broker 选举
Kafka 可靠性中的 副本复制、失效副本、副本滞后 等问题

一致性保证
基本概念
水位标记
水位或者水印(Watermark)一次,表示位置信息,即位移(offset)。Kafka 源码中使用的名字是高水位,HW(high Watermark)。
副本角色
Kafka 分区使用多个副本 replica 提供高可用。
LEO 和 HW
每个分区副本对象都有两个重要的属性:LEO 和 HW
LEO:记日志末端位移(log and offset),记录了该副本日志中下一条消息的位移值。如果 LEO=10,那么表示该副本保存了 10 条消息,位移值范围是【0,9】。
Leader LEO 和 Follower LEO 的更新是有区别的
HW:即上面提到的水位值,对于同一个副本对象而言,其 HW 值不会大于 LEO 值,小于等于 HW 值的所有消息都认为是“已备份”的 replicated。Leader 副本和 Follower 副本的 HW 更新不同。

上图中,HW 值是 7,表示位移的 0~7 的所有消息都已经处于“已提交状态”(COMMITED)。而 LEO 值是 14,8~13 的消息就是未完全备份(fully replicated)。
为什么没有 14?LEF 是指向下一条消息到来时的位移。
消费者无法消费分区下 Leader 副本中位移大于分区 HW 的消息
Follower 副本何时更新 LEO
在 Kafka 的副本同步机制中,Follower 副本的 LEO(Log End Offset)更新涉及两个关键环节:
1. Follower 本地 LEO 更新机制
当 Follower 副本向 Leader 发送 fetch 请求并成功获取消息后,会经历以下完整过程:
首先将消息写入本地日志文件
然后立即更新本地 Broker 副本管理中保存的 LEO 值
最后在响应 Leader 的 fetch 请求时,会将当前最新的 LEO 值一并返回给 Leader
示例场景:
假设 Follower 当前 LEO=100
从 Leader 获取了 10 条消息(offset 101-110)
写入本地日志后,立即将 LEO 更新为 110
在响应中会告知 Leader:"我的 LEO 现在是 110"
2. Leader 端维护的 Follower LEO
Leader 端会维护所有 Follower 副本的 LEO 信息,其更新机制如下:
当收到 Follower 的 fetch 响应时
解析响应中包含的 LEO 值
更新 Leader 副本管理器中该 Follower 对应的 LEO 记录
基于所有 Follower 的 LEO 计算新的 HW(High Watermark)
特殊处理情况:
如果网络延迟导致长时间未收到 Follower 响应,Leader 会保留该 Follower 最后上报的 LEO
当 Follower 重新连接时,会通过完整的同步过程重新建立准确的 LEO 记录
3. 两套 LEO 的协同工作机制
这两套 LEO 系统通过以下方式协同工作:
Follower 端 LEO 的作用:
用于确定本地日志的写入位置
作为计算本地 HW 的基础
在副本恢复时确定需要截断的位置
Leader 端维护的 Follower LEO 作用:
监控所有 Follower 的同步进度
计算分区级别的 HW
决定消息何时对消费者可见
在 Leader 选举时评估副本的同步状态
维护这两套 LEO 的关键原因在于:
本地 LEO 确保 Follower 能独立运作不受网络影响
Leader 端维护的 LEO 提供了全局视角,确保一致性
这种设计解耦了副本同步和 HW 计算的过程
注意:虽然两套 LEO 理论上应该一致,但在网络分区等异常情况下可能会出现短暂不一致,Kafka 的副本同步机制会最终确保它们达成一致。
Follower 副本的本地 LEO 何时更新
Follower 副本的本地 LEO(Log End Offset)更新机制如下:
基本更新原理
Follower 副本的 LEO 值直接反映其本地日志的最新写入位置
该值会随着新消息的持续写入而动态更新
更新触发场景
当 Broker 收到新消息写入请求时:
消息首先被追加到日志文件
LEO 值立即递增更新
例如:原 LEO=100,写入一条消息后变为 101
Fetch 请求处理流程
Follower 定期向 Leader 发送 fetch 请求(默认每 500ms)
Leader 返回的数据包含:
起始 offset
消息批次
其他控制信息
典型 fetch 请求处理过程:
Follower 发送 fetch(offset=100)
Leader 返回 offset 100-120 的消息
Follower 接收后写入本地日志
LEO 自动从 100 更新为 120
特殊情况处理
当出现消息压缩时:
物理 LEO 可能减小
但逻辑 LEO 继续保持递增
网络异常情况下:
可能触发重试机制
LEO 只在消息持久化后更新
与其他偏移量的关系
LEO 总是 >= HW(高水位线)
与 Leader 的 LEO 可能存在滞后
在 ISR 列表中时会尽量保持同步
监控与调优
可通过 kafka-topics.sh 查看 LEO 状态
replica.lag.time.max.ms 参数影响更新频率
监控 LEO 差值可发现副本同步问题
注意:LEO 更新是异步操作,实际更新时机可能受磁盘 I/O 性能影响。在配置较高的集群中,这个更新过程通常在毫秒级完成。
Leader 端 Follower 的 LEO 更新机制详解
在 Kafka 的副本同步机制中,Leader 端维护的 Follower 的 LEO(Log End Offset)更新遵循以下详细流程:
Fetch 请求触发时机:
Follower 会按照
replica.fetch.wait.max.ms
配置的时间间隔定期向 Leader 发送 fetch 请求或者当有新的消息到达 Leader 时,Follower 会立即触发 fetch 请求
请求处理阶段:
Leader 收到 fetch 请求后,首先解析请求中包含的 Follower 当前的 LEO 信息
Leader 根据请求中的 offset 从本地日志中读取相应的消息数据
在读取数据时,Leader 会检查该 Follower 是否有读取权限(ACL 校验)
LEO 更新阶段:
在准备响应数据前,Leader 会先更新内存中维护的该 Follower 的 LEO 值
更新的 LEO 值 = 本次 fetch 请求的起始 offset + 本次返回的消息数量
这个更新操作会记录在
LeaderEndPoint
的元数据中特殊场景处理:
如果 Follower 请求的 offset 不在 Leader 日志范围内,Leader 会触发截断或同步操作
对于新加入 ISR 的 Follower,Leader 会初始化其 LEO 为当前日志的起始 offset
当 Follower 落后过多时,Leader 会限制返回数据量以避免网络拥塞
更新后的操作:
更新完成后,Leader 会检查该 Follower 是否满足 ISR 条件
如果满足且原先不在 ISR 中,会触发 ISR 集合变更通知
Leader 会将更新后的 LEO 信息持久化到内存元数据中
示例场景:假设 Follower 当前的 LEO 是 100,发送 fetch 请求获取 offset=100 开始的 50 条消息。Leader 处理流程:
接收 fetch(offset=100, max_bytes=...)
从本地日志读取 offset 100-149 的消息
更新该 Follower 的 LEO 为 150
将消息 100-149 和更新后的 LEO 信息一并返回给 Follower
Follower 副本何时更新 HW
Follower 更新 HW 发生在其更新 LEO 之后,一旦 Follower 向 Log 写完数据,尝试更新自己的 HW 值。比较当前 LEO 值域 fetch 响应中 Leader 的 HW 值,取两者的小者作为新的 HW 值。
即:如果 Follower 的 LEO 大于 Leader 的 HW,Follower HW 值不会大于 Leader 的 HW 值。

Leader 副本何时更新 LEO
和 Follower 更新 LEO 相同,Leader 写 Log 时自动更新自己的 LEO 值
Leader 副本何时更新 HW
Leader 的 HW 值就是分区 HW 值,直接影响分区数据对消费者的可见性
Leader 会【尝试】去更新分区 HW 的四种情况:
Follower 副本成为 Leader 副本时,Kafka 会尝试去更新分区 HW
Broker 奔溃导致副本被踢出 ISR 时,检查下分区 HW 值是否需要更新是有必要的
生产者向 Leader 副本写消息时,因为写入消息会更新 Leader 的 LEO,有必要检查 HW 值是否需要更新
Leader 处理 Follower fetch 请求时,首先从 Log 读取数据,之后尝试更新分区 HW 值
当 Kafka Broker 都正常工作时,分区 HW 值的更新时机有两个:
Leader 处理 produce 请求时
Leader 处理 fetch 请求时
Leader 如何更新自己的 HW 值?详细解析
1. 数据存储机制
Leader Broker 维护两个关键数据集:
所有 Follower 副本的 LEO(Log End Offset)列表
自身的 LEO 值
这些数据会持久化到磁盘,并通过 ZooKeeper 进行集群同步,确保故障恢复时数据一致性。
2. HW 确定流程(分步骤说明)
当需要更新分区 HW 值时,Leader 会执行以下步骤:
副本筛选阶段:
扫描所有 Follower 副本
对每个副本检查两个条件(满足任意一个即可):
条件 A:当前位于 ISR(In-Sync Replicas)列表中
条件 B:同时满足:
不在 ISR 列表中
该副本 LEO 落后于 Leader LEO 的时长 ≤ replica.lag.time.max.ms(默认 10,000ms)
最小值比对阶段:
收集所有通过筛选的副本 LEO
加入 Leader 自身的 LEO 值
取这些 LEO 值中的最小值作为新 HW
3. 条件设计的必要性
双条件机制解决的核心问题:
避免 HW 越界:如果只考虑 ISR 中的副本,当某个副本刚好满足 replica.lag.time.max.ms 条件(即将进入 ISR),但尚未被加入 ISR 时:
该副本可能已经 catch up 了大部分数据(LEO 接近 Leader)
若忽略这类副本,HW 可能越过其实际存储位置
这违反了 HW 的核心定义(所有 ISR 副本 LEO 的最小值)
4. 配置参数影响
replica.lag.time.max.ms 的取值直接影响 HW 更新行为:
设置过小(如 1s):
可能导致频繁的 ISR 变动
增加网络抖动时的 HW 波动
设置过大(如 30s):
延长故障检测时间
可能保留过多"准 ISR"副本
5. 实际场景示例
假设一个分区有如下状态:
Leader LEO: 150
ISR 副本 A LEO: 148
非 ISR 副本 B LEO: 149(落后 Leader 时长 8s)
非 ISR 副本 C LEO: 130(落后 Leader 时长 15s)
HW 计算过程:
副本 A(ISR)→ 入选
副本 B(非 ISR 但落后 8s < 10s)→ 入选
副本 C(非 ISR 且落后 15s > 10s)→ 排除
比较值集合:[148(A), 149(B), 150(Leader)]
最终 HW = min(148,149,150) = 148
6. 异常处理机制
当出现以下情况时:
所有副本都不满足条件
网络分区导致无法获取 Follower 状态
Leader 会:
维持当前 HW 不变
触发 Controller 进行副本重分配
记录 WARN 级别日志:"No eligible replicas found for HW update"
版权声明: 本文为 InfoQ 作者【武子康】的原创文章。
原文链接:【http://xie.infoq.cn/article/7899d0d5e24fd394681d2952d】。文章转载请联系作者。
评论