写点什么

商品中心—库存分桶的一致性改造文档

  • 2025-07-03
    福建
  • 本文字数:20664 字

    阅读完需:约 68 分钟

1.分布式库存扣减时序图和流程图

 

(1)分布式库存扣减时序图



(2)分布式库存扣减流程图



(3)数据库设计


一.库存分桶操作表

CREATE TABLE `inventory_bucket_operate` (    `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,    `operate_id` varchar(32) NOT NULL COMMENT '操作id',    `seller_id` varchar(64) NOT NULL COMMENT '卖家id',    `sku_id` varchar(64) NOT NULL COMMENT '商品sku',    `operate_type` tinyint(3) NOT NULL COMMENT '操作类型:1-初始化,2-增加库存,3-分桶上线,4-分桶扩容,5-分桶下线',    `bucket` text COMMENT '分桶变动信息',    `inventory_num` int(11) DEFAULT NULL COMMENT '变动库存',    `feature` text COMMENT '扩展信息',    `operate_status` tinyint(4) DEFAULT '0' COMMENT '操作状态',    `del_flag` tinyint(1) DEFAULT '1' COMMENT '删除标记',    `create_user` int(11) DEFAULT NULL COMMENT '创建⼈',    `create_time` datetime DEFAULT NULL COMMENT '创建时间',    `update_user` int(11) DEFAULT NULL COMMENT '更新⼈',    `update_time` datetime DEFAULT NULL COMMENT '更新时间',    PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=58 DEFAULT CHARSET=utf8mb4 COMMENT='库存分桶操作表';
复制代码


二.库存操作失败记录表

CREATE TABLE `inventory_operate_fail` (    `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',    `operate_id` varchar(32) NOT NULL COMMENT '操作id',    `fail_type` varchar(32) DEFAULT NULL COMMENT '操作类型',    `bucket_no` varchar(32) DEFAULT NULL COMMENT '分桶编号',    `inventory_num` int(11) DEFAULT NULL COMMENT '变动库存数量',    `del_flag` tinyint(1) DEFAULT NULL COMMENT '删除标识',    `create_user` int(11) DEFAULT NULL COMMENT '创建⼈',    `create_time` datetime DEFAULT NULL COMMENT '创建时间',    `update_user` int(11) DEFAULT NULL COMMENT '更新⼈',    `update_time` datetime DEFAULT NULL COMMENT '更新时间',    PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存操作失败记录表';
复制代码


三.库存分桶配置表

CREATE TABLE `inventory_bucket_config` (    `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',    `bucket_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '分桶数量',    `max_depth_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '最⼤库存深度,即分桶的最大库存容量',    `min_depth_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '最⼩库存深度,即分桶的最小库存容量',    `threshold_value` int(10)  NOT NULL DEFAULT '0' COMMENT  '分桶下线阈值,当某个分桶的库存数小于阈值时就需要将该分桶下线了',    `back_source_proportion` int(10)  NOT NULL DEFAULT '0' COMMENT  '回源⽐例,从1-100设定⽐例',    `back_source_step` int(10)  NOT NULL DEFAULT '0' COMMENT  '回源步⻓,桶扩容的时候默认每次分配的库存⼤⼩',    `template_name` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '模板名称',    `is_default` tinyint(1)  NOT NULL DEFAULT '0' COMMENT '是否默认模板,只允许⼀个,1为默认模板',    `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',    `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',    PRIMARY KEY (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存分桶配置模板表';
复制代码


四.库存分配记录表

CREATE TABLE `inventory_allot_detail` (    `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',    `sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'skuId',    `inventor_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '库存申请业务编号',    `seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID',    `inventor_num` int(10) NOT NULL DEFAULT '0' COMMENT '库存变更数量',    `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',    `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',    PRIMARY KEY (`id`),    UNIQUE KEY `inde_unique_inventor_no` (`inventor_no`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=265 DEFAULT CHARSET=utf8 COMMENT='库存分配记录表';
复制代码


五.库存扣减明细表

CREATE TABLE `inventory_deduction_detail` (    `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',    `order_id` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '订单id',    `refund_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '退款编号',    `inventory_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '扣减库存数量',    `sku_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '商品skuId',    `seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID',    `bucket_no` int(10)  NOT NULL COMMENT  '扣减分桶编号',    `deduction_type` int(2)  NOT NULL COMMENT  '库存操作类型(10库存扣减,20库存退货)',    `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',    PRIMARY KEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存扣减明细表';
复制代码


2.库存入桶分配改造


这里主要进行的是库存分桶初始化分配库存的改造。其中的缓存一致性方案是:首先将分桶元数据写入库存分桶操作表,然后再由定时任务扫描处理。

 

计算出元数据(待上线分桶、中⼼桶剩余库存、每个分桶分配库存)信息后,为了保证⼀致性,会先将计算出的分桶元数据信息⼊库。也就是先写⼊库存分桶操作表,然后在缓存中写⼊中⼼桶剩余库存信息。如果⼊库失败或缓存写⼊失败,会抛出异常,数据库回滚,操作不成功。只有⼊库成功和缓存写⼊成功之后,本次操作才成功。

 

关键环节与核心代码:

 

(1)库存分桶初始化入口


@RestController@RequestMapping("/product/inventory")public class InventoryController {    @Autowired    private InventoryBucketService inventoryBucketService;
@Autowired private InventoryBucketCache cache;
@Resource private TairCache tairCache; ...
//初始化库存 @RequestMapping("/init") public void inventoryInit(@RequestBody InventorBucketRequest request) { //清除本地缓存数据,cache.getCache()获取的是Guava Cache cache.getCache().invalidateAll(); //清除Tair中的数据,扫描卖家ID+SKU的ID的key会比较耗时 Set<String> keys = tairCache.getJedis().keys("*" + request.getSellerId() + request.getSkuId() + "*"); if (!CollectionUtils.isEmpty(keys)) { tairCache.mdelete(Lists.newArrayList(keys)); } //这里模拟指定本次的库存业务单号,实际接口需要外部传入 request.setInventorCode(SnowflakeIdWorker.getCode()); //初始化库存信息 inventoryBucketService.inventorBucket(request); } ...}
//库存分桶业务实现类@Servicepublic class InventoryBucketServiceImpl implements InventoryBucketService { @Resource private TairLock tairLock; ...
//商品库存入桶分配 @Override @Transactional(rollbackFor = Exception.class) public void inventorBucket(InventorBucketRequest request) { //1.验证入参必填项 checkInventorParams(request); //锁key = 卖家ID + SKU的ID String key = buildBucketLockKey(request.getSellerId(), request.getSkuId()); String value = SnowflakeIdWorker.getCode(); //注意这里需要锁定中心桶库存 boolean lock = tairLock.tryLock(key, value); //分配库存时,这个卖家的sku是不允许其他相关操作的 if (lock) { try { //2.插入库存入库的记录信息 //由于申请的库存业务编号是一个唯一key,所以可以避免重复请求 //也就是会校验库存单号是否已经存在了,保证⼀次库存变更⾏为只能执⾏⼀次 inventoryRepository.saveInventoryAllotDetail(request); //3.将库存数据写入缓存 inventoryBucketCache(request); } catch (Exception e) { e.printStackTrace(); } finally { tairLock.unlock(key, value); } } else { throw new BaseBizException("请求繁忙,稍后重试!"); } }
//将库存数据写入缓存 private void inventoryBucketCache(InventorBucketRequest request) { //获取中心桶库存的key String key = buildSellerInventoryKey(request.getSellerId(), request.getSkuId()); //1.先验证是否已缓存分桶元数据信息 BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId()); try { //缓存不存在,则进行初始化操作 if (Objects.isNull(bucketLocalCache)) { //2.获取库存分桶的配置模板 InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId()); //初始化分桶库存 initInventoryBucket(request, inventoryBucketConfig); } else { //3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存 Integer residueNum = tairCache.incr(key, request.getInventoryNum()); if (residueNum < 0) { throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } //4.尝试将库存分配到新的分桶上(注意,先将中心桶的库存加上去) InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request); //5.构建新的分桶元数据信息,并写入 writeBucketCache(onlineRequest, residueNum); } } catch (Exception e) { log.error("分桶库存初始化出现失败", e); throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } finally { inventoryBucketCache.threadLocalRemove(); } } ...}
复制代码


(2)库存分桶元数据计算以及分桶编号创建


//库存分桶业务实现类@Servicepublic class InventoryBucketServiceImpl implements InventoryBucketService {    @Resource    private TairLock tairLock;
@Resource private InventoryRepository inventoryRepository;
@Resource private InventoryBucketCache inventoryBucketCache;
@Resource private TairCache tairCache; ...
//初始化分桶库存 private void initInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) { //计算出分桶的元数据信息 BucketLocalCache bucketLocalCache = calcInventoryBucket(request, inventoryBucketConfig); //库存分桶的元数据信息入库 inventoryRepository.saveBucketDetail(null, bucketLocalCache, BucketOperateEnum.INIT.getCode(), bucketLocalCache.getAvailableList(), null); //写入中心桶的剩余库存信息 log.info("中心桶剩余库存:{}", bucketLocalCache.getResidueNum()); //获取中心桶剩余库存的key String key = buildSellerInventoryKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId()); //设置中心桶剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量 boolean setFlag = tairCache.set(key, bucketLocalCache.getResidueNum().toString(), 0); if (!setFlag) { //中心桶剩余库存写入失败,回滚事务 throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } }
//计算出本次库存入库的具体分桶信息 private BucketLocalCache calcInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) { //分桶配置模版中默认的分桶数量 Integer bucketNum = inventoryBucketConfig.getBucketNum(); //获取本次需要入库的库存数量 Integer inventoryNum = request.getInventoryNum(); //配置模版中所有分桶的最大库存容量 Integer maxBucketNum = bucketNum * inventoryBucketConfig.getMaxDepthNum(); //配置模版中所有分桶的最小库存容量 //如果需要放入分桶的库存数量低于这个值,那么只会分配给部分分桶,此时就需要重新计算分桶 Integer minBucketNum = bucketNum * inventoryBucketConfig.getMinDepthNum();
//本次最多可以放入分桶的库存数量 int countBucketNum = Math.min(inventoryNum, maxBucketNum); //当库存数量小于最小分桶深度*分桶数量,就需要减少分配的分桶数 //此时要分配的分桶数量 bucketNum = 本次库存入库的数量 / 每个分桶的最小库存容量 if (minBucketNum > countBucketNum) { bucketNum = countBucketNum / inventoryBucketConfig.getMinDepthNum(); //如果库存数量不足一个分桶的最小深度,但是大于0,则上线一个分桶 if (bucketNum == 0 && countBucketNum % inventoryBucketConfig.getMinDepthNum() > 0) { bucketNum++; } }
//获取每个分桶分配的库存数量 Integer bucketInventoryNum = countBucketNum / bucketNum; //剩余库存数量,可能为0或者大于0,补到最后一个分桶上 Integer residueNum = countBucketNum - bucketInventoryNum * bucketNum; //构建缓存数据模型时,以卖家ID + 商品skuId为唯一标识 String key = request.getSellerId() + request.getSkuId();
//构建具体的缓存数据模型 BucketLocalCache bucketLocalCache = buildBucketCache(key, bucketNum, bucketInventoryNum, residueNum, inventoryBucketConfig); //标记到具体的数据上 bucketLocalCache.setSellerId(request.getSellerId()); bucketLocalCache.setSkuId(request.getSkuId()); bucketLocalCache.setInventoryNum(inventoryNum); //中心桶剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量 bucketLocalCache.setResidueNum(inventoryNum - countBucketNum); bucketLocalCache.setInventoryBucketConfig(inventoryBucketConfig); bucketLocalCache.setOperationType(BucketStatusEnum.AVAILABLE_STATUS.getCode());
return bucketLocalCache; }
//构建缓存模型 //@param bucketNum 分桶数量 //@param inventoryNum 分桶分配的库存数量 //@param residueNum 剩余的未分配均匀的库存 //@param inventoryBucketConfig 分桶配置信息 private BucketLocalCache buildBucketCache(String key, Integer bucketNum, Integer inventoryNum, Integer residueNum, InventoryBucketConfigDO inventoryBucketConfig) { BucketLocalCache bucketLocalCache = new BucketLocalCache(); //先获取得到这个模板配置的对应可分槽位的均匀桶列表 List<String> bucketNoList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum()); List<BucketCacheBO> bucketCacheBOList = new ArrayList<>(bucketNum); List<BucketCacheBO> undistributedList = new ArrayList<>(bucketNum); //构建出多个分桶对象 for (int i = 0; i < bucketNum; i++) { //生成对应的分桶编号,方便定义到具体的分桶上 BucketCacheBO bucketCache = new BucketCacheBO(); String bucketNo = bucketNoList.get(i); bucketCache.setBucketNo(bucketNo); //最后一个分桶,分配剩余未除尽的库存+平均库存 if (i == bucketNum - 1) { bucketCache.setBucketNum(inventoryNum + residueNum); } else { bucketCache.setBucketNum(inventoryNum); } bucketCacheBOList.add(bucketCache); } //生成的分桶对象超过实际可分配的分桶对象,保留这批多余的分桶模型为不可用分桶,后续分桶上线可以选择使用 if (bucketNoList.size() > bucketNum) { for (int i = bucketNum; i < bucketNoList.size(); i++) { BucketCacheBO bucketCache = new BucketCacheBO(); String bucketNo = bucketNoList.get(i); bucketCache.setBucketNo(bucketNo); undistributedList.add(bucketCache); } } //生成缓存的明细key List<String> bucketDetailKeyList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum(), "%07d"); //设置分桶缓存明细的key bucketLocalCache.setBucketDetailKeyList(bucketDetailKeyList); //设置可用的分桶缓存列表 bucketLocalCache.setAvailableList(bucketCacheBOList); //设置不可用或者已下线的分桶缓存列表 bucketLocalCache.setUndistributedList(undistributedList); return bucketLocalCache; } ...}
public class InventorBucketUtil { private static final int MAX_SIZE = 100000;
//生成对应的槽位key,默认的 //@param key 卖家Id+商品skuId //@param bucketNum 分桶配置数量 //@return 预先保留的槽位集合 public static List<String> createBucketNoList(String key, Integer bucketNum) { return createBucketNoList(key, bucketNum, "%06d"); }
//生成对应的槽位key,明细使用,多使用一位区分 //@param key 卖家Id+商品skuId //@param bucketNum 分桶配置数量 //@return 预先保留的槽位集合 public static List<String> createBucketNoList(String key, Integer bucketNum, String format) { Map<Long, String> cacheKey = new HashMap<>(bucketNum); //bucketNoList用来存放每个桶对应的hashKey List<String> bucketNoList = new ArrayList<>(bucketNum); //分配桶的编号 for (int i = 1; i <= MAX_SIZE; i++) { String serialNum = String.format(format, i); //卖家ID + 商品SKU ID + 序号 String hashKey = key + serialNum; //一致性哈希算法murmur long hash = HashUtil.murMurHash(hashKey.getBytes()); //对分桶数量进行取模运算 long c = (hash %= bucketNum); //确保被选中的hashKey都能哈希到不同的分桶 if (cacheKey.containsKey(c)) { continue; } cacheKey.put(c, hashKey); bucketNoList.add(hashKey); if (cacheKey.size() >= bucketNum) { break; } } return bucketNoList; } ...}
复制代码


(3)库存分桶操作记录写 DB + 中心桶库存写缓存


//库存分桶业务实现类@Servicepublic class InventoryBucketServiceImpl implements InventoryBucketService {    @Resource    private TairLock tairLock;
@Resource private InventoryRepository inventoryRepository;
@Resource private InventoryBucketCache inventoryBucketCache;
@Resource private TairCache tairCache; ...
//初始化分桶库存 private void initInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) { //计算出分桶的元数据信息 BucketLocalCache bucketLocalCache = calcInventoryBucket(request, inventoryBucketConfig); //库存分桶的元数据信息入库 inventoryRepository.saveBucketDetail(null, bucketLocalCache, BucketOperateEnum.INIT.getCode(), bucketLocalCache.getAvailableList(), null); //写入中心桶的剩余库存信息 log.info("中心桶剩余库存:{}", bucketLocalCache.getResidueNum()); //获取中心桶剩余库存的key String key = buildSellerInventoryKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId()); //设置中心桶剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量 boolean setFlag = tairCache.set(key, bucketLocalCache.getResidueNum().toString(), 0); if (!setFlag) { //中心桶剩余库存写入失败,回滚事务 throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } } ...}
@Repositorypublic class InventoryRepository { ... //保存库存分桶的元数据信息 //@param operateId 操作id //@param bucketLocalCache 变更之后的元数据信息 //@param operateType 操作类型 //@param bucketCacheBOList 变动的分桶列表 //@param inventoryNum 变动的库存数量 public void saveBucketDetail(String operateId, BucketLocalCache bucketLocalCache, Integer operateType, List<BucketCacheBO> bucketCacheBOList, Integer inventoryNum) { //变动的分桶为空,则不必要保存 if (CollectionUtils.isEmpty(bucketCacheBOList)) { return; } if (!StringUtils.hasLength(operateId)) { operateId = SnowflakeIdWorker.getCode(); } InventoryBucketOperateDO inventoryBucketOperateDO = InventoryBucketOperateDO.builder() .operateId(operateId) .sellerId(bucketLocalCache.getSellerId()) .skuId(bucketLocalCache.getSkuId()) .bucket(JSON.toJSONString(bucketCacheBOList)) .operateType(operateType) .feature(JSON.toJSONString(bucketLocalCache)) .inventoryNum(inventoryNum) .build(); int count = inventoryBucketOperateMapper.insert(inventoryBucketOperateDO); if (count <= 0) { throw new BaseBizException(InventoryExceptionCode.INVENTORY_SQL); } } ...}
复制代码


(4)库存分桶元数据缓存更新使用自缓存一致性服务的 DB + 消息双写方案


InventoryBucketCache.setBucketLocalCache()方法设置库存分桶元数据到本地缓存时,就用到了缓存一致性框架。也就是使用了缓存一致性服务的 @CacheRefresh 注解,因为库存分桶元数据属于热点数据,对实时性要求比较高。在一台机器的本地缓存了库存分桶元数据后,其他机器也应缓存该数据。


@Component@Datapublic class InventoryBucketCache {    @Autowired    private Cache cache;
@Resource private TairCache tairCache;
private ThreadLocal<BucketLocalCache> bucketLocalCacheThreadLocal = new ThreadLocal<>();
//本地存储关于分桶信息 @CacheRefresh( cacheKey = "bucketKey", mqCacheKey = CacheConstant.INVENTORY_SKU_KEY, index = "1", messageType = CacheConstant.MESSAGE_TYPE_HOT, cacheType = CacheConstant.TAIR_CACHE_TYPE ) public void setBucketLocalCache(String bucketKey, BucketLocalCache bucketLocalCache) { String bucketLocalKey = TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId(); synchronized (bucketLocalKey.intern()) { log.info("保存本地缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache)); BucketLocalCache bucketCache = getTairBucketCache(bucketKey); log.info("远程缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketCache)); //如果本地缓存没有就直接写入 if (Objects.isNull(bucketCache)) { setBucketCache(bucketKey, bucketLocalCache); cache.put(bucketKey, bucketLocalCache); return; } //本地缓存的元数据覆盖,考虑到是并发执行的,这里需要上内存级别的锁,并进行diff处理 if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.AVAILABLE_STATUS.getCode())) { diffCacheOnline(bucketCache, bucketLocalCache); } else if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.OFFLINE_STATUS.getCode())) { diffCacheOffline(bucketCache, bucketLocalCache); } setBucketCache(bucketKey, bucketCache); cache.put(bucketKey, bucketCache); log.info("实际保存本地缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketCache)); } } ...}
//刷新缓存的自定义注解@Aspect@Componentpublic class CacheRefreshAspect { @Autowired private DataRefreshProducer producer;
@Autowired private CacheRefreshConverter cacheRefreshConverter;
@Autowired private CacheQueue cacheQueue;
//切入点,@CacheRefresh注解标注的 @Pointcut("@annotation(com.demo.eshop.cache.annotation.CacheRefresh)") public void pointcut() { }
//环绕通知,在方法执行前后 //@param point 切入点 //@return 结果 @Around("pointcut() && @annotation(cacheRefresh)") public Object around(ProceedingJoinPoint point, CacheRefresh cacheRefresh) throws Throwable { //签名信息 Signature signature = point.getSignature(); //强转为方法信息 MethodSignature methodSignature = (MethodSignature) signature; //参数名称 String[] parameterNames = methodSignature.getParameterNames(); //参数值 Object[] parameterValues = point.getArgs(); Object response; try { //先执行本地方法再执行异步的操作 response = point.proceed(); } catch (Throwable throwable) { log.error("执行方法: {}失败,异常信息: {}", methodSignature.getMethod().getName(), throwable); throw throwable; }
try { MessageCache messageCache = new MessageCache(); for (int i = 0; i < parameterValues.length; i++) { if (parameterNames[i].equals(cacheRefresh.cacheKey())) { messageCache.setCacheKey(String.valueOf(parameterValues[i])); } if (Integer.valueOf(cacheRefresh.index()) == i) { messageCache.setCacheJSON(JSONObject.toJSONString(parameterValues[i])); } } messageCache.setOperationType(Integer.valueOf(cacheRefresh.operationType())); //给定一个有序的版本号(默认统一的工作ID和数据中心ID) messageCache.setVersion(SnowflakeIdWorker.getVersion()); messageCache.setMessageType(Integer.valueOf(cacheRefresh.messageType())); messageCache.setCacheType(Integer.valueOf(cacheRefresh.cacheType())); messageCache.setCreateDate(new Date());
//将缓存数据写入读写队列 //缓存数据写入读写队列后,会定时每秒批量写入数据库(缓存数据写入DB只用于兜底,所以偶尔出现丢失并不影响) DataRefreshDetailDO dataRefreshDetailDO = cacheRefreshConverter.converter(messageCache); cacheQueue.submit(dataRefreshDetailDO);
//发送MQ消息去处理缓存数据,比如将缓存数据更新到缓存上 //一般来说,热点缓存会比普通缓存少很多,所以普通缓存的更新会比较多,热点缓存的更新会比较少 //此外,热点缓存的更新会对时效性要求比较高,通过消息去异步处理本来就已存在一定的延迟 //所以这里将普通缓存和热点缓存的更新进行分开处理,减少时效性高的消息的处理延迟 if (CacheConstant.MESSAGE_TYPE_HOT.equals(cacheRefresh.messageType())) { producer.sendMessage(RocketMqConstant.DATA_HOT_RADIO_TOPIC, JSONObject.toJSONString(messageCache), "热点缓存消息发送"); } else { producer.sendMessage(RocketMqConstant.DATA_MESSAGE_CACHE_TOPIC, JSONObject.toJSONString(messageCache), cacheRefresh.mqCacheKey(), "通用缓存消息发送"); } } catch (Exception e) { log.error("处理缓存同步:{}失败,异常信息:{}", methodSignature.getMethod().getName(), e); } return response; }}
复制代码


(5)库存分桶元数据作为热点数据更新到各机器节点的本地缓存


setBucketLocalCache()方法的 @CacheRefresh 注解描述缓存是热点类型。该方法被执行后,会被 AOP 切面切入,将需要缓存的数据发送到 MQ。接着 MQ 会对这种热点类型的消息进行广播处理,也就是每台机器都会执行 CacheRefreshListener 的方法。


@Configurationpublic class ConsumerBeanConfig {    ...    //刷新本地缓存    @Bean("cacheRefresTopic")    public DefaultMQPushConsumer cacheRefresTopic(CacheRefreshListener cacheRefreshListener) throws MQClientException {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.REFRESH_CACHE_GROUP);        //设置为广播模式        consumer.setMessageModel(MessageModel.BROADCASTING);        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());        consumer.subscribe(RocketMqConstant.DATA_HOT_RADIO_TOPIC, "*");        consumer.registerMessageListener(cacheRefreshListener);        consumer.start();        return consumer;    }    ...}
//刷新分布式节点的本地缓存@Componentpublic class CacheRefreshListener implements MessageListenerConcurrently { //本地缓存 @Autowired private Cache cache;
@Resource private InventoryBucketCache inventoryBucketCache;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String msg = new String(messageExt.getBody()); log.info("刷新本地缓存,消息内容:{},消费时间:{}", msg, DateFormatUtil.formatDateTime(new Date()));
MessageCache messageCache = JsonUtil.json2Object(msg, MessageCache.class); BucketLocalCache bucketLocalCache = JsonUtil.json2Object(messageCache.getCacheJSON(), BucketLocalCache.class); synchronized (messageCache.getCacheKey().intern()) { String bucketLocalKey = TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId(); //获取远程缓存的分桶元数据信息 BucketLocalCache bucketCache = inventoryBucketCache.getTairBucketCache(bucketLocalKey);
if (Objects.isNull(bucketCache)) { cache.put(messageCache.getCacheKey(), JsonUtil.json2Object(messageCache.getCacheJSON(), BucketLocalCache.class)); log.info("更新本地缓存,本次更新内容:{},更新时间:{}", messageCache.getCacheJSON(), DateFormatUtil.formatDateTime(new Date())); } else { //进行diff数据处理 if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.AVAILABLE_STATUS.getCode())) { inventoryBucketCache.diffCacheOnline(bucketCache, bucketLocalCache); } else if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.OFFLINE_STATUS.getCode())) { inventoryBucketCache.diffCacheOffline(bucketCache, bucketLocalCache); } cache.put(messageCache.getCacheKey(), bucketCache); log.info("更新本地缓存,本次更新内容:{},更新时间:{}", JsonUtil.object2Json(bucketCache), DateFormatUtil.formatDateTime(new Date())); } } } } catch (Exception e) { //本地缓存只有参数转换会出错,这种错误重试也没什么作用 log.error("consume error, 刷新本地缓存失败", e); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}
复制代码


3.库存分桶上线改造


当向库存分桶增加库存时,会调用分桶上线接⼝,也就是会调⽤InventoryBucketServiceImpl 的 writeBucketCache()⽅法,writeBucketCache()⽅法会实现具体的分桶上线任务。

 

InventoryBucketServiceImpl 的 bucketOnline()方法,适⽤场景是在商品库存⼊桶时,分桶上线中存在上线失败的分桶。此时运营⼈员就可以通过 bucketOnline()方法⼿动执⾏分桶的上线。从而防⽌上线的分桶过少,承担的并发压⼒⼤。

 

其中的缓存一致性方案是:首先将分桶元数据写入库存分桶操作表,然后再由定时任务扫描处理。

 

也是先把分桶元数据写⼊到数据库中,然后再操作中⼼桶缓存数据。数据库写⼊成功和缓存写⼊成功,则本次操作成功。数据库写⼊失败或者缓存写⼊失败,都会回滚数据库,本次操作失败。


//库存分桶业务实现类@Servicepublic class InventoryBucketServiceImpl implements InventoryBucketService {    @Resource    private TairLock tairLock;
@Resource private InventoryRepository inventoryRepository;
@Resource private InventoryBucketCache inventoryBucketCache;
@Resource private TairCache tairCache; ...
//商品库存入桶分配 @Override @Transactional(rollbackFor = Exception.class) public void inventorBucket(InventorBucketRequest request) { //1.验证入参必填项 checkInventorParams(request); //锁key = 卖家ID + SKU的ID String key = buildBucketLockKey(request.getSellerId(), request.getSkuId()); String value = SnowflakeIdWorker.getCode(); //注意这里需要锁定中心桶库存 boolean lock = tairLock.tryLock(key, value); //分配库存时,这个卖家的sku是不允许其他相关操作的 if (lock) { try { //2.插入库存入库的记录信息 //由于申请的库存业务编号是一个唯一key,所以可以避免重复请求 //也就是会校验库存单号是否已经存在了,保证⼀次库存变更⾏为只能执⾏⼀次 inventoryRepository.saveInventoryAllotDetail(request); //3.将库存数据写入缓存 inventoryBucketCache(request); } catch (Exception e) { e.printStackTrace(); } finally { tairLock.unlock(key, value); } } else { throw new BaseBizException("请求繁忙,稍后重试!"); } }
//将库存数据写入缓存 private void inventoryBucketCache(InventorBucketRequest request) { //获取中心桶库存的key String key = buildSellerInventoryKey(request.getSellerId(), request.getSkuId()); //1.先验证是否已缓存分桶元数据信息,先查本地缓存,再查远程缓存 BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId()); try { //缓存不存在,则进行初始化操作 if (Objects.isNull(bucketLocalCache)) { //2.获取库存分桶的配置模板 InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId()); //初始化分桶库存 initInventoryBucket(request, inventoryBucketConfig); } else { //3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存 Integer residueNum = tairCache.incr(key, request.getInventoryNum()); if (residueNum < 0) { throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } //4.尝试将库存分配到新的分桶上(注意,先将中心桶的库存加上去) InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request); //5.构建新的分桶元数据信息,并写入 writeBucketCache(onlineRequest, residueNum); } } catch (Exception e) { log.error("分桶库存初始化出现失败", e); throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } finally { inventoryBucketCache.threadLocalRemove(); } }
//分桶上线接口 @Override @Transactional(rollbackFor = Exception.class) public void bucketOnline(InventorOnlineRequest request) { //1.验证入参必填 checkInventorOnlineParams(request); //2.注意这里需要锁定中心桶库存 String key = buildBucketLockKey(request.getSellerId(), request.getSkuId()); String value = SnowflakeIdWorker.getCode(); boolean lock = tairLock.tryLock(key, value); if (lock) { try { //3.获取中心桶的库存,并校验是否可上线分桶 Integer residueNum = checkBucketOnlineNum(key); //4.构建新的分桶元数据信息,并写入 writeBucketCache(request, residueNum); } catch (Exception e) { e.printStackTrace(); } finally { tairLock.unlock(key, value); } } else { throw new BaseBizException("请求繁忙,稍后重试!"); } } ...
//构建新的分桶元数据信息 //@param request 分桶上线对象 //@param residueNum 中心桶剩余库存 private void writeBucketCache(InventorOnlineRequest request, Integer residueNum) { String key = request.getSellerId() + request.getSkuId(); //获取到本地的缓存列表 BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key); try { if (!Objects.isNull(bucketLocalCache)) { //获取当前可上线的分桶列表信息 List<BucketCacheBO> bucketCacheBOList = buildBucketList(request.getBucketNoList(), bucketLocalCache.getAvailableList(), bucketLocalCache.getUndistributedList(), bucketLocalCache.getInventoryBucketConfig(), residueNum); //当前可上线的分桶为空,直接返回 if (CollectionUtils.isEmpty(bucketCacheBOList)) { return; }
//中心桶被扣减掉的库存(上线的分桶库存总和) Integer descInventoryNum = bucketCacheBOList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();
//构建返回新的元数据模型返回 buildBucketLocalCache(bucketLocalCache, bucketCacheBOList, residueNum - descInventoryNum);
//分桶信息入库 inventoryRepository.saveBucketDetail(null, bucketLocalCache, BucketOperateEnum.ONLINE.getCode(), bucketCacheBOList, descInventoryNum);
//扣减中心桶剩余库存,如果扣减失败了,直接抛异常 Integer decr = tairCache.decr(buildSellerInventoryKey(request.getSellerId(), request.getSkuId()), descInventoryNum); if (decr < 0) { throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } } } catch (Exception e) { log.error("分桶构建初始化失败", e); throw new BaseBizException(InventoryExceptionCode.INVENTORY_CACHE); } finally { inventoryBucketCache.threadLocalRemove(); } }
//获取可上线的分桶列表信息以及具体上线库存 //@param bucketNoList 上线分桶编号列表 //@param availableList 上线正在使用的分桶编号列表 //@param undistributedList 下线或者未使用的分桶编号列表 //@param inventoryBucketConfigDO 当前分桶的配置模板信息 //@param residueNum 中心桶的剩余可分配库存 //@return 可上线的分桶列表以及具体分桶库存 private List<BucketCacheBO> buildBucketList(List<String> bucketNoList, List<BucketCacheBO> availableList, List<BucketCacheBO> undistributedList, InventoryBucketConfigDO inventoryBucketConfigDO, Integer residueNum) { //1.如果入参选择了上线的分桶编号列表,则从缓存中配置的未使用分桶列表进行比对处理 List<String> bucketCacheList = null; if (!CollectionUtils.isEmpty(bucketNoList)) { Map<String, BucketCacheBO> bucketCacheMap = undistributedList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity())); //过滤返回可用的分桶编号 bucketCacheList = bucketNoList.stream().filter(bucketNo -> bucketCacheMap.containsKey(bucketNo)).collect(Collectors.toList()); } else { //直接返回下线的不可用分桶列表 bucketCacheList = undistributedList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList()); } //可上线的分桶列表为空 if (CollectionUtils.isEmpty(bucketCacheList)) { return Lists.newArrayList(); } //2.根据中心桶的可分配库存,处理返回具体上线的分桶配置信息 return calcOnlineBucket(availableList, bucketCacheList, residueNum, inventoryBucketConfigDO); }
//构建新的元数据模型 //@param bucketLocalCache 本地分桶元数据信息 //@param bucketCacheBOList 上线的分桶列表 //@param residueNum 中心桶剩余库存 private void buildBucketLocalCache(BucketLocalCache bucketLocalCache, List<BucketCacheBO> bucketCacheBOList, Integer residueNum) { //填充中心桶剩余库存 bucketLocalCache.setResidueNum(residueNum); //添加新上线的分桶列表 bucketLocalCache.getAvailableList().addAll(bucketCacheBOList); Map<String, BucketCacheBO> bucketCacheMap = bucketCacheBOList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));
List<BucketCacheBO> undistributedList = bucketLocalCache.getUndistributedList().stream().filter(bucketCacheBO -> //在上线的分桶列表,需要移除掉 !bucketCacheMap.containsKey(bucketCacheBO.getBucketNo())).collect(Collectors.toList()); //从不可用的分桶列表重移除 bucketLocalCache.setUndistributedList(undistributedList); bucketLocalCache.setOperationType(BucketStatusEnum.AVAILABLE_STATUS.getCode()); } ...}
@Repositorypublic class InventoryRepository { ... //保存库存分桶的元数据信息 //@param operateId 操作id //@param bucketLocalCache 变更之后的元数据信息 //@param operateType 操作类型 //@param bucketCacheBOList 变动的分桶列表 //@param inventoryNum 变动的库存数量 public void saveBucketDetail(String operateId, BucketLocalCache bucketLocalCache, Integer operateType, List<BucketCacheBO> bucketCacheBOList, Integer inventoryNum) { //变动的分桶为空,则不必要保存 if (CollectionUtils.isEmpty(bucketCacheBOList)) { return; } if (!StringUtils.hasLength(operateId)) { operateId = SnowflakeIdWorker.getCode(); } InventoryBucketOperateDO inventoryBucketOperateDO = InventoryBucketOperateDO.builder() .operateId(operateId) .sellerId(bucketLocalCache.getSellerId()) .skuId(bucketLocalCache.getSkuId()) .bucket(JSON.toJSONString(bucketCacheBOList)) .operateType(operateType) .feature(JSON.toJSONString(bucketLocalCache)) .inventoryNum(inventoryNum) .build(); int count = inventoryBucketOperateMapper.insert(inventoryBucketOperateDO); if (count <= 0) { throw new BaseBizException(InventoryExceptionCode.INVENTORY_SQL); } } ...}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18962609

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
商品中心—库存分桶的一致性改造文档_C#_电子尖叫食人鱼_InfoQ写作社区