写点什么

商品中心—库存分桶高并发的优化文档(二)

  • 2025-07-07
    福建
  • 本文字数:20409 字

    阅读完需:约 67 分钟

5.分桶⽆法扩容时快速触发下线


(1)优化原因


分桶扩容时,为避免并发操作中⼼桶库存,锁的维度是卖家 ID + 商品 ID。但是分桶默认配置是 32 个,⼤量并发请求下,可能会导致瞬间出现库存的多个分桶触发分桶下线阈值。

 

高并发下的每个请求都在锁内部排队,等待验证是否需要触发分桶下线。而且在⾼并发下,很多库存扣减请求都会路由到某个分桶。这样该分桶还未下线时,就可能⾯临分桶库存很快被扣光的情况。

 

也就是⼤量库存扣减请求,路由到库存很快被扣光的分桶,最终导致库存扣减失败,但是其他分桶实际上还是有库存的。

 

优化前的代码:

@Servicepublic class InventoryBucketServiceImpl implements InventoryBucketService {    ...    //分桶扩容接口    @Override    public void bucketCapacity(BucketCapacity bucketCapacity) {        //先锁住中心桶库存,避免此时库存发生变化        String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + bucketCapacity.getSellerId() + bucketCapacity.getSkuId();        String value = SnowflakeIdWorker.getCode();        //1.校验是否已经无需扩容了,如果是则快速结束        long startTime = System.currentTimeMillis();        BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity);        if (!bucketCapacityContext.getIsCapacity()) {            return;        }        //获取分布式锁来进行扩容处理        boolean lock = tairLock.tryLock(key, value);        if (lock) {            try {                //再次校验是否需要扩容,此处不允许并发                bucketCapacityContext = checkBucketCapacity(bucketCapacity);                if (bucketCapacityContext.getIsCapacity()) {                    //2.获取中心桶库存的库存                    Integer residueNum = getCenterStock(bucketCapacity);                    //3.可以扩容,计算出可回源的库存进行处理                    if (residueNum > 0) {                        backSourceInventory(residueNum, bucketCapacityContext);                        log.info(bucketCapacity.getBucketNo() + "处理扩容消耗时间{}", System.currentTimeMillis() - startTime);                    } else {                        //4.中心桶无库存,检查是否触发下线                        checkBucketOffline(bucketCapacity);                    }                }            } catch (Exception e) {                e.printStackTrace();            } finally {                tairLock.unlock(key, value);            }        } else {            throw new BaseBizException("请求繁忙,稍后重试!");        }    }
//校验本次请求是否还需要执行扩容处理 private BucketCapacityContext checkBucketCapacity(BucketCapacity bucketCapacity) { String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId(); //1.获取远程的分桶缓存 Integer residueNum = getBucketInventoryNum(bucketCapacity.getBucketNo()); //2.获取缓存元数据信息 BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key); //3.校验是否还需要执行扩容 List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList(); InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig(); for (BucketCacheBO bucketCacheBO : availableList) { //具体使用的是哪个分桶进行扣减库存 if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) { //触发回源比例的百分比 Integer backSourceProportion = inventoryBucketConfig.getBackSourceProportion(); //当前分桶的分配总库存 Integer bucketNum = bucketCacheBO.getBucketNum(); int backSourceNum = bucketNum * backSourceProportion / 100; //回源比例的库存 大于剩余的库存,触发异步扩容 return new BucketCapacityContext(residueNum, backSourceNum > residueNum, bucketCapacity); } } //如果不在可用列表里面,则意味已下线,快速结束掉 return new BucketCapacityContext(residueNum, false, bucketCapacity); }
//校验当前分桶是否触发下线的阈值 private void checkBucketOffline(BucketCapacity bucketCapacity) { //1.获取当前分桶的配置信息 String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId(); BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key); //2.检测分桶的库存是否触发下线阈值,先获取当前分桶的具体库存以及下线配置阈值 Integer thresholdValue = bucketLocalCache.getInventoryBucketConfig().getThresholdValue(); Integer inventoryNum = getBucketInventoryNum(bucketCapacity.getBucketNo()); //3.如触发下线,发送消息调用分桶下线 if (thresholdValue > inventoryNum) { log.info("触发下线{},阈值{},当前库存值{}", thresholdValue > inventoryNum, thresholdValue, inventoryNum); sendAsynchronous(bucketCapacity); } } ...}
复制代码


(2)解决⽅案


一.在分布式锁外就开始验证该扩容请求是否会触发分桶下线操作

当出现⼤量的扣减库存请求,那么每个分桶都会多次触发需要进行扩容。是否触发分桶下线的检查可以在锁外部提前进行,从而提升分桶下线的触发效率。

@Servicepublic class InventoryBucketServiceImpl implements InventoryBucketService {    ...    //分桶扩容接口    @Override    public void bucketCapacity(BucketCapacity bucketCapacity) {        long startTime = System.currentTimeMillis();        //获取中心桶的剩余库存        Integer residueNum = getCenterStock(bucketCapacity);        if (residueNum <= 0) {            //中心桶无剩余库存,检查是否触发下线            checkBucketOffline(bucketCapacity);            return;        }
//判断本次扩容的分桶,是否有多次扩容失败的情况 String failNum = tairCache.get(TairInventoryConstant.BUCKET_CAPACITY_FAIL + bucketCapacity.getBucketNo()); if (StringUtils.isNotBlank(failNum) && Integer.parseInt(failNum) >= 2) { //当前分桶扩容失败次数超过两次了,直接放弃这次扩容 //因为失败太多并且还继续去尝试,会持续的扣减中心桶库存,可能会导致其他可以正常扩容的分桶,没有中心桶库存可以扣减 return; } //1.校验是否已经无需扩容了,如果是则快速结束 BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity); if (!bucketCapacityContext.getIsCapacity()) { return; } //先锁住中心桶库存,避免此时库存发生变化 String key = buildBucketLockKey(bucketCapacity.getSellerId(), bucketCapacity.getSkuId()); String value = SnowflakeIdWorker.getCode();
//获取分布式锁来进行扩容处理 boolean lock = tairLock.tryLock(key, value); if (lock) { try { //再次校验是否需要扩容,此处不允许并发 bucketCapacityContext = checkBucketCapacity(bucketCapacity); if (bucketCapacityContext.getIsCapacity()) { //2.获取中心桶库存的库存 residueNum = getCenterStock(bucketCapacity); //3.可以扩容,计算出可回源的库存进行处理 if (residueNum > 0) { backSourceInventory(residueNum, bucketCapacityContext); log.info(bucketCapacity.getBucketNo() + "处理扩容消耗时间{}", System.currentTimeMillis() - startTime); } else { //4.中心桶无库存,检查是否触发下线 checkBucketOffline(bucketCapacity); } } } catch (Exception e) { e.printStackTrace(); } finally { tairLock.unlock(key, value); } } else { throw new BaseBizException("请求繁忙,稍后重试!"); } } ...}
复制代码


二.动态下线阈值计算

模板默认中会配置⼀个分桶的下线阈值。同一个分桶,面对不同流量的访问,会有不同的库存扣减速度。所以,一个分桶应该根据具体的流量来选择对应的下线阈值。从而减少由于大量并发扣减请求而导致的库存扣减失败情况。

 

为了计算出动态的下线阈值:可以启动一个定时任务调度,每隔 5 秒检查⼀下库存的扣减速度,根据可⽤分桶的数量得出⽬前单个分桶扣减库存的速度。分桶的下线阈值会根据实际流量⽽变化,分桶的最⼩下线阈值不能低于模板配置的下线阈值,分桶的最⾼下线阈值不能超过当前模板配置的下线阈值⽐例。例如分桶深度 30000,下线阈值最⼤⽐例 10%,最⼤下线阈值则为 3000。

 

注意:InventoryBucketCache 会缓存所有在售商品的库存分桶元数据信息。如果在售商品很多,可能需要考虑是否会 OOM。


@Component@Datapublic class InventoryBucketCache {    //本地缓存    @Autowired    private Cache cache;    ...
//获取本地缓存的所有分桶元数据 public List<BucketLocalCache> getBucketLocalCacheList() { ConcurrentMap concurrentMap = cache.asMap(); List<BucketLocalCache> bucketLocalCacheList = new ArrayList<BucketLocalCache>(concurrentMap.size()); for (Object key : concurrentMap.keySet()) { Object o = concurrentMap.get(key); if (!Objects.isNull(o)) { if (o instanceof BucketLocalCache) { BucketLocalCache bucketLocalCache = (BucketLocalCache) o; bucketLocalCacheList.add(bucketLocalCache); } } } return bucketLocalCacheList; } ...}
@Servicepublic class InventoryBucketServiceImpl implements InventoryBucketService { @Resource private CalculateThresholdQueue calculateThresholdQueue; ...
//计算分桶某个时间区间的库存具体消费速度,生成预估的下线阈值 @Override public void calculateOfflineThreshold() { //先获取到目前缓存的所有分桶元数据 List<BucketLocalCache> bucketLocalCacheList = inventoryBucketCache.getBucketLocalCacheList(); if (!CollectionUtils.isEmpty(bucketLocalCacheList)) { //将分桶元数据加入到队列中,计算出每个分桶的预估下线阈值 for (BucketLocalCache bucketLocalCache : bucketLocalCacheList) { calculateThresholdQueue.offerByRoundRobin(bucketLocalCache); } } } ...}
//用于计算分桶下线阈值的计算队列@Componentpublic class CalculateThresholdQueue { //计算队列列表 private final List<BlockingQueue> calculateQueue = new ArrayList<>();
@Resource private TairCache tairCache;
//处理下一个分桶元数据的计算队列在队列列表中的下标 private AtomicInteger index = new AtomicInteger();
//配置的计算队列数量 @Value("${calculate.threshold-num:32}") private Integer thresholdQueueNum; @PostConstruct public void init() { ExecutorService executors = Executors.newFixedThreadPool(thresholdQueueNum); for (int i = 0; i < thresholdQueueNum; i++) { //设置一个队列最大容纳数量 BlockingQueue blockingQueue = new ArrayBlockingQueue(150000); calculateQueue.add(blockingQueue); executors.execute(new CalculateThresholdRunner(blockingQueue, tairCache)); } }
//将分桶元数据提交到对应的计算队列 public boolean offerByRoundRobin(Object object) { index.compareAndSet(thresholdQueueNum * 10000, 0); boolean offer = calculateQueue.get(index.getAndIncrement() % thresholdQueueNum).offer(object); return offer; }}
复制代码


分桶下线阈值的动态计算逻辑如下:

//多线程消费计算队列里的分桶元数据,来计算分桶下线阈值public class CalculateThresholdRunner implements Runnable {    //处理的计算队列    private BlockingQueue blockingQueue;    private TairCache tairCache;
public CalculateThresholdRunner(BlockingQueue blockingQueue, TairCache tairCache) { this.blockingQueue = blockingQueue; this.tairCache = tairCache; }
//内部线程计算每个SKU的缓存信息 @Override public void run() { try { while (true) { BucketLocalCache bucketLocalCache = (BucketLocalCache) blockingQueue.take(); String currentDate = DateFormatUtil.formatDateTime(); //获取可用列表,不可用列表默认触发下线,库存暂不考虑计算入内,否则增加性能开销 List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList(); if (CollectionUtils.isEmpty(availableList)) { return; }
//获取可用列表的分桶缓存集合key List<String> cacheKeyList = availableList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList()); //批量获取的分桶库存数量 List<String> bucketNumList = tairCache.mget(cacheKeyList);
//构建中心桶的剩余库存key String sellerInventoryKey = buildSellerInventoryKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId()); String inventoryNum = tairCache.get(sellerInventoryKey);
//计算SKU所有分桶剩余的库存 Integer residueNum = 0; if (!StringUtils.isEmpty(inventoryNum)) { residueNum = residueNum + Integer.parseInt(inventoryNum); } //合并累计相加,得到当前商品SKU的总库存 for (String bucketNum : bucketNumList) { if (!StringUtils.isEmpty(bucketNum)) { residueNum = residueNum + Integer.parseInt(bucketNum); } }
//获取之前缓存的商品SKU库存下线阈值信息,计算差集得到实际库存消费速度 String key = buildSellerInventoryResidueKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId()); //上次计算存储的库存实际值以及查询时间,用于计算均匀的每秒库存消费 String oldCalculateInventory = tairCache.get(key);
//默认为当前配置的分桶下线阈值 Integer thresholdValue = bucketLocalCache.getInventoryBucketConfig().getThresholdValue(); if (!StringUtils.isEmpty(oldCalculateInventory)) { CalculateInventory calculateInventory = JsonUtil.json2Object(oldCalculateInventory, CalculateInventory.class); //计算上一次的库存 减去此次的库存,得到这个时间点消耗了多少库存 int diffNum = calculateInventory.getOldResidueNum() - residueNum; //上一次计算的创建时间 String createDate = calculateInventory.getCreateDate(); //当前时间减去上一次的计算的创建时间,得到间隔时间差,再通过差集的库存除以秒,得到每秒平均的消耗库存 long consumeStock = diffNum / (Long.parseLong(currentDate) - Long.parseLong(createDate)); if (consumeStock > 0) { //每秒消耗的库存 / 当前存活的分桶数量,得到目前分桶的下线阈值 Long newThresholdValue = consumeStock / availableList.size();
//这里计算的下线阈值,最小值不能低于配置的最小阈值 if (newThresholdValue > thresholdValue) { thresholdValue = newThresholdValue.intValue(); //阈值的最大值,不能超过库存深度的10%比例 int maxDepthNum = bucketLocalCache.getInventoryBucketConfig().getMaxDepthNum() / bucketLocalCache.getInventoryBucketConfig().getThresholdProportion(); if (thresholdValue > maxDepthNum) { thresholdValue = maxDepthNum; } } log.info("预估的分桶下线阈值{},实际使用的分桶下线阈值{}", newThresholdValue, thresholdValue); } }
//存储该商品SKU的预估下线阈值 String thresholdKey = buildSellerInventoryThresholdKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId()); tairCache.set(thresholdKey, String.valueOf(thresholdValue), 0);
//存储该商品SKU这次计算的库存和时间 CalculateInventory calculateInventory = new CalculateInventory(); calculateInventory.setOldResidueNum(residueNum); calculateInventory.setCreateDate(DateFormatUtil.formatDateTime()); tairCache.set(key, JSONObject.toJSONString(calculateInventory), 0); } } catch (Exception e) { log.error("处理库存分桶下线阈值异常", e); } }
//中心桶库存的key private String buildSellerInventoryKey(String sellerId, String skuId) { return TairInventoryConstant.SELLER_INVENTORY_PREFIX + sellerId + skuId; }
//计算商品SKU库存下线阈值的相关信息的key private String buildSellerInventoryResidueKey(String sellerId, String skuId) { return TairInventoryConstant.SELLER_INVENTORY_RESIDUE_PREFIX + sellerId + skuId; }
//商品SKU库存下线阈值的key private String buildSellerInventoryThresholdKey(String sellerId, String skuId) { return TairInventoryConstant.SELLER_INVENTORY_THRESHOLD_PREFIX + sellerId + skuId; }}
复制代码


6.多个分桶同时触发下线的并发场景优化


(1)优化原因


首先需要明确几点:

 

一.库存分桶下线的时机

并非没有库存才下线,而是触发下线阈值就下线。

 

二.库存分桶下线后

其剩余的库存会返还到中心桶剩余库存。

 

三.库存分桶扩容时

会从中心桶获取剩余库存进行扩容。

 

四.进行库存分桶扣减时

会先处理扣减、然后检查扩容、最后检查下线。

 

五.在对分桶进行下线处理时

会通过发送消息进行异步下线。

 

当某个商品 SKU 出现大量的并发扣减库存请求时,可能其中一些扣减请求路由到分桶 1234 进行库存扣减处理,另外一些扣减请求路由到分桶 4567 进行库存扣减处理。路由到分桶 1234 的扣减请求,没法扩容,同时触发了分桶下线。在进行分桶 1234 下线时,这些分桶其实还有一些库存,可供扣减。也就是说,如果分桶 1234 下线成功,那么紧接其后路由到分桶 4567 的扣减请求,则可以触发扩容。

 

但是分桶扩容后上线和分桶下线,都会竞争同一把分布式锁。即用分布式锁来保证分桶的上线和下线不会覆盖分桶元数据的变更,从而导致即便分桶下线成功,下线分桶的剩余库存返还给中心桶。分桶上线也由于等待锁不能及时将返还的库存及时添加到还没下线的分桶。

 

所以在并发分桶下线 + 分桶扩容的场景下,虽然竞争到锁的⼏个分桶成功地快速下线了,但可能会导致很多请求访问到没法及时扩容的、快没库存的分桶上,从而导致很多扣减请求出现库存不⾜、扣减失败的问题,而实际上中心桶还是有剩余库存的。所以对于⾯临需要下线的分桶,需要最⼩粒度的锁,来实现快速下线。

 

优化前的代码:

@Servicepublic class InventoryBucketServiceImpl implements InventoryBucketService {    ...    //分桶下线接口    @Override    public void bucketOffline(InventorOfflineRequest request) {        long start = System.currentTimeMillis();        //1.验证入参必填        checkInventorOfflineParams(request);        //过滤只有一个分桶的无效请求        Boolean isOffline = checkBucketOffline(request);        if (isOffline) {            //2.注意这里需要锁定中心桶库存            String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + request.getSellerId() + request.getSkuId();            String value = SnowflakeIdWorker.getCode();            boolean lock = tairLock.tryLock(key, value);            if (lock) {                try {                    //3.先将准备下线的分桶库存从本地和远程列表中移除至不可用列表,避免新的请求进来                    updateBucket(request);                } catch (Exception e) {                    e.printStackTrace();                } finally {                    tairLock.unlock(key, value);                }            } else {                throw new BaseBizException("请求繁忙,稍后重试!");            }            log.info("分桶下线处理时间,request:{}, lock:{}, time:{}", JSON.toJSONString(request), lock, System.currentTimeMillis() - start);        }    }    ...}
复制代码


(2)解决⽅案


一.减少分桶下线的锁粒度

锁不再是卖家 + 商品的维度,⽽是卖家 + 商品 + 下线分桶的维度,增加不同分桶下线的并发执⾏速度。

@Servicepublic class InventoryBucketServiceImpl implements InventoryBucketService {    ...    //分桶下线接口    @Override    public void bucketOffline(InventorOfflineRequest request) {        //1.验证入参必填        checkInventorOfflineParams(request);        //过滤只有一个分桶的无效请求        Boolean isOffline = checkBucketOffline(request);        if (isOffline) {            long start = System.currentTimeMillis();            //2.注意这里需要锁定 下线分桶的变更,这个接口默认一次只有一个分桶            String key = buildBucketOfflineLockKey(request.getSellerId(), request.getSkuId(), request.getBucketNoList().get(0));            String value = SnowflakeIdWorker.getCode();            boolean lock = tairLock.tryLock(key, value);            if (lock) {                try {                    //3.先将准备下线的分桶库存从本地和远程列表中移除至不可用列表,避免新的请求进来                    updateBucketCache(request);                    log.info("分桶下线处理时间,下线分桶:{}, 当前时间:{}, 耗时:{}", JSON.toJSONString(request.getBucketNoList()), DateFormatUtil.formatDateTime(new Date()), System.currentTimeMillis() - start);                } catch (Exception e) {                    e.printStackTrace();                } finally {                    tairLock.unlock(key, value);                }            } else {                throw new BaseBizException("请求繁忙,稍后重试!");            }        }    }    ...}
复制代码


二.分桶下线并发更新元数据要避免脏数据覆盖

因为优化后采取的是单个分桶去验证分桶下线处理,覆盖的是整个商品 SKU 的分桶元数据信息,这⾥的顺序已⽆法保证了,不处理直接简单覆盖可能会造成已下线的分桶重复错误上线。

 

所以在更新各个分桶的元数据,包括⼴播消息消费更新本地元数据时,不能简单进⾏元数据覆盖,⽽是要 diff 下线的分桶,覆盖对应的分桶数据。而且因为覆盖本地元数据,涉及到分布式本地缓存,在更新自己的同时,还需要处理其它机器接受处理的分桶元数据更新。所以还需要对商品更新元数据的操作进⾏本地内存级别锁的处理。


@Component@Datapublic class InventoryBucketCache {    @Autowired    private Cache cache;
@Resource private TairCache tairCache; ...
//本地存储关于分桶信息 @CacheRefresh(cacheKey = "bucketKey", mqCacheKey = CacheConstant.INVENTORY_SKU_KEY, index = "1", messageType = CacheConstant.MESSAGE_TYPE_HOT, cacheType = CacheConstant.TAIR_CACHE_TYPE) public void setBucketLocalCache(String bucketKey, BucketLocalCache bucketLocalCache) { String bucketLocalKey = TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId(); synchronized (bucketLocalKey.intern()) { log.info("保存本地缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache)); BucketLocalCache bucketCache = getTairBucketCache(bucketKey); log.info("远程缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketCache)); //如果本地缓存没有就直接写入 if (Objects.isNull(bucketCache)) { setBucketCache(bucketKey, bucketLocalCache); cache.put(bucketKey, bucketLocalCache); return; } //本地缓存的元数据覆盖,考虑到是并发执行的,这里需要上内存级别的锁,并进行diff处理 if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.AVAILABLE_STATUS.getCode())) { diffCacheOnline(bucketCache, bucketLocalCache); } else if (bucketLocalCache.getOperationType().equals(BucketStatusEnum.OFFLINE_STATUS.getCode())) { diffCacheOffline(bucketCache, bucketLocalCache); } setBucketCache(bucketKey, bucketCache); cache.put(bucketKey, bucketCache); log.info("实际保存本地缓存元数据 key:{}, value:{}", bucketKey, JSON.toJSONString(bucketCache)); } }
//处理原有元数据和更新元数据的下线分桶的处理 //@param bucketCache 原始本地缓存元数据 //@param bucketLocalCache 新的元数据 public void diffCacheOffline(BucketLocalCache bucketCache, BucketLocalCache bucketLocalCache) { if (Objects.isNull(bucketCache) || Objects.isNull(bucketLocalCache)) { return; } //原始的已下线分桶元数据列表 List<BucketCacheBO> oldUndistributedList = bucketCache.getUndistributedList(); //新的已下线分桶元数据列表 List<BucketCacheBO> newUndistributedList = bucketLocalCache.getUndistributedList(); List<BucketCacheBO> diffUndistributedList = null; //转换一个集合为MAP,用于计算差集的下线分桶,主要是看新的下线分桶里面有没有比旧的多 if (CollectionUtils.isEmpty(oldUndistributedList)) { diffUndistributedList = newUndistributedList; } else { Map<String, BucketCacheBO> bucketCacheBOMap = oldUndistributedList.stream() .collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity())); //处理新的下线分桶里面 是否更新了一批新的下线分桶,并和原有的元数据下线分桶比较,看哪些是新增的 if (!CollectionUtils.isEmpty(newUndistributedList)) { diffUndistributedList = new ArrayList<>(); for (BucketCacheBO bucketCacheBO : newUndistributedList) { if (!bucketCacheBOMap.containsKey(bucketCacheBO.getBucketNo()) && !StringUtils.isEmpty(bucketCacheBO.getBucketNo())) { diffUndistributedList.add(bucketCacheBO); } } } } Map<String, BucketCacheBO> availableMap = bucketCache.getAvailableList().stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));
//产生变化的 元数据下线分桶 if (!CollectionUtils.isEmpty(diffUndistributedList)) { //处理下线的分桶 for (BucketCacheBO bucketCacheBO : diffUndistributedList) { //最少保留一个分桶 if (availableMap.size() > 1) { //获取分桶下线触发的时间 log.info("本地元数据发生变更,分桶编号[{}]下线,下线时间{}", bucketCacheBO.getBucketNo(), DateFormatUtil.formatDateTime()); if (availableMap.containsKey(bucketCacheBO.getBucketNo()) && !StringUtils.isEmpty(bucketCacheBO.getBucketNo())) { availableMap.remove(bucketCacheBO.getBucketNo()); //增加下线的分桶数据 bucketCache.getUndistributedList().add(bucketCacheBO); } } } //从上线分桶中移除 List<BucketCacheBO> availableList = availableMap.values().stream().collect(Collectors.toList()); bucketCache.setAvailableList(availableList); }
bucketCache.setVersion(bucketLocalCache.getVersion()); } ...}
复制代码


7.⾼并发下分桶被全部下线如何修复


(1)优化原因


由于前面为了提⾼分桶下线的性能,会对可⽤分桶保留 1 个。但是因为⾼并发下还是有可能存在分桶全部下线,为了保证还有⼀个可⽤的分桶永远不下线,需要有⼀个兜底的分桶。

 

(2)解决⽅案


一.当所有的可⽤分桶都被扣减完时,可以使⽤中⼼桶库存来替代

构建扣减上下⽂对象时,如果发现没有⼀个可⽤的上线分桶。那么可选择中⼼桶来进⾏扣减尝试,避免此时⽆法扣减库存。

@Servicepublic class InventoryServiceImpl implements InventoryService {    ...    //填充扣减库存的分桶相关信息    private BucketContext buildDeductBucketList(InventoryRequest request) {        BucketContext context = new BucketContext();        //获取本地缓存的分桶元数据        BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());        //获取本地缓存的分桶列表        List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
if (!CollectionUtils.isEmpty(availableList)) { //获取本次扣减请求对应的扣减次数,用来定位具体使用的分桶库存 Integer incrementCount = getIncrementCount(request);
//通过运算得到本次访问所需要定位的分桶 int index = incrementCount % availableList.size();
//获取本次准备处理的分桶信息 BucketCacheBO bucketCacheBO = availableList.get(index); context.getAvailableList().add(bucketCacheBO);
//为了避免扣减失败(分桶已下线或者库存不足),可以多备份几个分桶 //全部分桶都作为备份,就是合并扣减的实现了 for (int i = 0; i < 2; i++) { //填充2个作为备份,如果超过下标则从0开始继续取 int num = index + i; if (num >= availableList.size()) { num = 0; } BucketCacheBO bucketCache = availableList.get(num); context.getAvailableList().add(bucketCache); } } else { //并发下,可能全部的分桶都下线了,这个时候使用中心桶进行库存扣减(因为其它分桶的下线库存回源会加到中心桶上) BucketCacheBO bucketCacheBO = new BucketCacheBO(); bucketCacheBO.setBucketNo(buildSellerInventoryKey(request.getSellerId(), request.getSkuId())); //中心桶无需扩容,但是出现这种场景属于高并发下,分桶全部被下线了,此时需要保证分桶本地元数据和远程保持一致,为了性能,分桶下线未上粒度较大的锁 //所以需要当遇到使用中心桶的时候,再次触发一次远程缓存和本地缓存同步的操作,并且需要保证远程缓存最少有一个可用分桶存在 bucketCacheBO.setBucketNum(0); context.getAvailableList().add(bucketCacheBO); //发送消息异步刷新分布式本地缓存的消息 bucketRefreshProducer.sendBucketOffline(request); }
Long index = InventorBucketUtil.createDetailBucketKey(request.getOrderId(), bucketLocalCache.getInventoryBucketConfig().getBucketNum()); String inventoryDetailKey = bucketLocalCache.getBucketDetailKeyList().get(Integer.valueOf(index + ""));
context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig()); context.setInventoryDetailKey(inventoryDetailKey); inventoryBucketCache.threadLocalRemove(); return context; } ...}
复制代码


二.异步刷新分布式库存服务机器的本地缓存

当扣减的上下⽂对象中存在中⼼桶作为为分桶进行扣减时,需要发送⼀个消息,异步刷新分布式库存服务机器的本地缓存,避免各分布式库存服务机器的本地缓存可能不⼀致。

//刷新本地缓存的分桶元数据,从而让分布式库存服务的本地缓存一致@Componentpublic class BucketRefreshListener implements MessageListenerConcurrently {    @Autowired    private Cache cache;
@Resource private InventoryBucketCache inventoryBucketCache;
@Resource private TairLock tairLock;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { try { for (MessageExt messageExt : list) { String msg = new String(messageExt.getBody()); InventoryRequest inventoryRequest = JsonUtil.json2Object(msg, InventoryRequest.class);
//锁住这个商品的本地缓存同步,每次只会处理一个本地缓存元数据 String key = inventoryRequest.getSellerId() + inventoryRequest.getSkuId(); String value = SnowflakeIdWorker.getCode(); boolean lock = tairLock.tryLock(TairInventoryConstant.SELLER_SYNC_BUCKET_PREFIX + key, value); if (lock) { try { String bucketLocalKey = TairInventoryConstant.SELLER_BUCKET_PREFIX + key; //远程缓存 BucketLocalCache bucketCache = inventoryBucketCache.getTairBucketCache(bucketLocalKey); //本地缓存 BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);
//以远程缓存为准进行本地缓存覆盖(缓存最少需要一个分桶列表,远程缓存没有就需要保证增加一个可用分桶) BucketLocalCache bucketLocalCache1 = inventoryBucketCache.diffRefreshCache(bucketLocalCache, bucketCache); //比较远程缓存和本地分桶是否一致 if (!Objects.isNull(bucketLocalCache1)) { //覆盖本地缓存 cache.put(bucketLocalKey, bucketLocalCache1); //更新远程缓存 inventoryBucketCache.setBucketCache(bucketLocalKey, bucketLocalCache1); } } catch (Exception e) { log.error("consume error, 同步刷新本地缓存的分桶元数据失败", e); //失败不重试 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } finally { tairLock.unlock(key, value); } } } } catch (Exception e) { log.error("consume error, 刷新本地缓存的分桶元数据失败", e); //失败不重试 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}
@Component@Datapublic class InventoryBucketCache { @Autowired private Cache cache;
@Resource private TairCache tairCache; ...
//返回一个正确的可用分桶元数据对象 //@param bucketCache 原始本地缓存元数据 //@param bucketLocalCache 新的元数据 public BucketLocalCache diffRefreshCache(BucketLocalCache bucketCache, BucketLocalCache bucketLocalCache) { if (Objects.isNull(bucketCache) || Objects.isNull(bucketLocalCache)) { return null; } //本地的上线分桶元数据列表 List<BucketCacheBO> oldAvailableList = bucketCache.getAvailableList(); //远程的已上线分桶元数据列表 List<BucketCacheBO> newAvailableList = bucketLocalCache.getAvailableList(); if (!CollectionUtils.isEmpty(oldAvailableList)) { Map<String, BucketCacheBO> bucketCacheBOMap = oldAvailableList.stream() .collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));
//验证本地的可用分桶列表和远程缓存的分桶列表差异,并处理保证缓存统一 if (!CollectionUtils.isEmpty(newAvailableList)) { for (BucketCacheBO bucketCacheBO : newAvailableList) { //如果有任意一个可用分桶和远程不一致,则统一以远程为准 if (!bucketCacheBOMap.containsKey(bucketCacheBO.getBucketNo())) { return bucketLocalCache; } } } //数据一致,不处理 return null; } else { //本地远程分桶可用列表为空,远程缓存可用列表也为空 if (CollectionUtils.isEmpty(newAvailableList)) { //从不可用列表选择一个分桶作为可用分桶使用 List<BucketCacheBO> undistributedList = bucketLocalCache.getUndistributedList();
bucketLocalCache.getAvailableList().add(undistributedList.get(0)); bucketLocalCache.getUndistributedList().remove(0);
return bucketLocalCache; } else { //远程缓存有可用分桶,直接使用远程缓存覆盖本地缓存 return bucketLocalCache; } } } ...}
复制代码


8.优化后的库存 SQL

CREATE TABLE `inventory_bucket_config` (    `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',    `bucket_num` int(10) NOT NULL DEFAULT '0' COMMENT '分桶数量',    `max_depth_num` int(10) NOT NULL DEFAULT '0' COMMENT '最⼤库存深度',    `min_depth_num` int(10) NOT NULL DEFAULT '0' COMMENT '最⼩库存深度',    `threshold_value` int(10) NOT NULL DEFAULT '0' COMMENT '分桶下线阈值',    `threshold_proportion` int(10) DEFAULT NULL COMMENT '分桶下线动态⽐例',    `back_source_proportion` int(10) NOT NULL DEFAULT '0' COMMENT '回源⽐例,从1-100设定⽐例',    `back_source_step` int(10) NOT NULL DEFAULT '0' COMMENT '回源步⻓,桶扩容的时候默认每次分配的库存⼤⼩',    `template_name` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '模板名称',    `is_default` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否默认模板,只允许⼀个,1为默认模板',    `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',    `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',    PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存分桶配置模板表';
CREATE TABLE `inventory_allot_detail` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'skuId', `inventor_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '库存申请业务编号', `seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID', `inventor_num` int(10) NOT NULL DEFAULT '0' COMMENT '库存变更数量', `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号', `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)', `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间', `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `inde_unique_inventor_no` (`inventor_no`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=265 DEFAULT CHARSET=utf8 COMMENT='库存分配记录表';
CREATE TABLE `inventory_deduction_detail` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `order_id` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '订单id', `refund_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '退款编号', `inventory_num` int(10) NOT NULL DEFAULT '0' COMMENT '扣减库存数量', `sku_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '商品skuId', `seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID', `bucket_no` int(10) NOT NULL COMMENT '扣减分桶编号', `deduction_type` int(2) NOT NULL COMMENT '库存操作类型(10库存扣减,20库存退货)', `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)', `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间', `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存扣减明细表';
-- 分布式库存扣减版本新增CREATE TABLE `inventory_bucket_operate` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `operate_id` varchar(32) NOT NULL COMMENT '操作id', `seller_id` varchar(64) NOT NULL COMMENT '卖家id', `sku_id` varchar(64) NOT NULL COMMENT '商品sku', `operate_type` tinyint(3) NOT NULL COMMENT '操作类型:1-初始化,2-增加库存,3-分桶上线,4-分桶扩容,5-分桶下线', `bucket` text COMMENT '分桶变动信息', `inventory_num` int(11) DEFAULT NULL COMMENT '变动库存', `feature` text COMMENT '扩展信息', `operate_status` tinyint(4) DEFAULT '0' COMMENT '操作状态', `del_flag` tinyint(1) DEFAULT '1' COMMENT '删除标记', `create_user` int(11) DEFAULT NULL COMMENT '创建⼈', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `update_user` int(11) DEFAULT NULL COMMENT '更新⼈', `update_time` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=58 DEFAULT CHARSET=utf8mb4 COMMENT='库存分桶操作表';
CREATE TABLE `inventory_operate_fail` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `operate_id` varchar(32) NOT NULL COMMENT '操作id', `fail_type` varchar(32) DEFAULT NULL COMMENT '操作类型', `bucket_no` varchar(32) DEFAULT NULL COMMENT '分桶编号', `inventory_num` int(11) DEFAULT NULL COMMENT '变动库存数量', `del_flag` tinyint(1) DEFAULT NULL COMMENT '删除标识', `create_user` int(11) DEFAULT NULL COMMENT '创建⼈', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `update_user` int(11) DEFAULT NULL COMMENT '更新⼈', `update_time` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存操作失败记录表';
复制代码


9.其他的优化


(1)对库存扣减明细消息的处理


库存扣减明细的消息是通过异步来进行发送的,如果异步发送消息失败了,则会导致消息丢失。

 

考虑增加⼀个队列接收异步发送失败的消息。通过每秒⼀次刷⼊的⽅式,将写队列的数据转换到读队列后再进⾏写⼊,然后再清除读队列。队列中的数据,可先顺序写本地⽂件,保证机器宕机数据不丢失。

 

(2)分桶数量的处理


分桶的数量,⼀般是最开始初始化添加库存的时候⽣成到对应缓存分⽚中的。但对于不同的场景,分桶需要的数量是不⼀样的。比如 1000 库存分散到 32 个分桶比较合理,1 万库存还是分散到 32 个分桶吗?或者库存只有 20,还要⽤32 个分桶吗?

 

所以分桶的数量并⾮初始化后就永远不变的。库存分桶也需要根据⼀些规则或者⼈⼯调整,进⾏分桶的增加和减少。

 

(3)合理进行库存缩减


比如已分配了分桶 5000 库存,但此时要减去 2000 库存。应该怎么从已上线分配库存的分桶⾥⾯对库存进⾏合理的减少。


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18964961

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
商品中心—库存分桶高并发的优化文档(二)_Java_量贩潮汐·WholesaleTide_InfoQ写作社区