写点什么

订单初版—取消订单链路中的技术问题说明文档

  • 2025-07-11
    福建
  • 本文字数:23687 字

    阅读完需:约 78 分钟

1.超时自动取消订单功能的业务背景


用户提交订单,订单系统生成订单后,由于种种原因,用户并没有立即点击去支付或者完成支付。此时默认该订单会在生成 30 分钟后,自动进行支付检查。也就是如果生成订单超过 30 分钟还没进行支付,那么就会自动取消订单。



超时自动取消订单的技术方案:RocketMQ 延迟消息 + Redisson 分布式锁 + XXL-JOB 分布式任务调度。

 

2.超时自动取消订单和支付的并发问题


当用户提交的订单在 30 分钟后才发起支付时,此时就有可能出现取消订单和支付出现并发的问题。


 

3.分布式锁解决超时订单取消和支付的并发


在如下位置加同一把分布式锁即可解决超时订单取消和支付的并发问题:

一.订单系统处理预支付请求时添加分布式锁

二.订单系统处理支付回调请求时添加分布式锁

三.订单系统消费延时 30m 的订单消息进行超时检查时添加分布式锁

 

当处理预支付和消费延时订单消息进行超时检查时产生并发:如果处理预支付先获取锁,则超时检查会被阻塞,之后发现已支付不取消。如果超时检查先获取锁,则预支付被阻塞,之后预支付失败。

 

当处理支付回调和消费延时订单消息进行超时检查时产生并发:如果支付回调先获取锁,则超时检查会被阻塞,之后发现已支付不取消。如果超时检查先获取锁,则支付回调被阻塞,之后支付回调获取锁,发现订单已被超时检查取消,于是需要进行退款处理通知用户。


 

4.定时任务解决延时消费的消息的丢失问题


如果发送到 MQ 的延时 30m 消费的消息丢失,此时会通过定时任务来处理。该定时任务会对 MySQL 进行扫描,把创建时间超 30m + 没支付的订单找出,然后再对这些超时订单进行取消。定时任务一般使用分布式调度框架 XXL-JOB 来实现。



以上便是超时自动取消订单的技术方案:RocketMQ 延迟消息 + Redisson 分布式锁 + XXL-JOB 分布式任务调度。

 

5.超时自动取消订单代码流程


(1)生成订单时发送延时 30m 才能被消费的消息


@Servicepublic class OrderServiceImpl implements OrderService {    ...    //提交订单/生成订单接口    //@param createOrderRequest 提交订单请求入参    @GlobalTransactional(rollbackFor = Exception.class)    @Override    public CreateOrderDTO createOrder(CreateOrderRequest createOrderRequest) {        //1.入参检查        checkCreateOrderRequestParam(createOrderRequest);        //2.风控检查        checkRisk(createOrderRequest);        //3.获取商品信息        List<ProductSkuDTO> productSkuList = listProductSkus(createOrderRequest);        //4.计算订单价格        CalculateOrderAmountDTO calculateOrderAmountDTO = calculateOrderAmount(createOrderRequest, productSkuList);        //5.验证订单实付金额        checkRealPayAmount(createOrderRequest, calculateOrderAmountDTO);        //6.锁定优惠券        lockUserCoupon(createOrderRequest);        //7.锁定商品库存        lockProductStock(createOrderRequest);        //8.生成订单到数据库        addNewOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO);        //9.发送订单延迟消息用于支付超时自动关单        sendPayOrderTimeoutDelayMessage(createOrderRequest);        //返回订单信息        CreateOrderDTO createOrderDTO = new CreateOrderDTO();        createOrderDTO.setOrderId(createOrderRequest.getOrderId());        return createOrderDTO;    }
//发送支付订单超时延迟消息,用于支付超时自动关单 private void sendPayOrderTimeoutDelayMessage(CreateOrderRequest createOrderRequest) { PayOrderTimeoutDelayMessage message = new PayOrderTimeoutDelayMessage(); message.setOrderId(createOrderRequest.getOrderId()); message.setBusinessIdentifier(createOrderRequest.getBusinessIdentifier()); message.setCancelType(OrderCancelTypeEnum.TIMEOUT_CANCELED.getCode()); message.setUserId(createOrderRequest.getUserId()); message.setOrderType(createOrderRequest.getOrderType()); message.setOrderStatus(OrderStatusEnum.CREATED.getCode()); String msgJson = JsonUtil.object2Json(message); defaultProducer.sendMessage( RocketMqConstant.PAY_ORDER_TIMEOUT_DELAY_TOPIC, msgJson, RocketDelayedLevel.DELAYED_30m, "支付订单超时延迟消息" ); } ...}
@Componentpublic class DefaultProducer { private final DefaultMQProducer producer;
@Autowired public DefaultProducer(RocketMQProperties rocketMQProperties) { producer = new DefaultMQProducer(RocketMqConstant.ORDER_DEFAULT_PRODUCER_GROUP); producer.setNamesrvAddr(rocketMQProperties.getNameServer()); start(); }
//对象在使用之前必须要调用一次,只能初始化一次 public void start() { try { this.producer.start(); } catch (MQClientException e) { log.error("producer start error", e); } } ...
//发送消息 public void sendMessage(String topic, String message, Integer delayTimeLevel, String type) { Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8)); try { if (delayTimeLevel > 0) { msg.setDelayTimeLevel(delayTimeLevel); } SendResult send = producer.send(msg); if (SendStatus.SEND_OK == send.getSendStatus()) { log.info("发送MQ消息成功, type:{}, message:{}", type, message); } else { throw new OrderBizException(send.getSendStatus().toString()); } } catch (Exception e) { log.error("发送MQ消息失败:", e); throw new OrderBizException(OrderErrorCodeEnum.SEND_MQ_FAILED); } } ...}
复制代码


(2)订单超时检查监听器

@Configurationpublic class ConsumerConfig {    @Autowired    private RocketMQProperties rocketMQProperties;    ...
//支付订单超时延迟消息消费者 @Bean("payOrderTimeoutConsumer") public DefaultMQPushConsumer payOrderTimeoutConsumer(PayOrderTimeoutListener payOrderTimeoutListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PAY_ORDER_TIMEOUT_DELAY_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(PAY_ORDER_TIMEOUT_DELAY_TOPIC, "*"); consumer.registerMessageListener(payOrderTimeoutListener); consumer.start(); return consumer; } ...}
//监听支付订单超时延迟消息@Componentpublic class PayOrderTimeoutListener implements MessageListenerConcurrently { @Autowired private OrderAfterSaleService orderAfterSaleService;
@Autowired private OrderInfoDAO orderInfoDAO;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); PayOrderTimeoutDelayMessage payOrderTimeoutDelayMessage = JSON.parseObject(message, PayOrderTimeoutDelayMessage.class); //消费延迟消息,执行关单逻辑 CancelOrderRequest cancelOrderRequest = new CancelOrderRequest(); cancelOrderRequest.setOrderId(payOrderTimeoutDelayMessage.getOrderId()); cancelOrderRequest.setBusinessIdentifier(payOrderTimeoutDelayMessage.getBusinessIdentifier()); cancelOrderRequest.setCancelType(payOrderTimeoutDelayMessage.getOrderType()); cancelOrderRequest.setUserId(payOrderTimeoutDelayMessage.getUserId()); cancelOrderRequest.setOrderType(payOrderTimeoutDelayMessage.getOrderType()); cancelOrderRequest.setOrderStatus(payOrderTimeoutDelayMessage.getOrderStatus()); //查询当前数据库的订单实时状态 OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(payOrderTimeoutDelayMessage.getOrderId()); Integer orderStatusDatabase = orderInfoDO.getOrderStatus(); if (!OrderStatusEnum.CREATED.getCode().equals(orderStatusDatabase)) { //订单实时状态不等于已创建 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //当前时间 小于 订单实际支付截止时间 if (new Date().before(orderInfoDO.getExpireTime())) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } orderAfterSaleService.cancelOrder(cancelOrderRequest); log.info("关闭订单,orderId:{}", cancelOrderRequest.getOrderId()); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }}
复制代码


(3)对订单进行正式取消的业务逻辑

@Servicepublic class OrderAfterSaleServiceImpl implements OrderAfterSaleService {    ...    //取消订单/超时未支付取消    @Override    public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) {        //入参检查        checkCancelOrderRequestParam(cancelOrderRequest);        //分布式锁,这里的锁和预支付及支付回调使用的分布式锁一样        String orderId = cancelOrderRequest.getOrderId();        String key = RedisLockKeyConstants.ORDER_PAY_KEY + orderId;        try {            boolean lock = redisLock.lock(key);            if (!lock) {                throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT);            }            //执行取消订单            executeCancelOrder(cancelOrderRequest, orderId);            return JsonResult.buildSuccess(true);        } catch (Exception e) {            throw new OrderBizException(e.getMessage());        } finally {            redisLock.unlock(key);        }    }
@Override @GlobalTransactional(rollbackFor = Exception.class) public void executeCancelOrder(CancelOrderRequest cancelOrderRequest, String orderId) { //1.组装数据 CancelOrderAssembleRequest cancelOrderAssembleRequest = buildAssembleRequest(orderId, cancelOrderRequest); //幂等校验:防止多个线程同时操作取消同一笔订单 if (OrderStatusEnum.CANCELED.getCode().equals(cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus())) { return; }
//2.检验订单支付状态 checkOrderPayStatus(cancelOrderAssembleRequest);
//3.更新订单状态和记录订单操作日志 updateOrderStatusAndSaveOperationLog(cancelOrderAssembleRequest); //超时未支付的订单不用继续再往下执行取消履约和释放资产 if (OrderStatusEnum.PAID.getCode() > cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus()) { return; }
//4.履约取消 cancelFulfill(cancelOrderAssembleRequest);
//5.发送释放资产消息到MQ defaultProducer.sendMessage(RocketMqConstant.RELEASE_ASSETS_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest), "释放资产"); }
//更新订单状态和记录订单操作日志 private void updateOrderStatusAndSaveOperationLog(CancelOrderAssembleRequest cancelOrderAssembleRequest) { //更新订单表 OrderInfoDO orderInfoDO = cancelOrderAssembleRequest.getOrderInfoDTO().clone(OrderInfoDO.class); orderInfoDO.setCancelType(cancelOrderAssembleRequest.getCancelType().toString()); orderInfoDO.setOrderStatus(OrderStatusEnum.CANCELED.getCode()); orderInfoDO.setCancelTime(new Date()); orderInfoDAO.updateOrderInfo(orderInfoDO); log.info("更新订单信息OrderInfo状态: orderId:{},status:{}", orderInfoDO.getOrderId(), orderInfoDO.getOrderStatus());
//新增订单操作操作日志表 Integer cancelType = Integer.valueOf(orderInfoDO.getCancelType()); String orderId = orderInfoDO.getOrderId(); OrderOperateLogDO orderOperateLogDO = new OrderOperateLogDO(); orderOperateLogDO.setOrderId(orderId); orderOperateLogDO.setPreStatus(cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus()); orderOperateLogDO.setCurrentStatus(OrderStatusEnum.CANCELED.getCode()); orderOperateLogDO.setOperateType(OrderOperateTypeEnum.AUTO_CANCEL_ORDER.getCode()); if (OrderCancelTypeEnum.USER_CANCELED.getCode().equals(cancelType)) { orderOperateLogDO.setOperateType(OrderOperateTypeEnum.MANUAL_CANCEL_ORDER.getCode()); orderOperateLogDO.setRemark(OrderOperateTypeEnum.MANUAL_CANCEL_ORDER.getMsg() + orderOperateLogDO.getPreStatus() + "-" + orderOperateLogDO.getCurrentStatus()); } if (OrderCancelTypeEnum.TIMEOUT_CANCELED.getCode().equals(cancelType)) { orderOperateLogDO.setOperateType(OrderOperateTypeEnum.AUTO_CANCEL_ORDER.getCode()); orderOperateLogDO.setRemark(OrderOperateTypeEnum.AUTO_CANCEL_ORDER.getMsg() + orderOperateLogDO.getPreStatus() + "-" + orderOperateLogDO.getCurrentStatus()); } orderOperateLogDAO.save(orderOperateLogDO); log.info("新增订单操作日志OrderOperateLog状态,orderId:{}, PreStatus:{},CurrentStatus:{}", orderInfoDO.getOrderId(), orderOperateLogDO.getPreStatus(), orderOperateLogDO.getCurrentStatus()); } ...}
复制代码


(4)定时任务检查创建时间超 30m 但没取消的订单

//自动取消超时订单任务@Componentpublic class AutoCancelExpiredOrderTask {    @Autowired    private OrderInfoDAO orderInfoDAO;    @Autowired    private OrderAfterSaleService orderAfterSaleService;    @Autowired    private OrderProperties orderProperties;    @Autowired    private RedisLock redisLock;
//执行任务逻辑 @Scheduled(fixedRate = 30 * 60 * 1000) public void execute() { //扫描当前时间 - 订单超时时间 -> 前的一小段时间范围(时间范围用配置中心配置) //比如当前时间11:40,订单超时时间是30分钟,扫描11:09:00 -> 11:10:00这一分钟的未支付订单, //缺点:有一个订单超过了30 + 1 = 31分钟,都没有被处理(取消),这笔订单就永久待支付 for (OrderInfoDO order : orderInfoDAO.listAllUnPaid()) { if (new Date().getTime() - order.getExpireTime().getTime() >= orderProperties.getExpireTime()) { //分布式锁 String orderId = order.getOrderId(); String key = RedisLockKeyConstants.ORDER_PAY_KEY + orderId; try { boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT); } //超过30min未支付 CancelOrderRequest request = new CancelOrderRequest(); request.setOrderId(order.getOrderId()); request.setUserId(order.getUserId()); request.setBusinessIdentifier(order.getBusinessIdentifier()); request.setOrderType(order.getOrderType()); request.setCancelType(OrderCancelTypeEnum.TIMEOUT_CANCELED.getCode()); request.setOrderStatus(order.getOrderStatus()); try { orderAfterSaleService.cancelOrder(request); } catch (Exception e) { log.error("AutoCancelExpiredOrderTask execute error:", e); } } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } } } }}
复制代码


6.RocketMQ 延迟消息实现原理


发送需要延迟消费的消息到 RocketMQ 的某 Topic 时,RocketMQ 会将该消息进行改写,投递到重试 Topic,而非指定的某 Topic。RocketMQ 将该消息转发到重试 Topic 对应的 ConsumerQueue 后,RocketMQ 内部会有一个内部的延迟调度组件 ScheduleService,该组件便会消费重试 Topic 对应的 ConsumerQueue 里的消息,并判断里面的消息是否到达延迟时间。如果到达延迟时间,则会重新改写消息 Topic,投递到开始指定的某 Topic。


 

7.取消订单的业务场景和接口被调用的情况


(1)发生取消订单的三种场景


场景一:用户生成订单后还没支付,就取消订单。取消订单可能是用户手动取消,也可能是超时检查自动取消,此时只需要释放锁定的优惠券和库存即可。

 

场景二:用户生成订单后完成支付,还没拣货出库,用户就取消订单。此时不仅需要释放锁定的优惠券和库存,还需取消履约以及退款。

 

场景三:用户生成订单后完成支付,已拣货出库,开始物流配送。此时用户想要取消订单,只能等收到货以后,发起售后退货申请。



(2)调用取消订单接口的三种情况


情况一:⼿动取消,订单出库状态前都可取消

情况二:正向⽣成订单后,会发送延迟 30 分钟消费的订单消息到 MQ。订单系统在 30 分钟后消费到该 MQ 消息时,若发现仍未⽀付则取消该订单

情况三:定时扫描,超过 30 分钟未⽀付才取消


 

8.取消订单的业务链路和代码流程


(1)实现概述


一.实现流程图



二.技术要点

要点一:更新订单状态、取消履约、发送释放资产是⼀个分布式事务,需要将 Seata 的 AT 模式替换成 RocketMQ 的事务消息。

 

要点二:超时订单的处理⽅案是⽣成订单后发送⼀个延迟 30 分钟后才被消费的消息到 MQ。订单系统消费该延迟消息时,会验证订单是否已支付,否就调⽤取消订单的接⼝。

 

(2)对取消订单的处理

//订单中心-逆向售后业务接口@DubboService(version = "1.0.0", interfaceClass = AfterSaleApi.class, retries = 0)public class AfterSaleApiImpl implements AfterSaleApi {    @Autowired    private OrderLackService orderLackItemService;
@Autowired private OrderAfterSaleService orderAfterSaleService;
//取消订单/超时未支付取消 @Override public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) { try { return orderAfterSaleService.cancelOrder(cancelOrderRequest); } catch (OrderBizException e) { log.error("biz error", e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error", e); return JsonResult.buildError(e.getMessage()); } } ...}
//取消订单入参@Datapublic class CancelOrderRequest extends AbstractObject implements Serializable { private static final long serialVersionUID = -4035579903997700971L; private String orderId;//订单号 private Integer businessIdentifier;//订单渠道来源 private Integer cancelType;//订单取消类型 0-手动取消 1-超时未支付 private String userId;//用户id private Integer orderType;//订单类型 private Integer orderStatus;//订单状态 private Integer oldOrderStatus;//原订单状态}
@Servicepublic class OrderAfterSaleServiceImpl implements OrderAfterSaleService { ... //取消订单/超时未支付取消 @Override public JsonResult<Boolean> cancelOrder(CancelOrderRequest cancelOrderRequest) { //入参检查 checkCancelOrderRequestParam(cancelOrderRequest);
//分布式锁 String orderId = cancelOrderRequest.getOrderId(); String key = RedisLockKeyConstants.CANCEL_KEY + orderId; try { boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_REPEAT); } //执行取消订单 return executeCancelOrder(cancelOrderRequest, orderId); } catch (Exception e) { throw new OrderBizException(e.getMessage()); } finally { redisLock.unlock(key); } }
@Override public JsonResult<Boolean> executeCancelOrder(CancelOrderRequest cancelOrderRequest, String orderId) { //1.组装数据 OrderInfoDO orderInfoDO = findOrderInfo(orderId, cancelOrderRequest.getCancelType()); CancelOrderAssembleRequest cancelOrderAssembleRequest = buildAssembleRequest(orderId, cancelOrderRequest, orderInfoDO); if (cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus() >= OrderStatusEnum.OUT_STOCK.getCode()) { throw new OrderBizException(OrderErrorCodeEnum.CURRENT_ORDER_STATUS_CANNOT_CANCEL); }
TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { //2.执行履约取消、更新订单状态、新增订单日志操作 afterSaleManager.cancelOrderFulfillmentAndUpdateOrderStatus(cancelOrderAssembleRequest); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } }
@Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //查询订单状态是否已更新为"已取消" OrderInfoDO orderInfoByDatabase = orderInfoDAO.getByOrderId(orderId); if (OrderStatusEnum.CANCELED.getCode().equals(orderInfoByDatabase.getOrderStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } });
try { Message message = new Message(RocketMqConstant.RELEASE_ASSETS_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest).getBytes(StandardCharsets.UTF_8)); //3.发送事务消息 释放权益资产 TransactionSendResult result = producer.sendMessageInTransaction(message, cancelOrderAssembleRequest); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_PROCESS_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } ...}
复制代码


(3)对取消履约+更新订单状态+新增订单日志的处理

@Servicepublic class AfterSaleManagerImpl implements AfterSaleManager {    ...    @Override    public void cancelOrderFulfillmentAndUpdateOrderStatus(CancelOrderAssembleRequest cancelOrderAssembleRequest) {        //履约取消        cancelFulfill(cancelOrderAssembleRequest);
//更新订单状态和记录订单操作日志 updateOrderStatusAndSaveOperationLog(cancelOrderAssembleRequest); }
//调用履约拦截订单 private void cancelFulfill(CancelOrderAssembleRequest cancelOrderAssembleRequest) { OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO(); if (OrderStatusEnum.CREATED.getCode().equals(orderInfoDTO.getOrderStatus())) { return; } CancelFulfillRequest cancelFulfillRequest = orderInfoDTO.clone(CancelFulfillRequest.class); JsonResult<Boolean> jsonResult = fulfillApi.cancelFulfill(cancelFulfillRequest); if (!jsonResult.getSuccess()) { throw new OrderBizException(OrderErrorCodeEnum.CANCEL_ORDER_FULFILL_ERROR); } }
//更新订单状态和记录订单操作日志 private void updateOrderStatusAndSaveOperationLog(CancelOrderAssembleRequest cancelOrderAssembleRequest) { //更新订单表 OrderInfoDO orderInfoDO = cancelOrderAssembleRequest.getOrderInfoDTO().clone(OrderInfoDO.class); orderInfoDO.setCancelType(cancelOrderAssembleRequest.getCancelType().toString()); orderInfoDO.setOrderStatus(OrderStatusEnum.CANCELED.getCode()); orderInfoDO.setCancelTime(new Date()); orderInfoDAO.updateOrderInfo(orderInfoDO); log.info("更新订单信息OrderInfo状态: orderId:{},status:{}", orderInfoDO.getOrderId(), orderInfoDO.getOrderStatus());
//新增订单操作操作日志表 Integer cancelType = Integer.valueOf(orderInfoDO.getCancelType()); String orderId = orderInfoDO.getOrderId(); OrderOperateLogDO orderOperateLogDO = new OrderOperateLogDO(); orderOperateLogDO.setOrderId(orderId); orderOperateLogDO.setPreStatus(cancelOrderAssembleRequest.getOrderInfoDTO().getOrderStatus()); orderOperateLogDO.setCurrentStatus(OrderStatusEnum.CANCELED.getCode()); orderOperateLogDO.setOperateType(OrderOperateTypeEnum.AUTO_CANCEL_ORDER.getCode()); if (OrderCancelTypeEnum.USER_CANCELED.getCode().equals(cancelType)) { orderOperateLogDO.setOperateType(OrderOperateTypeEnum.MANUAL_CANCEL_ORDER.getCode()); orderOperateLogDO.setRemark(OrderOperateTypeEnum.MANUAL_CANCEL_ORDER.getMsg() + orderOperateLogDO.getPreStatus() + "-" + orderOperateLogDO.getCurrentStatus()); } if (OrderCancelTypeEnum.TIMEOUT_CANCELED.getCode().equals(cancelType)) { orderOperateLogDO.setOperateType(OrderOperateTypeEnum.AUTO_CANCEL_ORDER.getCode()); orderOperateLogDO.setRemark(OrderOperateTypeEnum.AUTO_CANCEL_ORDER.getMsg() + orderOperateLogDO.getPreStatus() + "-" + orderOperateLogDO.getCurrentStatus()); } orderOperateLogDAO.save(orderOperateLogDO); log.info("新增订单操作日志OrderOperateLog状态,orderId:{}, PreStatus:{},CurrentStatus:{}", orderInfoDO.getOrderId(), orderOperateLogDO.getPreStatus(), orderOperateLogDO.getCurrentStatus()); } ...}
复制代码


(4)对释放资产的处理

@Configurationpublic class ConsumerConfig {    @Autowired    private RocketMQProperties rocketMQProperties;    ...
//释放资产消息消费者 @Bean("releaseAssetsConsumer") public DefaultMQPushConsumer releaseAssetsConsumer(ReleaseAssetsListener releaseAssetsListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RELEASE_ASSETS_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RELEASE_ASSETS_TOPIC, "*"); consumer.registerMessageListener(releaseAssetsListener); consumer.start(); return consumer; }}
//消费MQ的释放资产消息@Componentpublic class ReleaseAssetsListener implements MessageListenerConcurrently { @Autowired private DefaultProducer defaultProducer;
@Autowired private OrderItemDAO orderItemDAO;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { //1.消费到释放资产message String message = new String(messageExt.getBody()); log.info("ReleaseAssetsListener message:{}", message); CancelOrderAssembleRequest cancelOrderAssembleRequest = JSONObject.parseObject(message, CancelOrderAssembleRequest.class); OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO();
//2.发送取消订单退款请求MQ if (orderInfoDTO.getOrderStatus() > OrderStatusEnum.CREATED.getCode()) { defaultProducer.sendMessage(RocketMqConstant.CANCEL_REFUND_REQUEST_TOPIC, JSONObject.toJSONString(cancelOrderAssembleRequest), "取消订单退款"); }
//3.发送释放库存MQ ReleaseProductStockRequest releaseProductStockRequest = buildReleaseProductStock(orderInfoDTO, orderItemDAO); defaultProducer.sendMessage(RocketMqConstant.CANCEL_RELEASE_INVENTORY_TOPIC, JSONObject.toJSONString(releaseProductStockRequest), "取消订单释放库存");
//4.发送释放优惠券MQ if (!Strings.isNullOrEmpty(orderInfoDTO.getCouponId())) { ReleaseUserCouponRequest releaseUserCouponRequest = buildReleaseUserCoupon(orderInfoDTO); defaultProducer.sendMessage(RocketMqConstant.CANCEL_RELEASE_PROPERTY_TOPIC, JSONObject.toJSONString(releaseUserCouponRequest), "取消订单释放优惠券"); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }
//组装释放优惠券数据 private ReleaseUserCouponRequest buildReleaseUserCoupon(OrderInfoDTO orderInfoDTO) { ReleaseUserCouponRequest releaseUserCouponRequest = new ReleaseUserCouponRequest(); releaseUserCouponRequest.setCouponId(orderInfoDTO.getCouponId()); releaseUserCouponRequest.setUserId(orderInfoDTO.getUserId()); releaseUserCouponRequest.setOrderId(orderInfoDTO.getOrderId()); return releaseUserCouponRequest; }
//组装释放库存数据 private ReleaseProductStockRequest buildReleaseProductStock(OrderInfoDTO orderInfoDTO, OrderItemDAO orderItemDAO) { String orderId = orderInfoDTO.getOrderId(); List<ReleaseProductStockRequest.OrderItemRequest> orderItemRequestList = new ArrayList<>();
//查询订单条目 ReleaseProductStockRequest.OrderItemRequest orderItemRequest; List<OrderItemDO> orderItemDOList = orderItemDAO.listByOrderId(orderId); for (OrderItemDO orderItemDO : orderItemDOList) { orderItemRequest = new ReleaseProductStockRequest.OrderItemRequest(); orderItemRequest.setSkuCode(orderItemDO.getSkuCode()); orderItemRequest.setSaleQuantity(orderItemDO.getSaleQuantity()); orderItemRequestList.add(orderItemRequest); } ReleaseProductStockRequest releaseProductStockRequest = new ReleaseProductStockRequest(); releaseProductStockRequest.setOrderId(orderInfoDTO.getOrderId()); releaseProductStockRequest.setOrderItemRequestList(orderItemRequestList); return releaseProductStockRequest; }}
@Configurationpublic class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties; ...
//消费退款请求消息 消费者 @Bean("cancelRefundConsumer") public DefaultMQPushConsumer cancelRefundConsumer(CancelRefundListener cancelRefundListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.REQUEST_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.CANCEL_REFUND_REQUEST_TOPIC, "*"); consumer.registerMessageListener(cancelRefundListener); consumer.start(); return consumer; }}
//消费取消订单退款请求的消息@Componentpublic class CancelRefundListener implements MessageListenerConcurrently { @Autowired private OrderAfterSaleService orderAfterSaleService;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); CancelOrderAssembleRequest cancelOrderAssembleRequest = JSONObject.parseObject(message, CancelOrderAssembleRequest.class); log.info("CancelRefundConsumer message:{}", message); //执行 取消订单/超时未支付取消 前的操作 JsonResult<Boolean> jsonResult = orderAfterSaleService.processCancelOrder(cancelOrderAssembleRequest); if (!jsonResult.getSuccess()) { throw new OrderBizException(OrderErrorCodeEnum.CONSUME_MQ_FAILED); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }}
@Configurationpublic class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties;
//释放库存消息消费者 @Bean("releaseInventoryConsumer") public DefaultMQPushConsumer releaseInventoryConsumer(ReleaseInventoryListener releaseInventoryListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.RELEASE_INVENTORY_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.CANCEL_RELEASE_INVENTORY_TOPIC, "*"); consumer.registerMessageListener(releaseInventoryListener); consumer.start(); return consumer; } ...}
//消费释放库存的消息@Componentpublic class ReleaseInventoryListener implements MessageListenerConcurrently { @DubboReference(version = "1.0.0") private InventoryApi inventoryApi;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt msg : list) { String content = new String(msg.getBody(), StandardCharsets.UTF_8); log.info("ReleaseInventoryConsumer message:{}", content); CancelOrderReleaseProductStockRequest cancelOrderReleaseProductStockRequest = JSONObject.parseObject(content, CancelOrderReleaseProductStockRequest.class); JsonResult<Boolean> jsonResult = inventoryApi.cancelOrderReleaseProductStock(cancelOrderReleaseProductStockRequest); if (!jsonResult.getSuccess()) { throw new InventoryBizException(InventoryErrorCodeEnum.CONSUME_MQ_FAILED); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }}
@Configurationpublic class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties;
//释放资产权益消息消费者 @Bean("releaseInventoryConsumer") public DefaultMQPushConsumer releaseInventoryConsumer(ReleasePropertyListener releasePropertyListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.RELEASE_PROPERTY_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.CANCEL_RELEASE_PROPERTY_TOPIC, "*"); consumer.registerMessageListener(releasePropertyListener); consumer.start(); return consumer; } ...}
//消费释放优惠券的消息@Componentpublic class ReleasePropertyListener implements MessageListenerConcurrently { @DubboReference(version = "1.0.0") private MarketApi marketApi;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt msg : list) { String content = new String(msg.getBody(), StandardCharsets.UTF_8); log.info("ReleasePropertyConsumer message:{}", content); CancelOrderReleaseUserCouponRequest cancelOrderReleaseUserCouponRequest = JSONObject.parseObject(content, CancelOrderReleaseUserCouponRequest.class); JsonResult<Boolean> jsonResult = marketApi.cancelOrderReleaseCoupon(cancelOrderReleaseUserCouponRequest); if (!jsonResult.getSuccess()) { throw new MarketBizException(MarketErrorCodeEnum.CONSUME_MQ_FAILED); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }}
复制代码


(5)对退款的退款前处理

@Servicepublic class OrderAfterSaleServiceImpl implements OrderAfterSaleService {    ...    @Override    public JsonResult<Boolean> processCancelOrder(CancelOrderAssembleRequest cancelOrderAssembleRequest) {        String orderId = cancelOrderAssembleRequest.getOrderId();        //分布式锁        String key = RedisLockKeyConstants.REFUND_KEY + orderId;        try {            boolean lock = redisLock.lock(key);            if (!lock) {                throw new OrderBizException(OrderErrorCodeEnum.PROCESS_REFUND_REPEAT);            }
//执行退款前的准备工作 //生成售后订单号 OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO(); OrderInfoDO orderInfoDO = orderInfoDTO.clone(OrderInfoDO.class); String afterSaleId = orderNoManager.genOrderId(OrderNoTypeEnum.AFTER_SALE.getCode(), orderInfoDO.getUserId());
//1.计算 取消订单 退款金额 CancelOrderRefundAmountDTO cancelOrderRefundAmountDTO = calculatingCancelOrderRefundAmount(cancelOrderAssembleRequest); cancelOrderAssembleRequest.setCancelOrderRefundAmountDTO(cancelOrderRefundAmountDTO);
TransactionMQProducer producer = defaultProducer.getProducer(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { try { //2.取消订单操作 记录售后信息 afterSaleManager.insertCancelOrderAfterSale(cancelOrderAssembleRequest, AfterSaleStatusEnum.REVIEW_PASS.getCode(), orderInfoDO, afterSaleId); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } }
@Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { // 查询售后数据是否插入成功 AfterSaleInfoDO afterSaleInfoDO = afterSaleInfoDAO.getOneByAfterSaleId(Long.valueOf(afterSaleId)); List<AfterSaleItemDO> afterSaleItemDOList = afterSaleItemDAO.listByAfterSaleId(Long.valueOf(afterSaleId)); List<AfterSaleLogDO> afterSaleLogDOList = afterSaleLogDAO.listByAfterSaleId(Long.valueOf(afterSaleId)); List<AfterSaleRefundDO> afterSaleRefundDOList = afterSaleRefundDAO.listByAfterSaleId(Long.valueOf(afterSaleId)); if (afterSaleInfoDO != null && afterSaleItemDOList.isEmpty() && afterSaleLogDOList.isEmpty() && afterSaleRefundDOList.isEmpty()) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } });
try { //3.组装事务MQ消息 ActualRefundMessage actualRefundMessage = new ActualRefundMessage(); actualRefundMessage.setOrderId(cancelOrderAssembleRequest.getOrderId()); actualRefundMessage.setLastReturnGoods(cancelOrderAssembleRequest.isLastReturnGoods()); actualRefundMessage.setAfterSaleId(Long.valueOf(afterSaleId)); Message message = new Message(RocketMqConstant.ACTUAL_REFUND_TOPIC, JSONObject.toJSONString(actualRefundMessage).getBytes(StandardCharsets.UTF_8));
//4.发送事务MQ消息 TransactionSendResult result = producer.sendMessageInTransaction(message, actualRefundMessage); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.PROCESS_REFUND_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } finally { redisLock.unlock(key); } } ...}
@Servicepublic class AfterSaleManagerImpl implements AfterSaleManager { ... //取消订单操作 记录售后信息 @Transactional(rollbackFor = Exception.class) @Override public void insertCancelOrderAfterSale(CancelOrderAssembleRequest cancelOrderAssembleRequest, Integer afterSaleStatus, OrderInfoDO orderInfoDO, String afterSaleId) { OrderInfoDTO orderInfoDTO = cancelOrderAssembleRequest.getOrderInfoDTO(); //取消订单过程中的 申请退款金额 和 实际退款金额 都是实付退款金额 金额相同 AfterSaleInfoDO afterSaleInfoDO = new AfterSaleInfoDO(); afterSaleInfoDO.setApplyRefundAmount(orderInfoDO.getPayAmount()); afterSaleInfoDO.setRealRefundAmount(orderInfoDO.getPayAmount());
//1.新增售后订单表 Integer cancelOrderAfterSaleStatus = AfterSaleStatusEnum.REVIEW_PASS.getCode(); insertCancelOrderAfterSaleInfoTable(orderInfoDO, cancelOrderAfterSaleStatus, afterSaleInfoDO, afterSaleId); cancelOrderAssembleRequest.setAfterSaleId(afterSaleId);
//2.新增售后条目表 String orderId = cancelOrderAssembleRequest.getOrderId(); List<OrderItemDTO> orderItemDTOList = cancelOrderAssembleRequest.getOrderItemDTOList(); insertAfterSaleItemTable(orderId, orderItemDTOList, afterSaleId);
//3.新增售后变更表 insertCancelOrderAfterSaleLogTable(afterSaleId, orderInfoDTO, AfterSaleStatusEnum.UN_CREATED.getCode(), afterSaleStatus);
//4.新增售后支付表 AfterSaleRefundDO afterSaleRefundDO = insertAfterSaleRefundTable(orderInfoDTO, afterSaleId, afterSaleInfoDO); cancelOrderAssembleRequest.setAfterSaleRefundId(afterSaleRefundDO.getId()); } ...}
复制代码


(6)对退款的实际退款处理

@Configurationpublic class ConsumerConfig {    @Autowired    private RocketMQProperties rocketMQProperties;    ...
@Bean("actualRefundConsumer") public DefaultMQPushConsumer actualRefundConsumer(ActualRefundListener actualRefundListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ACTUAL_REFUND_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(ACTUAL_REFUND_TOPIC, "*"); consumer.registerMessageListener(actualRefundListener); consumer.start(); return consumer; }}
//实际退款消费者@Componentpublic class ActualRefundListener implements MessageListenerConcurrently { @Autowired private OrderAfterSaleService orderAfterSaleService;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); ActualRefundMessage actualRefundMessage = JSONObject.parseObject(message, ActualRefundMessage.class); log.info("ActualRefundConsumer message:{}", message); JsonResult<Boolean> jsonResult = orderAfterSaleService.refundMoney(actualRefundMessage); if (!jsonResult.getSuccess()) { throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage()); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }}
@Servicepublic class OrderAfterSaleServiceImpl implements OrderAfterSaleService { ... //执行退款 @Override public JsonResult<Boolean> refundMoney(ActualRefundMessage actualRefundMessage) { Long afterSaleId = actualRefundMessage.getAfterSaleId(); String key = RedisLockKeyConstants.REFUND_KEY + afterSaleId; try { boolean lock = redisLock.lock(key); if (!lock) { throw new OrderBizException(OrderErrorCodeEnum.REFUND_MONEY_REPEAT); } AfterSaleInfoDO afterSaleInfoDO = afterSaleInfoDAO.getOneByAfterSaleId(actualRefundMessage.getAfterSaleId()); AfterSaleRefundDO afterSaleRefundDO = afterSaleRefundDAO.findOrderAfterSaleStatus(String.valueOf(afterSaleId));
//1.封装调用支付退款接口的数据 PayRefundRequest payRefundRequest = buildPayRefundRequest(actualRefundMessage, afterSaleRefundDO);
//2.执行退款 if (!payApi.executeRefund(payRefundRequest)) { throw new OrderBizException(OrderErrorCodeEnum.ORDER_REFUND_AMOUNT_FAILED); }
//3.本次售后的订单条目是当前订单的最后一笔,发送事务MQ退优惠券,此时isLastReturnGoods标记是true if (actualRefundMessage.isLastReturnGoods()) { TransactionMQProducer producer = defaultProducer.getProducer(); //组装事务MQ消息体 ReleaseUserCouponRequest releaseUserCouponRequest = buildLastOrderReleasesCouponMessage(producer, afterSaleInfoDO, afterSaleId, actualRefundMessage); try { //4.发送事务消息 释放优惠券 Message message = new Message(RocketMqConstant.CANCEL_RELEASE_PROPERTY_TOPIC, JSONObject.toJSONString(releaseUserCouponRequest).getBytes(StandardCharsets.UTF_8)); TransactionSendResult result = producer.sendMessageInTransaction(message, releaseUserCouponRequest); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new OrderBizException(OrderErrorCodeEnum.REFUND_MONEY_RELEASE_COUPON_FAILED); } return JsonResult.buildSuccess(true); } catch (Exception e) { throw new OrderBizException(OrderErrorCodeEnum.SEND_TRANSACTION_MQ_FAILED); } } else { //当前售后条目非本订单的最后一笔 和 取消订单,在此更新售后状态后流程结束 //更新售后单状态 updateAfterSaleStatus(afterSaleInfoDO, AfterSaleStatusEnum.REVIEW_PASS.getCode(), AfterSaleStatusEnum.REFUNDING.getCode()); return JsonResult.buildSuccess(true); } } catch (OrderBizException e) { log.error("system error", e); return JsonResult.buildError(e.getMessage()); } finally { redisLock.unlock(key); } }
private ReleaseUserCouponRequest buildLastOrderReleasesCouponMessage(TransactionMQProducer producer, AfterSaleInfoDO afterSaleInfoDO, Long afterSaleId, ActualRefundMessage actualRefundMessage) { producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { //更新售后单状态 updateAfterSaleStatus(afterSaleInfoDO, AfterSaleStatusEnum.REVIEW_PASS.getCode(), AfterSaleStatusEnum.REFUNDING.getCode()); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("system error", e); return LocalTransactionState.ROLLBACK_MESSAGE; } }
@Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { //查询售后单状态是"退款中" AfterSaleInfoDO afterSaleInfoDO = afterSaleInfoDAO.getOneByAfterSaleId(afterSaleId); if (AfterSaleStatusEnum.REFUNDING.getCode().equals(afterSaleInfoDO.getAfterSaleStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } });
//组装释放优惠券权益消息数据 String orderId = actualRefundMessage.getOrderId(); OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId); ReleaseUserCouponRequest releaseUserCouponRequest = new ReleaseUserCouponRequest(); releaseUserCouponRequest.setCouponId(orderInfoDO.getCouponId()); releaseUserCouponRequest.setUserId(orderInfoDO.getUserId()); return releaseUserCouponRequest; } ...}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18976429

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
订单初版—取消订单链路中的技术问题说明文档_Java_量贩潮汐·WholesaleTide_InfoQ写作社区