写点什么

商品中心—库存分桶高并发的优化文档

  • 2025-07-07
    福建
  • 本文字数:19589 字

    阅读完需:约 64 分钟

1.库存扣减时获取分桶元数据的优化


(1)优化原因


库存扣减过程中,⼤量的请求会加载本地缓存中的分桶元数据信息。在填充可⽤分桶到扣减上下⽂中,会调用缓存的分桶元数据信息的读方法。比如会调用本地缓存的分桶元数据对象 bucketLocalCache 的 getAvailableList()方法。而当增加库存、分桶上下线时,会修改本地缓存的分桶元数据对象 bucketLocalCache。

 

所以如果出现大量扣减请求时,也发生对本地缓存的分桶元数据对象修改,那么就会出现并发的读写问题,从而导致偶尔出现读方法的延迟问题。

 

优化前的部分日志:

...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 179毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 161毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 71毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 620毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 74毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 9毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 28毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 89毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 134毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
复制代码


优化前的代码:

//库存扣减业务实现类@Servicepublic class InventoryServiceImpl implements InventoryService {    ...    //构建接下来用于具体扣减库存所需要的模型对象    private BucketContext buildDeductProductStock(InventoryRequest request) {        //1.填充扣减库存相关信息明细        InventoryDetail inventoryDetail = inventoryConverter.converterRequest(request);        //2.填充扣减库存的分桶配置信息        BucketContext bucketContext = buildDeductBucketList(request);        bucketContext.setInventoryDetail(inventoryDetail);        return bucketContext;    }
//填充扣减库存的分桶相关信息 private BucketContext buildDeductBucketList(InventoryRequest request) { BucketContext context = new BucketContext(); //获取缓存中的分桶元数据信息 BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId()); //获取本地缓存的分桶列表 List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
//获取本次扣减请求的次数,用来定位具体使用的分桶库存 Integer incrementCount = getIncrementCount(request); //通过取模运算得到本次扣减需要定位到的分桶列表下标 int index = incrementCount % availableList.size(); log.info("本次可用分桶列表数量:{},扣减下标:{}", availableList.size(), index);
//获取本次扣减准备处理的分桶信息,避免扣减失败(分桶已下线或者库存不足),多备份几个 BucketCacheBO bucketCacheBO = availableList.get(index); context.getAvailableList().add(bucketCacheBO); context.getBucketNoList().add(bucketCacheBO.getBucketNo()); context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig());
//如果其他分桶都作为备用分桶,那么就可以实现库存合并扣减的功能了 for (int i = 0; i < 2; i++) { //任意填充2个作为备份 Random random = new Random(); int num = random.nextInt(availableList.size());
BucketCacheBO bucketCache = availableList.get(num); //避免拿到重复的分桶,这里处理一下 if (context.getBucketNoList().contains(bucketCache.getBucketNo())) { i--; continue; } context.getAvailableList().add(bucketCache); context.getBucketNoList().add(bucketCache.getBucketNo()); }
return context; } ...}
@Component@Datapublic class InventoryBucketCache { @Autowired private Cache cache;
@Autowired private TairCache tairCache;
//本地存储分桶元数据信息,增加库存、分桶扩容、分桶上下线时就会触发调用这个方法修改本地缓存对象 public void setBucketLocalCache(String bucketKey, BucketLocalCache bucketLocalCache) { log.info("local cache set key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache)); cache.put(bucketKey, bucketLocalCache); }
//获取本地的分桶元数据信息 public BucketLocalCache getBucketLocalCache(String bucketKey) { //先查本地缓存 BucketLocalCache bucketLocalCache = (BucketLocalCache) cache.getIfPresent(bucketKey); log.info("local cache get key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache)); if (Objects.isNull(bucketLocalCache)) { //再查远程缓存 synchronized (bucketKey.intern()) { String bucketCache = tairCache.get(TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketKey); if (!StringUtils.isEmpty(bucketCache)) { bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class); cache.put(bucketKey, bucketLocalCache); } } } return bucketLocalCache; }}
复制代码


(2)解决⽅案


由于库存分桶元数据的对象变量是库存扣减请求和库存调配请求共⽤的,所以可以就将该变量交给 ThreadLocal 来管理其线程副本。

 

注意:只需要对分桶元数据的对象进行读取时使用 ThreadLocal 线程副本即可,对分桶元数据的对象进行修改时没必要使用 ThreadLocal 线程副本。

 

当使⽤ThreadLocal 维护缓存的分桶元数据变量时,ThreadLocal 为会每个使⽤该变量的线程提供独⽴的变量副本。从而每个线程都可以独⽴改变⾃⼰的副本,⽽不会影响其它线程的副本。

 

优化后,获取本地缓存的分桶元数据对象的可⽤分桶列表的耗时都为 0 毫秒了,没有再出现⼏⼗上百毫秒的情况。


@Component@Datapublic class InventoryBucketCache {    //每次获取本地缓存存储的分桶元数据信息时,需要使⽤ThreadLocal来存储,避免线程之间的竞争    private ThreadLocal<BucketLocalCache> bucketLocalCacheThreadLocal = new ThreadLocal<>();    ...
//获取本地缓存的分桶元数据信息 public BucketLocalCache getBucketLocalCache(String bucketKey) { bucketKey = TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketKey; //先查本地缓存 BucketLocalCache bucketLocalCache = (BucketLocalCache) cache.getIfPresent(bucketKey); log.info("local cache get key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache)); if (Objects.isNull(bucketLocalCache)) { //再查远程缓存 Long startTime = System.currentTimeMillis(); synchronized (bucketKey.intern()) { String bucketCache = getBucketCache(bucketKey); if (!StringUtils.isEmpty(bucketCache)) { bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class); cache.put(bucketKey, bucketLocalCache); } log.error("本地加载缓存模型未命中缓存,远程重新加载耗时{}毫秒", System.currentTimeMillis() - startTime); } } bucketLocalCacheThreadLocal.set(bucketLocalCache); return bucketLocalCacheThreadLocal.get(); }
public void threadLocalRemove() { bucketLocalCacheThreadLocal.remove(); } ...}
@Servicepublic class InventoryServiceImpl implements InventoryService { ... //填充扣减库存的分桶相关信息 private BucketContext buildDeductBucketList(InventoryRequest request) { BucketContext context = new BucketContext(); //获取本地缓存的分桶元数据 BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId()); //获取本地缓存的分桶列表 List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
if (!CollectionUtils.isEmpty(availableList)) { //获取本次扣减请求对应的扣减次数,用来定位具体使用的分桶库存 Integer incrementCount = getIncrementCount(request);
//通过运算得到本次访问所需要定位的分桶 int index = incrementCount % availableList.size();
//获取本次准备处理的分桶信息 BucketCacheBO bucketCacheBO = availableList.get(index); context.getAvailableList().add(bucketCacheBO);
//为了避免扣减失败(分桶已下线或者库存不足),可以多备份几个分桶 //全部分桶都作为备份,就是合并扣减的实现了 for (int i = 0; i < 2; i++) { //填充2个作为备份,如果超过下标则从0开始继续取 int num = index + i; if (num >= availableList.size()) { num = 0; } BucketCacheBO bucketCache = availableList.get(num); context.getAvailableList().add(bucketCache); } } else { //并发下,可能全部的分桶都下线了,这个时候使用中心桶进行库存扣减(因为其它分桶的下线库存回源会加到中心桶上) BucketCacheBO bucketCacheBO = new BucketCacheBO(); bucketCacheBO.setBucketNo(buildSellerInventoryKey(request.getSellerId(), request.getSkuId())); //中心桶无需扩容,但是出现这种场景属于高并发下,分桶全部被下线了,此时需要保证分桶本地元数据和远程保持一致,为了性能,分桶下线未上粒度较大得锁 //所以需要当遇到使用中心桶的时候,再次触发一次远程缓存和本地缓存同步的操作,并且需要保证远程缓存最少有一个可用分桶存在 bucketCacheBO.setBucketNum(0); context.getAvailableList().add(bucketCacheBO); //异步消息发送同步本地缓存的消息 bucketRefreshProducer.sendBucketOffline(request); }
Long index = InventorBucketUtil.createDetailBucketKey(request.getOrderId(), bucketLocalCache.getInventoryBucketConfig().getBucketNum()); String inventoryDetailKey = bucketLocalCache.getBucketDetailKeyList().get(Integer.valueOf(index + ""));
context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig()); context.setInventoryDetailKey(inventoryDetailKey); inventoryBucketCache.threadLocalRemove(); return context; } ...}
@Servicepublic class InventoryBucketServiceImpl implements InventoryBucketService { ... //刷新分桶元数据缓存 //@param maxDepthNum 分桶最大库存深度 //@param bucketLocalCache 分桶元数据信息 //@param bucketNo 分桶编号 private void refreshBucketCache(Integer maxDepthNum, BucketLocalCache bucketLocalCache, String bucketNo, Integer inventoryNum) { List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList(); for (BucketCacheBO bucketCacheBO : availableList) { if (bucketCacheBO.getBucketNo().equals(bucketNo)) { //每次库存具体深度变化都要更细,否则很容易触发回源的比例 bucketCacheBO.setBucketNum(maxDepthNum); bucketCacheBO.setAllotNum(inventoryNum + (Objects.isNull(bucketCacheBO.getAllotNum()) ? 0 : bucketCacheBO.getAllotNum())); break; } } String key = buildBucketCacheKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId()); //刷新本地缓存 inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache); } ...}
复制代码


2.库存扣减的分桶路由⾃增序号优化


(1)优化原因


每次库存扣减,对应的路由分桶原本是通过缓存的⾃增序号来获取的。但是由于是同⼀个 key,⾼并发压⼒下,这个 key 的访问压⼒很⼤。进⽽部分请求出现阻塞,获取序列号的性能下降。

 

优化前的部分日志:

...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 0毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 2毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 3毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 6毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 4毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 2毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 2毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 238毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 258毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 3毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 2毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 0毫秒
复制代码


优化前的代码:

@Servicepublic class InventoryServiceImpl implements InventoryService {    @Resource    private TairCache tairCache;    ...
//获取对应售卖商品的扣减访问次数 private Integer getIncrementCount(InventoryRequest request) { String incrementKey = TairInventoryConstant.SELLER_SKU_STOCK_COUNT_PREFIX + request.getSellerId() + request.getSkuId(); Integer incrementCount = tairCache.incr(incrementKey); return incrementCount; } ...}
复制代码


(2)解决⽅案


使⽤号段的方案,每次⾃增获取⼀万个序列号。消费序列号过程中如序列号使⽤过快,则⾃动增⻓序列号的⻓度。并在使⽤过程中,提前⽣成⼀批新的序列号等待使⽤。

 

使⽤该⽅案负责⾃增序号的⽣成后,获取扣减分桶耗时稳定在 0 毫秒内。除了第⼀次不存在序号时初始化耗时会⾼⼀些,后续请求性能稳定。


@Servicepublic class InventoryServiceImpl implements InventoryService {    @Autowired    private SegmentNoGen segmentNoGen;    ...
//获取对应售卖商品的扣减访问次数 //这里考虑并发的时候自增导致性能过低,所以采取了批量获取一批序号,当这批序号被使用完以后才会再次获取一次 private Integer getIncrementCount(InventoryRequest request) { String incrementKey = TairInventoryConstant.SELLER_SKU_STOCK_COUNT_PREFIX + request.getSellerId() + request.getSkuId(); Long incrementCount = segmentNoGen.genNewNo(incrementKey); if (incrementCount > 0) { return incrementCount.intValue(); } //避免获取缓存的时候出现异常,当为负数的时候默认取第一个,分桶最少存在1个 return 0; } ...}
//号段ID生成器组件@Servicepublic class SegmentIDGenImpl implements SegmentIDGen { //下一次异步更新比率因子 public static final double NEXT_INIT_FACTOR = 0.9;
//最大步长不超过100,0000 private static final int MAX_STEP = 1000000;
//默认一个Segment会维持的时间为15分钟 //如果在15分钟内Segment就消耗完了,则步长要扩容一倍,但不能超过MAX_STEP //如果在超过15*2=30分钟才将Segment消耗完,则步长要缩容一倍,但不能低于MIN_STEP,MIN_STEP的值为数据库中初始的step字段值 private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
//更新因子 //更新因子=2时,表示成倍扩容或者折半缩容 private static final int EXPAND_FACTOR = 2;
private final ExecutorService threadPoolExecutor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new UpdateThreadFactory());
@Autowired private LeafAllocNoRepository leafAllocNoRepository;
@Resource private SegmentIDCache cache;
//生成新的ID @Override public Long genNewNo(String bizTag) { if (!cache.isInitOk()) { throw new RuntimeException("not init"); } //如果没有,此时需要初始化一个 if (!cache.containsKey(bizTag)) { leafAllocNoRepository.insertLeadAlloc(bizTag); cache.updateCacheFromDb(bizTag); } SegmentBuffer buffer = cache.getValue(bizTag); if (!buffer.isInitOk()) { synchronized (buffer) { if (!buffer.isInitOk()) { try { updateSegmentFromDb(bizTag, buffer.getCurrent()); log.info("Init buffer. Update leafkey {} {} from db", bizTag, buffer.getCurrent()); buffer.setInitOk(true); } catch (Exception e) { log.warn("Init buffer {} exception", buffer.getCurrent(), e); throw new RuntimeException("init error:" + bizTag); } } } } return getIdFromSegmentBuffer(buffer); } ...}
复制代码


3.库存扣减明细消息异步发送到 MQ 优化


(1)优化原因


每次库存扣减,都需要发送消息来进行异步记录⼀条库存扣减明细。由于原来发送消息时是等待消息发送成功后才返回,这会导致⾼并发下消息的吞吐量上不去,从⽽影响整体库存扣减的性能。

 

优化前的代码:

@Servicepublic class InventoryServiceImpl implements InventoryService {    @Resource    private InventoryDetailProducer inventoryDetailProducer;    ...
//扣减商品库存 @Override public JsonResult deductProductStock(InventoryRequest request) { //1.验证入参是否合法 checkDeductProductStock(request); //2.构建扣减库存的上下文对象 BucketContext bucketContext = buildDeductProductStock(request); try { //3.获取是否已经有一条扣减明细记录 String repeatDeductInfo = getRepeatDeductInfo(bucketContext); if (!StringUtils.isEmpty(repeatDeductInfo)){ return JsonResult.buildSuccess(); } //4.执行库存扣减 deductInventory(bucketContext); //5.写入明细,如果已重复写入,则写入失败并回退库存 writeInventoryDetail(bucketContext); } catch (Exception e){ e.printStackTrace(); return JsonResult.buildError(e.getMessage()); } finally { //6.检测当前返回的库存数量是否触发扩容的阈值(回源比例),触发则发送通知进行异步扩容 checkInventoryBackSource(bucketContext); } return JsonResult.buildSuccess(); }
//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存) private void writeInventoryDetail(BucketContext bucketContext) { //获取库存扣减的明细详情 InventoryDetail inventoryDetail = bucketContext.getInventoryDetail(); String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId(); //尝试写入明细记录,如果没有写入成功则说明库存需要回退 Integer count = tairCache.exhset(key, String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail)); if (count < 0){ //说明明细已经存在了,写入失败,需要将库存回退到对应的分桶上 tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum()); } else { //发送消息,异步写入库存扣减的明细到DB inventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail()); } } ...}
@Componentpublic class InventoryDetailProducer { @Autowired private DefaultProducer defaultProducer;
//库存扣减明细 MQ生产 public void sendInventoryDetail(InventoryDetail inventoryDetail) { //发送库存扣减明细保存消息 defaultProducer.sendMessage(RocketMqConstant.INVENTORY_DETAIL_TOPIC, JSONObject.toJSONString(inventoryDetail), "库存扣减"); }}
@Componentpublic class DefaultProducer { private DefaultMQProducer producer;
@Autowired public DefaultProducer(RocketMQProperties rocketMQProperties) { producer = new DefaultMQProducer(RocketMqConstant.ORDER_DEFAULT_PRODUCER_GROUP); producer.setNamesrvAddr(rocketMQProperties.getNameServer()); start(); }
//对象在使用之前必须要调用一次,只能初始化一次 public void start() { try { this.producer.start(); } catch (MQClientException e) { log.error("producer start error", e); } } ...
//发送消息 public void sendMessage(String topic, String message, String type) { sendMessage(topic, message, -1, type); }
//发送消息,同步等待消息发送请求返回成功 public void sendMessage(String topic, String message, Integer delayTimeLevel, String type) { Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8)); try { if (delayTimeLevel > 0) { msg.setDelayTimeLevel(delayTimeLevel); } SendResult send = producer.send(msg); if (SendStatus.SEND_OK == send.getSendStatus()) { log.info("发送MQ消息成功, type:{}, message:{}", type, message); } else { throw new ProductBizException(send.getSendStatus().toString()); } } catch (Exception e) { log.error("发送MQ消息失败:", e); throw new ProductBizException(CommonErrorCodeEnum.SEND_MQ_FAILED); } } ...}
复制代码


(2)解决⽅案


可以使⽤消息的异步发送,这样可以不用等待 Broker 返回结果。但是库存扣减明细消息是不允许丢失的,异步发送消息就可能发送失败。所以对于发送消息时返回发送失败的,可以进⾏重试处理。


@Servicepublic class InventoryServiceImpl implements InventoryService {    @Resource    private InventoryDetailProducer inventoryDetailProducer;    ...
//扣减商品库存 @Override public JsonResult deductProductStock(InventoryRequest request) { //1.验证入参是否合法 checkDeductProductStock(request); //2.构建扣减库存的上下文对象 BucketContext bucketContext = buildDeductProductStock(request);
try { //3.获取是否已经有一条扣减明细记录,检查该笔订单号是否已经在缓存中存在 String repeatDeductInfo = getRepeatDeductInfo(bucketContext); if (!StringUtils.isEmpty(repeatDeductInfo)) { return JsonResult.buildSuccess(); } //4.执行库存扣减 deductInventory(bucketContext); //5.写入明细,如果已重复写入失败,则回退库存 writeInventoryDetail(bucketContext); } catch (Exception e) { log.error("库存扣减失败", e); return JsonResult.buildError(e.getMessage()); } finally { //6.检测当前返回的库存数量是否触发扩容的阈值(回源比例),触发则发送通知进行异步扩容 checkInventoryBackSource(bucketContext); } return JsonResult.buildSuccess(); }
//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存) private void writeInventoryDetail(BucketContext bucketContext) { //获取扣减明细信息 InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();
//尝试写入明细记录,如果没有写入成功则说明库存需要回退 Integer count = tairCache.exhsetNx(bucketContext.getInventoryDetailKey(), String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail)); if (count < 0) { //说明明细已经存在了,写入失败,需要将库存回退到对应的分桶上 tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum()); } else { //发送消息,异步写入库存扣减的明细到DB inventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail()); } } ...}
@Componentpublic class InventoryDetailProducer { @Autowired private DefaultProducer defaultProducer;
//库存扣减明细 MQ生产 public void sendInventoryDetail(InventoryDetail inventoryDetail) { //发送库存扣减 明细保存消息 defaultProducer.sendAsyncMessage(RocketMqConstant.INVENTORY_DETAIL_TOPIC, JSONObject.toJSONString(inventoryDetail), "库存扣减明细"); }}
@Componentpublic class DefaultProducer { private DefaultMQProducer producer;
@Autowired public DefaultProducer(RocketMQProperties rocketMQProperties) { producer = new DefaultMQProducer(RocketMqConstant.ORDER_DEFAULT_PRODUCER_GROUP); producer.setNamesrvAddr(rocketMQProperties.getNameServer()); start(); }
public DefaultMQProducer getProducer() { return this.producer; }
//对象在使用之前必须要调用一次,只能初始化一次 public void start() { try { this.producer.start(); } catch (MQClientException e) { log.error("producer start error", e); } } ...
//异步发送消息 public void sendAsyncMessage(String topic, String message, String type) { Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8)); try { //2.异步发送 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) {
}
@Override public void onException(Throwable e) { //对于失败的消息,要做重试处理 log.error("发送MQ消息失败, type:{}, message:{}", type, message, e); } }); } catch (Exception e) { log.error("发送MQ消息失败, type:{}, message:{}", type, message, e); throw new ProductBizException(CommonErrorCodeEnum.SEND_MQ_FAILED); } } ...}
复制代码


4.库存扣减明细 key 热点缓存打散优化


(1)优化原因


库存进⾏分桶后,库存扣减的并发请求会均匀打散到多个缓存分⽚上。但库存扣减明细的 key 并没有进行缓存分片,⾼并发下会导致库存扣减明细的热 key 都集中在同⼀个分⽚上,从⽽影响写⼊性能。而其它⼏个缓存分⽚的性能还没有压到极限,所以要提升库存的性能,还需处理库存扣减明细的的热点 key 问题。

 

优化前的代码:

@Servicepublic class InventoryServiceImpl implements InventoryService {    @Resource    private InventoryDetailProducer inventoryDetailProducer;    ...
//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存) private void writeInventoryDetail(BucketContext bucketContext) { //获取库存扣减的明细详情 InventoryDetail inventoryDetail = bucketContext.getInventoryDetail(); String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId(); //尝试写入明细记录,如果没有写入成功则说明库存需要回退 Integer count = tairCache.exhset(key, String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail)); if (count < 0){ //说明明细已经存在了,写入失败,需要将库存回退到对应的分桶上 tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum()); } else { //发送消息,异步写入库存扣减的明细到DB inventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail()); } } ...}
复制代码


(2)解决⽅案


⽣成库存分桶的同时,还需要⽣成⼀份用于库存扣减明细的 key。这样当发生库存扣减时,就可以对订单号 ID 进⾏Hash,然后与分桶数量进⾏取模。从而计算出要使⽤的库存明细的缓存 key,实现对库存明细缓存的写入按缓存 key 均匀打散到不同分⽚上。

 

注意如下代码中的:

BucketContext.setInventoryDetailKey() + getInventoryDetailKey();BucketLocalCache.setBucketDetailKeyList() + getBucketDetailKeyList();
复制代码


@Servicepublic class InventoryServiceImpl implements InventoryService {    @Resource    private InventoryDetailProducer inventoryDetailProducer;    ...
//填充扣减库存的分桶相关信息 private BucketContext buildDeductBucketList(InventoryRequest request) { BucketContext context = new BucketContext(); //获取本地缓存的分桶元数据 BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId()); //获取本地缓存的分桶列表 List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
if (!CollectionUtils.isEmpty(availableList)) { //获取本次扣减请求对应的扣减次数,用来定位具体使用的分桶库存 Integer incrementCount = getIncrementCount(request);
//通过运算得到本次访问所需要定位的分桶 int index = incrementCount % availableList.size();
//获取本次准备处理的分桶信息 BucketCacheBO bucketCacheBO = availableList.get(index); context.getAvailableList().add(bucketCacheBO);
//为了避免扣减失败(分桶已下线或者库存不足),可以多备份几个分桶 //全部分桶都作为备份,就是合并扣减的实现了 for (int i = 0; i < 2; i++) { //填充2个作为备份,如果超过下标则从0开始继续取 int num = index + i; if (num >= availableList.size()) { num = 0; } BucketCacheBO bucketCache = availableList.get(num); context.getAvailableList().add(bucketCache); } } else { //并发下,可能全部的分桶都下线了,这个时候使用中心桶进行库存扣减(因为其它分桶的下线库存回源会加到中心桶上) BucketCacheBO bucketCacheBO = new BucketCacheBO(); bucketCacheBO.setBucketNo(buildSellerInventoryKey(request.getSellerId(), request.getSkuId())); //中心桶无需扩容,但是出现这种场景属于高并发下,分桶全部被下线了,此时需要保证分桶本地元数据和远程保持一致,为了性能,分桶下线未上粒度较大得锁 //所以需要当遇到使用中心桶的时候,再次触发一次远程缓存和本地缓存同步的操作,并且需要保证远程缓存最少有一个可用分桶存在 bucketCacheBO.setBucketNum(0); context.getAvailableList().add(bucketCacheBO); //异步消息发送同步本地缓存的消息 bucketRefreshProducer.sendBucketOffline(request); }
Long index = InventorBucketUtil.createDetailBucketKey(request.getOrderId(), bucketLocalCache.getInventoryBucketConfig().getBucketNum()); String inventoryDetailKey = bucketLocalCache.getBucketDetailKeyList().get(Integer.valueOf(index + ""));
context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig()); context.setInventoryDetailKey(inventoryDetailKey); inventoryBucketCache.threadLocalRemove(); return context; }
//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存) private void writeInventoryDetail(BucketContext bucketContext) { //获取扣减明细信息 InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();
//尝试写入明细记录,如果没有写入成功则说明库存需要回退 Integer count = tairCache.exhsetNx(bucketContext.getInventoryDetailKey(), String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail)); if (count < 0) { //说明明细已经存在了,写入失败,需要将库存回退到对应的分桶上 tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum()); } else { //发送消息,异步写入库存扣减的明细到DB inventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail()); } } ...}
@Servicepublic class InventoryBucketServiceImpl implements InventoryBucketService { ... //构建缓存模型 //@param key //@param bucketNum 分桶数量 //@param inventoryNum 分桶分配的库存数量 //@param residueNum 剩余的未分配均匀的库存 //@param inventoryBucketConfig 分桶配置信息 private BucketLocalCache buildBucketCache(String key, Integer bucketNum, Integer inventoryNum, Integer residueNum, InventoryBucketConfigDO inventoryBucketConfig) { BucketLocalCache bucketLocalCache = new BucketLocalCache(); //先获取得到这个模板配置的对应可分槽位的均匀桶列表 List<String> bucketNoList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum()); List<BucketCacheBO> bucketCacheBOList = new ArrayList<>(bucketNum); List<BucketCacheBO> undistributedList = new ArrayList<>(bucketNum); //构建出多个分桶对象 for (int i = 0; i < bucketNum; i++) { //生成对应的分桶编号,方便定义到具体的分桶上 BucketCacheBO bucketCache = new BucketCacheBO(); String bucketNo = bucketNoList.get(i); bucketCache.setBucketNo(bucketNo); //最后一个分桶,分配剩余未除尽的库存+平均库存 if (i == bucketNum - 1) { bucketCache.setBucketNum(inventoryNum + residueNum); } else { bucketCache.setBucketNum(inventoryNum); } bucketCacheBOList.add(bucketCache); } //生成的分桶对象超过实际可分配的分桶对象,保留这批多余的分桶模型为不可用分桶,后续分桶上线可以选择使用 if (bucketNoList.size() > bucketNum) { for (int i = bucketNum; i < bucketNoList.size(); i++) { BucketCacheBO bucketCache = new BucketCacheBO(); String bucketNo = bucketNoList.get(i); bucketCache.setBucketNo(bucketNo); undistributedList.add(bucketCache); } } //生成缓存的明细key List<String> bucketDetailKeyList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum(), "%07d"); //设置分桶缓存明细的key bucketLocalCache.setBucketDetailKeyList(bucketDetailKeyList); //设置可用的分桶缓存列表 bucketLocalCache.setAvailableList(bucketCacheBOList); //设置不可用或者已下线的分桶缓存列表 bucketLocalCache.setUndistributedList(undistributedList); return bucketLocalCache; } ...}
public class InventorBucketUtil { private static final int MAX_SIZE = 100000;
//生成对应的槽位key,明细使用,多使用一位区分 //@param key 卖家Id+商品skuId //@param bucketNum 分桶配置数量 //@return 预先保留的槽位集合 public static List<String> createBucketNoList(String key, Integer bucketNum, String format) { Map<Long, String> cacheKey = new HashMap<>(bucketNum); //bucketNoList用来存放每个桶对应的hashKey List<String> bucketNoList = new ArrayList<>(bucketNum); //分配桶的编号 for (int i = 1; i <= MAX_SIZE; i++) { String serialNum = String.format(format, i); //卖家ID + 商品SKU ID + 序号 String hashKey = key + serialNum; //一致性哈希算法murmur long hash = HashUtil.murMurHash(hashKey.getBytes()); //对分桶数量进行取模运算 long c = (hash %= bucketNum); //确保被选中的hashKey都能哈希到不同的分桶 if (cacheKey.containsKey(c)) { continue; } cacheKey.put(c, hashKey); bucketNoList.add(hashKey); if (cacheKey.size() >= bucketNum) { break; } } return bucketNoList; } ...}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18964961

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
商品中心—库存分桶高并发的优化文档_Java_量贩潮汐·WholesaleTide_InfoQ写作社区