写点什么

RocketMQ 事务消息

作者:周杰伦本人
  • 2022 年 8 月 13 日
    贵州
  • 本文字数:1997 字

    阅读完需:约 7 分钟

RocketMQ 事务消息

发送订单的事务消息,预处理

Controller 层:


@GetMapping(value = "/transaction")public String sendTransactionMsg() {    Order order = new Order("123", "山东菏泽");    String transactionId = UUID.randomUUID().toString();    MessageBuilder builder = MessageBuilder.withPayload(order).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId);    Message message = builder.build();
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction("OrderTransactionGroup", "TopicOrder", message, order.getOrderId()); return sendResult.getMsgId();}
复制代码


public class Order {
private String orderId;
private String address;
public Order(String orderId, String address) { this.orderId = orderId; this.address = address; }
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getAddress() { return address; }
public void setAddress(String address) { this.address = address; }}
复制代码


Order 对象保存订单信息,随机生成一个 ID 作为消息的事务 ID,定义了一个名为 OrderTransactionGroup 的事务组,用于下一步接收本地事务的监听。


此时消息已经发送到 Broker 中,但未投递出去,Consumer 暂时不能消费这条消息。

订单信息入库的事务操作,提交或回滚

@Component@RocketMQTransactionListener(txProducerGroup = "OrderTransactionGroup")public class TransactionMsgListener implements RocketMQLocalTransactionListener {
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { try { String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); Order order = (Order) message.getPayload(); boolean result = this.saveOrder(order, transactionId); return result ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } }
private boolean saveOrder(Order order, String transactionId){ return true; }
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); if (isSuccess(transactionId)) { return RocketMQLocalTransactionState.COMMIT; } return RocketMQLocalTransactionState.ROLLBACK; }
private boolean isSuccess(String transactionId) { return true; }}
复制代码


  1. 拿到前面生成的事务 ID

  2. 以事务 ID 为主键,执行本地事务 saveOrder()方法中使用事务 ID 来设置为唯一键,调用数据库插入订单表 checkLocalTransaction()方法先拿到事务 ID,然后以事务 ID 为主键,查询本地事务执行情况 isSuccess()方法中查询订单表。


实现 RocketMQLocalTransactionListener 接口,使用 @RocketMQTransactionListener 注解用于接收本地事务的监听,注解的 txProducerGroup 属性表示事务组名称,和前面定义的 OrderTransactionGroup 保持一致。


RocketMQLocalTransactionListener 两个实现方法:


  • executeLocalTransaction:执行本地事务,在第一步中消息发送成功后回调执行,一旦事务提交成功,下游应用的 Consumer 能收到该消息,在这里 demo 的本地事务就是保持订单信息入库

  • checkLocalTransaction:检查本地事务执行状态,如果 executeLocalTransaction 方法中返回的状态是未知或者未返回状态,默认会在预处理发送的 1 分钟后由 Broker 通知 Producer 检查本地事务,在 Producer 中回调本地事务监听器中的 checkLocalTransaction 方法。检查本地事务时,可以根据事务 ID 查询本地事务状态,返回具体事务状态给 Broker

消费消息

与普通消息消费一样


事务消息原理:


发送预处理消息成功后,开始执行本地事务。


如果本地事务执行成功,发送提交请求提交事务消息,消息会投递给 Consumer


如果本地事务执行失败,发送回滚事务消息,消息不会投递给 Consumer


如果本地事务状态未知,Broker 未收到二次确认的消息。Broker 发送请求给 Producer 进行消息回查,确认提交或回滚,如果消息状态一直未被确认,则需要人工介入处理。


好了 这就是 RocketMQ 的事务处理机制了,支持事务消息是 RocketMQ 的一大特点,我们要掌握它的流程,然后在适合的场合运用事务消息,保证事务。

发布于: 刚刚阅读数: 4
用户头像

还未添加个人签名 2020.02.29 加入

公众号《盼盼小课堂》,多平台优质博主

评论

发布
暂无评论
RocketMQ事务消息_8月月更_周杰伦本人_InfoQ写作社区