写点什么

大数据 -72 Kafka 事务 Coordinator、日志、2PC 与幂等性的协同机制 端到端 Exactly-Once 处理详解

作者:武子康
  • 2025-08-19
    山东
  • 本文字数:5925 字

    阅读完需:约 19 分钟

大数据-72 Kafka 事务Coordinator、日志、2PC 与幂等性的协同机制 端到端Exactly-Once处理详解

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 08 月 18 日更新到:Java-100 深入浅出 MySQL 事务隔离级别:读未提交、已提交、可重复读与串行化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下内容:


  • 磁盘存储

  • 零拷贝

  • 磁盘文件传输

  • JavaNIO、mmap、sendfile


事务场景

  • 事务消息的可见性控制:当 Producer 发送多条消息组成一个事务时,这些消息需要满足原子性语义。具体来说,要么所有消息同时对 Consumer 可见(事务成功提交后),要么所有消息同时不可见(事务回滚或未提交)。Kafka 通过事务协调器(Transaction Coordinator)和事务日志(Transaction Log)实现这一机制。例如,假设一个订单处理事务包含订单创建、库存扣减两条消息,它们必须同时生效或同时失效,否则会导致数据不一致。

  • 跨 Topic/Partition 的分布式事务:在实际场景中,Producer 可能需要向多个 Topic 或 Partition 发送消息(例如订单服务的订单 Topic 和物流服务的发货 Topic)。Kafka 通过全局唯一的事务 ID(Transactional ID)将这些操作绑定为一个分布式事务,并采用两阶段提交(2PC)协议保证原子性。例如,电商平台下单时,需同时写入订单库和优惠券库,这两个写入操作可能分布在不同的 Partition,但必须通过事务保证一致性。

  • 消费-处理-生产的端到端事务:在流式处理中,典型的模式是从 TopicA 消费消息,处理后写入 TopicB。Kafka 的事务机制允许将消费偏移量提交与生产消息绑定为原子操作:若处理失败,则消费偏移量不会提交(避免重复消费),同时已生产的消息也会被标记为中止。例如银行转账场景:读取转账请求 Topic→扣除 A 账户余额→写入 B 账户入账 Topic,任一环节失败均需回滚整个事务。

  • 事务恢复机制:为应对 Producer 崩溃,Kafka 设计了事务超时(transaction.timeout.ms)和幂等性机制。新 Producer 启动时会通过 Transaction ID 查询未完成事务的状态,并执行回滚或继续提交。例如支付服务崩溃后重启,需根据事务日志决定是补发支付成功消息还是撤回未完成交易。关键参数包括isolation.level(控制读隔离)和enable.idempotence(避免重复消息)。

  • 原子操作的三类场景

  • 仅生产消息:事务内包含多条发送到不同 Topic/Partition 的消息(如订单创建+支付流水记录)

  • 消费-生产组合:读取消息与发送消息的原子绑定(如流处理中的 Exactly-Once 语义)

  • 纯消费场景:仅提交消费偏移量的事务(实际无意义,因为单条消息消费本身具有原子性)


以下是扩展后的内容:




这三种典型的事务场景分别是:


  1. 纯生产者模式(Producer-only)

  2. 在这种模式下,应用程序只负责生产消息到 Kafka 主题,不涉及任何消费操作。

  3. 典型应用场景:日志收集系统、事件追踪系统等只需要单向写入的场合。

  4. 事务保证:确保一个事务批次内的所有消息要么全部成功写入,要么全部不写入。

  5. 示例:电商系统将用户行为事件批量写入 Kafka,需要保证一个用户会话中的所有事件要么全部记录,要么都不记录。

  6. 消费-处理-生产模式(Consume-Transform-Produce,CTP)

  7. 这是最常见的事务使用场景,包含完整的消息处理闭环:

  8. 从输入主题消费消息

  9. 对消息进行业务处理(转换、计算等)

  10. 将处理结果写入输出主题

  11. 事务保证:确保消费偏移量提交与生产消息这两个操作具有原子性。

  12. 典型应用场景:

  13. 流式 ETL 处理

  14. 事件驱动架构中的状态变更

  15. 精确一次(exactly-once)处理语义的实现

  16. 示例:银行转账系统中,消费交易请求消息,处理转账逻辑,然后生产账户余额更新消息。

  17. 纯消费者模式(Consumer-only)

  18. 仅消费消息并提交偏移量的场景。

  19. 实际上这种模式:

  20. 与手动提交偏移量效果相同

  21. 没有发挥事务的核心价值(跨操作的原子性保证)

  22. 还会带来额外的事务开销

  23. 因此在实际工程中基本不会采用:

  24. 事务的设计初衷是解决生产消费的原子性问题

  25. 单纯消费场景完全可以通过手动提交机制实现相同效果

  26. 反而可能因为事务机制引入不必要的性能损耗

  27. 特殊说明:某些框架可能出于 API 统一性考虑会支持此模式,但应避免在关键生产环境使用。



关键概念和推导

  • 因为 producer 发送消息可能是分布式事务,所以引入了常用的 2PC,所以有事务协调者(Transaction Coordinator)。Transaction Coordinate 和之前为了解决脑裂和惊群问题引入 Group Coordinate 在选举上类似。

  • 事务管理上事务日志是必不可少的,Kafka 使用一个内部的 topic 来保存事务日志,这个设计和之前使用内部 topic 保存偏移量的设计保持一致。事务日志是 TransactionCoordinate 管理的状态的持久化,因为不需要回溯事务的历史状态,所以事务日志只用保存最近的事务状态,_transaction_state

  • 因为事务存在 commit 和 abort 两种操作,而客户端又 read commit 和 read uncommited 两种隔离级别,所以消息队列必须能标识事务状态,这个被称作 Control Message。

  • procuer 挂掉重启或者漂移到其他机器需要能关联的之前的未完成的事务,所以需要有一个唯一标识符来进行关联,这个就是 Transcational Id,一个 producer 挂了,另一个相同 Transaction Id 的 producer 能够接着处理这个事务未完成的状态。Kafka 目前没有引入全局序,所以也没有 transaction id,这个 Transcation Id 是用户提前配置的。

  • TranscationId 能关联 producer,也需要避免两个使用相同 Transaction Id 的 producer 同时存在,所以引入了 producer epoch 来保证对应一个 Transcation Id 只有一个活跃的 producer。

事务语义

多分区原子写入

事务能够保证 KafkaTopic 下每个分区的原子写入,事务中所有的消息都将被写入或者丢弃。首先,我们来考虑一下子原子:读取-处理-写入周期是什么意思。简而言之,这意味着如果某个应用程序在某个 Topic0 的偏移量 X 处读取的消息 A,并且在对消息 A 进行了一些处理(如 B=F(A))之后将消息 B 写入 Topic tp1,则只有当消息 A 和 B 被认为被成功的消费并一起发布,或者完全不发布时,整个读取过程写入操作时原子的。现在,只有当消息 A 的偏移量 X 被标记为已消费,消息 A 才从 topic tp0 消费,消费到的数据偏移量(record offset)将被标记为提交偏移量(Committing offset)。在 Kafka 中,我们通过写入一个名为 offset topic 的内部 KafkaTopic 来记录 offset commit。消息仅在其 offset 被提交给 offsets topic 时才被认为成功消费。由于 offset commit 只是对 KafkaTopic 的另外一次,并且由于消息仅在提交偏移量时被视为成功消费,所以跨多个主题和分区的原子写入也启用原子 读取-处理-写入 循环:提交偏移量 X 到 offset topic 和消息 B 到 tp1 的写入将是单个事务的一部分,所以整个步骤都是原子的。

粉碎“僵尸案例”

我们通过每个事务 Producer 分配一个称为 Transcation Id 的唯一标识来解决僵尸实例的问题,在进程重新启动时能够识别相同的 Producer 实例。API 要求事务性 Producer 的第一个操作应该是在 Kafka 集群中显示注册的 TranscationId,当注册的时候,KafkaBroker 用给定的 Transcational Id 检查打开的事务并完成处理。Kafka 也增加了一个与 Transcational Id 相关的 epoch,epoch 存储每个 id 内部元数据。一旦 epoch 被触发,任何具有相同的 Transcation Id 和旧的 epoch 的生产者视为僵尸,Kafka 拒绝来自这些生产者的后续事务性写入。


简而言之:Kafka 可以保证 Consumer 最终只能消费非事务性消息或已提交事务性消息,它将保留来自未完成事务的消息,并过滤已终止事务的消息。

事务消息定义

生产者可以显式的发起事务会话,在这些会话中发送(事务)消息,并提交或中止事务。有如下的要求:


  • 原子性:消费者的应用程序不应暴露于未提交的消息中

  • 持久性:Broker 不能丢失任何已提交的事务

  • 排序:事务消费者应在每个分区中以原始顺序查看事务消息

  • 交织:每个分区都应该能够接受来自事务性生产者非事务生产者的消息

  • 事务中不应该有重复的消息


如果允许事务性和非事务性消息的交织,则非事务和事务性消息的相对顺序将基于相加(对于非事务性消息)和最终提交(对于事务性消息)的相对顺序。



在上图中,分区 P0 和 P1 接收事务 X1 和 X2 的消息,以及非事务性消息。时间线是消息到达 Broker 的时间,由于首先提交了 X2,所以每个分区都将在 X1 之前公开来自 X2 的消息,由于非事务性消息在 X1 和 X2 的提交之前到达,因此这些消息将在来自任一事务的消息之前公开。

事务配置

消费者

创建消费者代码,需要:● 将配置中的自动提交属性(auto.commit)进行关闭● 而且在代码里面也不能使用手动提交 commitSync()或者 commitAsync()● 设置 Isolation.level:READ_COMMITED 或 READ_UNCOMMITED

生产者

创建生产者,代码如下:● 配置 transacational.id 属性● 配置 enable.idempotence 属性

事务概览

生产者将表示事务开始、结束、中止状态的事务控制消息发送给使用多阶段协议管理事务的高可用事务协调器。生产者将事务控制记录(开始、结束、中止)发送到事务协调器,并将事务的消息直接发送到目标数据分区,消费者需要了解事务并缓冲每个待处理的事务,直到它们到达其相应的结束(提交、中止)记录为止。


  • 事务组

  • 事务组中的生产者

  • 事务组的事务协调器

  • Leader Brokers(事务数据所在的分区的 Broker)

  • 事务的消费者

事务组

事务组用于映射到特定的事务协调器(基于日志分区数字的哈希)。该组中的生产者需要配置为该组事务生产者,由于来自这些生产者的所有事务都通过此协调器进行,因此我们可以在这些事务生产者之间实现严格有序。

生产者 ID 和事务组状态

事务生产者需要两个参数:


  • 生产者 ID

  • 生产组需要将生产者的输入状态与上一个已提交的事务相关联,这使事务生产者能够重试事务(通过为该事务重新创建输入状态:在我们用例中通过是偏移量的向量)


可以使用消费者偏移管理机制来管理这些状态,消费者偏移量管理器将每个键(consumergroup-topic-partition)与该分区的最后一个检查点偏移量和元数据相关联。在事务生产者中,我们保存消费者的偏移量,该偏移量与事务的提交点关联。此偏移提交记录,(在__consumer_offsets 主题中) 应作为事务的一部分写入。即,存储消费组偏移量的(__consumer_offsets)主题分区将需要参与事务。因此,假定生产者在事务中间失败(事务协调器随后到期)。当生产者恢复时,它可以发出偏移量获取请求,以恢复与最后提交的事务相关联的输入偏移量,并从该点恢复事务处理。


为了支持此功能,我们需要对偏移量管理器和压缩的(__consumer_offsets)主题进行一些增强。首先,压缩的主题现在还将包含事务控制记录,我们将需要为这些控制记录提出剔除策略。其次,偏移量管理器需要具有事务意识,特别是,如果组与待处理的事务相关联,则偏移量提取请求应返回错误。

事务协调器

事务协调器 __transaction_state 主题特定分区的 Leader 分区所在的 Broker,它负责初始化、提交以及回滚事务。事务协调器在内存管理如下的状态:


  • 对应正在处理的事务的第一个消息的 HW,事务协调器周期性的将 HW 写到 ZK 中。

  • 事务控制日志中存储对应于日志 HW 的所有正在处理的事务

  • 事务消息主题分区的列表:事务的超时时间、与事务关联的 Producer ID


需要确保无论是什么样的保留策略(日志分区的删除还是压缩),都不能删除包含事务 HW 的日志分段。

事务流程

初始阶段

  • Producer:计算哪个 Broker 作为事务协调器

  • Producer:向事务协调器发送 BeginTransaction(producerId,generation、partitions)请求,当然也可以发送另一个包含事务过期时间的,如果生产者需要将消费者状态作为事务的一部分提交事务,则需要在 BeginTransaction 中包含对应的 __consumer_offsets 主题分区的信息。

  • Broker:生成事务 ID

  • Coordinator:向事务协调主题追加 BEIGIN(TxId,ProducerId,Generattion、Partitions)消息,然后发送响应给生产者

  • Producer:读取响应(包含了事务 ID:TxId)

  • Coordinator(and followers):在内存更新当前事务的待确认事务状态和数据分区信息

发送阶段

  • Producer:发送事务消息给主题 Leader 分区所在的 Broker

  • 每个消息包含 TxId 和 TxCtl 字段

  • TxCtl 仅用于标记事务的最终状态(提交还是中止),生产者请求也封装了生产者 ID,但不是不追加到日志中。

结束阶段

(生产者准备提交事务)


  • Producer:发送 OffsetCommitRequest 请求提交与事务结束状态关联的输入法状态(如下一个事务输入从哪儿开始)

  • Producer:发送 CommitTranscation(TxId,ProducerId,Generation)请求给事务协调器并等待响应(如果响应中没有错误信息,表示将提交事务)。

  • Coordinator:向事务控制主题追加 PREPARE_COMMIT(TxId)请求并向生产者发送响应。

  • Coordinator:向事务设计到的每个 Leader 分区(事务的业务数据的目标主题)的 Broker 发送一个 CommitTranscation(TxId,Partitions...)请求。

  • 事务业务数据的目标主题相关 Leader 分区 Broker:(情况 1:)如果不是__consumer_offsets 主题的 Learder 分区,一收到 CommitTransaction(TxId,Partition1,Partition2)请求就会向对应的分区 Broker 发送空(NULL)消息,并给该消息设置 TxId 和 TxCtl(设置为 COMMITED)字段,Leader 分区的 Broker 给协调器发送响应。

  • 事务业务数据的目标主题相关 Leader 分区 Broker:(情况 2:)如果是__consumer_offsets 主题的 Leader 分区:追加消息,该消息的 key 是 G-LAST-COMMIT,Value 就是 TxId 的值,同时也应该给该消息设置 TxId 和 TxCtl 字段。

  • Coordinator:向事务控制主题发送 COMMITED(TxId)请求,__transaction_state

  • Coordinator(and followers):尝试更新 HW。

事务中止

当事务生产者发送业务消息的时候如果发生了异常,可以中止该事务,如果事务提交超时,事务协调器也会中止当前事务。


  • Producer:向事务协调器发送 AbortTransaction(TxId)请求并等待响应。(一个没有异常的响应表示事务将会中止)

  • Coordinator:向事务控制主题追加 PREPARE_ABORT(Txid)消息,然后向生产者发送响应。

  • Coordinator:向事务业务数据的目标主题的每个涉及到的 Leader 分区 Broker 发送 AbortTranscation(TxId,Partition)请求。

基本事务流程的失败

  • 生产者发送 BeginTranscation(TxId)的时候超时或响应中包含异常,生产者使用相同的 TxId 重试。

  • 生产者发送数据时的 Broker 错误:生产者中止(然后重做)事务(使用新的 TxId)。

  • 如果生产者没有中止事务,则协调器将在事务超时后中止事务。仅在可能已将请求数据附加并复制到 Follower 的错误的情况下才需要重做事务。例如:生产者请求超时将需要重做,而 NotLeaderForPartitionException 不需要重做。

  • 生产者发送 CommitTranscation(TxId)请求超时或响应中包含异常,生产者使用相同的 TxId 重试事务,此时需要幂等性。

发布于: 刚刚阅读数: 2
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-72 Kafka 事务Coordinator、日志、2PC 与幂等性的协同机制 端到端Exactly-Once处理详解_Java_武子康_InfoQ写作社区