写点什么

精华推荐 | 【深入浅出 RocketMQ 原理及实战】「性能原理挖掘系列」透彻剖析贯穿 RocketMQ 的事务性消息的底层原理并在分析其实际开发场景

作者:洛神灬殇
  • 2022-12-17
    江苏
  • 本文字数:6009 字

    阅读完需:约 20 分钟

精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景

什么是事务消息

事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

事务消息所对应的场景

在一些对数据一致性有强需求的场景,可以用 Apache RocketMQ 事务消息来解决,从而保证上下游数据的一致性。



以秒杀购物商城的商品下单交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、库存变更、购物车状态清空等多个子系统的变更。

事务性业务的处理分支包括:

  1. 主分支订单系统状态更新:由未支付变更为支付成功。

  2. 调用第三方物流系统状态新增:新增待发货物流记录,创建订单物流记录。

  3. 积分系统状态变更:变更用户积分,更新用户积分表。

  4. 购物车系统状态变更:清空购物车,更新用户购物车记录。


RocketMQ 的事务消息

Apache RocketMQ 在 4.3.0 版的时候已经支持分布式事务消息,这里 RocketMQ 采用了 2PC 的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。

RocketMQ 事务消息流程

针对于事务消息的总体运作流程,主要分为两个部分:正常事务消息的发送及提交、事务消息的补偿流程。

事务消息发送及提交基本流程概要(后面会详细分析原理)

事务消息发送步骤如下


  1. 消息发送者:生产者将半事务消息发送至 RocketMQ Broker。

  2. Broker 服务端:RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。

  3. 业务系统:生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback)。

  5. 如果本地操作成功,Commit 操作生成消息索引,消息对消费者可见

  6. 如果本地操作失败,此时对应的 half 消息对业务不可见,本地逻辑不执行,Rollback 均进行回滚。


服务端收到确认结果后处理逻辑如下


  • 确认结果为 Commit:服务端将半事务消息标记为可投递,并投递给消费者。

  • 确认结果为 Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

消息出现异常情况的补偿流程如下

在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果(Commit/Rollback)或服务端收到的二次确认结果为 Unknown 未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。


注意:服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置


事务消息回查步骤如下


  • 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  • 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行处理。


补偿总结


  1. 对没有 Commit/Rollback 的事务消息(pending 状态的消息),Broker 服务端会发起一次“回查”。

  2. 生产者 Producer 收到回查消息,检查回查消息对应的本地事务的状态。

  3. 生产者根据本地事务状态,重新 Commit 或者 Rollback。


补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况

RocketMQ 事务消息实现原理

事务消息在一阶段对用户不可见

在 RocketMQ 事务消息的主要流程中,一阶段的消息如何对用户不可见。


实现技术要点一:事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。如何做到写入消息但是对用户不可见呢?


RocketMQ 事务消息的做法是:如果消息是 half 消息,将备份原消息的 Topic 与消息消费队列,然后,改变 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC。


由于消费组未订阅该主题,故消费端无法消费 half 类型的消息,然后 RocketMQ 会开启一个定时任务,从 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC 中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。


RocketMQ 中,消息在服务端的存储结构如下,每条消息都会有对应的索引信息,Consumer 通过 ConsumeQueue 这个二级索引来读取消息实体内容,其流程如下:


RocketMQ 的底层实现原理

  1. 写入的如果事务消息,对消息的 Topic 和 Queue 等属性进行替换,同时将原来的 Topic 和 Queue 信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列。

  2. 由于没有直接发送到目标的 topic 的队列里面,故此消费者无法感知消息的存在,不会消费,其实改变消息主题是 RocketMQ 的常用“套路”,回想一下延时消息的实现机制。


发送一个半事务消息

半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是 Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递(pending)”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit 或者 rollback),半事务消息只有 commit 状态才会真正向下游投递。


Commit 和 Rollback 操作以及 Op 消息的底层实现原理


Rollback 的情况,对于 Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上 RocketMQ 也无法去真正的删除一条消息,因为是顺序写文件的)。


但是区别于这条消息没有确定状态(Pending 状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ 事务消息方案中引入了 Op 消息的概念,用 Op 消息标识事务消息已经确定的状态(Commit 或者 Rollback)。


如果一条事务消息没有对应的 Op 消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入 Op 消息后,事务消息无论是 Commit 或者 Rollback 都会记录一个 Op 操作。Commit 相对于 Rollback 只是在写入 Op 消息前创建 Half 消息的索引


Op 消息的存储和对应关系


Op 消息写入到全局特定的 Topic 中通过源码中的方法


TransactionalMessageUtil.buildOpTopic();
复制代码


这个 Topic 是一个内部的 Topic(像 Half 消息的 Topic 一样),不会被用户消费。Op 消息的内容为对应的 Half 消息的存储的 Offset,这样通过 Op 消息能索引到 Half 消息进行后续的回查操作。



Half 消息的索引构建


执行二阶段 Commit 操作时,需要构建出 Half 消息的索引。


  • 一阶段的 Half 消息由于是写到一个特殊的 Topic,

  • 二阶段构建索引时需要读取出 Half 消息,并将 Topic 和 Queue 替换成真正的目标的 Topic 和 Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。


所以,RocketMQ 事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。


补偿控制要点


如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于"半事务消息"时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback)。


这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。



注意:事务消息的生产组名称 ProducerGroupName 不能随意设置。事务消息有回查机制,回查时 Broker 端如果发现原始生产者已经崩溃,则会联系同一生产者组的其他生产者实例回查本地事务执行情况以 Commit 或 Rollback 半事务消息。


RocketMQ 的回查功能实现原理


如果在 RocketMQ 事务消息的二阶段过程中失败了,例如在做 Commit 操作时,出现网络问题导致 Commit 失败,那么需要通过一定的策略使这条消息最终被 Commit。RocketMQ 采用了一种补偿机制,称为“回查”。


  • 回查次数的配置化

  • Broker 端对未确定状态的消息发起回查,将消息发送到对应的 Producer 端(同一个 Group 的 Producer),由 Producer 根据消息来检查本地事务的状态,进而执行 Commit 或者 Rollback。Broker 端通过对比 Half 消息和 Op 消息进行事务消息的回查并且推进 CheckPoint(记录那些事务消息的状态是确定的)。

  • 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制

  • 如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志,执行回滚 Rollback 操作。

  • 回查行为的定制化 d

  • 此外用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个 Rollback 的行为,比如改写为 Commit,或者其他的记录日志或者发送消息邮件推送给指定人进行人工跟进。

  • 回查触发时间定制化


事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。


事务性消息可能不止一次被检查或消费。


  • 发送给用户的目标 topic 消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。

  • 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的生产者 ID 查询到消费者。




消息事务样例


事务消息共有三种状态,提交状态、回滚状态、中间状态。


  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。

  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。

  • TransactionStatus.Unknown: 中间状态(Pending),它代表需要检查消息队列来确定状态。

开发实现案例

发送事务消息样例

创建事务性生产者

使用 TransactionMQProducer 类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。


  TransactionListener transactionListener = new TransactionListenerImpl();  TransactionMQProducer producer = new    TransactionMQProducer("please_rename_unique_group_name");  ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,                        TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {           @Override           public Thread newThread(Runnable r) {               Thread thread = new Thread(r);               thread.setName("client-transaction-msg-check-thread");               return thread;           }       });       producer.setExecutorService(executorService);       producer.setTransactionListener(transactionListener);       producer.start();       String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};       for (int i = 0; i < 10; i++) {           try {               Message msg =                   new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,                       ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));               SendResult sendResult = producer.sendMessageInTransaction(msg, null);               System.out.printf("%s%n", sendResult);               Thread.sleep(10);           } catch (MQClientException | UnsupportedEncodingException e) {               e.printStackTrace();           }       }       for (int i = 0; i < 100000; i++) {           Thread.sleep(1000);       }       producer.shutdown();   }
复制代码
实现事务的监听接口

TransactionListener 接口的定义如下:


public interface TransactionListener {    /**     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.     *     * @param msg Half(prepare) message     * @param arg Custom business parameter     * @return Transaction state     */    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/** * When no response to prepare(half) message. broker will send check message to check the transaction status, and this * method will be invoked to get local transaction status. * * @param msg Check message * @return Transaction state */ LocalTransactionState checkLocalTransaction(final MessageExt msg);}
复制代码


当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。


public class TransactionListenerImpl implements TransactionListener {  private AtomicInteger transactionIndex = new AtomicInteger(0);  private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();  @Override  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {      int value = transactionIndex.getAndIncrement();      int status = value % 3;      localTrans.put(msg.getTransactionId(), status);      return LocalTransactionState.UNKNOW;  }  @Override  public LocalTransactionState checkLocalTransaction(MessageExt msg) {      Integer status = localTrans.get(msg.getTransactionId());      if (null != status) {          switch (status) {              case 0:                  return LocalTransactionState.UNKNOW;              case 1:                  return LocalTransactionState.COMMIT_MESSAGE;              case 2:                  return LocalTransactionState.ROLLBACK_MESSAGE;          }      }      return LocalTransactionState.COMMIT_MESSAGE;  }}
复制代码


executeLocalTransaction 是半事务消息发送成功后,执行本地事务的方法,具体执行完本地事务后,可以在该方法中返回以下三种状态:


  • LocalTransactionState.COMMIT_MESSAGE:提交事务,允许消费者消费该消息

  • LocalTransactionState.ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。

  • LocalTransactionState.UNKNOW:暂时无法判断状态,等待固定时间以后 Broker 端根据回查规则向生产者进行消息回查。


checkLocalTransaction 是由于二次确认消息没有收到,Broker 端回查事务状态的方法。回查规则:本地事务执行完成后,若 Broker 端收到的本地事务返回状态为 LocalTransactionState.UNKNOW,或生产者应用退出导致本地事务未提交任何状态。则 Broker 端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。

事务消息使用上的限制

事务消息不支持延时消息和批量消息。

发布于: 2022-12-17阅读数: 25
用户头像

洛神灬殇

关注

🏆 InfoQ写作平台-签约作者 🏆 2020-03-25 加入

【个人简介】酷爱计算机科学、醉心编程技术、喜爱健身运动、热衷悬疑推理的“极客达人” 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、微服务/分布式体系和算法设计等

评论

发布
暂无评论
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景_分布式事务_洛神灬殇_InfoQ写作社区