写点什么

Cloud- 借助消息队列解决分布式事务

  • 2022 年 5 月 11 日
  • 本文字数:1026 字

    阅读完需:约 3 分钟

多个系统相互配合工作,产生数据一致性问题。例如外卖场景中,下单中心,运单中心两个系统要配合工作,必须保证两个系统数据一致性。错误的解决方案:使用 API 接口调用,下单中心插入数据,调用运单中心的 API 接口处理数据,并启动事务回滚。咋一看这场景没有什么问题,毕竟有事务回滚,一起成功一起失败,但其实存在 API 调用超时的情况,此时下单中心以为调用失败回滚,而运单中心只是超时仍会继续执行程序,从而造成两个系统数据不一致。假设 API 调用成功,也有可能是在订单中心提交事务时失败了,此时订单中心回滚,而 API 已经调用,下单中心的数据已经产生,数据不一致。

使用消息队列解决分布式事务


问题的核心就是保证可靠生产与可靠消费可靠生产:下单中心处理数据和状态表更改应该保证事务一致。生产者往消息队列发送数据时,在本地建立一张状态表,看是否成功发送给队列。利用 RabbitMQ 的确认机制看是否重发还是定时扫描状态表重发,保证可靠生产。兜底方案还是定时扫描状态表。代码


/**


  • 分单处理队列*/public static final String QUEUE_NAME_TRANSACTION = "xucheng.distribute.queue";


/**


  • 分单处理交换机*/public static final String EXCHANGE_NAME_TRANSACTION = "xucheng.distribute.exchange";


/**


  • 消息队列服务进程,此进程包括两个部分:Exchange 和 Queue。*/public static final String ROUTE_NAME_TRANSACTION = "xucheng.distribute.route";


/**


  • 补单队列*///public static final String CREATE_QUEUE_NAME_TRANSACTION = "xucheng.order.reCreate.queue";


/**


  • 1、交换机绑定到分单队列

  • @return*/@Beanpublic DirectExchange transExchange() {return new DirectExchange(EXCHANGE_NAME_TRANSACTION);}


/***2、分单队列*


  • @return*/@Beanpublic Queue scoreQueue() {return new Queue(QUEUE_NAME_TRANSACTION, true);}


/**


  • 3、Binding

  • @return*/@Beanpublic Binding bindingExchangeOrderReceiverQueue() {//通过 Binding 将 Exchange 与 Queue 关联起来。  return BindingBuilder.bind(scoreQueue()).to(transExchange()).with(ROUTE_NAME_TRANSACTION);}



可靠消费:消费端开启手动 ACK,给 MQ 队列发送确认信息。对每条数据进行记录,一旦有异常记录可以试着去重试重新要求 MQ 再发数据,但不能重试太多次。可以将数据主键插入数据库 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 ,这样同一数据就不会执行两次,或者使用 redis 记录数据操作。

用户头像

还未添加个人签名 2022.04.13 加入

还未添加个人简介

评论

发布
暂无评论
Cloud-借助消息队列解决分布式事务_Java_爱好编程进阶_InfoQ写作社区