商品中心—库存分桶初始化的技术文档
- 2025-06-27 福建
本文字数:28156 字
阅读完需:约 92 分钟
1.库存分桶缓存初始化时涉及的数据表
(1)库存分桶配置表
CREATE TABLE `inventory_bucket_config` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`bucket_num` int(10) NOT NULL DEFAULT '0' COMMENT '分桶数量',
`max_depth_num` int(10) NOT NULL DEFAULT '0' COMMENT '最⼤库存深度,即分桶的最大库存容量',
`min_depth_num` int(10) NOT NULL DEFAULT '0' COMMENT '最⼩库存深度,即分桶的最小库存容量',
`threshold_value` int(10) NOT NULL DEFAULT '0' COMMENT '分桶下线阈值,当某个分桶的库存数小于阈值时就需要将该分桶下线了',
`back_source_proportion` int(10) NOT NULL DEFAULT '0' COMMENT '回源⽐例,从1-100设定⽐例',
`back_source_step` int(10) NOT NULL DEFAULT '0' COMMENT '回源步⻓,桶扩容的时候默认每次分配的库存⼤⼩',
`template_name` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '模板名称',
`is_default` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否默认模板,只允许⼀个,1为默认模板',
`version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',
`del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',
`create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
`update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存分桶配置模板表';
(2)库存分配记录表
CREATE TABLE `inventory_allot_detail` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'skuId',
`inventor_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '库存申请业务编号',
`seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID',
`inventor_num` int(10) NOT NULL DEFAULT '0' COMMENT '库存变更数量',
`version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',
`del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',
`create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
`update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `inde_unique_inventor_no` (`inventor_no`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=265 DEFAULT CHARSET=utf8 COMMENT='库存分配记录表';
2.库存分桶架构的初始化 + 扣减 + 上下线 + 扩容 + 下线 + 预警补货流程
分桶上线:指的是初始化时设置了可用的分桶或者增加库存时将下线的分桶设置可用。
分桶下线:指的是可用的分桶在扣减库存后触发了下线阈值,导致该分桶需要下线。当某个可用的分桶拥有的库存数较少时,如果将该分桶下线,那么可以提高库存扣减的效率。
分桶扩容:指的是当可用的分桶剩余库存少于回源比例的库存时,而且中心桶的剩余库存大于 0 时,就会触发当前分桶的异步扩容(即加库存)。
中心桶和分桶:每个商品库存都会对应一个中心桶缓存 + 多个分桶缓存。例如对 10000 库存分配到 8 个分桶中,每个分桶最多 1000 个库存,此时就有 2000 个库存剩下需要放入中心桶缓存中。中心桶的出现,是由于退款返还库存时可用于临时存放返还的库存。
Tair 缓存:商品 SKU 库存的各个分桶会对应不同的分桶编号,这些分桶编号会对应于 Tair 缓存的各个 key,从而可以应对高并发请求。这样就可以利用 Redis Cluster 的特性,通过把 key 路由到不同机器来处理。
一.分配库存的流程

二.扣减库存 + 分桶扩容 + 分桶下线 + 库存预警的流程

3.商品库存⼊桶流程概览
(1)使用入口
库存后台对商品初始化库存或增加库存时:
@RestController
@RequestMapping("/product/inventory")
public class InventoryController {
@Autowired
private InventoryBucketService inventoryBucketService;
@Autowired
private InventoryBucketCache cache;
@Resource
private TairCache tairCache;
...
//初始化库存
@RequestMapping("/init")
public void inventoryInit(@RequestBody InventorBucketRequest request) {
//清除本地缓存数据
cache.getCache().invalidateAll();
//清除tair中的数据,扫描卖家ID+SKU的ID的key会比较耗时
Set<String> keys = tairCache.getJedis().keys("*" + request.getSellerId() + request.getSkuId() + "*");
if (!CollectionUtils.isEmpty(keys)) {
tairCache.mdelete(Lists.newArrayList(keys));
}
//这里模拟指定本次的库存业务单号,实际接口需要外部传入
request.setInventorCode(SnowflakeIdWorker.getCode());
//初始化库存信息
inventoryBucketService.inventorBucket(request);
}
//增加库存
@RequestMapping("/inventorBucket")
public void inventorBucket(@RequestBody InventorBucketRequest request) {
//这里模拟指定本次的库存业务单号,实际接口需要外部传入
request.setInventorCode(SnowflakeIdWorker.getCode());
//增加库存
inventoryBucketService.inventorBucket(request);
}
...
}
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private TairCache tairCache;
@Resource
private TairLock tairLock;
@Resource
private InventoryBucketCache inventoryBucketCache;
...
//商品库存入桶分配
@Override
@Transactional(rollbackFor = Exception.class)
public void inventorBucket(InventorBucketRequest request) {
//1.验证入参必填项
checkInventorParams(request);
//锁key = 卖家ID + SKU的ID
String key = buildBucketLockKey(request.getSellerId(), request.getSkuId());
String value = SnowflakeIdWorker.getCode();
//注意这里需要锁定中心桶库存
boolean lock = tairLock.tryLock(key, value);
//分配库存时,这个卖家的sku是不允许其他相关操作的
if (lock) {
try {
//2.插入库存入库的记录信息
//由于申请的库存业务编号是一个唯一key,所以可以避免重复请求
//也就是会校验库存单号是否已经存在了,保证⼀次库存变更⾏为只能执⾏⼀次
inventoryRepository.saveInventoryAllotDetail(request);
//3.将库存数据写入缓存
inventoryBucketCache(request);
} catch (Exception e) {
e.printStackTrace();
} finally {
tairLock.unlock(key, value);
}
} else {
throw new BaseBizException("请求繁忙,稍后重试!");
}
}
//将库存数据写入缓存
private void inventoryBucketCache(InventorBucketRequest request) {
//获取中心桶库存的key
String key = buildSellerInventoryKey(request.getSellerId(), request.getSkuId());
//1.先验证是否已缓存分桶元数据信息,先查本地缓存,再查远程缓存
BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());
//缓存不存在,则进行初始化
if (Objects.isNull(bucketLocalCache)) {
//2.获取库存分桶的配置模板
InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId());
//初始化分桶库存
initInventoryBucket(request, inventoryBucketConfig);
} else {
//3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存
Integer residueNum = tairCache.incr(key, request.getInventoryNum());
//4.尝试将库存分配到新的分桶上(注意,先将中心桶的库存加上去)
InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request);
//5.构建新的分桶元数据信息并写入
//分桶元数据中包含了商品库存应该如何构建分桶缓存的详细信息
writeBucketCache(onlineRequest, residueNum);
}
}
//获取锁的key
private String buildBucketLockKey(String sellerId, String skuId) {
return TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + sellerId + skuId;
}
//获取中心桶库存的key
private String buildSellerInventoryKey(String sellerId, String skuId) {
return TairInventoryConstant.SELLER_INVENTORY_PREFIX + sellerId + skuId;
}
...
}
(2)场景一初始化分桶库存
步骤一:检验入参必填项
步骤二:插入库存入库的记录信息
步骤三:获取库存分桶对应的配置模板,开始初始化分桶库存
步骤四:初始化分桶库存时首先计算出本次库存入库的具体分桶信息
步骤五:根据计算好的分桶数 + 每个分桶分配的库存数构建缓存数据模型
步骤六:计算完分桶的数据信息后则将这些信息写入远程缓存和本地缓存
步骤一:检验入参必填项
步骤二:插入库存入库的记录信息。由于申请的库存业务编号是一个唯一 key,所以可以避免重复请求。即会校验库存单号是否已经存在,保证⼀次库存变更⾏为只执⾏⼀次。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//验证入参必填字段
private void checkInventorParams(InventorBucketRequest request) {
if (Objects.isNull(request)) {
throw new BaseBizException(InventoryExceptionCode.PARAM_CHECK_ERROR, InventoryExceptionCode.PARAM_CHECK_ERROR.getErrorCode());
}
if (Objects.isNull(request.getInventorCode()) || Objects.isNull(request.getInventoryNum())
|| Objects.isNull(request.getSellerId()) || Objects.isNull(request.getSkuId())) {
throw new BaseBizException(InventoryExceptionCode.PARAM_CHECK_ERROR, InventoryExceptionCode.PARAM_CHECK_ERROR.getErrorCode());
}
if (request.getInventoryNum() <= 0) {
throw new BaseBizException(InventoryExceptionCode.PARAM_CHECK_ERROR, InventoryExceptionCode.PARAM_CHECK_ERROR.getErrorCode());
}
}
...
}
@Repository
public class InventoryRepository {
...
//存储每次库存入库的申请记录
//校验库存单号是否已经存在了(⼀次库存变更⾏为只能执⾏⼀次)
public void saveInventoryAllotDetail(InventorBucketRequest request) {
InventoryAllotDetailDO inventoryAllotDetailDO = inventoryConverter.converterDO(request);
inventoryAllotDetailDO.initCommon();
int count = inventoryAllotDetailMapper.insert(inventoryAllotDetailDO);
if (count <= 0) {
throw new BaseBizException(InventoryExceptionCode.INVENTORY_SQL);
}
}
...
}
步骤三:获取库存分桶对应的配置模板,开始初始化分桶库存。由于是第⼀次初始化库存,所以库存的本地缓存默认是空的。
通过InventoryBucketCache的getBucketLocalCache()方法获取本地缓存
通过inventoryRepository的getInventoryBucketConfig()方法获取分桶配置模版
通过initInventoryBucket()方法执行初始化分桶库存
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private InventoryBucketCache inventoryBucketCache;
...
//将库存数据写入缓存
private void inventoryBucketCache(InventorBucketRequest request) {
//获取中心桶库存的key
String key = buildSellerInventoryKey(request.getSellerId(), request.getSkuId());
//1.先验证是否已缓存分桶元数据信息,先查本地缓存,再查远程缓存
BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());
//缓存不存在,则进行初始化
if (Objects.isNull(bucketLocalCache)) {
//2.获取库存分桶的配置模板
InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId());
//初始化分桶库存
initInventoryBucket(request, inventoryBucketConfig);
} else {
//3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存
Integer residueNum = tairCache.incr(key, request.getInventoryNum());
//4.尝试将库存分配到新的分桶上(注意,先将中心桶的库存加上去)
InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request);
//5.构建新的分桶信息数据信息并写入
writeBucketCache(onlineRequest, residueNum);
}
}
...
}
//库存分桶配置
@Data
@TableName("inventory_bucket_config")
public class InventoryBucketConfigDO extends BaseEntity {
//分桶数量
private Integer bucketNum;
//最大库存深度
private Integer maxDepthNum;
//最小库存深度
private Integer minDepthNum;
//分桶下线阈值
private Integer thresholdValue;
//分桶下线阈值动态比例
private Integer thresholdProportion;
//回源比例,从1-100设定比例
private Integer backSourceProportion;
//回源步长,桶扩容的时候默认每次分配的库存大小
private Integer backSourceStep;
//模板名称
private String templateName;
//是否默认模板,只允许一个,1为默认模板
private Integer isDefault;
}
步骤四:初始化分桶库存时需要计算出本次库存入库的具体分桶信息。也就是执行 initInventoryBucket()方法:⾸先获取本次需要入库的库存数量,然后根据配置的每个分桶的最⼤(小)分配数量 * 分桶数量,得出配置的所有分桶的最大(小)库存容量。接着根据需要入库数与最大库存数,获取本次最多可放入分桶的库存数量。
情况一:如果本次需要入库的库存数量大于配置的所有分桶的最大库存容量,那么说明⼀次分桶还⽆法分配完全部的库存。此时会面向所有分桶进行库存分配,分桶数量就是配置的分桶数量,并且每个分桶所分配的库存数就是配置的最⼤库存容量。
情况二:如果本次需要入库的库存数量小于配置的所有分桶的最大库存容量,但是大于配置的所有分桶的最小库存容量。则根据配置的分桶数,计算每个分桶具体要分配多少库存即可。
情况三:如果本次需要入库的库存数量小于配置的所有分桶的最大库存容量,并且小于配置的所有分桶的最小库存容量。则计算此时要分配的分桶数 = 本次库存入库数 / 每个分桶的最小库存容量。
情况三(一):如果得出的要分配的分桶数大于 0,那么就根据计算出的分桶数,重新计算每个分桶应该分配多少库存,也就是进行分配大于或等于最小库存容量的库存。
情况三(二):如果得出的要分配的分桶数等于 0,那么此时只需要一个分桶即可。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//初始化分桶库存
private void initInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {
//计算出分桶的数据信息
BucketLocalCache bucketLocalCache = calcInventoryBucket(request, inventoryBucketConfig);
//写入远程缓存以及本地缓存
writeCache(bucketLocalCache);
}
//计算出当前库存入库的具体分桶信息
private BucketLocalCache calcInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {
//构建出当前分桶的数据模型,库存数量大于最大容量时,通过最大容量来构建模型
return buildBucketInfo(request, inventoryBucketConfig);
}
//构建分桶的数据模型
//@param request 请求入参
//@param inventoryBucketConfig 分桶配置信息
private BucketLocalCache buildBucketInfo(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {
//分桶配置模版中默认的分桶数量
Integer bucketNum = inventoryBucketConfig.getBucketNum();
//获取本次需要入库的库存数量
Integer inventorNum = request.getInventoryNum();
//配置模版中所有分桶的最大库存容量
Integer maxBucketNum = bucketNum * inventoryBucketConfig.getMaxDepthNum();
//配置模版中所有分桶的最小库存容量
//如果需要放入分桶的库存数量低于这个值,那么只会分配给部分分桶,此时就需要重新计算分桶
Integer minBucketNum = bucketNum * inventoryBucketConfig.getMinDepthNum();
//本次最多可以放入分桶的库存数量
int countBucketNum = Math.min(inventorNum, maxBucketNum);
//当库存数量小于最小分桶深度 * 分桶数量,就需要减少分配的分桶数
//此时要分配的分桶数量 bucketNum = 本次库存入库的数量 / 每个分桶的最小库存容量
if (minBucketNum > countBucketNum) {
bucketNum = countBucketNum / inventoryBucketConfig.getMinDepthNum();
//如果库存数量不足一个分桶的最小深度,但是大于0,则上线一个分桶
if (bucketNum == 0 && countBucketNum % inventoryBucketConfig.getMinDepthNum() > 0) {
bucketNum++;
}
}
//获取每个分桶分配的库存数量
Integer bucketInventorNum = countBucketNum / bucketNum;
//剩余库存数量,可能为0或者大于0,补到最后一个分桶上
Integer residueNum = countBucketNum - bucketInventorNum * bucketNum;
//构建缓存数据模型时,以卖家ID + 商品skuId为唯一标识
String key = request.getSellerId() + request.getSkuId();
//构建缓存数据模型
BucketLocalCache bucketLocalCache = buildBucketCache(key, bucketNum, bucketInventorNum, residueNum, inventoryBucketConfig);
//标记到具体的数据上
bucketLocalCache.setSellerId(request.getSellerId());
bucketLocalCache.setSkuId(request.getSkuId());
bucketLocalCache.setInventoryNum(inventorNum);
//中心桶剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量
bucketLocalCache.setResidueNum(inventorNum - countBucketNum);
bucketLocalCache.setInventoryBucketConfig(inventoryBucketConfig);
return bucketLocalCache;
}
...
}
步骤五:根据计算好的分桶数 + 每个分桶分配的库存数构建缓存数据模型
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//构建缓存模型
//@param bucketNum 分桶数量
//@param inventorNum 分桶分配的库存数量
//@param residueNum 剩余的未分配均匀的库存
//@param inventoryBucketConfig 分桶配置信息
private BucketLocalCache buildBucketCache(String key, Integer bucketNum, Integer inventorNum, Integer residueNum, InventoryBucketConfigDO inventoryBucketConfig) {
BucketLocalCache bucketLocalCache = new BucketLocalCache();
//先获取得到这个模板配置的对应可分槽位的均匀桶列表
List<String> bucketNoList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum());
List<BucketCacheBO> bucketCacheBOList = new ArrayList<>(bucketNum);
List<BucketCacheBO> undistributedList = new ArrayList<>(bucketNum);
//构建出多个分桶对象
for (int i = 0; i < bucketNum; i++) {
//生成对应的分桶编号,方便定义到具体的分桶上
BucketCacheBO bucketCache = new BucketCacheBO();
String bucketNo = bucketNoList.get(i);
bucketCache.setBucketNo(bucketNo);
//最后一个分桶,分配剩余未除尽的库存 + 平均库存
if (i == bucketNum - 1) {
bucketCache.setBucketNum(inventorNum + residueNum);
} else {
bucketCache.setBucketNum(inventorNum);
}
bucketCacheBOList.add(bucketCache);
}
//生成的分桶对象超过实际可分配的分桶对象,保留这批多余的分桶模型为不可用分桶,后续分桶上线可以选择使用
if (bucketNoList.size() > bucketNum) {
for (int i = bucketNum; i < bucketNoList.size(); i++) {
BucketCacheBO bucketCache = new BucketCacheBO();
bucketCache.setBucketNo(bucketNoList.get(i));
undistributedList.add(bucketCache);
}
}
//设置可用的分桶缓存列表
bucketLocalCache.setAvailableList(bucketCacheBOList);
//设置不可用或者已下线的分桶缓存列表
bucketLocalCache.setUndistributedList(undistributedList);
return bucketLocalCache;
}
...
}
//库存工具方法
public class InventorBucketUtil {
private static final int MAX_SIZE = 100000;
//生成对应的槽位key
//@param key 卖家Id + 商品skuId
//@param bucketNum 分桶配置数量
//@return 预先保留的槽位集合
public static List<String> createBucketNoList(String key, Integer bucketNum) {
Map<Long, String> cacheKey = new HashMap<>(bucketNum);
//bucketNoList用来存放每个桶对应的hashKey
List<String> bucketNoList = new ArrayList<>(bucketNum);
for (int i = 1; i <= MAX_SIZE; i++) {
String serialNum = String.format("%06d", i);
//卖家ID + 商品SKU ID + 序号
String hashKey = key + serialNum;
//一致性哈希算法murmur
long hash = HashUtil.murMurHash(hashKey.getBytes());
//对分桶数量进行取模运算
long c = (hash %= bucketNum);
//确保被选中的hashKey都能哈希到不同的分桶
if (cacheKey.containsKey(c)) {
continue;
}
cacheKey.put(c, hashKey);
bucketNoList.add(hashKey);
if (cacheKey.size() >= bucketNum) {
break;
}
}
return bucketNoList;
}
}
步骤六:计算完分桶的数据信息后则将这些信息写入远程缓存和本地缓存
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private InventoryBucketCache inventoryBucketCache;
@Resource
private TairCache tairCache;
...
//初始化分桶库存
private void initInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {
//计算出分桶的数据信息
BucketLocalCache bucketLocalCache = calcInventoryBucket(request, inventoryBucketConfig);
//写入远程缓存以及本地缓存
writeCache(bucketLocalCache);
}
//将数据写入远程缓存以及本地缓存
private void writeCache(BucketLocalCache bucketLocalCache) {
//卖家ID + 商品skuId标识
String key = bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();
//1.写入中心桶的库存信息:中心桶存放的是剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量
log.info("中心桶中存放的是剩余库存:{}", bucketLocalCache.getResidueNum());
tairCache.set(TairInventoryConstant.SELLER_INVENTORY_PREFIX + key, bucketLocalCache.getResidueNum().toString(), 0);
//2.写入数据到对应的缓存上,计算出的分桶库存写入Tair上的分桶缓存
List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
for (BucketCacheBO bucketCacheBO : availableList) {
log.info("bucketNo: {}, inventoryNum: {}", bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum());
tairCache.set(bucketCacheBO.getBucketNo(), JSONObject.toJSONString(bucketCacheBO.getBucketNum()), 0);
}
//3.将数据存储到本地缓存列表
inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);
//4.维护分桶的元数据信息到缓存上
tairCache.set(TairInventoryConstant.SELLER_BUCKET_PREFIX + key, JSONObject.toJSONString(bucketLocalCache), 0);
log.info("元数据信息: {}", JSONObject.toJSONString(bucketLocalCache));
}
...
}
@Component
@Data
public class InventoryBucketCache {
//本地缓存
@Autowired
private Cache cache;
@Resource
private TairCache tairCache;
//本地存储分桶元数据信息
public void setBucketLocalCache(String bucketKey, BucketLocalCache bucketLocalCache) {
log.info("local cache set key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));
cache.put(bucketKey, bucketLocalCache);
}
...
}
@Component
public class TairCache {
private JedisPool jedisPool;
public TairCache(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
public Jedis getJedis() {
return jedisPool.getResource();
}
public TairString createTairString(Jedis jedis) {
return new TairString(jedis);
}
...
//缓存存储
public Boolean set(String key, String value, int seconds) {
log.info("tairString set key:{}, value:{}, seconds:{}", key, value, seconds);
try (Jedis jedis = getJedis()) {
TairString tairString = createTairString(jedis);
String result;
if (seconds > 0) {
result = tairString.exset(key, value, new ExsetParams().ex(seconds));
} else {
result = tairString.exset(key, value);
}
return "OK".equals(result);
}
}
...
}
(3)场景二增加库存
步骤一:检验入参必填项
步骤二:插入库存入库的记录信息
步骤三:缓存已存在,先把库存加到中心桶上
步骤四:构建新的分桶元数据信息并写入
步骤五:获取本地存储的分桶元数据信息
步骤六:获取当前可上线的分桶列表信息以及具体上线库存
步骤七:如果当前可上线分桶列表信息不空则构建新的分桶元数据模型
步骤八:写入数据到远程缓存中并更新本地缓存的分桶元数据信息
步骤一:检验入参必填项
步骤二:插入库存入库的记录信息。由于申请的库存业务编号是一个唯一 key,所以可以避免重复请求。即会校验库存单号是否已经存在,保证⼀次库存变更⾏为只执⾏⼀次。
步骤三:缓存已存在,先把库存加到中心桶上
步骤四:构建新的分桶元数据信息并写入
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private InventoryBucketCache inventoryBucketCache;
...
//将库存数据写入缓存
private void inventoryBucketCache(InventorBucketRequest request) {
//获取中心桶库存的key
String key = buildSellerInventoryKey(request.getSellerId(), request.getSkuId());
//1.先验证是否已缓存分桶元数据信息,先查本地缓存,再查远程缓存
BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());
//缓存不存在,则进行初始化
if (Objects.isNull(bucketLocalCache)) {
//2.获取库存分桶的配置模板
InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId());
//初始化分桶库存
initInventoryBucket(request, inventoryBucketConfig);
} else {
//3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存
Integer residueNum = tairCache.incr(key, request.getInventoryNum());
InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request);
//4.构建新的分桶元数据信息并写入
writeBucketCache(onlineRequest, residueNum);
}
}
//构建新的分桶元数据信息
//@param request 分桶上线对象
//@param residueNum 中心桶剩余库存
private void writeBucketCache(InventorOnlineRequest request, Integer residueNum) {
String key = request.getSellerId() + request.getSkuId();
//5.获取本地存储的分桶元数据信息
BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);
if (!Objects.isNull(bucketLocalCache)) {
//6.获取当前可上线的分桶列表信息以及具体上线库存
List<BucketCacheBO> bucketCacheBOList = buildBucketList(
request.getBucketNoList(),
bucketLocalCache.getAvailableList(),
bucketLocalCache.getUndistributedList(),
bucketLocalCache.getInventoryBucketConfig(),
residueNum
);
//当前可上线的分桶为空,直接返回
if (CollectionUtils.isEmpty(bucketCacheBOList)) {
return;
}
//7.构建返回新的分桶元数据模型返回
buildBucketLocalCache(bucketLocalCache, bucketCacheBOList, residueNum);
//8.写入数据到远程缓存中并更新本地缓存的分桶元数据信息
writeBucketLocalCache(bucketLocalCache, bucketCacheBOList);
}
}
...
}
步骤五:获取本地存储的分桶元数据信息
@Component
@Data
public class InventoryBucketCache {
//本地缓存
@Autowired
private Cache cache;
@Autowired
private TairCache tairCache;
//获取本地存储的分桶元数据信息
public BucketLocalCache getBucketLocalCache(String bucketKey) {
//先查本地缓存
BucketLocalCache bucketLocalCache = (BucketLocalCache) cache.getIfPresent(bucketKey);
log.info("local cache get key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));
if (Objects.isNull(bucketLocalCache)) {
//再查远程缓存
synchronized (bucketKey.intern()) {
String bucketCache = tairCache.get(TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketKey);
if (!StringUtils.isEmpty(bucketCache)) {
bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);
cache.put(bucketKey, bucketLocalCache);
}
}
}
return bucketLocalCache;
}
}
步骤六:获取当前可上线的分桶列表信息以及具体上线库存
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//获取当前可上线的分桶列表信息以及具体上线库存
//@param bucketNoList 上线分桶编号列表
//@param availableList 上线正在使用的分桶编号列表
//@param undistributedList 下线或者未使用的分桶编号列表
//@param inventoryBucketConfigDO 当前分桶的配置模板信息
//@param residueNum 中心桶的剩余可分配库存
//@return 当前可上线的分桶列表以及具体分桶库存
private List<BucketCacheBO> buildBucketList(List<String> bucketNoList, List<BucketCacheBO> availableList, List<BucketCacheBO> undistributedList, InventoryBucketConfigDO inventoryBucketConfigDO, Integer residueNum) {
//1.如果入参选择了上线的分桶编号列表,则从缓存中配置的未使用分桶列表进行比对处理
List<String> bucketCacheList = null;
if (!CollectionUtils.isEmpty(bucketNoList)) {
Map<String, BucketCacheBO> bucketCacheMap = undistributedList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));
//过滤返回可用的分桶编号
bucketCacheList = bucketNoList.stream().filter(bucketNo -> bucketCacheMap.containsKey(bucketNo)).collect(Collectors.toList());
} else {
//直接返回:下线的不可用分桶列表
bucketCacheList = undistributedList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());
}
//可上线的分桶列表为空
if (CollectionUtils.isEmpty(bucketCacheList)) {
return Lists.newArrayList();
}
//2.根据中心桶的可分配库存,处理返回具体上线的分桶配置信息
return calcOnlineBucket(availableList, bucketCacheList, residueNum, inventoryBucketConfigDO);
}
//构建上线的分桶库存模型
//@param availableList 上线正在使用的分桶编号列表
//@param bucketCacheList 预上线的分桶列表
//@param residueNum 中心桶剩余库存容量
//@param inventoryBucketConfig 当前分桶配置信息
private List<BucketCacheBO> calcOnlineBucket(List<BucketCacheBO> availableList, List<String> bucketCacheList, Integer residueNum, InventoryBucketConfigDO inventoryBucketConfig) {
List<BucketCacheBO> bucketCacheBOList = new ArrayList<>();
//获取已上线分桶 + 准备上线的分桶数量
Integer sumBucketSize = availableList.size() + bucketCacheList.size();
//获取得到已上线的分桶分配库存深度总和
int sumBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();
//获取总的库存深度 + 中心桶的库存,得到平均的分桶实际可分配库存深度
Integer averageNum = (sumBucketNum + residueNum) / sumBucketSize;
//当前准备分桶上线的数量(一般都是未使用的下线分桶)
Integer bucketNum = bucketCacheList.size();
//计算一下平均分桶的库存 是否小于最小深度,如小于则以最小深度为准进行分桶
Integer minBucketNum = bucketNum * averageNum;
//当库存数量小于最小分桶深度*分桶数量,减少可分配的分桶数量, 最后一个分桶分配剩余的全部库存(避免少量的库存分桶直接触发阈值下线)
if (minBucketNum > residueNum) {
bucketNum = residueNum / inventoryBucketConfig.getMinDepthNum();
averageNum = inventoryBucketConfig.getMinDepthNum();
}
//如果库存深度超过最大库存深度,则只存放最大深度
if (averageNum > inventoryBucketConfig.getMaxDepthNum()) {
averageNum = inventoryBucketConfig.getMinDepthNum();
}
//当前没有准备分桶上线的数量
if (bucketNum == 0) {
return Lists.newArrayList();
}
//开始填充每个分桶的具体上线库存
for (int i = 0; i < bucketNum; i++) {
BucketCacheBO bucketCache = new BucketCacheBO();
//这里上线的分桶数量不会超过实际能够上线的分桶数量,所以bucketCacheList.get(i)不会数组越界
bucketCache.setBucketNo(bucketCacheList.get(i));
bucketCache.setBucketNum(averageNum);
bucketCache.setAllotNum(bucketCache.getBucketNum());
bucketCacheBOList.add(bucketCache);
}
return bucketCacheBOList;
}
...
}
步骤七:如果当前可上线分桶列表信息不空则构建新的分桶元数据模型
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//构建新的分桶元数据模型
//@param bucketLocalCache 本地分桶元数据信息
//@param bucketCacheBOList 上线的分桶数据列表
//@param residueNum 中心桶剩余库存
private void buildBucketLocalCache(BucketLocalCache bucketLocalCache, List<BucketCacheBO> bucketCacheBOList, Integer residueNum) {
//获取本次上线的库存信息
Integer inventoryNum = 0;
for (BucketCacheBO bucketCacheBO : bucketCacheBOList) {
inventoryNum = inventoryNum + bucketCacheBO.getBucketNum();
}
//填充中心桶剩余库存
residueNum = residueNum - inventoryNum;
bucketLocalCache.setResidueNum(residueNum);
//添加新上线的分桶列表
bucketLocalCache.getAvailableList().addAll(bucketCacheBOList);
Map<String, BucketCacheBO> bucketCacheMap = bucketCacheBOList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));
List<BucketCacheBO> undistributedList = bucketLocalCache.getUndistributedList().stream().filter(bucketCacheBO ->
//在上线的分桶列表,需要移除掉
!bucketCacheMap.containsKey(bucketCacheBO.getBucketNo())).collect(Collectors.toList()
);
//从不可用的分桶列表重移除
bucketLocalCache.setUndistributedList(undistributedList);
}
...
}
步骤八:写入数据到远程缓存中并更新本地缓存的分桶元数据信息
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//写入数据到远程缓存中并更新本地缓存的分桶元数据信息
//@param bucketLocalCache 分桶元数据信息
//@param bucketCacheBOList 上线的分桶信息
private void writeBucketLocalCache(BucketLocalCache bucketLocalCache, List<BucketCacheBO> bucketCacheBOList) {
String key = bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();
//中心桶被扣减掉的库存(上线的分桶库存总和)
Integer centerInventoryNum = bucketCacheBOList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();
//中心桶的库存扣减信息
tairCache.decr(TairInventoryConstant.SELLER_INVENTORY_PREFIX + key, centerInventoryNum);
//1.先更新分桶的上线缓存处理操作
for (BucketCacheBO bucketCacheBO : bucketCacheBOList) {
tairCache.incr(bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum());
}
//2.处理分桶列表的更新,待中心桶库存以及上线分桶库存更新完成,更新远程和本地的分桶列表
inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);
tairCache.set(TairInventoryConstant.SELLER_BUCKET_PREFIX + key, JSONObject.toJSONString(bucketLocalCache), 0);
}
...
}
4.商品库存分桶缓存初始化请求处理
首先通过 Guava Cache 清除本地的缓存数据,然后通过 Tair Cache 清除 Tair 的数据,最后传入请求参数模型到 inventorBucket()方法来初始化库存。注意:请求参数模型中会带上本次的库存业务单号。
@RestController
@RequestMapping("/product/inventory")
public class InventoryController {
@Autowired
private InventoryBucketService inventoryBucketService;
@Autowired
private InventoryBucketCache cache;
@Resource
private TairCache tairCache;
//初始化库存
@RequestMapping("/init")
public void inventoryInit(@RequestBody InventorBucketRequest request) {
//1.清除本地缓存数据,cache.getCache()获取的是Guava Cache
cache.getCache().invalidateAll();
//2.清除tair中的数据,此时扫描卖家ID+SKU的ID的key会比较耗时
Set<String> keys = tairCache.getJedis().keys("*" + request.getSellerId() + request.getSkuId() + "*");
if (!CollectionUtils.isEmpty(keys)) {
tairCache.mdelete(Lists.newArrayList(keys));
}
//3.这里模拟指定本次的库存业务单号,实际接口需要外部传入
request.setInventorCode(SnowflakeIdWorker.getCode());
//4.初始化库存信息
inventoryBucketService.inventorBucket(request);
}
...
}
//商品库存入库
@Data
public class InventorBucketRequest implements Serializable {
//商品skuID
private String skuId;
//卖家ID
private String sellerId;
//库存变更数量
private Integer inventoryNum;
//库存变更申请业务单号
private String inventorCode;
//选择分配的分桶模板ID(非必传,不传使用默认)
private Long templateId;
}
@Component
public class TairCache {
private JedisPool jedisPool;
public TairCache(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
public Jedis getJedis() {
return jedisPool.getResource();
}
...
}
5.商品库存分桶缓存初始化的加分布式锁处理 + 插入库存变更记录
执行 InventoryBucketService 的 inventorBucket()方法来初始化分桶缓存。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private TairLock tairLock;
@Resource
private InventoryRepository inventoryRepository;
...
//商品库存入桶分配
@Override
@Transactional(rollbackFor = Exception.class)
public void inventorBucket(InventorBucketRequest request) {
//1.验证入参必填
checkInventorParams(request);
//锁key = 卖家ID + SKU的ID
String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + request.getSellerId() + request.getSkuId();
String value = SnowflakeIdWorker.getCode();
//注意这里需要锁定中心桶库存
boolean lock = tairLock.tryLock(key, value);
//分配库存时,这个卖家的sku是不允许其他相关操作的
if (lock) {
try {
//2.插入库存入库的记录信息
//由于申请的库存业务编号是一个唯一key,所以可以避免重复请求
//也就是会校验库存单号是否已经存在了,保证⼀次库存变更⾏为只能执⾏⼀次
inventoryRepository.saveInventoryAllotDetail(request);
//3.将库存数据写入缓存
inventoryBucketCache(request);
} catch (Exception e) {
log.error(e.getStackTrace().toString());
} finally {
tairLock.unlock(key, value);
}
} else {
throw new BaseBizException("请求繁忙,稍后再重试!");
}
}
...
}
@Component
public class TairLock {
//锁过期时间10s
private static final int LOCK_EXPIRE_TIME = 10;
private static final int lockWaitTimeOut = 10000;
private static final int retryTime = 100;
private TairCache tairCache;
public TairLock(TairCache tairCache) {
this.tairCache = tairCache;
}
//加锁
public boolean tryLock(String cacheKey, String randomValue) {
if (StringUtils.isEmpty(cacheKey)) {
return true;
}
int waitTimeOut = lockWaitTimeOut;
try {
while (waitTimeOut > 0) {
//如果put成功说明加锁成功
if (this.lock(cacheKey, randomValue)) {
return true;
}
waitTimeOut -= retryTime;
sleep(retryTime);
}
//程序走到这里说明锁等待一定的时间,所以这里释放这个锁
unlock(cacheKey, randomValue);
} catch (Exception ex) {
log.error("getCacheLock exception key:{}", cacheKey, ex);
}
return true;
}
public boolean lock(String resourceKey, String randomValue) {
try (Jedis jedis = tairCache.getJedis()) {
String result = jedis.set(resourceKey, randomValue, new SetParams().nx().ex(LOCK_EXPIRE_TIME));
return "OK".equals(result);
}
}
//解锁
public boolean unlock(String resourceKey, String randomValue) {
try (Jedis jedis = tairCache.getJedis()) {
//CAD的意思是Compare And Delete,CAS的意思是Compare And Swap
jedis.getClient().sendCommand(TairCommand.CAD, resourceKey, randomValue);
Long ret = jedis.getClient().getIntegerReply();
return 1 == ret;
}
}
...
}
@Repository
public class InventoryRepository {
...
//存储每次库存入库的申请记录
//校验库存单号是否已经存在了(⼀次库存变更⾏为只能执⾏⼀次)
public void saveInventoryAllotDetail(InventorBucketRequest request) {
InventoryAllotDetailDO inventoryAllotDetailDO = inventoryConverter.converterDO(request);
inventoryAllotDetailDO.initCommon();
int count = inventoryAllotDetailMapper.insert(inventoryAllotDetailDO);
if (count <= 0) {
throw new BaseBizException(InventoryExceptionCode.INVENTORY_SQL);
}
}
...
}
6.商品库存分桶元数据本地 + 远程缓存查询
执行 inventoryBucketCache()方法将缓存写入分桶时,首先会查询是否已经存在了库存缓存,即先尝试获取分桶元数据。分桶元数据其实就是分桶的数据信息 BucketLocalCache,分桶元数据中包含了商品库存应该如何构建分桶缓存的详细信息。查询缓存时,会先查本地缓存 Guava Cache,再查远程缓存 Tair Cache。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private InventoryBucketCache inventoryBucketCache;
...
//操作数据入缓存
private void inventoryBucketCache(InventorBucketRequest request) {
String key = TairInventoryConstant.SELLER_INVENTORY_PREFIX + request.getSellerId() + request.getSkuId();
//1.先验证是否已缓存分桶元数据信息,先查本地缓存,再查远程缓存
BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());
//缓存不存在,则进行初始化
if (Objects.isNull(bucketLocalCache)) {
//2.获取库存分桶的配置模板
InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId());
//初始化分桶库存
initInventoryBucket(request, inventoryBucketConfig);
} else {
//3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存
Integer residueNum = tairCache.incr(key, request.getInventoryNum());
//4.尝试将库存分配到新的分桶上(注意,先将中心桶的库存加上去)
InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request);
//5.构建新的分桶元数据信息并写入
//分桶元数据中包含了商品库存应该如何构建分桶缓存的详细信息
writeBucketCache(onlineRequest, residueNum);
}
}
...
}
@Component
@Data
public class InventoryBucketCache {
//本地缓存
@Autowired
private Cache cache;
@Autowired
private TairCache tairCache;
...
//获取本地的分桶元数据信息
public BucketLocalCache getBucketLocalCache(String bucketKey) {
//先查本地缓存
BucketLocalCache bucketLocalCache = (BucketLocalCache) cache.getIfPresent(bucketKey);
log.info("local cache get key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));
if (Objects.isNull(bucketLocalCache)) {
//再查远程缓存
synchronized (bucketKey.intern()) {
String bucketCache = tairCache.get(TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketKey);
if (!StringUtils.isEmpty(bucketCache)) {
bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);
cache.put(bucketKey, bucketLocalCache);
}
}
}
return bucketLocalCache;
}
}
//本地分桶缓存相关信息 = 分桶元数据
//分桶元数据中包含了商品库存应该如何构建分桶缓存的详细信息
@Data
public class BucketLocalCache {
//商品skuID
private String skuId;
//卖家ID
private String sellerId;
//中心桶库存
private Integer inventoryNum;
//中心桶剩余库存
private Integer residueNum;
//本地元数据对应的版本号
private String version;
//操作类型,0上线,1下线
private Integer operationType;
//当前分桶的配置信息
private InventoryBucketConfigDO inventoryBucketConfig;
//分桶明细缓存key
private List<String> bucketDetailKeyList;
//可用分桶缓存列表
private List<BucketCacheBO> availableList;
//未分配或者已下线的分桶缓存列表,不可用的分桶缓存列表
private List<BucketCacheBO> undistributedList;
//默认的缓存对象
public String getBucketLocalKey() {
return sellerId + skuId;
}
}
//分桶缓存相关信息
@Data
public class BucketCacheBO {
//分桶编号
private String bucketNo;
//分桶分配的库存
private Integer bucketNum = 0;
//分桶分配的总库存
private Integer allotNum = 0;
public BucketCacheBO() {
}
public BucketCacheBO(String bucketNo) {
this.bucketNo = bucketNo;
}
}
7.商品库存动态分桶算法实现
⾸先获取本次需要入库的库存数量,然后根据配置的每个分桶最⼤(小)分配数量 * 分桶数量得出配置的所有分桶的最大(小)库存容量,接着根据需要入库数与最大库存数获取本次最多可放入分桶的库存数量。
情况一:如果本次需要入库的库存数量大于配置的所有分桶的最大库存容量,那么说明⼀次分桶还⽆法分配完全部的库存。此时会面向所有分桶进行库存分配,分桶数量就是配置的分桶数量,并且每个分桶所分配的库存数就是配置的最⼤库存容量。
情况二:如果本次需要入库的库存数量小于配置的所有分桶的最大库存容量,但是大于配置的所有分桶的最小库存容量,则根据配置的分桶数,计算每个分桶具体要分配多少库存即可。
情况三:如果本次需要入库的库存数量小于配置的所有分桶的最小库存容量,并且小于配置的所有分桶的最小库存容量,则计算此时要分配的分桶数 = 本次库存入库数 / 每个分桶的最小库存容量。
情况三(一):如果得出的要分配的分桶数大于 0,那么就根据计算出的分桶数,重新计算每个分桶应该分配多少库存,也就是进行分配大于或等于最小库存容量的库存。
情况三(二):如果得出的要分配的分桶数等于 0,那么此时只需要一个分桶即可。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//初始化分桶库存
private void initInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {
//计算出分桶的数据信息
BucketLocalCache bucketLocalCache = calcInventoryBucket(request, inventoryBucketConfig);
//写入远程缓存以及本地缓存
writeCache(bucketLocalCache);
}
//计算出当前库存入库的具体分桶信息
private BucketLocalCache calcInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {
//构建出当前分桶的数据模型,库存数量大于最大容量时,通过最大容量来构建模型
return buildBucketInfo(request, inventoryBucketConfig);
}
//构建分桶的数据模型
//@param request 请求入参
//@param inventoryBucketConfig 分桶配置信息
private BucketLocalCache buildBucketInfo(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {
//分桶配置模版中默认的分桶数量
Integer bucketNum = inventoryBucketConfig.getBucketNum();
//获取本次需要入库的库存数量
Integer inventorNum = request.getInventoryNum();
//配置模版中所有分桶的最大库存容量
Integer maxBucketNum = bucketNum * inventoryBucketConfig.getMaxDepthNum();
//配置模版中所有分桶的最小库存容量
//如果需要放入分桶的库存数量低于这个值,那么只会分配给部分分桶,此时就需要重新计算分桶
Integer minBucketNum = bucketNum * inventoryBucketConfig.getMinDepthNum();
//本次最多可以放入分桶的库存数量
int countBucketNum = Math.min(inventorNum, maxBucketNum);
//当库存数量小于最小分桶深度 * 分桶数量,就需要减少分配的分桶数
//此时要分配的分桶数量 bucketNum = 本次库存入库的数量 / 每个分桶的最小库存容量
if (minBucketNum > countBucketNum) {
bucketNum = countBucketNum / inventoryBucketConfig.getMinDepthNum();
//如果库存数量不足一个分桶的最小深度,但是大于0,则上线一个分桶
if (bucketNum == 0 && countBucketNum % inventoryBucketConfig.getMinDepthNum() > 0) {
bucketNum++;
}
}
//获取每个分桶分配的库存数量
Integer bucketInventorNum = countBucketNum / bucketNum;
//剩余库存数量,可能为0或者大于0,补到最后一个分桶上
Integer residueNum = countBucketNum - bucketInventorNum * bucketNum;
//构建缓存数据模型时,以卖家ID + 商品skuId为唯一标识
String key = request.getSellerId() + request.getSkuId();
//构建缓存数据模型
BucketLocalCache bucketLocalCache = buildBucketCache(key, bucketNum, bucketInventorNum, residueNum, inventoryBucketConfig);
//标记到具体的数据上
bucketLocalCache.setSellerId(request.getSellerId());
bucketLocalCache.setSkuId(request.getSkuId());
bucketLocalCache.setInventoryNum(inventorNum);
//中心桶剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量
bucketLocalCache.setResidueNum(inventorNum - countBucketNum);
bucketLocalCache.setInventoryBucketConfig(inventoryBucketConfig);
return bucketLocalCache;
}
...
}
8.基于分桶算法结果构建库存分桶元数据
库存分桶元数据 BucketLocalCache 中包含了商品库存应该如何构建分桶缓存的详细信息。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//构建缓存模型
//@param bucketNum 分桶数量
//@param inventorNum 分桶分配的库存数量
//@param residueNum 剩余的未分配均匀的库存
//@param inventoryBucketConfig 分桶配置信息
private BucketLocalCache buildBucketCache(String key, Integer bucketNum, Integer inventorNum, Integer residueNum, InventoryBucketConfigDO inventoryBucketConfig) {
BucketLocalCache bucketLocalCache = new BucketLocalCache();
//先获取得到这个模板配置的对应可分槽位的均匀桶列表
List<String> bucketNoList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum());
List<BucketCacheBO> bucketCacheBOList = new ArrayList<>(bucketNum);
List<BucketCacheBO> undistributedList = new ArrayList<>(bucketNum);
//构建出多个分桶对象
for (int i = 0; i < bucketNum; i++) {
//生成对应的分桶编号,方便定义到具体的分桶上
BucketCacheBO bucketCache = new BucketCacheBO();
String bucketNo = bucketNoList.get(i);
bucketCache.setBucketNo(bucketNo);
//最后一个分桶,分配剩余未除尽的库存 + 平均库存
if (i == bucketNum - 1) {
bucketCache.setBucketNum(inventorNum + residueNum);
} else {
bucketCache.setBucketNum(inventorNum);
}
bucketCacheBOList.add(bucketCache);
}
//生成的分桶对象超过实际可分配的分桶对象,保留这批多余的分桶模型为不可用分桶,后续分桶上线可以选择使用
if (bucketNoList.size() > bucketNum) {
for (int i = bucketNum; i < bucketNoList.size(); i++) {
BucketCacheBO bucketCache = new BucketCacheBO();
bucketCache.setBucketNo(bucketNoList.get(i));
undistributedList.add(bucketCache);
}
}
//设置可用的分桶缓存列表
bucketLocalCache.setAvailableList(bucketCacheBOList);
//设置不可用或者已下线的分桶缓存列表
bucketLocalCache.setUndistributedList(undistributedList);
return bucketLocalCache;
}
...
}
//库存工具方法
public class InventorBucketUtil {
private static final int MAX_SIZE = 100000;
//生成对应的槽位key
//@param key 卖家Id + 商品skuId
//@param bucketNum 分桶配置数量
//@return 预先保留的槽位集合
public static List<String> createBucketNoList(String key, Integer bucketNum) {
Map<Long, String> cacheKey = new HashMap<>(bucketNum);
//bucketNoList用来存放每个桶对应的hashKey
List<String> bucketNoList = new ArrayList<>(bucketNum);
for (int i = 1; i <= MAX_SIZE; i++) {
String serialNum = String.format("%06d", i);
//卖家ID + 商品SKU ID + 序号
String hashKey = key + serialNum;
//一致性哈希算法murmur
long hash = HashUtil.murMurHash(hashKey.getBytes());
//对分桶数量进行取模运算
long c = (hash %= bucketNum);
//确保被选中的hashKey都能哈希到不同的分桶
if (cacheKey.containsKey(c)) {
continue;
}
cacheKey.put(c, hashKey);
bucketNoList.add(hashKey);
if (cacheKey.size() >= bucketNum) {
break;
}
}
return bucketNoList;
}
}
//本地分桶缓存相关信息 = 分桶元数据
//分桶元数据中包含了商品库存应该如何构建分桶缓存的详细信息
@Data
public class BucketLocalCache {
//商品skuID
private String skuId;
//卖家ID
private String sellerId;
//中心桶库存
private Integer inventoryNum;
//中心桶剩余库存
private Integer residueNum;
//本地元数据对应的版本号
private String version;
//操作类型,0上线,1下线
private Integer operationType;
//当前分桶的配置信息
private InventoryBucketConfigDO inventoryBucketConfig;
//分桶明细缓存key
private List<String> bucketDetailKeyList;
//可用分桶缓存列表
private List<BucketCacheBO> availableList;
//未分配或者已下线的分桶缓存列表,不可用的分桶缓存列表
private List<BucketCacheBO> undistributedList;
//默认的缓存对象
public String getBucketLocalKey() {
return sellerId + skuId;
}
}
9.剩余库存写入中心桶缓存 + 分桶库存写入分桶缓存 + 分桶元数据写入本地缓存
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private InventoryBucketCache inventoryBucketCache;
@Resource
private TairCache tairCache;
...
//初始化分桶库存
private void initInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {
//计算出分桶的数据信息
BucketLocalCache bucketLocalCache = calcInventoryBucket(request, inventoryBucketConfig);
//写入远程缓存以及本地缓存
writeCache(bucketLocalCache);
}
//将数据写入远程缓存以及本地缓存
private void writeCache(BucketLocalCache bucketLocalCache) {
//卖家ID + 商品skuId标识
String key = bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();
//1.写入中心桶的库存信息:中心桶存放的是剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量
log.info("中心桶中存放的是剩余库存:{}", bucketLocalCache.getResidueNum());
tairCache.set(TairInventoryConstant.SELLER_INVENTORY_PREFIX + key, bucketLocalCache.getResidueNum().toString(), 0);
//2.写入数据到对应的缓存上,计算出的分桶库存写入Tair上的分桶缓存
List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
for (BucketCacheBO bucketCacheBO : availableList) {
log.info("bucketNo: {}, inventoryNum: {}", bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum());
tairCache.set(bucketCacheBO.getBucketNo(), JSONObject.toJSONString(bucketCacheBO.getBucketNum()), 0);
}
//3.将数据存储到本地缓存列表
inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);
//4.维护分桶的元数据信息到缓存上
tairCache.set(TairInventoryConstant.SELLER_BUCKET_PREFIX + key, JSONObject.toJSONString(bucketLocalCache), 0);
log.info("元数据信息: {}", JSONObject.toJSONString(bucketLocalCache));
}
...
}
@Component
@Data
public class InventoryBucketCache {
//本地缓存
@Autowired
private Cache cache;
@Resource
private TairCache tairCache;
//本地存储分桶元数据信息
public void setBucketLocalCache(String bucketKey, BucketLocalCache bucketLocalCache) {
log.info("local cache set key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));
cache.put(bucketKey, bucketLocalCache);
}
...
}
@Component
public class TairCache {
private JedisPool jedisPool;
public TairCache(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
public Jedis getJedis() {
return jedisPool.getResource();
}
public TairString createTairString(Jedis jedis) {
return new TairString(jedis);
}
...
//缓存存储
public Boolean set(String key, String value, int seconds) {
log.info("tairString set key:{}, value:{}, seconds:{}", key, value, seconds);
try (Jedis jedis = getJedis()) {
TairString tairString = createTairString(jedis);
String result;
if (seconds > 0) {
result = tairString.exset(key, value, new ExsetParams().ex(seconds));
} else {
result = tairString.exset(key, value);
}
return "OK".equals(result);
}
}
...
}
文章转载自:东阳马生架构

电子尖叫食人鱼
还未添加个人签名 2025-04-01 加入
还未添加个人简介
评论