写点什么

订单初版—生单链路实现的重构文档

  • 2025-07-15
    福建
  • 本文字数:19625 字

    阅读完需:约 64 分钟

1.库存服务的数据库与缓存双写的实现


(1)生单链路使用 Seata AT 分布式事务的原理流程图


库存服务写库存时,通常都会进行数据库 + 缓存的双写处理。写缓存是库存服务写库存的一个常规性操作,因为需要支撑高并发的库存扣减。



(2)生单链路 Seata AT 模式下的并发问题分析


生单链路中的分布式事务环节在于:锁定优惠券 + 锁定库存 + 生成订单。

 

一.在锁定优惠券环节

每个用户都会有属于自己的优惠券。日常情况下,都是不同的用户使用不同的优惠券购买商品,所以并不会出现并发获取同一优惠券数据的全局锁的情况。

 

二.在锁定库存环节

对于爆品或秒杀,大量用户可能都会基于某商品进行下单扣减库存,因此会出现并发获取同一个 SKU 数据的全局锁。

 

第一个获取到某 SKU 数据的全局锁的事务,在进行生成订单环节由于需要插入多条 SQL,所以可能会比较耗时,从而导致并发等待获取该 SKU 数据的全局锁的其他事务等待时间过长。

 

(3)生单链路如何解决库存全局锁争用问题


一.问题分析

一个商品 SKU 就对应一条库存数据记录。如果大量用户同时购买一个商品 SKU,必然导致:多个分布式事务都去竞争和等待同一个 SKU 库存数据的全局锁。

 

二.解决方案

方案一:库存分桶方案

例如对库存表进行库存分桶。一般一个 SKU 就一条库存数据,该方案下一个 SKU 会有多条库存数据。比如 1 万的库存可分为 1000 条库存数据,每条库存数据可扣库存为 10。每次扣减库存时,按照一定的规则和算法,选择一个库存分桶去扣减。

 

方案二:RocketMQ 柔性事务方案

通过 RocketMQ 柔性事务方案来替换掉 Seata 刚性事务方案。在互联网公司里,一般的业务系统,都是使用 RocketMQ 柔性事务。大多情况下,RocketMQ 柔性事务都能确保数据是一致的。

 

刚性事务:分支事务出现异常或者失败,则全局回滚。柔性事务:分支事务出现异常或者失败,则不断重试消费,直到成功。使用 RocketMQ 柔性事务方案,需要确保消息能被投递到 RocketMQ。

 

方案三:使用没有全局锁的分布式事务方案

Seata 支持 AT、TCC、Saga、XA 这几种事务方案。生单链路的建议是:锁定营销使用 AT 模式 + 锁定库存使用 TCC 模式的混合分布式事务方案。

 

三.生单链路中锁库存的技术方案重构

为了提升生单链路的性能,避免扣减库存时出现大量的全局锁争用。锁定库存使用 Seata 的 TCC 模式,纳入到全局事务中。而且为了让写库存时双写数据库 + 缓存的数据一致性,也可以用 TCC 模式实现。

 

四.存在异构存储的服务解决数据一致性问题的方案

使用 Seata 分布式事务的 TCC 模式。

 

(4)库存服务双写数据库 + 缓存的实现

@RestController@RequestMapping("/inventory")public class InventoryController {    @Autowired    private InventoryService inventoryService;
//新增商品库存 @PostMapping("/addProductStock") public JsonResult<Boolean> addProductStock(@RequestBody AddProductStockRequest request) { inventoryService.addProductStock(request); return JsonResult.buildSuccess(true); } ...}
@Servicepublic class InventoryServiceImpl implements InventoryService { ... @Override public Boolean addProductStock(AddProductStockRequest request) { log.info("新增商品库存:request={}", JSONObject.toJSONString(request)); //1.校验入参 checkAddProductStockRequest(request);
//2.查询商品库存 ProductStockDO productStock = productStockDAO.getBySkuCode(request.getSkuCode()); ParamCheckUtil.checkObjectNull(productStock, InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_EXISTED_ERROR);
//3.添加Redis锁,防并发 String lockKey = RedisLockKeyConstants.ADD_PRODUCT_STOCK_KEY + request.getSkuCode(); Boolean locked = redisLock.lock(lockKey); if (!locked) { throw new InventoryBizException(InventoryErrorCodeEnum.ADD_PRODUCT_SKU_STOCK_ERROR); }
try { //4.执行添加商品库存逻辑 addProductStockProcessor.doAdd(request); } finally { //5.解锁 redisLock.unlock(lockKey); } return true; } ...}
@Componentpublic class AddProductStockProcessor { @Autowired private RedisCache redisCache;
@Autowired private ProductStockDAO productStockDAO;
//执行添加商品库存逻辑 @Transactional(rollbackFor = Exception.class) public void doAdd(AddProductStockRequest request) { //1.构造商品库存 ProductStockDO productStock = buildProductStock(request); //2.保存商品库存到MySQL productStockDAO.save(productStock); //3.保存商品库存到Redis addStockToRedis(productStock); }
//保存商品库存到Redis public void addStockToRedis(ProductStockDO productStock) { String productStockKey = CacheSupport.buildProductStockKey(productStock.getSkuCode()); Map<String, String> productStockValue = CacheSupport.buildProductStockValue(productStock.getSaleStockQuantity(), productStock.getSaledStockQuantity()); redisCache.hPutAll(productStockKey, productStockValue); }
private ProductStockDO buildProductStock(AddProductStockRequest request) { ProductStockDO productStock = new ProductStockDO(); productStock.setSkuCode(request.getSkuCode()); productStock.setSaleStockQuantity(request.getSaleStockQuantity()); productStock.setSaledStockQuantity(0L); return productStock; }}
public interface CacheSupport { String PREFIX_PRODUCT_STOCK = "PRODUCT_STOCK:"; //可销售库存key String SALE_STOCK = "saleStock";
//已销售库存key String SALED_STOCK = "saledStock";
//构造缓存商品库存key static String buildProductStockKey(String skuCode) { return PREFIX_PRODUCT_STOCK + ":" + skuCode; }
//构造缓存商品库存value static Map<String, String> buildProductStockValue(Long saleStockQuantity, Long saledStockQuantity) { Map<String, String> value = new HashMap<>(); value.put(SALE_STOCK, String.valueOf(saleStockQuantity)); value.put(SALED_STOCK, String.valueOf(saledStockQuantity)); return value; }}
复制代码


2.如何发起异构存储下的 Seata TCC 分布式事务


一.在双写数据库 + 缓存的入口添加 Seata 的 @GlobalTransactional 注解。

二.在写数据库接口和写缓存接口上添加 Seata 的 @TwoPhaseBusinessAction 注解。

三.在提供双写数据库 + 库存接口的服务上添加 Seata 的 @LocalTCC 注解。


 

3.异构存储下的 Seata TCC 分布式事务原理


(1)TCC 的核心逻辑


TCC 的核心逻辑就是:try、commit、cancel。

 

一.try

会预留一些资源,但实际的动作并没有执行,当然实际应用比如写库中也可以直接执行实际的动作即提交 SQL。

 

二.commit

分支事务执行 try 都成功后,就会让所有分支事务都执行 commit,commit 会执行实际的动作。

 

三.cancel

如果存在分支事务 try 失败了,那么所有分支事务都要执行 cancel。执行 cancel 时会对预留的资源进行逆向补偿,取消资源预留。

 

(2)TCC 的事务流程


 

4.生单链路锁定库存的 Seata TCC 分布式事务实现


(1)生单链路中生成订单到扣减库存的实现


createOrder()时锁定优惠券 + 生成订单到数据库会使用 Seata 的 AT 模式来实现分布式事务。

@DubboService(version = "1.0.0", interfaceClass = OrderApi.class, retries = 0)public class OrderApiImpl implements OrderApi {    @Autowired    private OrderService orderService;    ...
//提交订单/生成订单接口 @Override public JsonResult<CreateOrderDTO> createOrder(CreateOrderRequest createOrderRequest) { try { CreateOrderDTO createOrderDTO = orderService.createOrder(createOrderRequest); return JsonResult.buildSuccess(createOrderDTO); } catch (OrderBizException e) { log.error("biz error", e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error", e); return JsonResult.buildError(e.getMessage()); } } ...}
@Servicepublic class OrderServiceImpl implements OrderService { @Autowired private OrderManager orderManager; ...
//提交订单/生成订单接口 @Override public CreateOrderDTO createOrder(CreateOrderRequest createOrderRequest) { //1.入参检查 checkCreateOrderRequestParam(createOrderRequest); //2.风控检查 checkRisk(createOrderRequest); //3.获取商品信息 List<ProductSkuDTO> productSkuList = listProductSkus(createOrderRequest); //4.计算订单价格 CalculateOrderAmountDTO calculateOrderAmountDTO = calculateOrderAmount(createOrderRequest, productSkuList); //5.验证订单实付金额 checkRealPayAmount(createOrderRequest, calculateOrderAmountDTO); //6.生成订单(包含锁定优惠券、扣减库存等逻辑) createOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO); //7.发送订单延迟消息用于支付超时自动关单 sendPayOrderTimeoutDelayMessage(createOrderRequest); //返回订单信息 CreateOrderDTO createOrderDTO = new CreateOrderDTO(); createOrderDTO.setOrderId(createOrderRequest.getOrderId());
return createOrderDTO; }
//插入订单到数据库 private void createOrder(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList, CalculateOrderAmountDTO calculateOrderAmountDTO) { //插入订单到数据库 orderManager.createOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO); } ...}
@Servicepublic class OrderManagerImpl implements OrderManager { @DubboReference(version = "1.0.0", retries = 0) private InventoryApi inventoryApi;//库存服务 ...
//生成订单 //由于锁定优惠券不会出现竞争AT模式下的全局锁,所以锁定优惠券+生成订单可以一起使用Seata的AT模式 //但扣减库存继续使用Seata的AT模式则会出现竞争全局锁,所以扣减库存使用Seata的TCC模式 @Override @GlobalTransactional(rollbackFor = Exception.class) public void createOrder(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList, CalculateOrderAmountDTO calculateOrderAmountDTO) { //锁定优惠券 lockUserCoupon(createOrderRequest); //扣减库存 deductProductStock(createOrderRequest); //生成订单到数据库 addNewOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO); }
//锁定商品库存 private void deductProductStock(CreateOrderRequest createOrderRequest) { String orderId = createOrderRequest.getOrderId(); List<DeductProductStockRequest.OrderItemRequest> orderItemRequestList = ObjectUtil.convertList(createOrderRequest.getOrderItemRequestList(), DeductProductStockRequest.OrderItemRequest.class); DeductProductStockRequest lockProductStockRequest = new DeductProductStockRequest(); lockProductStockRequest.setOrderId(orderId); lockProductStockRequest.setOrderItemRequestList(orderItemRequestList); JsonResult<Boolean> jsonResult = inventoryApi.deductProductStock(lockProductStockRequest); //检查锁定商品库存结果 if (!jsonResult.getSuccess()) { throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage()); } } ...}
@DubboService(version = "1.0.0", interfaceClass = InventoryApi.class, retries = 0)public class InventoryApiImpl implements InventoryApi { @Autowired private InventoryService inventoryService;
//扣减商品库存 @Override public JsonResult<Boolean> deductProductStock(DeductProductStockRequest deductProductStockRequest) { try { Boolean result = inventoryService.deductProductStock(deductProductStockRequest); return JsonResult.buildSuccess(result); } catch (InventoryBizException e) { log.error("biz error", e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error", e); return JsonResult.buildError(e.getMessage()); } } ...}
复制代码


(2)库存服务中扣减库存的双写数据库 + 缓存实现


doDeduct()扣减库存时双写数据库 + 缓存会使用 Seata 的 TCC 模式来实现分布式事务。TCC 模式特别适合这种多写异构存储的业务,关键的注解是 @LocalTCC 和 @TwoPhaseBusinessAction。


@Servicepublic class InventoryServiceImpl implements InventoryService {    ...    //扣减商品库存    @Override    public Boolean deductProductStock(DeductProductStockRequest deductProductStockRequest) {        //检查入参        checkLockProductStockRequest(deductProductStockRequest);        String orderId = deductProductStockRequest.getOrderId();        List<DeductProductStockRequest.OrderItemRequest> orderItemRequestList = deductProductStockRequest.getOrderItemRequestList();        for (DeductProductStockRequest.OrderItemRequest orderItemRequest : orderItemRequestList) {            String skuCode = orderItemRequest.getSkuCode();
//1.查询MySQL库存数据 ProductStockDO productStockDO = productStockDAO.getBySkuCode(skuCode); if (productStockDO == null) { log.error("商品库存记录不存在,skuCode={}", skuCode); throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_NOT_FOUND_ERROR); }
//2.查询Redis库存数据 String productStockKey = CacheSupport.buildProductStockKey(skuCode); Map<String, String> productStockValue = redisCache.hGetAll(productStockKey); if (productStockValue.isEmpty()) { //如果查询不到Redis库存数据,将MySQL库存数据放入Redis,以MySQL的数据为准 addProductStockProcessor.addStockToRedis(productStockDO); }
//3.添加Redis锁,防并发 String lockKey = MessageFormat.format(RedisLockKeyConstants.DEDUCT_PRODUCT_STOCK_KEY, orderId, skuCode); Boolean locked = redisLock.lock(lockKey); if (!locked) { log.error("无法获取扣减库存锁,orderId={},skuCode={}", orderId, skuCode); throw new InventoryBizException(InventoryErrorCodeEnum.DEDUCT_PRODUCT_SKU_STOCK_ERROR); }
try { //4.查询库存扣减日志 ProductStockLogDO productStockLog = productStockLogDAO.getLog(orderId, skuCode); if (null != productStockLog) { log.info("已扣减过,扣减库存日志已存在,orderId={},skuCode={}", orderId, skuCode); return true; } Integer saleQuantity = orderItemRequest.getSaleQuantity(); Integer originSaleStock = productStockDO.getSaleStockQuantity().intValue(); Integer originSaledStock = productStockDO.getSaledStockQuantity().intValue();
//5.执行执库存扣减 DeductStockDTO deductStock = new DeductStockDTO(orderId, skuCode, saleQuantity, originSaleStock, originSaledStock); deductProductStockProcessor.doDeduct(deductStock); } finally { redisLock.unlock(lockKey); } } return true; } ...}
//扣减商品库存处理器@Componentpublic class DeductProductStockProcessor { @Autowired private LockMysqlStockTccService lockMysqlStockTccService;
@Autowired private LockRedisStockTccService lockRedisStockTccService;
@Autowired private ProductStockLogDAO productStockLogDAO;
//执行扣减商品库存逻辑,由于createOrder()已经加了@GlobalTransactional注解,这里就不用加了 //@GlobalTransactional(rollbackFor = Exception.class) public void doDeduct(DeductStockDTO deductStock) { //1.执行执行MySQL库存扣减,就是执行TCC的try boolean result = lockMysqlStockTccService.deductStock(null, deductStock); if (!result) { throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_NOT_FOUND_ERROR); }
//2.执行Redis库存扣减,就是执行TCC的try result = lockRedisStockTccService.deductStock(null, deductStock); if (!result) { throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_NOT_FOUND_ERROR); } }}
//锁定MySQL库存,Seata TCC模式的Service@LocalTCCpublic interface LockMysqlStockTccService { //一阶段方法:扣减销售库存(saleStockQuantity-saleQuantity),这就是TCC中的try //所有分支事务的try执行成功,则所有事务执行commit方法; //存在分支事务的try执行失败,则所有事务执行rollback方法; @TwoPhaseBusinessAction(name = "lockMysqlStockTccService", commitMethod = "commit", rollbackMethod = "rollback") boolean deductStock(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "deductStock") DeductStockDTO deductStock);
//二阶段方法:增加已销售库存(saledStockQuantity+saleQuantity),这就是TCC中的commit void commit(BusinessActionContext actionContext);
//回滚:增加销售库存(saleStockQuantity+saleQuantity),这就是TCC中的cancel void rollback(BusinessActionContext actionContext);}
//锁定Redis库存,Seata TCC模式的Service@LocalTCCpublic interface LockRedisStockTccService { //一阶段方法:扣减销售库存(saleStockQuantity-saleQuantity),这就是TCC中的try //所有分支事务的try执行成功,则所有事务执行commit方法; //存在分支事务的try执行失败,则所有事务执行rollback方法; @TwoPhaseBusinessAction(name = "lockRedisStockTccService", commitMethod = "commit", rollbackMethod = "rollback") boolean deductStock(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "deductStock") DeductStockDTO deductStock);
//二阶段方法:增加已销售库存(saledStockQuantity+saleQuantity),这就是TCC中的commit void commit(BusinessActionContext actionContext);
//回滚:增加销售库存(saleStockQuantity+saleQuantity),这就是TCC中的cancel void rollback(BusinessActionContext actionContext);}
复制代码


其中,try 操作可以是预留资源,也可以是直接执行动作(即等于 commit)。比如锁库存,会在 try 操作中把写数据库或写缓存直接处理了。try 操作具体是预留资源还是直接执行,往往会根据业务来决定。

@Servicepublic class LockMysqlStockTccServiceImpl implements LockMysqlStockTccService {    ...    @Transactional(rollbackFor = Exception.class)    @Override    public boolean deductStock(BusinessActionContext actionContext, DeductStockDTO deductStock) {        //actionContext上下文获取全局事务xid        String xid = actionContext.getXid();        String skuCode = deductStock.getSkuCode();        Integer saleQuantity = deductStock.getSaleQuantity();        Integer originSaleStock = deductStock.getOriginSaleStock();        Integer originSaledStock = deductStock.getOriginSaledStock();
//标识try阶段开始执行 TccResultHolder.tagTryStart(getClass(), skuCode, xid);
if (isEmptyRollback()) { return false; } log.info("一阶段方法:扣减MySQL销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);
int result = productStockDAO.deductSaleStock(skuCode, saleQuantity, originSaleStock); //标识try阶段执行成功 if (result > 0) { TccResultHolder.tagTrySuccess(getClass(), skuCode, xid); } return result > 0; } ...}
@Servicepublic class LockRedisStockTccServiceImpl implements LockRedisStockTccService { ... @Override public boolean deductStock(BusinessActionContext actionContext, DeductStockDTO deductStock) { String xid = actionContext.getXid(); String skuCode = deductStock.getSkuCode(); Integer saleQuantity = deductStock.getSaleQuantity(); Integer originSaleStock = deductStock.getOriginSaleStock(); Integer originSaledStock = deductStock.getOriginSaledStock();
//标识try阶段开始执行 TccResultHolder.tagTryStart(getClass(), skuCode, xid); log.info("一阶段方法:扣减redis销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid); if (isEmptyRollback()) { return false; } String luaScript = LuaScript.DEDUCT_SALE_STOCK; String saleStockKey = CacheSupport.SALE_STOCK; String productStockKey = CacheSupport.buildProductStockKey(skuCode); Long result = redisCache.execute(new DefaultRedisScript<>(luaScript, Long.class), Arrays.asList(productStockKey, saleStockKey), String.valueOf(saleQuantity), String.valueOf(originSaleStock));
//标识try阶段执行成功 if (result > 0) { TccResultHolder.tagTrySuccess(getClass(), skuCode, xid); } return result > 0; } ...}
//存储TCC第一阶段执行结果,用于解决TCC幂等,空回滚,悬挂问题public class TccResultHolder { //标识TCC try阶段开始执行的标识 private static final String TRY_START = "TRY_START";
//标识TCC try阶段执行成功的标识 private static final String TRY_SUCCESS = "TRY_SUCCESS";
//保存TCC事务执行过程的状态 private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>(); ...
//标记try阶段开始执行 public static void tagTryStart(Class<?> tccClass, String bizKey, String xid) { setResult(tccClass, bizKey, xid, TRY_START); } //标记try阶段执行成功 public static void tagTrySuccess(Class<?> tccClass, String bizKey, String xid) { setResult(tccClass, bizKey, xid, TRY_SUCCESS); }
//一个tccClass代表了TCC的一个分支事务 public static void setResult(Class<?> tccClass, String bizKey, String xid, String v) { Map<String, String> results = map.get(tccClass); if (results == null) { synchronized (map) { if (results == null) { results = new ConcurrentHashMap<>(); map.put(tccClass, results); } } } results.put(getTccExecution(xid, bizKey), v);//保存当前分布式事务id } ...}
复制代码


5.库存服务异构存储双写 TCC 异常处理


只要 TCC 的分支事务在 try 过程中出现异常,都需要回滚所有分支事务。

 

如果没有出现异常,则执行 commit()方法。如果出现异常,则执行 rollback()方法。


//锁定MySQL库存,Seata TCC模式的Service@LocalTCCpublic interface LockMysqlStockTccService {    //一阶段方法:扣减销售库存(saleStockQuantity-saleQuantity),这就是TCC中的try    //所有分支事务的try执行成功,则所有事务执行commit方法;    //存在分支事务的try执行失败,则所有事务执行rollback方法;    @TwoPhaseBusinessAction(name = "lockMysqlStockTccService", commitMethod = "commit", rollbackMethod = "rollback")    boolean deductStock(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "deductStock") DeductStockDTO deductStock);
//二阶段方法:增加已销售库存(saledStockQuantity+saleQuantity),这就是TCC中的commit void commit(BusinessActionContext actionContext);
//回滚:增加销售库存(saleStockQuantity+saleQuantity),这就是TCC中的cancel void rollback(BusinessActionContext actionContext);}
//锁定Redis库存,Seata TCC模式的Service@LocalTCCpublic interface LockRedisStockTccService { //一阶段方法:扣减销售库存(saleStockQuantity-saleQuantity),这就是TCC中的try //所有分支事务的try执行成功,则所有事务执行commit方法; //存在分支事务的try执行失败,则所有事务执行rollback方法; @TwoPhaseBusinessAction(name = "lockRedisStockTccService", commitMethod = "commit", rollbackMethod = "rollback") boolean deductStock(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "deductStock") DeductStockDTO deductStock);
//二阶段方法:增加已销售库存(saledStockQuantity+saleQuantity),这就是TCC中的commit void commit(BusinessActionContext actionContext);
//回滚:增加销售库存(saleStockQuantity+saleQuantity),这就是TCC中的cancel void rollback(BusinessActionContext actionContext);}
复制代码


写数据库的 commit()和 rollback():

@Servicepublic class LockMysqlStockTccServiceImpl implements LockMysqlStockTccService {    @Autowired    private ProductStockDAO productStockDAO;
@Autowired private ProductStockLogDAO productStockLogDAO; ...
@Override public void commit(BusinessActionContext actionContext) { String xid = actionContext.getXid(); DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class); String skuCode = deductStock.getSkuCode(); Integer saleQuantity = deductStock.getSaleQuantity(); Integer originSaleStock = deductStock.getOriginSaleStock(); Integer originSaledStock = deductStock.getOriginSaledStock(); log.info("二阶段方法:增加mysql已销售库存,deductStock,xid={}", JSONObject.toJSONString(deductStock), xid);
//幂等 //当出现网络异常或者TC Server异常时,会出现重复调用commit阶段的情况,所以需要进行幂等操作 if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) { return; }
//1.增加已销售库存 productStockDAO.increaseSaledStock(skuCode, saleQuantity, originSaledStock);
//2.插入一条扣减日志表 log.info("插入一条扣减日志表"); productStockLogDAO.save(buildStockLog(deductStock));
//移除标识 TccResultHolder.removeResult(getClass(), skuCode, xid); }
@Override public void rollback(BusinessActionContext actionContext) { String xid = actionContext.getXid(); DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class); String skuCode = deductStock.getSkuCode(); Integer saleQuantity = deductStock.getSaleQuantity(); Integer originSaleStock = deductStock.getOriginSaleStock(); Integer originSaledStock = deductStock.getOriginSaledStock(); log.info("回滚:增加mysql销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);
//空回滚处理 if (TccResultHolder.isTagNull(getClass(), skuCode, xid)) { log.info("mysql:出现空回滚"); insertEmptyRollbackTag(); return; }
//幂等处理 //try阶段没有完成的情况下,不必执行回滚,因为try阶段有本地事务,事务失败时已经进行了回滚 //如果try阶段成功,而其他全局事务参与者失败,这里会执行回滚 if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) { log.info("mysql:无需回滚"); return; }
//1.还原销售库存 productStockDAO.restoreSaleStock(skuCode, saleQuantity, originSaleStock - saleQuantity);
//2.删除库存扣减日志 ProductStockLogDO logDO = productStockLogDAO.getLog(deductStock.getOrderId(), skuCode); if (null != logDO) { productStockLogDAO.removeById(logDO.getId()); }
//移除标识 TccResultHolder.removeResult(getClass(), skuCode, xid); } ...}
复制代码


写缓存的 commit()和 rollback():

@Servicepublic class LockRedisStockTccServiceImpl implements LockRedisStockTccService {    @Autowired    private RedisCache redisCache;    ...
@Override public void commit(BusinessActionContext actionContext) { String xid = actionContext.getXid(); DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class); String skuCode = deductStock.getSkuCode(); Integer saleQuantity = deductStock.getSaleQuantity(); Integer originSaleStock = deductStock.getOriginSaleStock(); Integer originSaledStock = deductStock.getOriginSaledStock(); log.info("二阶段方法:增加redis已销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);
//幂等 //当出现网络异常或者TC Server异常时,会出现重复调用commit阶段的情况,所以需要进行幂等操作 if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) { log.info("已经执行过commit阶段"); return; }
String luaScript = LuaScript.INCREASE_SALED_STOCK; String saledStockKey = CacheSupport.SALED_STOCK; String productStockKey = CacheSupport.buildProductStockKey(skuCode); redisCache.execute(new DefaultRedisScript<>(luaScript, Long.class), Arrays.asList(productStockKey, saledStockKey), String.valueOf(saleQuantity), String.valueOf(originSaledStock));
//移除标识 TccResultHolder.removeResult(getClass(), skuCode, xid); }
@Override public void rollback(BusinessActionContext actionContext) { String xid = actionContext.getXid(); DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class); String skuCode = deductStock.getSkuCode(); Integer saleQuantity = deductStock.getSaleQuantity(); Integer originSaleStock = deductStock.getOriginSaleStock(); Integer originSaledStock = deductStock.getOriginSaledStock(); log.info("回滚:增加redis销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);
//空回滚处理 if (TccResultHolder.isTagNull(getClass(), skuCode, xid)) { log.info("redis:出现空回滚"); insertEmptyRollbackTag(); return; }
//幂等处理 //try阶段没有完成的情况下,不必执行回滚 //如果try阶段成功,而其他全局事务参与者失败,这里会执行回滚 if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) { log.info("redis:无需回滚"); return; }
String luaScript = LuaScript.RESTORE_SALE_STOCK; String saleStockKey = CacheSupport.SALE_STOCK; String productStockKey = CacheSupport.buildProductStockKey(skuCode); redisCache.execute(new DefaultRedisScript<>(luaScript, Long.class), Arrays.asList(productStockKey, saleStockKey), String.valueOf(saleQuantity), String.valueOf(originSaleStock - saleQuantity));
//移除标识 TccResultHolder.removeResult(getClass(), skuCode, xid); } ...}
复制代码


6.库存服务 TCC 事务的空悬挂问题


(1)空悬挂问题


因为网络延迟等原因,分支事务的 rollback()方法可能会比 try()方法先执行,即 rollback()方法进行了空回滚,然后 try()方法才执行,从而导致 try()方法预留的资源无法被取消。

 

(2)解决空悬挂的思路


当 rollback()方法出现空回滚时,需要进行标识(如在数据库中查一条记录),然后在 try()方法里会判断是否发生了空回滚。


@Servicepublic class LockMysqlStockTccServiceImpl implements LockMysqlStockTccService {    ...    //这就是TCC的try    @Transactional(rollbackFor = Exception.class)    @Override    public boolean deductStock(BusinessActionContext actionContext, DeductStockDTO deductStock) {        //actionContext上下文获取全局事务xid        String xid = actionContext.getXid();        String skuCode = deductStock.getSkuCode();        Integer saleQuantity = deductStock.getSaleQuantity();        Integer originSaleStock = deductStock.getOriginSaleStock();        Integer originSaledStock = deductStock.getOriginSaledStock();
//标识try阶段开始执行 TccResultHolder.tagTryStart(getClass(), skuCode, xid);
//悬挂问题:rollback接口比try接口先执行,即rollback接口进行了空回滚,try接口才执行,导致try接口预留的资源无法被取消 //解决空悬挂的思路:即当rollback接口出现空回滚时,需要打一个标识(在数据库中查一条记录),在try()里判断是否发生了空回滚 if (isEmptyRollback()) { return false; } log.info("一阶段方法:扣减MySQL销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);
int result = productStockDAO.deductSaleStock(skuCode, saleQuantity, originSaleStock); //标识try阶段执行成功 if (result > 0) { TccResultHolder.tagTrySuccess(getClass(), skuCode, xid); }
return result > 0; }
//判断是否发生的空回滚 private Boolean isEmptyRollback() { //需要查询本地数据库,看是否发生了空回滚 return false; }
//插入空回滚标识 private void insertEmptyRollbackTag() { //在数据库插入空回滚的标识 }
@Override public void rollback(BusinessActionContext actionContext) { String xid = actionContext.getXid(); DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class); String skuCode = deductStock.getSkuCode(); Integer saleQuantity = deductStock.getSaleQuantity(); Integer originSaleStock = deductStock.getOriginSaleStock(); Integer originSaledStock = deductStock.getOriginSaledStock(); log.info("回滚:增加mysql销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);
//空回滚处理 if (TccResultHolder.isTagNull(getClass(), skuCode, xid)) { log.info("mysql:出现空回滚"); //插入空回滚标识 insertEmptyRollbackTag(); return; }
//幂等处理 //try阶段没有完成的情况下,不必执行回滚,因为try阶段有本地事务,事务失败时已经进行了回滚 //如果try阶段成功,而其他全局事务参与者失败,这里会执行回滚 if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) { log.info("mysql:无需回滚"); return; }
//1.还原销售库存 productStockDAO.restoreSaleStock(skuCode, saleQuantity, originSaleStock - saleQuantity);
//2.删除库存扣减日志 ProductStockLogDO logDO = productStockLogDAO.getLog(deductStock.getOrderId(), skuCode); if (null != logDO) { productStockLogDAO.removeById(logDO.getId()); }
//移除标识 TccResultHolder.removeResult(getClass(), skuCode, xid); } ...}
复制代码


7.库存服务 TCC 二阶段重试的幂等问题


如果执行 commit 失败,Seata Server 会让分支事务不断重试 commit。如果执行 cancel 失败,Seata Server 会让分支事务不断重试 cancel。只要出现重试,就需要保证重试操作的方法是幂等的。

 

当 try 开始执行时,会添加标识,表明开启了 TCC 事务。当标识被移除掉后,则说明 commit 或 cancel 执行成功。

 

重复执行 commit 或 cancel 时,通过判断标识是否为空,就能拦截掉重复执行的 commit 或 cancel,从而实现幂等。


@Servicepublic class LockMysqlStockTccServiceImpl implements LockMysqlStockTccService {    @Autowired    private ProductStockDAO productStockDAO;
@Autowired private ProductStockLogDAO productStockLogDAO; ...
@Override public void commit(BusinessActionContext actionContext) { String xid = actionContext.getXid(); DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class); String skuCode = deductStock.getSkuCode(); Integer saleQuantity = deductStock.getSaleQuantity(); Integer originSaleStock = deductStock.getOriginSaleStock(); Integer originSaledStock = deductStock.getOriginSaledStock(); log.info("二阶段方法:增加mysql已销售库存,deductStock,xid={}", JSONObject.toJSONString(deductStock), xid);
//幂等 //当出现网络异常或者TC Server异常时,会出现重复调用commit阶段的情况,所以需要进行幂等操作 if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) { return; }
//1.增加已销售库存 productStockDAO.increaseSaledStock(skuCode, saleQuantity, originSaledStock);
//2.插入一条扣减日志表 log.info("插入一条扣减日志表"); productStockLogDAO.save(buildStockLog(deductStock));
//移除标识 TccResultHolder.removeResult(getClass(), skuCode, xid); }
@Override public void rollback(BusinessActionContext actionContext) { String xid = actionContext.getXid(); DeductStockDTO deductStock = ((JSONObject) actionContext.getActionContext("deductStock")).toJavaObject(DeductStockDTO.class); String skuCode = deductStock.getSkuCode(); Integer saleQuantity = deductStock.getSaleQuantity(); Integer originSaleStock = deductStock.getOriginSaleStock(); Integer originSaledStock = deductStock.getOriginSaledStock(); log.info("回滚:增加mysql销售库存,deductStock={},xid={}", JSONObject.toJSONString(deductStock), xid);
//空回滚处理 if (TccResultHolder.isTagNull(getClass(), skuCode, xid)) { log.info("mysql:出现空回滚"); insertEmptyRollbackTag(); return; }
//幂等处理 //try阶段没有完成的情况下,不必执行回滚,因为try阶段有本地事务,事务失败时已经进行了回滚 //如果try阶段成功,而其他全局事务参与者失败,这里会执行回滚 if (!TccResultHolder.isTrySuccess(getClass(), skuCode, xid)) { log.info("mysql:无需回滚"); return; }
//1.还原销售库存 productStockDAO.restoreSaleStock(skuCode, saleQuantity, originSaleStock - saleQuantity);
//2.删除库存扣减日志 ProductStockLogDO logDO = productStockLogDAO.getLog(deductStock.getOrderId(), skuCode); if (null != logDO) { productStockLogDAO.removeById(logDO.getId()); }
//移除标识 TccResultHolder.removeResult(getClass(), skuCode, xid); } ...}
//存储TCC第一阶段执行结果,用于解决TCC幂等,空回滚,悬挂问题public class TccResultHolder { //标识TCC try阶段开始执行的标识 private static final String TRY_START = "TRY_START";
//标识TCC try阶段执行成功的标识 private static final String TRY_SUCCESS = "TRY_SUCCESS";
//保存TCC事务执行过程的状态 private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>(); ...
//判断try阶段是否执行成功 public static boolean isTrySuccess(Class<?> tccClass, String bizKey, String xid) { String v = getResult(tccClass, bizKey, xid); if (StringUtils.isNotBlank(v) && TRY_SUCCESS.equals(v)) { return true; } return false; }
public static String getResult(Class<?> tccClass, String bizKey, String xid) { Map<String, String> results = map.get(tccClass); if (results != null) { return results.get(getTccExecution(xid, bizKey)); } return null; }
public static void removeResult(Class<?> tccClass, String bizKey, String xid) { Map<String, String> results = map.get(tccClass); if (results != null) { results.remove(getTccExecution(xid, bizKey)); } } ...}
复制代码


8.假设使用异步锁库存方案可能会导致的几种问题


假如锁定优惠券 + 生成订单数据写库之后,通过发送库存扣减消息到 MQ。后续再由消费者消费 MQ 的库存扣减消息,完成数据库 + 缓存的双写。通过这种异步锁库存的方式,来实现隔离 Seata AT 模式和 Seata TCC 模式。那么就可能会出现如下问题:

 

(1)发送库存扣减消息到 MQ 失败导致超卖


比如 10 个库存,发了 11 次消息,其中 1 次失败,那么就会造成 10 个库存扣完了,但是生成了 11 个订单。

 

(2)消费库存扣减消息失败重试时出现重复消费


比如 10 个库存,发了 9 次消息,其中一条消息重复消费了两次。这样库存扣完了,但只生成 9 个订单。

 

(3)大量并发扣减请求进来扣库存导致大量扣减失败


出现生成订单成功,但是后续出现大量扣库存失败,导致退款。

 

所以,异步锁库存并不科学,锁库存还是要使用同步。也就是生成订单到数据库 + 锁优惠券 + 锁库存,使用 AT 模式绑定成刚性事务。但由于锁库存使用了 TCC 模式,所以锁库存的分支事务不用再竞争全局锁,从而提高了锁库存的并发性能,而且 TCC 模式也保证了双写数据库 + 缓存的数据一致性。

 

9.生单链路的 AT + TCC 混合事务方案流程总结


一.生成订单数据 + 锁定优惠券,使用的是 AT 模式

订单数据和营销数据通常不需要做异构存储,使用数据库存储即可。往数据库写入订单数据 + 锁优惠券,由于都是与用户关联,所以即使并发情况下也不会出现竞争全局锁。

 

二.锁库存双写数据库 + 缓存,使用的是 TCC 模式

库存数据需要异构存储,所以扣减库存时,需要操作数据库 + 缓存。双写数据库 + 缓存会面临数据一致性问题,TCC 模式可以保证数据一致性。

 

锁库存使用 TCC 模式后,即便出现大量并发请求锁库存,也不需要竞争 AT 模式下的全局锁了。


 

10.生单链路非分布式事务的纯补偿方案


也有很多公司并没有使用 Seata 分布式事务这种比较复杂的技术,而是使用纯补偿方案来实现生单链路。

 

生单链路纯补偿方案需要引入操作日志来实现补偿检查,锁定优惠券需要有操作日志,锁定库存也需要有操作日志。

 

无论生单是否成功,都要发送消息到 MQ,以生单请求为基础去检查锁定优惠券和锁定库存的操作日志。

 

如果生单成功,但锁定优惠券或锁定库存的操作日志缺失,则进行锁定优惠券或锁定库存的补偿操作。

 

如果生单失败,但锁定优惠券或锁定库存的操作日志显示锁定成功,则需要释放优惠券或库存的资源。



文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18983430

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
订单初版—生单链路实现的重构文档_WPF_不在线第一只蜗牛_InfoQ写作社区