写点什么

订单初版—取消订单链路中的技术问题说明文档(二)

  • 2025-07-11
    福建
  • 本文字数:23023 字

    阅读完需:约 76 分钟

9.为什么拦截履约要同步而释放资产是异步 h


(1)在正向订单流程中触发履约采用异步柔性事务


生成订单写库、锁优惠券、锁库存这三个动作会绑定在一个刚性事务里,从而实现"生成订单写库 + 锁优惠券 + 锁库存"的强同步效果。但是当支付成功完成支付回调后,推送订单消息进行履约时,则通过 MQ 异步推送。也就是说:"生单 + 锁优惠券 + 锁库存"采用了同步刚性事务,而"推送订单消息触发履约"则采用了异步柔性事务。

 

原因一:在正向订单流程中,生单会使用到优惠券及涉及扣减库存。如果不采用同步刚性事务,则可能会出现虽然订单已经生成了,但是用户却还是能看到其优惠券的状态还处于未使用,或库存还没扣减。这样就可能对用户造成困惑,数据产生了短暂的不一致问题。严重可能会导致用户重复使用优惠券,以及库存超卖问题。

 

所以"生单 + 锁优惠券 + 锁库存"采用同步刚性事务,当生单操作一旦完成,用户就可以看到优惠券已使用 + 库存已扣减。



原因二:推送订单消息触发履约可以采用异步柔性事务,是因为用户支付完一个订单后,并没有必要马上看到订单已在履约处理,所以"推送订单进行履约"是天然可以被用户接受延迟的。

 

(2)在逆向订单流程中拦截履约采用同步刚性事务


与正向订单流程刚好相反:"修改订单状态 + 拦截订单不被履约"采用了同步刚性事务,而"释放优惠券 + 释放库存"则采用了异步柔性事务。因为取消订单的第一要务必然是拦截履约。

 

如果"修改订单状态 + 拦截订单不被履约"采用了异步柔性事务,那么可能出现订单状态已经改为已取消,但是由于异步拦截履约慢了,导致订单都已经被打包发货了,这样就可能发生纠纷和资损了。

 

如果订单还没被支付就进行了取消,此时订单状态被修改后,可以不用立刻关注库存和优惠券。在用户看来,其还没释放的优惠券最多不能马上恢复使用,影响不严重。最差也不会出现优惠券被用户重复使用的恶劣情况,以及库存超卖问题。

 

10.支付回调分别互斥预支付和取消订单



首先,预支付和支付回调会加同一把锁:ORDER_PAY_KEY。然后,支付回调和取消订单也会加同一把锁:CANCEL_KEY。所以支付回调会加两把锁:ORDER_PAY_KEY 和 CANCEL_KEY,这可以通过 Redisson 的 MutiLock 来实现。


@Servicepublic class OrderServiceImpl implements OrderService {    ...    //预支付订单    @Override    @Transactional(rollbackFor = Exception.class)    public PrePayOrderDTO prePayOrder(PrePayOrderRequest prePayOrderRequest) {        //入参检查        checkPrePayOrderRequestParam(prePayOrderRequest);        String orderId = prePayOrderRequest.getOrderId();        Integer payAmount = prePayOrderRequest.getPayAmount();
//加分布式锁(与订单支付回调时加的是同一把锁) String key = RedisLockKeyConstants.ORDER_PAY_KEY + orderId; boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PRE_PAY_ERROR); } try { ... } finally { //释放分布式锁 redisLock.unlock(key); } } ...
//支付回调 //支付回调有2把分布式锁的原因说明:同一笔订单在同一时间只能支付or取消 //不可以同时对一笔订单,既发起支付,又发起取消 @Override public void payCallback(PayCallbackRequest payCallbackRequest) { //入参检查 checkPayCallbackRequestParam(payCallbackRequest); String orderId = payCallbackRequest.getOrderId(); Integer payAmount = payCallbackRequest.getPayAmount(); Integer payType = payCallbackRequest.getPayType(); List<String> redisKeyList = Lists.newArrayList();
//加支付分布式锁避免支付系统并发回调 String orderPayKey = RedisLockKeyConstants.ORDER_PAY_KEY + orderId;
//加取消订单分布式锁避免支付和取消订单同时操作同一笔订单 String cancelOrderKey = RedisLockKeyConstants.CANCEL_KEY + orderId; redisKeyList.add(orderPayKey); redisKeyList.add(cancelOrderKey);
boolean lock = redisLock.multiLock(redisKeyList); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_ERROR); } try { ... } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { //释放分布式锁 redisLock.unMultiLock(redisKeyList); } } ...}
@Servicepublic class OrderAfterSaleServiceImpl implements OrderAfterSaleService { ... //取消订单/超时未支付取消 @Override @GlobalTransactional(rollbackFor = Exception.class) public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) { //入参检查 checkCancelOrderRequestParam(cancelOrderRequest);
//分布式锁 String orderId = cancelOrderRequest.getOrderId(); String key = RedisLockKeyConstants.CANCEL_KEY + orderId; boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT); } try { //执行取消订单 return executeCancelOrder(cancelOrderRequest, orderId); } catch (Exception e) { log.error("biz error", e); throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } } ...}
复制代码


11.支付回调和取消订单互斥下的两种场景分析


支付回调的处理过程中,会对异常场景进行判断:

 

一.如果订单状态是"已创建"

那么会更新订单状态为已支付,并发送事务消息。

 

二.如果订单状态不是"已创建"

那么判断订单状态是否是取消状态,如果是取消状态,则继续判断订单是否未支付。若是未支付,则进行退款,并抛出异常。若不是未支付,则按支付方式抛出不同的异常。如果不是取消状态,则判断支付回调是否是同种支付方式。如果是,则返回。如果不是,则进行退款,并抛出异常。


@Servicepublic class OrderServiceImpl implements OrderService {    ...    //支付回调    //支付回调有2把分布式锁的原因说明:同一笔订单在同一时间只能支付or取消    //不可以同时对一笔订单,既发起支付,又发起取消    @Override    public void payCallback(PayCallbackRequest payCallbackRequest) {        //1.入参检查        checkPayCallbackRequestParam(payCallbackRequest);
String orderId = payCallbackRequest.getOrderId(); Integer payAmount = payCallbackRequest.getPayAmount(); Integer payType = payCallbackRequest.getPayType(); List<String> redisKeyList = Lists.newArrayList();
//2.加支付分布式锁避免支付系统并发回调 String orderPayKey = RedisLockKeyConstants.ORDER_PAY_KEY + orderId; //加取消订单分布式锁避免支付和取消订单同时操作同一笔订单 String cancelOrderKey = RedisLockKeyConstants.CANCEL_KEY + orderId; redisKeyList.add(orderPayKey); redisKeyList.add(cancelOrderKey); boolean lock = redisLock.multiLock(redisKeyList); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_ERROR); } try { //从数据库中查询出当前订单信息 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); OrderPaymentDetailDO orderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(orderId); //3.校验参数 if (orderInfoDO == null || orderPaymentDetailDO == null) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL); } if (!payAmount.equals(orderInfoDO.getPayAmount())) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CALLBACK_PAY_AMOUNT_ERROR); }
//4.异常场景判断 Integer orderStatus = orderInfoDO.getOrderStatus(); if (OrderStatusEnum.CREATED.getCode().equals(orderStatus)) { //如果订单状态是"已创建",直接更新订单状态为已支付,并发送事务消息 TransactionMQProducer transactionMQProducer = defaultProducer.getProducer(); transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { orderManager.updateOrderStatusPaid(payCallbackRequest, orderInfoDO, orderPaymentDetailDO); return LocalTransactionState.COMMIT_MESSAGE; } catch (BaseBizException e) { throw e; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } }
@Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //检查订单是否是已支付 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); if (orderInfoDO != null && OrderStatusEnum.PAID.getCode().equals(orderInfoDO.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); //发送 "订单已完成支付" 消息 sendPaidOrderSuccessMessage(transactionMQProducer, orderInfoDO); } else { //如果订单状态不是 "已创建" if (OrderStatusEnum.CANCELED.getCode().equals(orderStatus)) { //如果订单状态是取消状态 Integer payStatus = orderPaymentDetailDO.getPayStatus(); if (PayStatusEnum.UNPAID.getCode().equals(payStatus)) { //调用退款 executeOrderRefund(orderInfoDO, orderPaymentDetailDO); throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_ERROR); } else if (PayStatusEnum.PAID.getCode().equals(payStatus)) { if (payType.equals(orderPaymentDetailDO.getPayType())) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_SAME_ERROR); } else { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_NO_SAME_ERROR); } } } else { //如果订单状态不是取消状态 if (PayStatusEnum.PAID.getCode().equals(orderPaymentDetailDO.getPayStatus())) { if (payType.equals(orderPaymentDetailDO.getPayType())) { return; } //调用退款 executeOrderRefund(orderInfoDO, orderPaymentDetailDO); throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_REPEAT_ERROR); } } } } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { //释放分布式锁 redisLock.unMultiLock(redisKeyList); } }
//发送订单已完成支付消息,触发订单进行履约 private void sendPaidOrderSuccessMessage(TransactionMQProducer transactionMQProducer, OrderInfoDO orderInfoDO) throws MQClientException { String orderId = orderInfoDO.getOrderId(); PaidOrderSuccessMessage message = new PaidOrderSuccessMessage(); message.setOrderId(orderId); String topic = RocketMqConstant.PAID_ORDER_SUCCESS_TOPIC; byte[] body = JSON.toJSONString(message).getBytes(StandardCharsets.UTF_8); Message mq = new Message(topic, body); TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(mq, orderInfoDO); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_SEND_MQ_ERROR); } } ...}
//支付系统回调请求对象@Datapublic class PayCallbackRequest extends AbstractObject implements Serializable { private static final long serialVersionUID = 3685085492927992753L; private String orderId;//订单ID private String payAccount;//支付账户 private Integer payAmount;//支付金额 private String outTradeNo;//支付系统交易单号 private Integer payType;//支付方式 private String merchantId;//商户号 private String payChannel;//支付渠道 private String appid;//微信平台 appid}
复制代码


12.拦截履约的具体业务流程


流程图如下:



代码如下:


一.取消订单入口

@Servicepublic class OrderAfterSaleServiceImpl implements OrderAfterSaleService {    @Autowired    private AfterSaleManager afterSaleManager;    ...
//取消订单/超时未支付取消 @Override @GlobalTransactional(rollbackFor = Exception.class) public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) { //入参检查 checkCancelOrderRequestParam(cancelOrderRequest); //分布式锁 String orderId = cancelOrderRequest.getOrderId(); String key = RedisLockKeyConstants.CANCEL_KEY + orderId; boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT); } try { //执行取消订单 return executeCancelOrder(cancelOrderRequest, orderId); } catch (Exception e) { log.error("biz error", e); throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } }
@Override public JsonResult<Boolean> executeCancelOrder(CancelOrderRequest cancelOrderRequest, String orderId) { //1.组装数据 OrderInfoDO orderInfoDO = findOrderInfo(orderId, cancelOrderRequest.getCancelType()); CancelOrderAssembleRequest cancelOrderAssembleRequest = buildAssembleRequest(orderId, cancelOrderRequest, orderInfoDO); if (cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus() >= OrderStatusEnum.OUT_STOCK.getCode()) { throw new OrderBizException(OrderErrorCodeEnum.CURRENT_ORDER_STATUS_CANNOT_CANCEL); } TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { //2.执行履约取消、更新订单状态、新增订单日志操作 afterSaleManager.cancelOrderFulfillmentAndUpdateOrderStatus(cancelOrderAssembleRequest); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } }
@Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //查询订单状态是否已更新为"已取消" OrderInfoDO orderInfoByDatabase = orderInfoDAO.getByOrderId(orderId); if (OrderStatusEnum.CANCELED.getCode().equals(orderInfoByDatabase.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } });
try { Message message = new Message(RocketMqConstant.RELEASE_ASSETS_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest).getBytes(StandardCharsets.UTF_8)); //3.发送事务消息 释放权益资产 TransactionSendResult result = producer.sendMessageInTransaction(message, cancelOrderAssembleRequest); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_PROCESS_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } ...}
复制代码


二.拦截履约处理

@Servicepublic class AfterSaleManagerImpl implements AfterSaleManager {    @DubboReference(version = "1.0.0")    private FulfillApi fulfillApi;    ...
@Override public void cancelOrderFulfillmentAndUpdateOrderStatus(CancelOrderAssembleRequest cancelOrderAssembleRequest) { //履约取消 cancelFulfill(cancelOrderAssembleRequest); //更新订单状态和记录订单操作日志 updateOrderStatusAndSaveOperationLog(cancelOrderAssembleRequest); }
//调用履约拦截订单 private void cancelFulfill(CancelOrderAssembleRequest cancelOrderAssembleRequest) { OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO(); if (OrderStatusEnum.CREATED.getCode().equals(orderInfoDTO.getOrderStatus())) { return; } CancelFulfillRequest cancelFulfillRequest = orderInfoDTO.clone(CancelFulfillRequest.class); JsonResult<Boolean> jsonResult = fulfillApi.cancelFulfill(cancelFulfillRequest); if (!jsonResult.getSuccess()) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_FULFILL_ERROR); } } ...}
@DubboService(version = "1.0.0", interfaceClass = FulfillApi.class, retries = 0)public class FulfillApiImpl implements FulfillApi { @Autowired private FulfillService fulfillService;
@DubboReference(version = "1.0.0", retries = 0) private WmsApi wmsApi;
@DubboReference(version = "1.0.0", retries = 0) private TmsApi tmsApi; ...
@Override public JsonResult<Boolean> cancelFulfill(CancelFulfillRequest cancelFulfillRequest) { log.info("取消履约:request={}", JSONObject.toJSONString(cancelFulfillRequest)); //1.取消履约单 fulfillService.cancelFulfillOrder(request.getOrderId()); //2.取消捡货 wmsApi.cancelPickGoods(request.getOrderId()); //3.取消发货 tmsApi.cancelSendOut(request.getOrderId());
return JsonResult.buildSuccess(true); } ...}
@Servicepublic class FulfillServiceImpl implements FulfillService { @Autowired private OrderFulfillDAO orderFulfillDAO;
@Autowired private OrderFulfillItemDAO orderFulfillItemDAO; ...
@Override public void cancelFulfillOrder(String orderId) { //1.查询履约单 OrderFulfillDO orderFulfill = orderFulfillDAO.getOne(orderId); //2.移除履约单 if (null != orderFulfill) { orderFulfillDAO.removeById(orderFulfill.getId()); //3.查询履约单条目 List<OrderFulfillItemDO> fulfillItems = orderFulfillItemDAO.listByFulfillId(orderFulfill.getFulfillId()); //4.移除履约单条目 for (OrderFulfillItemDO item : fulfillItems) { orderFulfillItemDAO.removeById(item.getId()); } } } ...}
@DubboService(version = "1.0.0", interfaceClass = WmsApi.class, retries = 0)public class WmsApiImpl implements WmsApi { ... @Transactional(rollbackFor = Exception.class) @Override public JsonResult<Boolean> cancelPickGoods(String orderId) { log.info("取消捡货,orderId={}", orderId); //1.查询出库单 List<DeliveryOrderDO> deliveryOrders = deliveryOrderDAO.listByOrderId(orderId); //2.移除出库单和条目 if (CollectionUtils.isNotEmpty(deliveryOrders)) { for (DeliveryOrderDO order : deliveryOrders) { List<DeliveryOrderItemDO> items = deliveryOrderItemDAO.listByDeliveryOrderId(order.getDeliveryOrderId()); for (DeliveryOrderItemDO item : items) { deliveryOrderItemDAO.removeById(item.getId()); } deliveryOrderDAO.removeById(order.getId()); } } return JsonResult.buildSuccess(true); } ...}
@DubboService(version = "1.0.0", interfaceClass = TmsApi.class, retries = 0)public class TmsApiImpl implements TmsApi { ... @Transactional(rollbackFor = Exception.class) @Override public JsonResult<Boolean> cancelSendOut(String orderId) { log.info("取消发货,orderId={}", orderId); //1.查询物流单 List<LogisticOrderDO> logisticOrders = logisticOrderDAO.listByOrderId(orderId); //2.移除物流单 for (LogisticOrderDO order : logisticOrders) { logisticOrderDAO.removeById(order.getId()); } return JsonResult.buildSuccess(true); } ...}
复制代码


13.拦截履约和取消订单的 Seata 事务原理分析

 

(1)取消订单接口使用 Seata 事务的 AT 模式


如果拦截履约时发现,订单已经出库配送,那么就会拦截失败。当拦截失败时,就不能更新订单状态了,可使用 Seata 的刚性事务实现。具体就是,在分布式事务入口添加 @GlobalTransactional 注解,在各分支事务添加 @Transactional 注解。


@Servicepublic class OrderAfterSaleServiceImpl implements OrderAfterSaleService {    ...    //取消订单/超时未支付取消    @Override    @GlobalTransactional(rollbackFor = Exception.class)    public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) {        //入参检查        checkCancelOrderRequestParam(cancelOrderRequest);        //分布式锁        String orderId = cancelOrderRequest.getOrderId();        String key = RedisLockKeyConstants.CANCEL_KEY + orderId;        boolean lock = redisLock.lock(key);        if (!lock) {            throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT);        }
try { //执行取消订单 return executeCancelOrder(cancelOrderRequest, orderId); } catch (Exception e) { log.error("biz error", e); throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } }
@Override public JsonResult<Boolean> executeCancelOrder(CancelOrderRequest cancelOrderRequest, String orderId) { //1.组装数据 OrderInfoDO orderInfoDO = findOrderInfo(orderId, cancelOrderRequest.getCancelType()); CancelOrderAssembleRequest cancelOrderAssembleRequest = buildAssembleRequest(orderId, cancelOrderRequest, orderInfoDO); if (cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus() >= OrderStatusEnum.OUT_STOCK.getCode()) { throw new OrderBizException(OrderErrorCodeEnum.CURRENT_ORDER_STATUS_CANNOT_CANCEL); }
TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { //2.执行履约取消、更新订单状态、新增订单日志操作 afterSaleManager.cancelOrderFulfillmentAndUpdateOrderStatus(cancelOrderAssembleRequest); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //查询订单状态是否已更新为"已取消" OrderInfoDO orderInfoByDatabase = orderInfoDAO.getByOrderId(orderId); if (OrderStatusEnum.CANCELED.getCode().equals(orderInfoByDatabase.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } });
try { Message message = new Message(RocketMqConstant.RELEASE_ASSETS_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest).getBytes(StandardCharsets.UTF_8)); //3.发送事务消息 释放权益资产 TransactionSendResult result = producer.sendMessageInTransaction(message, cancelOrderAssembleRequest); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_PROCESS_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } ...}
复制代码


(2)Seata 事务 AT 模式的原理

一.取消订单接口的入口会向Seata Server注册一个全局事务XID二.调用的履约服务会向Seata Server注册一个分支事务Branch ID三.在履约服务更新数据前,要先获取本地锁四.在履约服务在获取本地锁成功后,会插入undo log表数据五.然后,履约服务会向Seata Server服务器获取全局锁六.接着,履约服务才提交本地事务并释放本地锁七.最后,履约服务会向Seata Server上报分支事务成功
复制代码


 

14.取消订单全链路 Seata 回滚原理与并发分析


(1)回滚原理


如果分支事务出现异常,就会上报 Seata Server 需要回滚全局事务。如果某些分支事务还没提交,那么直接就不需要执行了。如果某些分支事务已经提交成功,那么就根据 undo log 进行数据恢复。

 

(2)并发分析


拦截履约的各分支事务并不会出现并发竞争全局锁的问题。因为取消履约、取消出库、取消配送都是针对一个订单操作,同一时刻不会出现大量对一个订单的操作。


15.支付退款时的双异步设计原因(提升性能 + 解耦)


支付退款的时候会有两个消费者(双异步):一个是退款准备消费者,一个是实际退款消费者。如果两个消费者合并成一个消费者,可能会出现两个问题。

 

问题一:退款速度可能会比较慢,因为一个消费者需要处理的事情多了。

 

问题二:如果第三方退款接口出现异常或报错,耦合比较严重特别不合理。这时计算好的数据和插入的记录,可能需要回滚或者重新消费消息。在故障情况下,第三方一直报错,那么系统就一直回滚,而且还会导致用户不能及时通过售后记录查看售后信息。


 

16.释放资产消息的高扩展性设计(多路发送消息)


订单系统的取消订单接口被调用时,会先发送一个释放资产的消息到 MQ,然后订单系统会消费释放资产的消息。在消费释放资产的消息时,才发送具体释放哪些资产的消息到 MQ,接着各个系统才能对各自监听的释放资产消息进行消费。

 

如果取消订单的核心业务代码,直接把具体的释放资产消息到 MQ。那么当资产种类增加时,比如增加虚拟币、积分、权益等各种资产,就需要修改订单系统取消订单的核心业务代码了,扩展性会很差。

 

如果取消订单的核心业务代码,只发一个释放资产的消息到 MQ。那么后续增加资产种类时,只需在消费释放资产的消息处进行改动即可,这样就可以大大提高了代码的扩展性。


 

17.取消订单链路中数据库事务与 MQ 消息不一致问题


(1)数据库事务与 MQ 消息不一致情况


在 v1 版本的取消订单接口中,由于"更新订单状态 + 取消履约 + 发送释放资产消息到 MQ"属于刚性事务,所以其中任何一个操作出现 Exception,都会进行全局回滚保证数据一致。但是更新数据库和发送消息到 MQ,还是可能会出现数据不一致的情况。

 

比如代码顺序是:先发送释放资产消息,再更新订单状态,再取消履约。先发送释放资产消息到 MQ 成功,更新订单状态也成功,但取消履约失败。那么此时回滚只能回滚更新订单状态,发送到 MQ 的消息却不能回滚了。所以会存在这种不一致的风险,因此需要一种强一致的方案。

 

如下是 v1 版本的取消订单实现:


@Servicepublic class OrderAfterSaleServiceImpl implements OrderAfterSaleService {    ...    //取消订单/超时未支付取消    @Override    public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) {        //入参检查        checkCancelOrderRequestParam(cancelOrderRequest);        //分布式锁        String orderId = cancelOrderRequest.getOrderId();        String key = RedisLockKeyConstants.CANCEL_KEY + orderId;
try { boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT); } //执行取消订单 executeCancelOrder(cancelOrderRequest, orderId); return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } }
@Override @GlobalTransactional(rollbackFor = Exception.class) public void executeCancelOrder(CancelOrderRequest cancelOrderRequest, String orderId) { //1.组装数据 CancelOrderAssembleRequest cancelOrderAssembleRequest = buildAssembleRequest(orderId, cancelOrderRequest); //幂等校验:防止多个线程同时操作取消同一笔订单 if (OrderStatusEnum.CANCELED.getCode().equals(cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus())) { return; } //2.检验订单支付状态 checkOrderPayStatus(cancelOrderAssembleRequest); //3.更新订单状态和记录订单操作日志 updateOrderStatusAndSaveOperationLog(cancelOrderAssembleRequest); //超时未支付的订单不用继续再往下执行取消履约和释放资产 if (OrderStatusEnum.PAID.getCode() > cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus()) { return; } //4.履约取消 cancelFulfill(cancelOrderAssembleRequest); //5.发送释放资产消息到MQ defaultProducer.sendMessage(RocketMqConstant.RELEASE_ASSETS_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest), "释放资产"); } ...}
复制代码


(2)取消订单全链路数据不丢失的方案设计


为保证取消订单的链路中,数据库的更新事务与推送给 MQ 的消息强一致,需要将两者包裹在一个事务中,保证它们要么一起成功,要么一起失败。于是,就需要通过 RocketMQ 的事务机制来实现了。

 

如下是 v2 版本的取消订单实现,保证了数据库事务与 MQ 消息强一致。


@Servicepublic class OrderAfterSaleServiceImpl implements OrderAfterSaleService {    ...    //取消订单/超时未支付取消    @Override    @GlobalTransactional(rollbackFor = Exception.class)    public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) {        //入参检查        checkCancelOrderRequestParam(cancelOrderRequest);        //分布式锁        String orderId = cancelOrderRequest.getOrderId();        String key = RedisLockKeyConstants.CANCEL_KEY + orderId;        boolean lock = redisLock.lock(key);        if (!lock) {            throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT);        }        try {            //执行取消订单            return executeCancelOrder(cancelOrderRequest, orderId);        } catch (Exception e) {            log.error("biz error", e);            throw new OrderBizException(e.getMessage());        } finally {            redisLock.unlock(key);        }    }
@Override public JsonResult<Boolean> executeCancelOrder(CancelOrderRequest cancelOrderRequest, String orderId) { //1.组装数据 OrderInfoDO orderInfoDO = findOrderInfo(orderId, cancelOrderRequest.getCancelType()); CancelOrderAssembleRequest cancelOrderAssembleRequest = buildAssembleRequest(orderId, cancelOrderRequest, orderInfoDO); if (cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus() >= OrderStatusEnum.OUT_STOCK.getCode()) { throw new OrderBizException(OrderErrorCodeEnum.CURRENT_ORDER_STATUS_CANNOT_CANCEL); }
TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { //2.执行履约取消、更新订单状态、新增订单日志操作 afterSaleManager.cancelOrderFulfillmentAndUpdateOrderStatus(cancelOrderAssembleRequest); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } }
@Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //查询订单状态是否已更新为"已取消" OrderInfoDO orderInfoByDatabase = orderInfoDAO.getByOrderId(orderId); if (OrderStatusEnum.CANCELED.getCode().equals(orderInfoByDatabase.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } });
try { Message message = new Message(RocketMqConstant.RELEASE_ASSETS_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest).getBytes(StandardCharsets.UTF_8)); //3.发送事务消息 释放权益资产 TransactionSendResult result = producer.sendMessageInTransaction(message, cancelOrderAssembleRequest); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_PROCESS_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } ...}
复制代码


18.释放资产多路 MQ 故障重试与幂等方案


如果发送消息到 MQ 过程中出现故障,那么就会通过返回 RECONSUME_LATER 来进行重试。

 

此外,消费消息时调用的接口可能会出现同一请求多次被调用。因此,必须对调用的接口,使用分布式锁 + 状态前置校验进行幂等处理。加分布式锁是为了防止并非请求进来后可能会避开状态的前置校验。


@Configurationpublic class ConsumerConfig {    @Autowired    private RocketMQProperties rocketMQProperties;    ...
//释放资产消息消费者 @Bean("releaseAssetsConsumer") public DefaultMQPushConsumer releaseAssetsConsumer(ReleaseAssetsListener releaseAssetsListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RELEASE_ASSETS_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RELEASE_ASSETS_TOPIC, "*"); consumer.registerMessageListener(releaseAssetsListener); consumer.start(); return consumer; }}
//监听并消费释放资产消息@Componentpublic class ReleaseAssetsListener implements MessageListenerConcurrently { @Autowired private DefaultProducer defaultProducer;
@Autowired private OrderItemDAO orderItemDAO;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { //1.消费到释放资产message String message = new String(messageExt.getBody()); log.info("ReleaseAssetsListener message:{}", message); CancelOrderAssembleRequest cancelOrderAssembleRequest = JSONObject.parseObject(message, CancelOrderAssembleRequest.class); OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO(); //2.发送取消订单退款请求MQ if (orderInfoDTO.getOrderStatus() > OrderStatusEnum.CREATED.getCode()) { defaultProducer.sendMessage(RocketMqConstant.CANCEL_REFUND_REQUEST_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest), "取消订单退款"); } //3.发送释放库存MQ ReleaseProductStockRequest releaseProductStockRequest = buildReleaseProductStock(orderInfoDTO, orderItemDAO); defaultProducer.sendMessage(RocketMqConstant.CANCEL_RELEASE_INVENTORY_TOPIC, JSONObject.toJSONString(releaseProductStockRequest), "取消订单释放库存"); //4.发送释放优惠券MQ if (!Strings.isNullOrEmpty(orderInfoDTO.getCouponId())) { ReleaseUserCouponRequest releaseUserCouponRequest = buildReleaseUserCoupon(orderInfoDTO); defaultProducer.sendMessage(RocketMqConstant.CANCEL_RELEASE_PROPERTY_TOPIC, JSONObject.toJSONString(releaseUserCouponRequest), "取消订单释放优惠券"); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } ...}
@Configurationpublic class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties;
//释放优惠券消息消费者 @Bean("releaseInventoryConsumer") public DefaultMQPushConsumer releaseInventoryConsumer(ReleasePropertyListener releasePropertyListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.RELEASE_PROPERTY_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.CANCEL_RELEASE_PROPERTY_TOPIC, "*"); consumer.registerMessageListener(releasePropertyListener); consumer.start(); return consumer; }}
@Componentpublic class ReleasePropertyListener implements MessageListenerConcurrently { @DubboReference(version = "1.0.0") private MarketApi marketApi;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt msg : list) { String content = new String(msg.getBody(), StandardCharsets.UTF_8); log.info("ReleasePropertyConsumer message:{}", content); ReleaseUserCouponRequest releaseUserCouponRequest = JSONObject.parseObject(content, ReleaseUserCouponRequest.class); //释放优惠券 JsonResult<Boolean> jsonResult = marketApi.releaseUserCoupon(releaseUserCouponRequest); if (!jsonResult.getSuccess()) { throw new MarketBizException(MarketErrorCodeEnum.CONSUME_MQ_FAILED); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }}
@DubboService(version = "1.0.0", interfaceClass = MarketApi.class, retries = 0)public class MarketApiImpl implements MarketApi { ... //回退用户使用的优惠券 @Override public JsonResult<Boolean> releaseUserCoupon(ReleaseUserCouponRequest releaseUserCouponRequest) { log.info("开始执行回滚优惠券,couponId:{}", releaseUserCouponRequest.getCouponId()); //分布式锁 String couponId = releaseUserCouponRequest.getCouponId(); String key = RedisLockKeyConstants.RELEASE_COUPON_KEY + couponId; boolean lock = redisLock.lock(key); if (!lock) { throw new MarketBizException(MarketErrorCodeEnum.RELEASE_COUPON_FAILED); } try { //执行释放优惠券 Boolean result = couponService.releaseUserCoupon(releaseUserCouponRequest); return JsonResult.buildSuccess(result); } catch (MarketBizException e) { log.error("biz error", e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error", e); return JsonResult.buildError(e.getMessage()); } finally { redisLock.unlock(key); } } ...}
@Servicepublic class CouponServiceImpl implements CouponService { ... //释放用户优惠券 @Override public Boolean releaseUserCoupon(ReleaseUserCouponRequest releaseUserCouponRequest) { String userId = releaseUserCouponRequest.getUserId(); String couponId = releaseUserCouponRequest.getCouponId(); CouponDO couponAchieve = couponDAO.getUserCoupon(userId, couponId); if (CouponUsedStatusEnum.UN_USED.getCode().equals(couponAchieve.getUsed())) { log.info("当前用户未使用优惠券,不用回退,userId:{},couponId:{}", userId, couponId); return true; } couponAchieve.setUsed(CouponUsedStatusEnum.UN_USED.getCode()); couponAchieve.setUsedTime(null); couponDAO.updateById(couponAchieve); return true; } ...}
复制代码


19.双异步支付退款不一致问题分析


在退款准备消费者中,如果先写售后记录到数据库,再发退款消息到 MQ。如果写入售后记录到数据库成功了,但发送实际退款消息到 MQ 却失败了,那么此时就会产生数据库与 MQ 的数据不一致问题。

 

在退款准备消费者中,如果先发退款消息到 MQ,再写售后记录到数据库。如果发送退款消息到 MQ 成功了,但写入售后记录到数据库却失败了,那么同样会产生数据库与 MQ 的数据不一致问题。



此时就可以使用 RocketMQ 的事务消息机制;

@Configurationpublic class ConsumerConfig {    @Autowired    private RocketMQProperties rocketMQProperties;    ...
//消费退款准备请求消息消费者 @Bean("cancelRefundConsumer") public DefaultMQPushConsumer cancelRefundConsumer(CancelRefundListener cancelRefundListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.REQUEST_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.CANCEL_REFUND_REQUEST_TOPIC, "*"); consumer.registerMessageListener(cancelRefundListener); consumer.start(); return consumer; } ...}
//消费退款准备请求消息@Componentpublic class CancelRefundListener implements MessageListenerConcurrently { @Autowired private OrderAfterSaleService orderAfterSaleService;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); CancelOrderAssembleRequest cancelOrderAssembleRequest = JSONObject.parseObject(message, CancelOrderAssembleRequest.class); log.info("CancelRefundConsumer message:{}", message); //执行 取消订单/超时未支付取消 前的操作 JsonResult<Boolean> jsonResult = orderAfterSaleService.processCancelOrder(cancelOrderAssembleRequest); if (!jsonResult.getSuccess()) { throw new OrderBizException(OrderErrorCodeEnum.CONSUME_MQ_FAILED); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }}
@Servicepublic class OrderAfterSaleServiceImpl implements OrderAfterSaleService { ... @Override public JsonResult<Boolean> processCancelOrder(CancelOrderAssembleRequest cancelOrderAssembleRequest) { String orderId = cancelOrderAssembleRequest.getOrderId(); //分布式锁 String key = RedisLockKeyConstants.REFUND_KEY + orderId; try { boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.PROCESS_REFUND_REPEAT); }
//执行退款前的准备工作 //生成售后订单号 OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO(); OrderInfoDO orderInfoDO = orderInfoDTO.clone(OrderInfoDO.class); String afterSaleId = orderNoManager.genOrderId(OrderNoTypeEnum.AFTER_SALE.getCode(), orderInfoDO.getUserId());
//1.计算 取消订单 退款金额 CancelOrderRefundAmountDTO cancelOrderRefundAmountDTO = calculatingCancelOrderRefundAmount(cancelOrderAssembleRequest); cancelOrderAssembleRequest.setCancelOrderRefundAmountDTO(cancelOrderRefundAmountDTO); TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { //2.取消订单操作 记录售后信息 afterSaleManager.insertCancelOrderAfterSale(cancelOrderAssembleRequest, AfterSaleStatusEnum.REVIEW_PASS.getCode(), orderInfoDO, afterSaleId); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } }
@Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //查询售后数据是否插入成功 AfterSaleInfoDO afterSaleInfoDO = afterSaleInfoDAO.getOneByAfterSaleId(Long.valueOf(afterSaleId)); List<AfterSaleItemDO> afterSaleItemDOList = afterSaleItemDAO.listByAfterSaleId(Long.valueOf(afterSaleId)); List<AfterSaleLogDO> afterSaleLogDOList = afterSaleLogDAO.listByAfterSaleId(Long.valueOf(afterSaleId)); List<AfterSaleRefundDO> afterSaleRefundDOList = afterSaleRefundDAO.listByAfterSaleId(Long.valueOf(afterSaleId)); if (afterSaleInfoDO != null && afterSaleItemDOList.isEmpty() && afterSaleLogDOList.isEmpty() && afterSaleRefundDOList.isEmpty()) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } });
try { //3.组装事务MQ消息 ActualRefundMessage actualRefundMessage = new ActualRefundMessage(); actualRefundMessage.setOrderId(cancelOrderAssembleRequest.getOrderId()); actualRefundMessage.setLastReturnGoods(cancelOrderAssembleRequest.isLastReturnGoods()); actualRefundMessage.setAfterSaleId(Long.valueOf(afterSaleId)); Message message = new Message(RocketMqConstant.ACTUAL_REFUND_TOPIC, JSONObject.toJSONString(actualRefundMessage).getBytes(StandardCharsets.UTF_8)); //4.发送事务MQ消息--实际退款消息 TransactionSendResult result = producer.sendMessageInTransaction(message, actualRefundMessage); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.PROCESS_REFUND_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } finally { redisLock.unlock(key); } } ...}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18976429

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
订单初版—取消订单链路中的技术问题说明文档(二)_Java_量贩潮汐·WholesaleTide_InfoQ写作社区