写点什么

订单初版—支付和履约实现的重构文档

  • 2025-07-16
    福建
  • 本文字数:28799 字

    阅读完需:约 94 分钟

1.预支付到完成支付的业务流程


订单正向链路有三个核心环节:生成订单 + 支付 + 履约。


 

2.支付回调到推送履约的代码流程



(1)支付回调成功,更新订单状态 + 发送订单已完成支付的事务消息


@DubboService(version = "1.0.0", interfaceClass = OrderApi.class, retries = 0)public class OrderApiImpl implements OrderApi {    @Autowired    private OrderService orderService;    ...
//支付回调接口 @Override public JsonResult<Boolean> payCallback(PayCallbackRequest payCallbackRequest) { try { orderService.payCallback(payCallbackRequest); return JsonResult.buildSuccess(true); } catch (OrderBizException e) { log.error("biz error", e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("error", e); return JsonResult.buildError(e.getMessage()); } } ...}
@Servicepublic class OrderServiceImpl implements OrderService { @Autowired private OrderInfoDAO orderInfoDAO;
@Autowired private OrderNoManager orderNoManager;
@Autowired private DefaultProducer defaultProducer;
@Autowired private RedisLock redisLock; ...
//支付回调 //支付回调有2把分布式锁的原因说明:同一笔订单在同一时间只能支付or取消 //不可以同时对一笔订单,既发起支付,又发起取消 @Override public void payCallback(PayCallbackRequest payCallbackRequest) { //1.入参检查 checkPayCallbackRequestParam(payCallbackRequest); String orderId = payCallbackRequest.getOrderId(); Integer payAmount = payCallbackRequest.getPayAmount(); Integer payType = payCallbackRequest.getPayType(); List<String> redisKeyList = Lists.newArrayList();
//2.加支付分布式锁避免支付系统并发回调 String orderPayKey = RedisLockKeyConstants.ORDER_PAY_KEY + orderId; //加取消订单分布式锁避免支付和取消订单同时操作同一笔订单 String cancelOrderKey = RedisLockKeyConstants.CANCEL_KEY + orderId; redisKeyList.add(orderPayKey); redisKeyList.add(cancelOrderKey); boolean lock = redisLock.multiLock(redisKeyList); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_ERROR); }
try { //从数据库中查询出当前订单信息 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); OrderPaymentDetailDO orderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(orderId);
//3.校验参数 if (orderInfoDO == null || orderPaymentDetailDO == null) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL); } if (!payAmount.equals(orderInfoDO.getPayAmount())) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CALLBACK_PAY_AMOUNT_ERROR); }
//4.异常场景判断 Integer orderStatus = orderInfoDO.getOrderStatus(); if (OrderStatusEnum.CREATED.getCode().equals(orderStatus)) { //如果订单状态是"已创建",直接更新订单状态为已支付,并发送事务消息 TransactionMQProducer transactionMQProducer = defaultProducer.getProducer(); transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { orderManager.updateOrderStatusPaid(payCallbackRequest, orderInfoDO, orderPaymentDetailDO); return LocalTransactionState.COMMIT_MESSAGE; } catch (BaseBizException e) { throw e; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } }
//回查接口 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //检查订单是否是已支付 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); if (orderInfoDO != null && OrderStatusEnum.PAID.getCode().equals(orderInfoDO.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); //发送 "订单已完成支付" 消息 sendPaidOrderSuccessMessage(transactionMQProducer, orderInfoDO); } else { //如果订单状态不是 "已创建" if (OrderStatusEnum.CANCELED.getCode().equals(orderStatus)) { //如果订单那状态是取消状态 Integer payStatus = orderPaymentDetailDO.getPayStatus(); if (PayStatusEnum.UNPAID.getCode().equals(payStatus)) { //调用退款 executeOrderRefund(orderInfoDO, orderPaymentDetailDO); throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_ERROR); } else if (PayStatusEnum.PAID.getCode().equals(payStatus)) { if (payType.equals(orderPaymentDetailDO.getPayType())) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_SAME_ERROR); } else { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_NO_SAME_ERROR); } } } else { //如果订单状态不是取消状态 if (PayStatusEnum.PAID.getCode().equals(orderPaymentDetailDO.getPayStatus())) { if (payType.equals(orderPaymentDetailDO.getPayType())) { return; } //调用退款 executeOrderRefund(orderInfoDO, orderPaymentDetailDO); throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_REPEAT_ERROR); } } } } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { //释放分布式锁 redisLock.unMultiLock(redisKeyList); } }
//发送订单已完成支付消息,触发订单进行履约 private void sendPaidOrderSuccessMessage(TransactionMQProducer transactionMQProducer, OrderInfoDO orderInfoDO) throws MQClientException { String orderId = orderInfoDO.getOrderId(); PaidOrderSuccessMessage message = new PaidOrderSuccessMessage(); message.setOrderId(orderId); String topic = RocketMqConstant.PAID_ORDER_SUCCESS_TOPIC; byte[] body = JSON.toJSONString(message).getBytes(StandardCharsets.UTF_8); Message mq = new Message(topic, body); TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(mq, orderInfoDO); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_SEND_MQ_ERROR); } } ...}
@Servicepublic class OrderManagerImpl implements OrderManager { ... //支付回调更新订单状态 @Transactional(rollbackFor = Exception.class) @Override public void updateOrderStatusPaid(PayCallbackRequest payCallbackRequest, OrderInfoDO orderInfoDO, OrderPaymentDetailDO orderPaymentDetailDO) { //主单信息 String orderId = payCallbackRequest.getOrderId(); Integer preOrderStatus = orderInfoDO.getOrderStatus(); orderInfoDO.setOrderStatus(OrderStatusEnum.PAID.getCode()); orderInfoDAO.updateById(orderInfoDO);
//主单支付信息 orderPaymentDetailDO.setPayStatus(PayStatusEnum.PAID.getCode()); orderPaymentDetailDAO.updateById(orderPaymentDetailDO);
//新增订单状态变更日志 OrderOperateLogDO orderOperateLogDO = new OrderOperateLogDO(); orderOperateLogDO.setOrderId(orderId); orderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode()); orderOperateLogDO.setPreStatus(preOrderStatus); orderOperateLogDO.setCurrentStatus(orderInfoDO.getOrderStatus()); orderOperateLogDO.setRemark("订单支付回调操作" + orderOperateLogDO.getPreStatus() + "-" + orderOperateLogDO.getCurrentStatus()); orderOperateLogDAO.save(orderOperateLogDO);
//判断是否存在子订单 List<OrderInfoDO> subOrderInfoDOList = orderInfoDAO.listByParentOrderId(orderId); if (subOrderInfoDOList != null && !subOrderInfoDOList.isEmpty()) { //先将主订单状态设置为无效订单 Integer newPreOrderStatus = orderInfoDO.getOrderStatus(); orderInfoDO.setOrderStatus(OrderStatusEnum.INVALID.getCode()); orderInfoDAO.updateById(orderInfoDO);
//新增订单状态变更日志 OrderOperateLogDO newOrderOperateLogDO = new OrderOperateLogDO(); newOrderOperateLogDO.setOrderId(orderId); newOrderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode()); newOrderOperateLogDO.setPreStatus(newPreOrderStatus); newOrderOperateLogDO.setCurrentStatus(OrderStatusEnum.INVALID.getCode()); orderOperateLogDO.setRemark("订单支付回调操作,主订单状态变更" + newOrderOperateLogDO.getPreStatus() + "-" + newOrderOperateLogDO.getCurrentStatus()); orderOperateLogDAO.save(newOrderOperateLogDO);
//再更新子订单的状态 for (OrderInfoDO subOrderInfo : subOrderInfoDOList) { Integer subPreOrderStatus = subOrderInfo.getOrderStatus(); subOrderInfo.setOrderStatus(OrderStatusEnum.PAID.getCode()); orderInfoDAO.updateById(subOrderInfo); //更新子订单的支付明细状态 String subOrderId = subOrderInfo.getOrderId(); OrderPaymentDetailDO subOrderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(subOrderId); if (subOrderPaymentDetailDO != null) { subOrderPaymentDetailDO.setPayStatus(PayStatusEnum.PAID.getCode()); orderPaymentDetailDAO.updateById(subOrderPaymentDetailDO); }
//新增订单状态变更日志 OrderOperateLogDO subOrderOperateLogDO = new OrderOperateLogDO(); subOrderOperateLogDO.setOrderId(subOrderId); subOrderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode()); subOrderOperateLogDO.setPreStatus(subPreOrderStatus); subOrderOperateLogDO.setCurrentStatus(OrderStatusEnum.PAID.getCode()); orderOperateLogDO.setRemark("订单支付回调操作,子订单状态变更" + subOrderOperateLogDO.getPreStatus() + "-" + subOrderOperateLogDO.getCurrentStatus()); orderOperateLogDAO.save(subOrderOperateLogDO); } } } ...}
复制代码


(2)订单系统消费已完成支付的消息,更新订单履约状态 + 发送订单履约的消息


//订单系统进行消费@Configurationpublic class ConsumerConfig {    @Autowired    private RocketMQProperties rocketMQProperties;
//订单完成支付消息消费者 @Bean("paidOrderSuccessConsumer") public DefaultMQPushConsumer paidOrderSuccessConsumer(PaidOrderSuccessListener paidOrderSuccessListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PAID_ORDER_SUCCESS_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(PAID_ORDER_SUCCESS_TOPIC, "*"); consumer.registerMessageListener(paidOrderSuccessListener); consumer.start(); return consumer; } ...}
//订单系统监听订单支付完成的消息@Componentpublic class PaidOrderSuccessListener implements MessageListenerConcurrently { @Autowired private OrderInfoDAO orderInfoDAO;
@Autowired private OrderFulFillService orderFulFillService;
@Autowired private RedisLock redisLock;
@Autowired private DefaultProducer defaultProducer;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); PaidOrderSuccessMessage paidOrderSuccessMessage = JSON.parseObject(message, PaidOrderSuccessMessage.class); String orderId = paidOrderSuccessMessage.getOrderId(); log.info("触发订单履约,orderId:{}", orderId); OrderInfoDO order = orderInfoDAO.getByOrderId(orderId); if (Objects.isNull(order)) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL); }
//1.加分布式锁 + 里面的履约前置状态校验防止消息重复消费 String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId; if (!redisLock.lock(key)) { log.error("order has not acquired lock,cannot fulfill, orderId={}", orderId); throw new BaseBizException(OrderErrorCodeEnum.ORDER_FULFILL_ERROR); }
try { //2.进行订单履约逻辑 TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { orderFulFillService.triggerOrderFulFill(orderId); return LocalTransactionState.COMMIT_MESSAGE; } catch (BaseBizException e) { throw e; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } }
//回查接口 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //检查订单是否"已履约"状态 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); if (orderInfoDO != null && OrderStatusEnum.FULFILL.getCode().equals(orderInfoDO.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); ReceiveFulfillRequest receiveFulfillRequest = orderFulFillService.buildReceiveFulFillRequest(order); String topic = TRIGGER_ORDER_FULFILL_TOPIC; byte[] body = JSON.toJSONString(receiveFulfillRequest).getBytes(StandardCharsets.UTF_8); Message mq = new Message(topic, body); producer.sendMessageInTransaction(mq, order); } finally { redisLock.unlock(key); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); //本地业务逻辑执行失败,触发消息重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }}
@Servicepublic class OrderFulFillServiceImpl implements OrderFulFillService { @Autowired private OrderInfoDAO orderInfoDAO;
@Autowired private OrderOperateLogDAO orderOperateLogDAO; ...
@Transactional(rollbackFor = Exception.class) @Override public void triggerOrderFulFill(String orderId) throws OrderBizException { //1.查询订单 OrderInfoDO order = orderInfoDAO.getByOrderId(orderId); if (Objects.isNull(order)) { return; }
//2.校验订单是否已支付 OrderStatusEnum orderStatus = OrderStatusEnum.getByCode(order.getOrderStatus()); if (!OrderStatusEnum.PAID.equals(orderStatus)) { log.info("order has not been paid,cannot fulfill, orderId={}", order.getOrderId()); return; }
//3.更新订单状态为:已履约 orderInfoDAO.updateOrderStatus(orderId, OrderStatusEnum.PAID.getCode(), OrderStatusEnum.FULFILL.getCode());
//4.并插入一条订单变更记录 orderOperateLogDAO.save(orderOperateLogFactory.get(order, OrderStatusChangeEnum.ORDER_FULFILLED)); } ...}
复制代码


(3)履约系统消费触发订单履约的消息,进行具体的订单履约处理


//履约系统消费@Configurationpublic class ConsumerConfig {    @Autowired    private RocketMQProperties rocketMQProperties;
//触发订单履约消息消费者 @Bean("triggerOrderFulfillConsumer") public DefaultMQPushConsumer triggerOrderFulfillConsumer(TriggerOrderFulfillTopicListener triggerOrderFulfillTopicListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(TRIGGER_ORDER_FULFILL_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(TRIGGER_ORDER_FULFILL_TOPIC, "*"); consumer.registerMessageListener(triggerOrderFulfillTopicListener); consumer.start(); return consumer; } ...}
//监听并消费订单履约消息@Componentpublic class TriggerOrderFulfillTopicListener implements MessageListenerConcurrently { @Autowired private FulfillService fulfillService;//履约服务
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); ReceiveFulfillRequest request = JSON.parseObject(message, ReceiveFulfillRequest.class); fulfillService.receiveOrderFulFill(request); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }}
@Servicepublic class FulfillServiceImpl implements FulfillService { ... @Override public Boolean receiveOrderFulFill(ReceiveFulfillRequest request) { log.info("接受订单履约成功,request={}", JSONObject.toJSONString(request)); String orderId = request.getOrderId();
//加分布式锁(防止重复触发履约) String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId; boolean lock = redisLock.lock(key); if (!lock) { throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_ERROR); }
try { //1.幂等:校验orderId是否已经履约过 if (orderFulfilled(request.getOrderId())) { log.info("该订单已履约!!!,orderId={}", request.getOrderId()); return true; }
//2.saga状态机,触发wms捡货和tms发货 StateMachineEngine stateMachineEngine = (StateMachineEngine) springApplicationContext.getBean("stateMachineEngine"); Map<String, Object> startParams = new HashMap<>(3); startParams.put("receiveFulfillRequest", request);
//配置的saga状态机 json的name //位于/resources/statelang/order_fulfull.json String stateMachineName = "order_fulfill"; log.info("开始触发saga流程,stateMachineName={}", stateMachineName); StateMachineInstance inst = stateMachineEngine.startWithBusinessKey(stateMachineName, null, null, startParams); if (ExecutionStatus.SU.equals(inst.getStatus())) { log.info("订单履约流程执行完毕. xid={}", inst.getId()); } else { log.error("订单履约流程执行异常. xid={}", inst.getId()); throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_IS_ERROR); } return true; } finally { redisLock.unlock(key); } } ...}
复制代码


3.支付回调到推送履约的双异步设计


(1)双异步设计的原因


第一个异步:订单支付回调后,订单系统先发一个支付完成的消息到 MQ。

 

第二个异步:订单系统消费到支付完成消息后进行更新,再发消息到 MQ。

 

为什么需要双异步:为什么订单系统收到支付回调后要先发一个消息到 MQ,然后再由自己监听消费,并发送真正触发履约的消息到 MQ。

 

由于更新订单状态和更新订单履约状态都是由订单系统执行的,为什么不按如下处理:订单系统收到支付回调后,先把订单状态更新为已完成支付,然后紧接着把订单履约状态更新为已履约,最后再发送订单履约消息到 MQ 让履约系统消费该消息进行发货。

 

原因一:从业务语义来说,支付回调后的主要操作其实是更新订单状态为支付完成,而对订单进行履约并不属于支付回调的主要工作。支付回调最多是触发履约系统进行订单履约,所以支付回调成功后,在支付回调中更新订单履约状态并不合适。因此从业务语义来说,支付回调后发送一个订单支付完成的消息到 MQ 即可,不应有更多其他操作。

 

原因二:从扩展性角度来说,由于订单支付完成是一个非常关键的消息,所以可能以后会在订单支付完成后,需要进行更多的业务处理。也就是说,可能以后会有更多系统需要监听和消费订单支付完成消息,如会员系统、营销系统、数据分析系统等都需要消费订单支付完成的消息。

 

(2)履约系统通过 MQ 对订单进行异步履约


因为对订单进行履约,涉及到仓库、仓储、库存的调度,需要通知物流公司进行配送等,这个过程非常耗时及复杂,所以触发履约系统对订单进行履约不能使用同步来实现。



(3)双异步设计的目的总结


第一个异步是为了可扩展

第二个异步是为了提升性能

 

4.发送订单履约的 RocketMQ 事务消息的原理


(1)如何让更新数据库 + 发送消息到 MQ 是一致的


问题一:如何保证更新订单状态为已完成 + 发送订单支付完成消息是一致的。也就是更新数据库 + 发送消息到 MQ,要么同时成功,要么同时失败。

 

问题二:如何保证更新订单履约状态为已履约 + 发送触发订单履约消息是一致的。同样是更新数据库 + 发送消息到 MQ,要么同时成功,要么同时失败。

 

为了解决更新数据库 + 发送消息到 MQ 是一致的,可以使用 RocketMQ 的事务机制。

 

(2)发送订单履约的 RocketMQ 事务消息的原理


 

5.RocketMQ 事务消息的实现原理


 

6.支付回调到发送履约的 RocketMQ 事务消息代码


(1)支付回调成功后发送支付完成的事务消息


@Servicepublic class OrderServiceImpl implements OrderService {    ...    //支付回调    //支付回调有2把分布式锁的原因说明:同一笔订单在同一时间只能支付or取消    //不可以同时对一笔订单,既发起支付,又发起取消    @Override    public void payCallback(PayCallbackRequest payCallbackRequest) {        ...        try {            //从数据库中查询出当前订单信息            OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);            OrderPaymentDetailDO orderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(orderId);
//3.校验参数 if (orderInfoDO == null || orderPaymentDetailDO == null) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL); } if (!payAmount.equals(orderInfoDO.getPayAmount())) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_CALLBACK_PAY_AMOUNT_ERROR); }
//4.异常场景判断 Integer orderStatus = orderInfoDO.getOrderStatus(); if (OrderStatusEnum.CREATED.getCode().equals(orderStatus)) { //如果订单状态是"已创建",直接更新订单状态为已支付,并发送事务消息 TransactionMQProducer transactionMQProducer = defaultProducer.getProducer(); transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { orderManager.updateOrderStatusPaid(payCallbackRequest, orderInfoDO, orderPaymentDetailDO); return LocalTransactionState.COMMIT_MESSAGE; } catch (BaseBizException e) { throw e; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } }
//回查 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //检查订单是否是已支付 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); if (orderInfoDO != null && OrderStatusEnum.PAID.getCode().equals(orderInfoDO.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); //发送 "订单已完成支付" 消息 sendPaidOrderSuccessMessage(transactionMQProducer, orderInfoDO); } else { ... } } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { //释放分布式锁 redisLock.unMultiLock(redisKeyList); } }
//发送订单已完成支付消息,触发订单进行履约 private void sendPaidOrderSuccessMessage(TransactionMQProducer transactionMQProducer, OrderInfoDO orderInfoDO) throws MQClientException { String orderId = orderInfoDO.getOrderId(); PaidOrderSuccessMessage message = new PaidOrderSuccessMessage(); message.setOrderId(orderId); String topic = RocketMqConstant.PAID_ORDER_SUCCESS_TOPIC; byte[] body = JSON.toJSONString(message).getBytes(StandardCharsets.UTF_8); Message mq = new Message(topic, body); TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(mq, orderInfoDO); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_SEND_MQ_ERROR); } } ...}
复制代码


(2)消费订单支付完成消息时发送触发订单履约的事务消息


//订单系统监听订单支付完成后的消息@Componentpublic class PaidOrderSuccessListener implements MessageListenerConcurrently {    @Autowired    private OrderInfoDAO orderInfoDAO;
@Autowired private OrderFulFillService orderFulFillService;
@Autowired private RedisLock redisLock;
@Autowired private DefaultProducer defaultProducer;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); PaidOrderSuccessMessage paidOrderSuccessMessage = JSON.parseObject(message, PaidOrderSuccessMessage.class); String orderId = paidOrderSuccessMessage.getOrderId(); log.info("触发订单履约,orderId:{}", orderId);
OrderInfoDO order = orderInfoDAO.getByOrderId(orderId); if (Objects.isNull(order)) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL); }
//1.加分布式锁 + 里面的履约前置状态校验防止消息重复消费 String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId; if (!redisLock.lock(key)) { log.error("order has not acquired lock,cannot fulfill, orderId={}", orderId); throw new BaseBizException(OrderErrorCodeEnum.ORDER_FULFILL_ERROR); } try { //2.进行订单履约逻辑 TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { orderFulFillService.triggerOrderFulFill(orderId); return LocalTransactionState.COMMIT_MESSAGE; } catch (BaseBizException e) { throw e; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } }
@Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //检查订单是否"已履约"状态 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); if (orderInfoDO != null && OrderStatusEnum.FULFILL.getCode().equals(orderInfoDO.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } });
ReceiveFulfillRequest receiveFulfillRequest = orderFulFillService.buildReceiveFulFillRequest(order); String topic = TRIGGER_ORDER_FULFILL_TOPIC; byte[] body = JSON.toJSONString(receiveFulfillRequest).getBytes(StandardCharsets.UTF_8); Message mq = new Message(topic, body); producer.sendMessageInTransaction(mq, order); } finally { redisLock.unlock(key); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); //本地业务逻辑执行失败,触发消息重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }}
复制代码


7.履约场景引入 Saga 模式 + 状态机


(1)进行订单履约时使用 Saga 模式的流程简介


Saga 模式运行时,会按照一个顺序流程去运行长事务。比如第一步更新订单履约数据,第二步调度发货,第三步物流配送。如果运行到某一步时出现异常,则会从这一步开始往回顺序执行补偿逻辑。比如先补偿第三步,再补偿第二步,最后补偿第一步。



(2)Saga 模式的流程定义文件和状态机


使用 Saga 模式的分布式事务在启动时,需要有一个 Saga 模式的流程定义文件。比如履约系统需要有一个本地文件:Saga 模式的流程定义文件。

 

在该文件中,需定义好整个长事务的流程:第一步执行的逻辑是 A,第二步执行的逻辑是 B,第三步执行的逻辑是 C。第一步回滚的逻辑是 a,第二步回滚的逻辑是 b,第三步回滚的逻辑是 c。

 

Saga 模式启动时,也需要向 Seata Server 注册全局事务 XID。Saga 模式启动后,便会根据状态机来判断流程定义文件中的每一步,应该执行正向还是执行逆向,也就是会通过状态机来控制正向和逆向。



(3)Saga 模式需要自定义补偿逻辑


一.AT 模式下的补偿逻辑

分支事务写入本地事务的数据前,会先生成 undo log。当某分支事务出现异常,已提交分支事务需要进行回滚补偿时,会根据生成的 undo log 来进行回滚补偿。

 

二.TCC 模式下的补偿逻辑

先执行 try 逻辑,如果 try 逻辑执行成功则执行 commit 逻辑来进行提交,如果 try 逻辑执行失败则执行 cancel 逻辑进行回滚补偿。

 

三.Saga 模式的补偿逻辑

每个分支事务都有正向的逻辑,每一个正向逻辑都会搭配一个逆向补偿逻辑,Saga 模式需要我们自定义实现这些逆向补偿逻辑。

 

(4)Saga 模式的优点和缺点及适用场景


优点是:执行正向链路的分支事务时,不再需要获取全局锁。缺点是:由于没有全局锁,当多个分布式事务并发执行时,不能写隔离。Saga 模式的应用场景是:一个业务链路里,需要很多系统层层调用。

 

一.长事务使用 AT 模式可能导致持有全局锁时间长

这种长事务如果使用 AT 模式,由于需要获取全局锁,而长事务链路又过长,从而会导致持有全局锁的时间也过长。即便对某数据的全局锁竞争并不激烈,但也需要过长时间才能释放,所以其并发能力特别低。

 

二.长事务使用 TCC 模式需要改造工作量较大

这种长事务如果使用 TCC 模式,则会对代码产生很大的侵入性。因为需要对长事务接口改造成三个接口:try、commit、cancel。而且长事务的业务链路比较长,改造业务链路的各接口工作量过大了。所以常见于异构存储的场景,不太常见于长事务。

 

三.Saga 模式特别适用于老系统下的长事务

因此 Saga 模式适用于长事务的场景,特别是老系统下的长事务。如果不希望对老系统有较大侵入性的代码改造,而且希望稍微修改一点代码就能让老系统支持分布式事务,那么就可以使用 Saga 模式。因为 Saga 模式可以独立地对老系统定义补偿逻辑,不需要像 TCC 模式那样去改造接口,也不用像 AT 模式那样侵入性添加 undo log 表。

 

8.履约流程的 Seata Saga 状态节点定义 + 节点流转


(1)流程定义文件分析


在履约系统的 resources/statelang 目录下,有一个 JSON 文件,该 JSON 文件 order_fullfill.json 便是 Saga 模式流程定义文件。


{    "Name": "order_fulfill",    "Comment": "订单履约流程",    "StartState": "FulfillService",    "Version": "1.0.0",    "States": {        //第一个节点        "FulfillService": {            "Type": "ServiceTask",            "ServiceName": "fulfillSagaService",//所在服务            "ServiceMethod": "createFulfillOrder",//正向执行方法            "CompensateState": "CreateFulfillOrderCompensate",//逆向补偿方法            "Next": "ChoiceWmsState",//下一个节点            "Input": [//请求参数                "$.[receiveFulfillRequest]"            ],            "Output": {//返回结果                "CreateFulfillOrderResult": "$.#root"            },            "Status": {                "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU                "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA                "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN            },            "Catch": [                {                    "Exceptions": [                        "java.lang.Throwable"                    ],                    "Next": "CompensationTrigger"//抛异常时执行的下一个节点                }            ]        },        //状态机,判断节点        "ChoiceWmsState": {            "Type": "Choice",            "Choices": [                {                    "Expression": "[CreateFulfillOrderResult] == true",                    "Next": "WmsService"//上一个节点返回true,就执行下一个节点                }            ],            "Default": "Fail"        },        //第二个节点        "WmsService": {            "Type": "ServiceTask",            "ServiceName": "wmsSageService",//所在服务            "ServiceMethod": "pickGoods",//正向执行方法            "CompensateState": "WmsPickGoodsCompensate",//逆向补偿方法            "Next": "ChoiceTmsState",//下一个节点            "Input": [//请求参数                "$.[receiveFulfillRequest]"            ],            "Output": {//返回结果                "WmsPickGoodsResult": "$.#root"            },            "Status": {                "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU                "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA                "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN            },            "Catch": [                {                    "Exceptions": [                      "java.lang.Throwable"                    ],                    "Next": "CompensationTrigger"//抛异常时执行的下一个节点                }            ]        },        //状态机,判断节点        "ChoiceTmsState": {            "Type": "Choice",            "Choices": [                {                    "Expression": "[WmsPickGoodsResult] == true",                    "Next": "TmsService"//上一个节点返回true,就执行下一个节点                }            ],            "Default": "Fail"        },        //第三个节点        "TmsService": {            "Type": "ServiceTask",            "ServiceName": "tmsSagaService",//所在服务            "ServiceMethod": "sendOut",//正向执行方法            "CompensateState": "TmsSendOutCompensate",//逆向补偿方法            "Input": [//请求参数                "$.[receiveFulfillRequest]"            ],            "Output": {//返回结果                "TmsSendOutResult": "$.#root"            },            "Status": {                "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU                "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA                "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN            },            "Catch": [                {                    "Exceptions": [                        "java.lang.Throwable"                    ],                    "Next": "CompensationTrigger"//抛异常时执行的下一个节点                }            ],            "Next": "Succeed"//下一个节点是Succeed节点        },        //第一个节点的补偿逻辑        "CreateFulfillOrderCompensate": {            "Type": "ServiceTask",            "ServiceName": "fulfillSagaService",            "ServiceMethod": "createFulfillOrderCompensate",            "Input": [                "$.[receiveFulfillRequest]"            ]        },        //第二个节点的补偿逻辑        "WmsPickGoodsCompensate": {            "Type": "ServiceTask",            "ServiceName": "wmsSageService",            "ServiceMethod": "pickGoodsCompensate",            "Input": [                "$.[receiveFulfillRequest]"            ]        },        //第三个节点的补偿逻辑        "TmsSendOutCompensate": {            "Type": "ServiceTask",            "ServiceName": "tmsSagaService",            "ServiceMethod": "sendOutCompensate",            "Input": [                "$.[receiveFulfillRequest]"            ]        },        //补偿触发后的下一个节点        "CompensationTrigger": {            "Type": "CompensationTrigger",            "Next": "Fail"//下一个节点是Fail节点        },        //成功节点        "Succeed": {            "Type": "Succeed"        },        //失败节点        "Fail": {            "Type": "Fail",            "ErrorCode": "500",            "Message": "订单履约异常!!"        }    }}
复制代码


(2)流程定义文件中对应的方法


需要注意的是:触发调用各节点补偿方法时,都是由履约系统通过本地或者 RPC 来调用的。


@Service("fulfillSagaService")public class FulfillSagaServiceImpl implements FulfillSagaService {    @Autowired    private FulfillService fulfillService;
@Override public Boolean createFulfillOrder(ReceiveFulfillRequest request) { log.info("创建履约单,request={}", JSONObject.toJSONString(request)); String fulfillException = request.getFulfillException(); if (StringUtils.isNotBlank(fulfillException) && fulfillException.equals("true")) { throw new FulfillBizException("创建履约单异常!"); } //创建履约单 fulfillService.createFulfillOrder(request); return true; }
@Override public Boolean createFulfillOrderCompensate(ReceiveFulfillRequest request) { log.info("补偿创建履约单,request={}", JSONObject.toJSONString(request)); //取消履约单 fulfillService.cancelFulfillOrder(request.getOrderId()); log.info("补偿创建履约单结束,request={}", JSONObject.toJSONString(request)); return true; }}
@Service("wmsSageService")public class WmsSageServiceImpl implements WmsSagaService { @DubboReference(version = "1.0.0", retries = 0) private WmsApi wmsApi;
@Override public Boolean pickGoods(ReceiveFulfillRequest request) { log.info("捡货,request={}", JSONObject.toJSONString(request)); //调用wms系统进行捡货 JsonResult<PickDTO> jsonResult = wmsApi.pickGoods(buildPickGoodsRequest(request)); log.info("捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR); } return true; }
@Override public Boolean pickGoodsCompensate(ReceiveFulfillRequest request) { log.info("补偿捡货,request={}", JSONObject.toJSONString(request)); //调用wms系统进行捡货 JsonResult<Boolean> jsonResult = wmsApi.cancelPickGoods(request.getOrderId()); log.info("补偿捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR); } return true; } ...}
@Service("tmsSagaService")public class TmsSagaServiceImpl implements TmsSagaService { @DubboReference(version = "1.0.0", retries = 0) private TmsApi tmsApi;
@Autowired private OrderFulfillDAO orderFulfillDAO;
@Override public Boolean sendOut(ReceiveFulfillRequest request) { log.info("发货,request={}", JSONObject.toJSONString(request));
//1.调用tms进行发货 JsonResult<SendOutDTO> jsonResult = tmsApi.sendOut(buildSendOutRequest(request)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR); } log.info("发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR); }
//2.查询履约单 OrderFulfillDO orderFulfill = orderFulfillDAO.getOne(request.getOrderId());
//3.存储物流单号 String logisticsCode = jsonResult.getData().getLogisticsCode(); orderFulfillDAO.saveLogisticsCode(orderFulfill.getFulfillId(), logisticsCode); return true; }
@Override public Boolean sendOutCompensate(ReceiveFulfillRequest request) { log.info("补偿发货,request={}", JSONObject.toJSONString(request));
//调用tms进行补偿发货 JsonResult<Boolean> jsonResult = tmsApi.cancelSendOut(request.getOrderId()); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR); } log.info("补偿发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult));
if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR); } return true; } ...}
复制代码


9.履约流程的 Seata Saga 失败补偿流程


 

10.Seata Saga 状态机流转原理


 

11.履约系统的 Seata Saga 代码实现


(1)进行流程定义


{    "Name": "order_fulfill",    "Comment": "订单履约流程",    "StartState": "FulfillService",    "Version": "1.0.0",    "States": {        //第一个节点        "FulfillService": {            "Type": "ServiceTask",            "ServiceName": "fulfillSagaService",//所在服务            "ServiceMethod": "createFulfillOrder",//正向执行方法            "CompensateState": "CreateFulfillOrderCompensate",//逆向补偿方法            "Next": "ChoiceWmsState",//下一个节点            "Input": [//请求参数                "$.[receiveFulfillRequest]"            ],            "Output": {//返回结果                "CreateFulfillOrderResult": "$.#root"            },            "Status": {                "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU                "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA                "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN            },            "Catch": [                {                    "Exceptions": [                        "java.lang.Throwable"                    ],                    "Next": "CompensationTrigger"//抛异常时执行的下一个节点                }            ]        },        //状态机,判断节点        "ChoiceWmsState": {            "Type": "Choice",            "Choices": [                {                    "Expression": "[CreateFulfillOrderResult] == true",                    "Next": "WmsService"//上一个节点返回true,就执行下一个节点                }            ],            "Default": "Fail"        },        //第二个节点        "WmsService": {            "Type": "ServiceTask",            "ServiceName": "wmsSageService",//所在服务            "ServiceMethod": "pickGoods",//正向执行方法            "CompensateState": "WmsPickGoodsCompensate",//逆向补偿方法            "Next": "ChoiceTmsState",//下一个节点            "Input": [//请求参数                "$.[receiveFulfillRequest]"            ],            "Output": {//返回结果                "WmsPickGoodsResult": "$.#root"            },            "Status": {                "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU                "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA                "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN            },            "Catch": [                {                    "Exceptions": [                        "java.lang.Throwable"                    ],                    "Next": "CompensationTrigger"//抛异常时执行的下一个节点                }            ]        },        //状态机,判断节点        "ChoiceTmsState": {            "Type": "Choice",            "Choices": [                {                    "Expression": "[WmsPickGoodsResult] == true",                    "Next": "TmsService"//上一个节点返回true,就执行下一个节点                }            ],            "Default": "Fail"        },        //第三个节点        "TmsService": {            "Type": "ServiceTask",            "ServiceName": "tmsSagaService",//所在服务            "ServiceMethod": "sendOut",//正向执行方法            "CompensateState": "TmsSendOutCompensate",//逆向补偿方法            "Input": [//请求参数                "$.[receiveFulfillRequest]"            ],            "Output": {//返回结果                "TmsSendOutResult": "$.#root"            },            "Status": {                "#root == true": "SU",//正向执行方法的返回结果是true,那么状态就是SU                "#root == false": "FA",//正向执行方法的返回结果是false,那么状态就是FA                "$Exception{java.lang.Throwable}": "UN"//抛异常,那么状态就是UN            },            "Catch": [                {                    "Exceptions": [                        "java.lang.Throwable"                    ],                    "Next": "CompensationTrigger"//抛异常时执行的下一个节点                }            ],            "Next": "Succeed"//下一个节点是Succeed节点        },        //第一个节点的补偿逻辑        "CreateFulfillOrderCompensate": {            "Type": "ServiceTask",            "ServiceName": "fulfillSagaService",            "ServiceMethod": "createFulfillOrderCompensate",            "Input": [                "$.[receiveFulfillRequest]"            ]        },        //第二个节点的补偿逻辑        "WmsPickGoodsCompensate": {            "Type": "ServiceTask",            "ServiceName": "wmsSageService",            "ServiceMethod": "pickGoodsCompensate",            "Input": [                "$.[receiveFulfillRequest]"            ]        },        //第三个节点的补偿逻辑        "TmsSendOutCompensate": {            "Type": "ServiceTask",            "ServiceName": "tmsSagaService",            "ServiceMethod": "sendOutCompensate",            "Input": [                "$.[receiveFulfillRequest]"            ]        },        //补偿触发后的下一个节点        "CompensationTrigger": {            "Type": "CompensationTrigger",            "Next": "Fail"//下一个节点是Fail节点        },        //成功节点        "Succeed": {            "Type": "Succeed"        },        //失败节点        "Fail": {            "Type": "Fail",            "ErrorCode": "500",            "Message": "订单履约异常!!"        }    }}
复制代码


(2)配置状态机


//配置数据源@Configurationpublic class DataSourceConfiguration {    @ConfigurationProperties(prefix = "spring.datasource")    @Bean    public DruidDataSource druidDataSource() {        return new DruidDataSource();    }
@Bean(name = "transactionManager") @Primary public DataSourceTransactionManager transactionManager(@Qualifier("druidDataSource") DruidDataSource druidDataSource) { return new DataSourceTransactionManager(druidDataSource); }}
//配置Saga状态机@Configurationpublic class StateMachineConfiguration { @Bean public ThreadPoolExecutorFactoryBean threadExecutor() { ThreadPoolExecutorFactoryBean threadExecutor = new ThreadPoolExecutorFactoryBean(); threadExecutor.setThreadNamePrefix("SAGA_ASYNC_EXE_"); threadExecutor.setCorePoolSize(1); threadExecutor.setMaxPoolSize(20); return threadExecutor; }
@Bean public DbStateMachineConfig dbStateMachineConfig(ThreadPoolExecutorFactoryBean threadExecutor, DruidDataSource druidDataSource) throws IOException { DbStateMachineConfig dbStateMachineConfig = new DbStateMachineConfig(); //设置数据源 dbStateMachineConfig.setDataSource(druidDataSource); //设置状态机的线程池 dbStateMachineConfig.setThreadPoolExecutor((ThreadPoolExecutor) threadExecutor.getObject()); //设置状态机的配置文件 dbStateMachineConfig.setResources(new PathMatchingResourcePatternResolver().getResources("classpath*:statelang/*.json")); //设置开启异步化 dbStateMachineConfig.setEnableAsync(true); //设置当前Saga长事务所属的分组 dbStateMachineConfig.setTxServiceGroup("demo-eshop-fulfill-group"); return dbStateMachineConfig; }
//Saga状态机实例 @Bean public ProcessCtrlStateMachineEngine stateMachineEngine(DbStateMachineConfig dbStateMachineConfig) { ProcessCtrlStateMachineEngine stateMachineEngine = new ProcessCtrlStateMachineEngine(); stateMachineEngine.setStateMachineConfig(dbStateMachineConfig); return stateMachineEngine; }
@Bean public StateMachineEngineHolder stateMachineEngineHolder(ProcessCtrlStateMachineEngine stateMachineEngine) { StateMachineEngineHolder stateMachineEngineHolder = new StateMachineEngineHolder(); stateMachineEngineHolder.setStateMachineEngine(stateMachineEngine); return stateMachineEngineHolder; }}
复制代码


(3)实现正向逆向方法


@Service("fulfillSagaService")public class FulfillSagaServiceImpl implements FulfillSagaService {    @Autowired    private FulfillService fulfillService;
@Override public Boolean createFulfillOrder(ReceiveFulfillRequest request) { log.info("创建履约单,request={}", JSONObject.toJSONString(request)); String fulfillException = request.getFulfillException(); if (StringUtils.isNotBlank(fulfillException) && fulfillException.equals("true")) { throw new FulfillBizException("创建履约单异常!"); } //创建履约单 fulfillService.createFulfillOrder(request); return true; }
@Override public Boolean createFulfillOrderCompensate(ReceiveFulfillRequest request) { log.info("补偿创建履约单,request={}", JSONObject.toJSONString(request)); //取消履约单 fulfillService.cancelFulfillOrder(request.getOrderId()); log.info("补偿创建履约单结束,request={}", JSONObject.toJSONString(request)); return true; }}
@Service("wmsSageService")public class WmsSageServiceImpl implements WmsSagaService { @DubboReference(version = "1.0.0", retries = 0) private WmsApi wmsApi;
@Override public Boolean pickGoods(ReceiveFulfillRequest request) { log.info("捡货,request={}", JSONObject.toJSONString(request)); //调用wms系统进行捡货 JsonResult<PickDTO> jsonResult = wmsApi.pickGoods(buildPickGoodsRequest(request)); log.info("捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR); } return true; }
@Override public Boolean pickGoodsCompensate(ReceiveFulfillRequest request) { log.info("补偿捡货,request={}", JSONObject.toJSONString(request)); //调用wms系统进行捡货 JsonResult<Boolean> jsonResult = wmsApi.cancelPickGoods(request.getOrderId()); log.info("补偿捡货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR); } return true; } ...}
@Service("tmsSagaService")public class TmsSagaServiceImpl implements TmsSagaService { @DubboReference(version = "1.0.0", retries = 0) private TmsApi tmsApi;
@Autowired private OrderFulfillDAO orderFulfillDAO;
@Override public Boolean sendOut(ReceiveFulfillRequest request) { log.info("发货,request={}", JSONObject.toJSONString(request)); //1.调用tms进行发货 JsonResult<SendOutDTO> jsonResult = tmsApi.sendOut(buildSendOutRequest(request)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR); } log.info("发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.WMS_IS_ERROR); } //2.查询履约单 OrderFulfillDO orderFulfill = orderFulfillDAO.getOne(request.getOrderId()); //3.存储物流单号 String logisticsCode = jsonResult.getData().getLogisticsCode(); orderFulfillDAO.saveLogisticsCode(orderFulfill.getFulfillId(), logisticsCode); return true; }
@Override public Boolean sendOutCompensate(ReceiveFulfillRequest request) { log.info("补偿发货,request={}", JSONObject.toJSONString(request)); //调用tms进行补偿发货 JsonResult<Boolean> jsonResult = tmsApi.cancelSendOut(request.getOrderId()); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR); } log.info("补偿发货结果,jsonResult={}", JSONObject.toJSONString(jsonResult)); if (!jsonResult.getSuccess()) { throw new FulfillBizException(FulfillErrorCodeEnum.TMS_IS_ERROR); } return true; } ...}
复制代码


(4)驱动 Saga 状态机开始执行长事务


//监听并消费订单履约消息@Componentpublic class TriggerOrderFulfillTopicListener implements MessageListenerConcurrently {    @Autowired    private FulfillService fulfillService;//履约服务
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); ReceiveFulfillRequest request = JSON.parseObject(message, ReceiveFulfillRequest.class); fulfillService.receiveOrderFulFill(request); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }}
@Servicepublic class FulfillServiceImpl implements FulfillService { ... @Override public Boolean receiveOrderFulFill(ReceiveFulfillRequest request) { log.info("接受订单履约成功,request={}", JSONObject.toJSONString(request)); String orderId = request.getOrderId();
//加分布式锁(防止重复触发履约) String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId; boolean lock = redisLock.lock(key); if (!lock) { throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_ERROR); }
try { //1.幂等:校验orderId是否已经履约过 if (orderFulfilled(request.getOrderId())) { log.info("该订单已履约!!!,orderId={}", request.getOrderId()); return true; }
//2.获取Saga状态机,触发wms捡货和tms发货 StateMachineEngine stateMachineEngine = (StateMachineEngine) springApplicationContext.getBean("stateMachineEngine"); Map<String, Object> startParams = new HashMap<>(3); startParams.put("receiveFulfillRequest", request);
//配置的Saga状态机json的name,位于/resources/statelang/order_fulfull.json String stateMachineName = "order_fulfill"; log.info("开始触发saga流程,stateMachineName={}", stateMachineName);
//通过状态机启动长事务流程 StateMachineInstance inst = stateMachineEngine.startWithBusinessKey(stateMachineName, null, null, startParams); if (ExecutionStatus.SU.equals(inst.getStatus())) { log.info("订单履约流程执行完毕. xid={}", inst.getId()); } else { log.error("订单履约流程执行异常. xid={}", inst.getId()); throw new FulfillBizException(FulfillErrorCodeEnum.ORDER_FULFILL_IS_ERROR); } return true; } finally { redisLock.unlock(key); } } ...}
复制代码


12.履约系统的 Seata Saga 空回滚和悬挂问题


(1)空回滚问题


正向操作没有执行成功,但也进行了逆向操作。解决方案是:通过自定义 Holder 把正向操作记录下来,如果发现空回滚,则进行记录,且不能进行回滚。

 

(2)空悬挂问题


逆向操作先执行,之后才执行正向操作。解决方案是:通过自定义 Holder 把正向操作记录下来,确保执行逆向操作时,已经执行过正向操作。

 

Saga 的空回滚和空悬挂与 TCC 的解决思路一样。


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18985075

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
订单初版—支付和履约实现的重构文档_架构_电子尖叫食人鱼_InfoQ写作社区