写点什么

博文推荐|深入解析 Apache Pulsar 中的事务

作者:Apache Pulsar
  • 2021 年 11 月 29 日
  • 本文字数:5272 字

    阅读完需:约 17 分钟

关于 Apache Pulsar

Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。 

GitHub 地址:http://github.com/apache/pulsar/


原文转载于 StreamNative,原作者李鹏辉,地址:https://streamnative.io/en/blog/tech/2021-06-16-a-deep-dive-of-transactions-in-apache-pulsar。本文译者是资飞。


本系列上一篇文章《基于 Pulsar 事务实现的 Exactly-Once(精确一次)语义》介绍了在 Apache Pulsar 中,可以通过启用 Transaction API 保证精确一次语义。本文将详细介绍多种消息传递语义,包括:

  • 通过幂等 producer 支持单个 topic 精确一次语义

  • 事务 API

  • Pulsar 与 Flink 集成时端对端仅且只处理一次语义

本文将深入解析 Apache Pulsar 中的事务,帮助读者熟悉 Pulsar 事务 API 的主要概念,以便后续使用。

为什么需要事务?

事务增强了消息传递语义和流处理过程中处理保证(例如:使用 Pulsar Functions 或与其他流处理引擎集成)。流处理通常表现为“消费-处理-生产”,在一个数据流中生产和消费(例如:Pulsar Topic)。


随着流处理的兴起,对具有更强处理保证的流处理应用的需求也在不断增长。例如,金融机构使用流处理引擎来处理用户的借贷业务。这种场景要求每条消息都只处理一次且无异常。

换句话说,如果流处理应用消费 A 消息 并将生成的结果当作消息 B(B = f(A)) ,那么精确一次处理保证意味着,只有在成功生成消息 B 时,A 才会被标记为已消费,反之亦然。 


在 Pulsar 2.8.0 之前,使用 Apache Pulsar 构建流处理应用时,实现精确一次处理保证的操作都不简单。将 Pulsar 与流处理引擎(如 Flink)集成可能可以实现精确一次保证。例如,你可以使用 Flink 精确一次读取从 Pulsar topic 中的消息,但不能精确一次将结果写入到 Pulsar topic。


当为 Pulsar producer 和 consumer 配置至少传递一次语义时,流处理应用无法在以下场景中实现精确一次处理语义:

  • 重复写入:由于内部重试逻辑,producer 可能会多次写入同一条消息。幂等 producer 通过消息去重来解决这个问题。

  • 应用程序崩溃:流处理应用程序可能在任何时候崩溃。如果应用程序在写入结果消息 B 之后,但未将源消息 A 设为已消费(即 ack)时崩溃,应用程序在重启后会重新处理源消息 A,导致消息 B 重复写入到输出 topic,违反了精确一次处理的保证。

  • 僵尸应用程序:在分布式环境中,流处理应用可能会从网络中分区(如网络临时不可用)。通常,同一流处理应用程序的多个新实例会自动启动,以替换“丢失”的实例。在这种情况下,同一处理应用程序的多个实例可能在并行运行,这些实例会处理相同的输入 topic,并将结果写入相同的输出 topic,导致输出重复的消息,违反精确一次处理语义。

Pulsar 在 2.8.0 版本中引入了新事务 API,旨在解决在上述场景 2、3 中不能实现精确一次处理语义的问题。

事务语义

事务 API 使流处理应用程序能够在一个原子操作中消费、处理和生产消息。这意味着,在同一事务中的一批消息可以从许多 topic 分区接收、生产和确认。处理同一事务的所有操作为一个整体,要么全部成功,要么全部失败。


那么事务 API 如何解决上述三个问题呢?

跨多 topic 的原子写入与确认

首先,事务 API 支持将多个 Pulsar topic 作为单个整体进行原子写入和原子确认。在一个事务中生产或消费的所有消息一起被成功写入或确认,或者没有消息被成功写入或确认。例如,处理过程中的错误可能导致事务中止,在这种情况下,任何 consumer 都不会消费到该事务生成的任何消息。这对“消费-处理 -生产”原子操作意味着什么?


假设应用程序消费来自 topic T0 的消息 A,并在对消息 A(B=f(A))应用一些转换逻辑后生成结果消息 B 到 topic T1,那么仅当消息 A 和消息 B 被认为成功地一起消费和发布,或者根本不被消费也不被发布(即什么都不做)时,此时消费-处理-生产整个操作才是原子的。只有当消息 A 被成功确认时,我们才认为是在 topic T0 中消费了此消息。



事务 API 确保消息 A 的确认和消息 B 的写入以原子操作发生,此时才认为“消费-处理-生产”整个操作为一个原子操作。

通过条件确认隔离僵尸实例

我们通过条件确认来解决僵尸实例的问题。条件确认指当两个事务试图确认同一消息时,Pulsar 保证只有一个事务可以确认成功,另一个事务的确认会被中止。

读事务消息

读取由事务写入的消息会有怎样的保证?


只有在事务已提交时,Pulsar broker 才会向 consumer 分发事务消息。换句话说,如果事务仍在进行中,则 broker 不会分发其中的消息;也不会传递处于中止状态的事务消息。


然而,Pulsar 并不保证在一个提交的事务中生产的消息会同时被消费。有以下几个原因:但 Pulsar 不保证同时消费在同一个提交事务中生产的消息,原因如下:


  1. 参与提交事务的 topic 分区数量众多,consumer 不一定能消费到所有分区上的消息,因此无法读取该事务中生成的所有消息。

  2. Consumer 的接受队列大小或缓冲区大小可能不同,因此只能接收一定数量(可能是任意值)的消息。

事务 API

事务特性主要是服务器端协议级特性。目前事务 API 只支持 Java 客户端(未来将会支持更多语言的客户端)。用 Java 编写,使用 Pulsar 事务 API 的“消费-处理-生产”应用程序示例如下:



让我们按步骤分析这个例子。



事务的实现

本节简要概述事务 API 引入的新组件和新请求流程。你可以阅读相关文档,或回看 Pulsar 北美峰会上的相关视频了解更多关于事务的详细信息


本节只介绍与事务相关的主要概念,为用户调试或调优事务,提供参考。



组件

事务协调器和事务日志

事务协调器(Transaction Coordinator,TC)维护与事务交互的 topic 和订阅。提交事务时,事务协调器与 topic owner broker 交互以完成事务。


事务协调器( TC )是一个在 Pulsar broker 中运行的模块,全程维护事务,并阻止事务进入错误状态。事务协调器也处理事务超时,确保事务在超时后中止。


所有事务元数据都保存在事务日志中,而事务日志保存在 Pulsar topic 中。事务协调器崩溃后,仍可以从事务日志中恢复事务的元数据。


每个事务协调器都有事务日志 topic 的分区子集,也就是说,(事务协调器所在的) broker 是(topic) 分区的 owner。


每个事务都有唯一的事务 id(TxnID),长度为 128 位。最高的 16 位用于表示事务日志所在的 topic 分区,其余位为根据 TC(此事务日志所在的 topic 分区的 owner) 生成的单调递增数值。


值得注意的是,事务日志 topic 只存储事务的状态,不存储事务中的消息。消息存储在 topic 分区中。事务可以处于多种状态,如“ Open ”、“ Prepare commit ”和“ committed ”。事务日志中存储事务的状态信息和相关的元数据。

事务缓冲区

事务中的消息(原存储在 topic 分区中)存储在对应的 topic 分区所在的 broker 事务缓冲区中。在提交事务前,事务缓冲区中的消息对 consumer 不可见。事务中止时,事务缓冲区中的消息将被丢弃。

Pending ack 状态

在提交事务前,事务中的消息确认处在 pending ack 状态。如果消息处于 pending ack 状态,则在事务中止时,消息未从 pending ack 状态中移除,其他事务无法确认该消息。(消息不能被其他事务确认直到此消息移除 pending ack 状态。)


pending ack 状态保存在 pending ack 日志中。pending ack 日志存储在游标日志中。重启后 broker 可以从 pending ack 日志还原事务状态,保证 ack 不会丢失。

数据流

从上层 API 可看出,数据流可以分为多个步骤:

  • 开启事务;

  • 发布事务消息;

  • 确认事务消息;

  • 完成事务。

开启事务

在事务开始时,Pulsar 客户端向定位事务协调器请求新的事务 ID。收到请求后,事务协调器为事务分配事务 ID。然后,自动生成该事务的日志,并记录其 id 和状态( OPEN,如步骤 1a 所示),保证事务状态持久化(不必担心事务协调器崩溃)。记录事务状态后,TC 将事务 ID 返回给 Pulsar 客户端。

发布事务消息

在 Pulsar 客户端向新的 topic 分区发送消息之前,客户端请求 TC 将此 topic 分区添加到事务中。TC 将分区的更改记录并持久存储在其事务日志中(如 2.1a 所示),确保 TC 知道事务正在处理的所有分区。因此,在 end-partition 时,TC 可以提交或中止此 transaction 在所有分区上的变更。

Pulsar 客户端开始向分区发送消息,此发送流程与正常的消息发送流程完全相同。唯一的区别是事务生成的批消息包含事务 id。接收该批消息的 broker 检查该批消息是否属于某个事务。如果不属于某个事务,那么 broker 会正常处理写操作;否则,broker 将该批消息写入分区的事务缓冲区。

携带事务 Ack 消息

Pulsar 客户端第一次订阅被确认为事务的一部分时向 TC 发送请求。在步骤 2.3a 中,TC 记录对事务的新订阅,确保 TC 知道事务正在处理的所有订阅,因此 TC 可以在 EndTxn 阶段提交或中止对每个订阅的更改。

Pulsar 客户端开始 ack 订阅上的消息,此事务确认流程与正常确认流程相同,但事务确认请求中包含事务 id。接收 ack 请求的 broker 检查此 ack 是否属于某个事务,如果属于某个事务,则 broker 将消息标记为:PENDING_ACK 状态,即在提交或中止该 ack 前,其他 consumer 不能 ack 或 nack 此消息,从而确保当两个事务在 ack 同一条消息时,只有一个事务可以 ack 成功,另一个则将被中止。

Pulsar 客户端尝试确认消息时,如果在单个确认和累积确认中都检测到冲突,则会中止整个事务。

完成事务

在事务结束时,应用程序将决定提交或中止事务。如果在确认消息时检测到冲突,也可以中止事务。

当事务完成时,Pulsar 客户端可以向 TC 请求结束事务,并用一个字段标识事务是提交还是中止。


TC 将提交或中止消息写入其事务日志(如 3.1a 所示),并向该事务中涉及的所有分区发送提交或中止事务请求。如 3.2 所示。


当接收到请求的所有分区都成功提交或中止事务后,TC 将提交或中止的消息写入其事务日志。如图中 3.3 所示。

事务性能如何

本文已经解释了事务的语义及工作原理,接下来我们来看看事务的性能。

事务 producer 性能

事务仅导致中等程度的写放大。额外写入出现的主要原因如下:


  • 对于每个事务,producer 都会收到额外的请求,以便向事务协调器注册 topic 分区。

  • 当事务完成时,向参与该事务的所有分区写入事务标记。

  • TC 将事务状态变更写入事务日志。所有添加到事务的 topic 分区的状态(xxx)都会被/更新/记录(下来)。(“准备提交”和“已提交”状态)。


  • 开销与作为事务部分写入的消息数无关。因此,提高吞吐量的关键是每个事务包含大量的消息。减少消息数量或缩短事务提交时间都会降低吞吐量。


    增加事务持续时间的后果是增加了端到端延迟。回想一下,consumer 并不会读取到未提交的事务消息。因此,提交间隔越长,consumer 等待的时间就越长(不得不等待),从而增加了端到端延迟。

    事务 consumer 的性能

    事务 consumer 比 producer 简单得多。所有的(事务)逻辑都由 Pulsar broker 服务器端完成,broker 仅分发已完成的事务消息。

    扩展阅读

    本文简要介绍了 Apache Pulsar 事务的相关信息。你可以阅读以下资料,深入了解 Pulsar 事务:



    我的同事郭斯杰和 Addison Higham 在 6 月 16 日至 17 日举行的 Pulsar Summit 北美峰会 2021 上分享了“ Exactly-Once 如此简单:Apache Pulsar 中的事务消息”。观看演讲视频,了解 Pulsar 事务的更多细节。

    结论

    本系列的第一篇博客文章《借助 Pulsar 事务机制,实现精确一次语义如此简单》介绍了 Apache Pulsar 的事务 API 如何启用精确一次语义。在本文中,我们讨论了 Apache Pulsar 中事务 API 的关键设计目标、事务 API 的语义,以及 API 实际如何工作。


    如果我们把流处理看作一个读-写处理器,那么这篇博文将重点放在读和写路径上,而处理本身则是一个黑匣子。然而,在实际处理阶段会发生很多事情,导致仅使用事务 API 无法保证精确一次处理。例如,如果处理逻辑修改了外部存储系统,那么这里介绍的事务 API 不足以保证精确一次处理。


    Pulsar 和 Flink 集成通过事务 API 为各种流处理应用程序提供端到端的精确一次处理,甚至处理期间更新那些额外状态存储。


    在接下来的几周里,我们将分享本系列的第三篇文章,详细介绍 Pulsar 和 Flink 集成如何基于新的 Pulsar 事务提供端到端的精确一次处理语义,以及如何使用 Pulsar 和 Flink 轻松编写流应用程序。

    相关阅读

    译者简介

    资飞,资深架构师,Apache Pulsar Contributor,目前在某初创公司负责数据平台、交易平台建设,个人在金融证券领域已有 9 年的工作经验,专注证券业务以及分布式计算方向,喜爱读书、烹饪、旅游,关注分布式、高并发、内存交易相关的技术。


    点击链接,获取 Apache Pulsar 硬核干货资料!

    用户头像

    Apache Pulsar

    关注

    下一代云原生分布式消息流平台 2017.10.17 加入

    Apache 软件基金会顶级项目,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展流数据存储特性。

    评论

    发布
    暂无评论
    博文推荐|深入解析 Apache Pulsar 中的事务