写点什么

RocketMQ 中的事务消息

用户头像
废材姑娘
关注
发布于: 2021 年 01 月 13 日
RocketMQ中的事务消息

事务

一说到事务, 大家一定会想到我们常用的关系型数据库中的事务, 当我们想同时更新多个数据时, 要保证更新操作要么全部成功, 要么全部失败.  且事务需要满足 ACID 特性, 即原子性, 一致性, 隔离性和持久性.

A(Atomic):原子性,构成事务的所有操作,要么都执行完成,要么全部不执行,不能有一半成功一半失败的情况。

C(Consistency):是指这些数据在事务执行完成这个时间点之前,读到的一定是更新前的数据,之后读到的一定是更新后的数据,不应该存在一个时刻,让用户读到更新过程中的数据。

I(Isolation):隔离性,数据库中的事务一般都是并发的,隔离性是指并发的两个事务的执行互不干扰,一个事

务不能看到其他事务运行过程的中间状态。通过配置事务隔离级别可以避脏读、重复读等问题。

D(Durability):持久性,事务完成之后,该事务对数据的更改会被持久化到数据库,后续的其他操作和故障都不会对事务的结果产生任何影响。。

分布式事务

随着业务的快速发展,网站系统往往由单体架构逐渐演变为分布式、微服务架构,而对于数据库则由单机数据库架构向分布式数据库架构转变。此时,我们会将一个大的应用系统拆分为多个可以独立部署的应用服务,需要各个服务之间进行远程协作才能完成事务操作。

分布式事务就是要在分布式系统中实现事务, 但是在分布式系统中实现严格的事务是很难的. 即使做到了,也会严重的影响我们系统的性能.  于是就有了 CAP 理论和 BASE 理论(自行百度, 到处都是)来解决分布式下的这些问题.基于 CAP 和 BASE 理论,  目前常用的分布式事务解决方案有: 2PC(Two-phase Commit,也叫二阶段提交)、TCC(Try-Confirm-Cancel) 和事务消息。每一种实现都有其特定的使用场景,也有各自的问题,都不是完美的解决方案。事务消息的使用场景就是需要异步更新数据时保证数据的最终一致性. 下面我们就看一下 RocketMQ 中是如何实现事务消息的.

RocketMQ 中的事务消息

RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。下面是 RokcetMQ 官网提供的事务消息流程图:


image


  1. 生产者发送半消息给消息队列服务端(Broker)

  2. 如果半消息发送成功, 则执行本地事务.

  3. 根据本地事务执行结果, 向消息队列服务端发送事务提交或回滚的请求.

  4. 如果消息队列长时间没有收到提交或回滚的请求, RocketMQ 提供了消息反查机制用于查询本地事务的执行情况.

  5. 根据反查结果,向消息队列服务端发送提交事务或回滚的请求.

  6. 消息队列服务端收到提交的请求会将消息发送给消费者, 如果收到回滚的请求则会将消息丢弃.


上面的流程中提到两个重要的概念: 半消息和反查机制

半消息: 半消息并不是说消息内容不完整, 而是说消息内容是完整的, 只不过是暂时无法提交给消息者进行消费的消息. 此时消息的状态是"暂时不可消息".

消息反查机制:  如果出现网络问题, 或生产者故障等原因导致服务端长时间没有收到确认消息,  服务端就会发送一个请求给生产者, 查询该消息的最终状态.


RocketMQ 事务案例

上面的描述很抽象, 我们来一个实际的例子. 比如我们在淘宝上买东西, 当你在购物车选择你要下单的商品后, 提交订单. 再回头看你的购物车, 是不是商品就没了.  如果下单成功,但是购物车没删除商品或者是商品删除了, 但是下单失败都会导致数据的不一致. 下面我以这个过程为例来看看如果使用 RocketMQ 的事务消息来实现.(说到这个, 淘宝只要你下单成功就会清理你的购物车, 如果你取消订单,购物车也无法还原了. ) 整个过程的流程如下所示:


Screen Shot 2021-01-12 at 10.41.07 PM.png


发送消息代码:

    @GetMapping("/pre")    public Boolean createOrder() {        TestOrder request = new TestOrder();        request.setUserId("3ef100c4-227e-11eb-adc1-0242ac120002");        request.setOrderId("6dfdedb6-4f54-11eb-ae93-0242ac130002");        request.setGoodsId("6dfde96a-4f54-11eb-ae93-0242ac130002");        request.setPaymentAmount(new BigDecimal(20));        request.setOrderStatus(OrderStatus.UNPAYED.getCode());        HashMap<String, Object> headers = new HashMap<>();        headers.put("KEYS", request.getOrderId());        MessageHeaders messageHeaders = new MessageHeaders(headers);        try {            Message<TestOrder> message = MessageBuilder.createMessage(request, messageHeaders);            TransactionSendResult sendResult = extRocketMQTemplate.sendMessageInTransaction(orderTopic +":order",  message,  request);            log.info("事务结果{}", sendResult.getLocalTransactionState());        } catch (Exception e) {            log.error("发送信息异常:{}", e.getMessage());        }        return true;    }
复制代码

发送端事务监听代码:

@Slf4j@Component@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")public class TransactionListenerImpl implements RocketMQLocalTransactionListener {    @Resource    private TestOrderService testOrderService;    @Override    public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object o) {        log.info("执行本地事务, 消息:{}", JSONObject.toJSONString(message));        // 执行本地业务逻辑, 如果本地事务执行成功, 则通知Broker可以提交消息让Consumer进行消费        TestOrder testOrder = (TestOrder) o;        try {            boolean success = testOrderService.createOrder(testOrder);            return success ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.UNKNOWN;        } catch (Exception e) {            log.info("本地任务执行异常: {}", e.getMessage());            return RocketMQLocalTransactionState.ROLLBACK;        }    }    @Override    public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {        log.info("check local transaction ");        // 提供事务执行状态的回查方法,提供给broker回调        // 正常情况下不会调用到        String orderId = (String) message.getPayload();        TestOrder testOrder = testOrderService.queryByOrderId(orderId);        if (ObjectUtil.isNotNull(testOrder)) {            return RocketMQLocalTransactionState.COMMIT;        } else {            return RocketMQLocalTransactionState.ROLLBACK;        }    }}
复制代码

消费端代码:

@Slf4j@Service@RocketMQMessageListener(nameServer = "${rocketmq.name-server}",        topic = "${demo.rocketmq.topic}",        consumerGroup = "${rocketmq.consumer.group}",        selectorExpression = "order")public class TransactionTestConsumer implements RocketMQListener<TestOrder> {    @Resource    private TestShoppingCarService shoppingCarService;    @Override    public void onMessage(TestOrder message) {        // 收到预下单成功的消息后, 将商品从用户的购物车中删除        log.info("consumer on message : {}", message);        boolean success = shoppingCarService.removeGoodsForUser(message.getGoodsId(), message.getUserId());        log.info("购物车删除状态:{} ", success);    }}
复制代码

总结

这里我们只是简单介绍了 RocketMQ 的事务实现的理论知识, 并给出了一个简单的 Demo.  从这个 Demo 中可以看到核心是发送端实现的事务监听器: RocketMQLocalTransactionListener.  该监听器提供了两个方法: executeLocalTransaction(org.springframework.messaging.Message message, Object o) 和 checkLocalTransaction(org.springframework.messaging.Message message).分别用于执行本地事务和消息反查. RocketMQLocalTransactionListener 是 Rocketmq-spring-boot-starter 对 Rocketmq 实现的一个封装. 至于 RocketMQ 的事务的实现方式我们将再后面的源码分析文章中详细阐述.







发布于: 2021 年 01 月 13 日阅读数: 33
用户头像

废材姑娘

关注

废材姑娘 2018.01.24 加入

大家叫我双儿,梦想着成为韦小宝的老婆 欢迎关注我的个人公众号----废材姑娘,回复“双儿”加我微信,让我们一起探索多彩的世界。

评论

发布
暂无评论
RocketMQ中的事务消息