写点什么

大数据 -76 Kafka 从发送到消费:Kafka 消息丢失 / 重复问题深入剖析与最佳实践

作者:武子康
  • 2025-08-24
    山东
  • 本文字数:4195 字

    阅读完需:约 14 分钟

大数据-76 Kafka 从发送到消费:Kafka 消息丢失/重复问题深入剖析与最佳实践

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 08 月 18 日更新到:Java-100 深入浅出 MySQL 事务隔离级别:读未提交、已提交、可重复读与串行化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下内容:


  • Kafka 一致性保证

  • LogAndOffset(LEO)

  • HightWatermark(HW)

  • Leader 和 Follower 何时更新 LEO

  • Leader 和 Follower 何时更新 HW


基本介绍

消息重复和丢失是 Kafka 中很常见的问题,主要发生在以下三个阶段:


  • 生产者阶段

  • Broke 阶段

  • 消费者阶段

生产者阶段丢失

出现场景

生产者发送消息没有收到正确 Broke 的响应,导致生产者重试。生产者发送出一条消息,Broker 落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的 Exception 重试消息导致消息重复。

重试过程


上图说明:


  • new KafkaProducer()创建一个后台线程 KafkaThread 扫描 RecordAccumulator 中是否有消息

  • 调用 KafkaProducer.send()发送消息,实际上只是把消息保存到 RecordAccumulator 中

  • 后台线程 KafkaThread 扫描到 RecordAccumulator 中有消息后,将消息发送到 Kafka 集群

  • 如果发送成功,那么返回成功

  • 如果发送失败,判断是否重试,如果不允许重试则失败。允许重试则再保存到 RecordAccumulator 中,等待后台线程 KafkaThread 扫描再次发送

可恢复异常

异常是 RetriableException 类型 或者 TransactionManager 允许重试,RetriableException 类集成关系如下:


消息顺序问题

  • 如果设置 max.in.flight.requests.per.connection 大于 1(默认 5,单个连接上发送的未确认的请求的最大数量,表示上一个发出的请求没有确认下一个请求又发出去了)。大于 1 可能会改变记录的顺序,因为如果将两个 Batch 发送到单个分区,第一个 batch 处理失败并重试,但是第二个 batch 处理成功,那么第二个 batch 处理中的记录可能先出现被消费掉。

  • 如果设置 max.in.flight.requests.per.connection 等于 1,则可能会影响吞吐量,可以解决单个生产者发送顺序问题,如果多个生产者,生产者 1 先发送一个请求,生产者 2 后发送请求,此时生产者 1 返回可恢复异常,且重试成功,此时虽然 1 先发送,但是 2 是先被消费的。

解决方案

幂等性

启动 Kafka 幂等性:


  • enable.idempotence=true

  • ack=all

  • retries>=1

ack=0 且不重试

可能会丢失消息,适用于吞吐量指标重要性高于数据丢失,比如:日志采集。

生产者-Broker 阶段丢失

出现场景

ack=0 且不重试

生产者发送消息完,不管结果了,如果发送失败也就丢失了

ack=1, Leader 宕机

生产者发送完消息,只等待 Leader 写入成功就返回了,Leader 分区丢失了,此时 Follower 没来得及同步,消息丢失。

unclean.leader.election.enable 配置为 true

允许选举 ISR 以外的副本作为 Leader,会导致数据丢失,默认为 False。生产者发送异步消息,只等待 Leader 写入成功就返回,Leader 分区丢失,此时 ISR 中就没有 Follower,Leader 从 OSR 中选举,因为 OSR 中本来就落后于 Leader,造成了消息的丢失。

解决方案

禁用 unclean 选举 ACK=ALL

  • ack=all 或者 -1

  • tries > 1

  • unclean.leader.election.enable = false 生产者发送完消息后,等待 Follower 同步完再返回,如果异常则重试。副本的数量可能影响吞吐量,不超过 5 个,一般是 3 个。不允许 unclean 的 Leader 参与选举。

min.insync.replicas > 1 详细解析

核心概念

当生产者将 acks 参数设置为 all(或 -1)时,min.insync.replicas 参数用于指定确认消息写入成功所需的最小同步副本数量。如果存活的同步副本数低于这个最小值,生产者将收到以下异常之一:


  • NotEnoughReplicasException:表示当前没有足够的副本可用

  • NotEnoughReplicasAfterAppendException:表示消息已发送但最终未能达到要求的副本数

参数协同工作原理

这两个参数的组合使用可以增强数据持久性保证:


  1. 典型配置组合

  2. 创建复制因子(replication factor)为 3 的主题

  3. 设置 min.insync.replicas=2

  4. 生产者配置 acks=all

  5. 工作流程

  6. 生产者发送消息后,会等待至少 2 个副本确认写入(包括 leader)

  7. 如果只有 1 个副本可用(例如 2 个 broker 宕机),生产者将收到异常

  8. 系统会确保大多数副本(2/3)都确认写入后才认为消息提交成功

实际应用场景

  1. 金融交易系统

  2. 需要严格保证交易记录的持久性

  3. 配置示例:5 个副本,min.insync.replicas=3,可以容忍 2 个节点故障

  4. 关键业务日志

  5. 在 3 节点集群中设置min.insync.replicas=2

  6. 即使 1 个节点故障,系统仍能正常接收新消息

  7. 异常处理建议

  8. 捕获NotEnoughReplicasException后应实现重试机制

  9. 监控同步副本数量变化,提前发现潜在问题

注意事项

  • 该参数需要与unclean.leader.election.enable=false配合使用

  • 过高的设置可能影响可用性(如设置min.insync.replicas=3,当 2 个节点故障时将拒绝写入)

  • 可以通过kafka-topics.sh --describe命令查看实际同步副本数

失败的 offset 单独记录

生产者发送消息,会自动重试,遇到不可能恢复异常会跳出。这是可以捕获异常记录到数据库或者缓存,进行单独的处理。

消费者数据重复场景

出现场景

数据消费完没有及时的提交 offset 到 Broker。消费消息端在消费过程中挂掉没有及时的提交 offset 到 Broker,另一个消费端启动之后拿到之前的 offset 记录开始消费,由于 offset 的滞后性可能会导致启动的客户端有少量的重复消费。

解决方案

取消自动提交

每次消费完或者程序退出时手动提交,这可能也没法保证一条重复。

下游做幂等处理方案详解

在分布式消息系统中,确保消息处理的幂等性是保证数据一致性的重要手段。以下是几种常见的下游幂等处理方案及其实现细节:

1. 标准幂等消费模式

  • 基本实现:让下游服务自身实现幂等逻辑

  • 典型做法:下游系统在数据表中设计唯一约束(如订单 ID+处理状态),通过数据库约束来防止重复处理

  • 适用场景:订单处理、支付回调等业务场景

  • 示例:在订单状态变更时,先检查当前状态是否允许变更,再执行更新

2. Offset 记录方案

  • 实现方式:消费者为每条处理成功的消息记录 offset

  • 存储选择

  • 可存储在 Redis 等高速缓存中

  • 也可持久化到数据库

  • 注意事项

  • 需要确保 offset 记录和业务处理是原子操作

  • 系统重启时能正确恢复 offset 位置

  • 优化建议:可以批量记录 offset 以提高性能

3. 严格事务方案

  • 实现原理:将 offset 与业务数据放在同一事务中处理

  • 技术实现

  • 使用分布式事务框架(如 Seata)

  • 或通过本地事务+消息表模式

  • 典型架构

  • 创建业务数据表和 offset 记录表

  • 在同一个事务中:

  • 更新业务数据

  • 记录当前消费 offset

  • 适用场景:金融交易、资金变动等对一致性要求极高的场景

4. 乐观锁方案

  • 实现步骤

  • 在下游表中添加 version/offset 字段

  • 处理消息时比较传入 offset 与记录 offset:

  • 若新 offset 更大则处理并更新

  • 否则拒绝处理

  • SQL 示例


  UPDATE order_status   SET status = 'paid', offset = 12345   WHERE order_id = 'ORD123' AND offset < 12345
复制代码


  • 优势

  • 避免分布式事务开销

  • 实现相对简单

  • 注意事项

  • 需要确保 offset 是单调递增的

  • 要考虑 offset 字段的存储空间

方案对比


在实际应用中,需要根据业务场景的特点选择合适的方案。对于大多数业务场景,采用 offset 记录+业务幂等的组合方案即可满足需求;只有在极少数严格要求强一致的场景下,才需要考虑分布式事务方案。

__consumer_offsets:Kafka 消费者位移管理机制

背景与演进

在早期版本的 Kafka 中,消费者位移信息是存储在 ZooKeeper 中的。然而,随着 Kafka 集群规模的扩大和消息吞吐量的增加,这种设计逐渐暴露出一些问题:


  1. ZooKeeper 的性能瓶颈:ZooKeeper 并不适合处理大批量的频繁写入操作,这会影响消费者位移提交的性能

  2. 扩展性问题:频繁的位移提交会给 ZooKeeper 带来较大压力,限制了 Kafka 集群的扩展能力

  3. 功能限制:基于 ZooKeeper 的位移管理难以实现更复杂的位移管理功能

__consumer_offsets 主题

从 Kafka 0.9.0 版本开始引入了内置的__consumer_offsets主题,并在 Kafka 1.0.2 版本中完全移除了对 ZooKeeper 的依赖。这个特殊的主题具有以下特点:


  1. 存储机制

  2. 采用 Kafka 本身的存储机制来保存消费者位移

  3. 默认配置为 50 个分区(可通过offsets.topic.num.partitions参数调整)

  4. 采用紧凑压缩策略(compact)来定期清理过期数据

  5. 性能优势

  6. 利用 Kafka 自身的高吞吐特性处理位移提交

  7. 批量写入机制显著提高了位移提交的性能

  8. 与消息处理使用相同的通信协议,减少额外开销

  9. 数据格式

  10. 键(Key):由group.id、主题名和分区号组成

  11. 值(Value):包含位移值、元数据和时间戳

管理与监控工具

Kafka 提供了kafka-consumer-groups.sh脚本(位于 bin 目录下)来管理消费者组和位移信息:

常用命令示例

  1. 查看所有消费者组:


bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
复制代码


  1. 查看特定消费者组的详细信息:


bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--group my-group --describe
复制代码


  1. 重置消费者位移(需要先停止消费者):


bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--group my-group --topic my-topic --reset-offsets --to-earliest --execute
复制代码

输出解读

执行 describe 命令后,输出通常包含以下列:


  • TOPIC:主题名称

  • PARTITION:分区号

  • CURRENT-OFFSET:消费者当前消费到的位移

  • LOG-END-OFFSET:分区最新消息的位移

  • LAG:未消费的消息数量(LOG-END-OFFSET - CURRENT-OFFSET)

  • CONSUMER-ID:消费者 ID

  • HOST:消费者客户端主机信息

实际应用场景

  1. 监控消费者延迟:通过定期检查 LAG 值来监控消费者健康状况

  2. 故障恢复:在消费者故障时,可以手动重置位移到指定位置

  3. 重新处理数据:通过修改位移值实现历史数据的重新处理

  4. 测试验证:在测试环境中快速重置消费者状态

配置参数

__consumer_offsets相关的几个重要配置参数:


  1. offsets.retention.minutes:位移保留时间(默认 1440 分钟/24 小时)

  2. offsets.topic.replication.factor:位移主题的副本数(建议≥2)

  3. offsets.commit.timeout.ms:位移提交超时时间(默认 5000ms)

  4. offsets.topic.num.partitions:位移主题分区数(默认 50)

注意事项

  1. 直接操作__consumer_offsets主题可能导致数据不一致,应优先使用官方工具

  2. 消费者组在长时间无活动后,其位移信息可能被自动清理

  3. 在高安全要求的集群中,应限制对__consumer_offsets主题的访问权限

发布于: 刚刚阅读数: 6
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-76 Kafka 从发送到消费:Kafka 消息丢失/重复问题深入剖析与最佳实践_Java_武子康_InfoQ写作社区