写点什么

商品中心—商品可采可补可售的技术文档

  • 2025-06-12
    福建
  • 本文字数:32600 字

    阅读完需:约 107 分钟

1.可采可补可售业务的数据库建模设计


(1)可采可补可售


可售:配置了卖家组(售卖区)的商品,在该卖家组下的区域是可售状态可补:可售商品,微仓是否可补,指⼤仓向微仓补货可采:可售商品,⼤仓是否可采,指⼤仓采购
复制代码


(2)可采业务表


一.商品与卖家组关系表


CREATE TABLE `sku_seller_relation` (    `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主健',    `item_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'itemId',    `sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'skuId',    `seller_group_id` varchar(50) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家组ID',    `seller_type` int(10) DEFAULT '0' COMMENT '卖家类型(1-⾃营,2-POP)',    `relation_type` tinyint(3) DEFAULT '0' COMMENT '关系类型(1-可售,2-屏蔽)',    `del_flag` tinyint(1) DEFAULT '1' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',    PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='商品与卖家组关系表';
复制代码


二.组套商品与 SKU 关系表


CREATE TABLE `stack_sku_relation` (    `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',    `sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '售卖sku(组套商品)',    `stack_sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '普通商品skuId或原料商品skuId',    `stack_num` int(10) NOT NULL DEFAULT '0' COMMENT '数量',    `channel` tinyint(3) NOT NULL DEFAULT '0' COMMENT '渠道(1-每日⽣鲜、2-美团、3-饿了么、4-淘鲜达、5-招商银⾏)',    `seller_type` int(10) NOT NULL DEFAULT '0' COMMENT '卖家类型(1-⾃营,2-POP)',    `features` varchar(500) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '扩展字段',    `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',    `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更改时间',    PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='组套商品与SKU关系表';
复制代码


三.商品 ITEM 表


CREATE TABLE `item_info` (    `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',    `item_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '商品ID',    `item_name` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '商品名称',    `recommend` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '推荐语',    `item_type` int(10) NOT NULL DEFAULT '0' COMMENT '商品类型',    `channel` tinyint(3) NOT NULL DEFAULT '0' COMMENT '渠道(1-每日⽣鲜、2-美团、3-饿了么、4-淘鲜达、5-招商银⾏)',    `seller_type` int(10) NOT NULL DEFAULT '0' COMMENT '卖家类型(1-⾃营,2-POP)',    `producing_area_id` int(10) NOT NULL DEFAULT '0' COMMENT '产地ID',    `item_status` int(10) NOT NULL DEFAULT '0' COMMENT '商品状态',    `brand_id` int(10) NOT NULL DEFAULT '0' COMMENT '品牌ID',    `shelf_life` int(10) NOT NULL DEFAULT '0' COMMENT '保质期(⼩时)',    `store_condition_type` int(10) NOT NULL DEFAULT '0' COMMENT '存储条件',    `category_id` int(10) NOT NULL DEFAULT '0' COMMENT '末级品类ID',    `first_category_id` int(10) NOT NULL DEFAULT '0' COMMENT '⼀级品类ID',    `second_category_id` int(10) NOT NULL DEFAULT '0' COMMENT '⼆级品类ID',    `third_category_id` int(10) NOT NULL DEFAULT '0' COMMENT '三级品类ID',    `item_specs_value` varchar(2048) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '规格值([{"颜⾊":"⾦⾊", "内存":"128g"},{"颜⾊":"银⾊", "内存":"256g"}])',    `features` varchar(500) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '扩展字段',    `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',    `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',    PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8 COMMENT='商品ITEM表';
复制代码


四.商品 SKU 表


CREATE TABLE `sku_info` (    `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',    `sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'skuId',    `item_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'itemId',    `sku_name` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'sku名称',    `sku_type` int(10) NOT NULL DEFAULT '0' COMMENT 'sku类型(与item保持⼀致)',    `base_price` int(10) NOT NULL DEFAULT '0' COMMENT '商城价格(单位:分)',    `vip_price` int(10) NOT NULL DEFAULT '0' COMMENT '会员价格(单位:分)',    `sku_grade` int(10) NOT NULL DEFAULT '0' COMMENT '商品分级(ABC标签,运营归类处理)',    `channel` tinyint(3) NOT NULL DEFAULT '0' COMMENT '渠道(1-每日⽣鲜、2-美团、3-饿了么、4-淘鲜达、5-招商银⾏)',    `seller_type` int(10) NOT NULL DEFAULT '0' COMMENT '卖家类型(1-⾃营,2-POP)',    `sku_specs_value` varchar(2048) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '规格值({"颜⾊":"⾦⾊", "内存":"128g"})',    `features` varchar(500) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '扩展字段',    `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',    `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更改时间',    PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8 COMMENT='商品SKU表';
复制代码


五.商品⽆需采购配置表


CREATE TABLE `item_procurement_config` (    `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',    `category_id` int(10) NOT NULL DEFAULT '0' COMMENT '品类ID',    `procurement_type` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否需要采购(1-需要,0-⽆需)',    `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',    `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更改时间',    PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8 COMMENT='商品⽆需采购配置表';
复制代码


(3)可补业务表


一.商品 ITEM 表


CREATE TABLE `item_info` (    `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',    `item_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '商品ID',    `item_name` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '商品名称',    `recommend` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '推荐语',    `item_type` int(10) NOT NULL DEFAULT '0' COMMENT '商品类型',    `channel` tinyint(3) NOT NULL DEFAULT '0' COMMENT '渠道(1-每日⽣鲜、2-美团、3-饿了么、4-淘鲜达、5-招商银⾏)',    `seller_type` int(10) NOT NULL DEFAULT '0' COMMENT '卖家类型(1-⾃营,2-POP)',    `producing_area_id` int(10) NOT NULL DEFAULT '0' COMMENT '产地ID',    `item_status` int(10) NOT NULL DEFAULT '0' COMMENT '商品状态',    `brand_id` int(10) NOT NULL DEFAULT '0' COMMENT '品牌ID',    `shelf_life` int(10) NOT NULL DEFAULT '0' COMMENT '保质期(⼩时)',    `store_condition_type` int(10) NOT NULL DEFAULT '0' COMMENT '存储条件',    `category_id` int(10) NOT NULL DEFAULT '0' COMMENT '末级品类ID',    `first_category_id` int(10) NOT NULL DEFAULT '0' COMMENT '⼀级品类ID',    `second_category_id` int(10) NOT NULL DEFAULT '0' COMMENT '⼆级品类ID',    `third_category_id` int(10) NOT NULL DEFAULT '0' COMMENT '三级品类ID',    `item_specs_value` varchar(2048) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '规格值([{"颜⾊":"⾦⾊", "内存":"128g"},{"颜⾊":"银⾊", "内存":"256g"}])',    `features` varchar(500) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '扩展字段',    `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',    `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',    PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8 COMMENT='商品ITEM表';
复制代码


二.组套商品与 SKU 关系表


CREATE TABLE `stack_sku_relation` (    `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',    `sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '售卖sku(组套商品)',    `stack_sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '普通商品skuId或原料商品skuId',    `stack_num` int(10) NOT NULL DEFAULT '0' COMMENT '数量',    `channel` tinyint(3) NOT NULL DEFAULT '0' COMMENT '渠道(1-每日⽣鲜、2-美团、3-饿了么、4-淘鲜达、5-招商银⾏)',    `seller_type` int(10) NOT NULL DEFAULT '0' COMMENT '卖家类型(1-⾃营,2-POP)',    `features` varchar(500) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '扩展字段',    `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',    `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更改时间',    PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='组套商品与SKU关系表';
复制代码


三.商品与卖家组关系表


CREATE TABLE `sku_seller_relation` (    `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主健',    `item_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'itemId',    `sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'skuId',    `seller_group_id` varchar(50) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家组ID',    `seller_type` int(10) DEFAULT '0' COMMENT '卖家类型(1-⾃营,2-POP)',    `relation_type` tinyint(3) DEFAULT '0' COMMENT '关系类型(1-可售,2-屏蔽)',    `del_flag` tinyint(1) DEFAULT '1' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',    PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='商品与卖家组关系表';
复制代码


四.商品属性扩展表


CREATE TABLE `attribute_extend` (    `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',    `participate_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '参与ID(ITEM_ID或SKU_ID)',    `participate_type` int(10) NOT NULL DEFAULT '0' COMMENT '参与类型(1-ITEM,2-SKU)',    `attribute_content` varchar(2048) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '属性内容',    `feature` varchar(500) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '扩展字段',    `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',    `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',    PRIMARY KEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8 COMMENT='商品属性扩展表';
复制代码


(4)可售业务表


一.商品与卖家组关系表


CREATE TABLE `sku_seller_relation` (    `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主健',    `item_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'itemId',    `sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'skuId',    `seller_group_id` varchar(50) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家组ID',    `seller_type` int(10) DEFAULT '0' COMMENT '卖家类型(1-⾃营,2-POP)',    `relation_type` tinyint(3) DEFAULT '0' COMMENT '关系类型(1-可售,2-屏蔽)',    `del_flag` tinyint(1) DEFAULT '1' COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(10) DEFAULT '0' COMMENT '创建⼈',    `create_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',    `update_user` int(10) DEFAULT '0' COMMENT '更新⼈',    `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',    PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='商品与卖家组关系表';
复制代码


二.商品卖家库存关系表


CREATE TABLE `sku_stock_seller_relation` (    `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,    `sku_id` varchar(40) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '商品ID',    `seller_type` tinyint(3) NULL DEFAULT NULL COMMENT '卖家类型(1-⾃营,2-POP)',    `seller_id` bigint(20) NULL DEFAULT NULL COMMENT '卖家ID',    `stock_num` bigint(20) NULL DEFAULT NULL COMMENT '库存数量',    `stock_unit` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '库存单位',    `del_flag` tinyint(1) NULL DEFAULT NULL COMMENT '删除标记(1-有效,0-删除)',    `create_user` int(11) NULL DEFAULT NULL COMMENT '创建⼈',    `create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',    `update_user` int(11) NULL DEFAULT NULL COMMENT '更新⼈',    `update_time` datetime NULL DEFAULT NULL COMMENT '更新时间',    PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO INCREMENT = 1 DEFAULT CHARSET=utf8 COMMENT='商品卖家库存关系表';
复制代码


(5)基础配置表


leaf⾃增序列表 leaf_alloc


CREATE TABLE `leaf_alloc` (    `biz_tag` varchar(128) NOT NULL DEFAULT '' COMMENT '业务key',    `max_id` bigint(20) NOT NULL DEFAULT '1' COMMENT '当前已经分配了的最⼤id',    `step` int(11) NOT NULL COMMENT '初始步⻓,也是动态调整的最⼩步⻓',    `description` varchar(256) DEFAULT NULL COMMENT '业务key的描述',    `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据库维护的更新时间',    PRIMARY KEY (`biz_tag`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='leaf⾃增序列表';
复制代码


2.定时同步可采商品

 

(1)定时调度任务定时同步可采商品


使用场景:定时任务调度,可以按不同的卖家类型分任务进⾏执⾏同步结果。


@Componentpublic class RecoverableJobHandler {    @DubboReference(version = "1.0.0")    private RecoverableApi recoverableApi;
@XxlJob("syncRecoverableProduct") public void syncAvailableProduct(RecoverableRequest request) { XxlJobHelper.log("sync available product job starting..."); JsonResult result = recoverableApi.syncRecoverableProduct(request); XxlJobHelper.log("sync available product job end, result:{}", result); }}
@DubboService(version = "1.0.0", interfaceClass = RecoverableApi.class, retries = 0)public class RecoverableApiImpl implements RecoverableApi { @Autowired private RecoverableService recoverableService;
//同步可采商品 @Override public JsonResult syncRecoverableProduct(RecoverableRequest request) { try { return recoverableService.syncRecoverableProduct(request); } catch (ProductBizException e) { log.error("biz error: request={}", JSON.toJSONString(request), e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error: request={}", JSON.toJSONString(request), e); return JsonResult.buildError(e.getMessage()); } }}
//商品可采业务实现类@Servicepublic class RecoverableServiceImpl implements RecoverableService { ... //同步可采的数据入缓存 @Override public JsonResult syncRecoverableProduct(RecoverableRequest request) { Integer pageNo = 1; //获取卖家类型对应的卖家组信息 List<SellerGroupResponse> sellerGroupResponses = querySellerGroupList(pageNo, request.getSellerGroupType()); while (!CollectionUtils.isEmpty(sellerGroupResponses)) { //1.过滤卖家组的非有效状态信息数据 List<SellerGroupResponse> sellerGroupResponseList = sellerGroupFilter(sellerGroupResponses); //2.根据卖家组获取卖家支持的可售商品列表 List<SkuSellerRelationDO> sellerRelationDOList = queryAvailableProduct(sellerGroupResponseList); //3.查询商品信息,并过滤非自营的商品 List<ProductDetailDO> productDetailDOList = queryProductDetailList(sellerRelationDOList); //4.进行item级别的商品过滤(无需采购和生命周期) List<ProductDetailDO> itemFilterList = itemFilter(productDetailDOList); //5.进行组套商品的商品过滤(无需采购和生命周期) List<ProductDetailDO> suitFilterList = suitFilter(itemFilterList); //6.将详情的商品sku信息绑定到卖家上 List<ProductSellerRelationBO> productSellerRelationBOList = buildBinding(sellerRelationDOList, suitFilterList); //7.读取历史的缓存信息,对已经存在的缓存进行diff处理并刷入缓存 diffRecoverableCache(productSellerRelationBOList, sellerGroupResponses); pageNo++; sellerGroupResponses = querySellerGroupList(pageNo, request.getSellerGroupType()); } return JsonResult.buildSuccess(); } ...}
复制代码


具体实现:


一.调⽤查询卖家信息接⼝,传⼊卖家类型,返回对应的卖家组信息列表二.过滤掉返回的卖家组列表中⾮有效状态的卖家组三.根据卖家组ID列表获取这些卖家组所⽀持售卖的商品四.根据商品sku表sku_info查询得到⾃营类型的商品五.根据商品⽆需采购配置表获取得到不需要采购的品类信息六.根据sku列表批量查询⽣命周期的可采结果,并过滤不可采的商品信息
七.进⾏组套商品验证,先通过sku批量查询组套商品与sku关系表获取得到每个sku下的原料以及普通商品信息对每个商品进⾏⽆需采购以及⽣命周期的验证当组合商品下的sku都满⾜条件则可补,否则过滤组套商品与SKU关系表是stack_sku_relation
八.对已经存在的缓存数据和当前这次同步处理后的数据进⾏差集处理,发⽣变化的数据才需要刷⼊缓存(新增的或者⽆效的数据)九.构建缓存模型,对可补的商品数据进⾏缓存,缓存的模型对象为:key为'前缀标识+卖家组ID',value为'可采sku+品类'
复制代码


时序图:



流程图:



(2)定时同步可采商品时的卖家组数据分页查询


@Servicepublic class RecoverableServiceImpl implements RecoverableService {    @Autowired    private SellerRemote sellerRemote;    ...
//分页查询卖家组(售卖区)信息 public List<SellerGroupResponse> querySellerGroupList(Integer pageNo, Integer sellerGroupType) { PageResult<SellerGroupResponse> sellerGroupPage = sellerRemote.getSellerGroupList(pageNo, CompensationConstants.SELLER_PAGE_SIZE, sellerGroupType); return sellerGroupPage.getContent(); }
//同步可采的数据入缓存 @Override public JsonResult syncRecoverableProduct(RecoverableRequest request) { Integer pageNo = 1; //获取卖家类型对应的卖家组信息 List<SellerGroupResponse> sellerGroupResponses = querySellerGroupList(pageNo, request.getSellerGroupType()); while (!CollectionUtils.isEmpty(sellerGroupResponses)) { //1.过滤卖家组的非有效状态信息数据 List<SellerGroupResponse> sellerGroupResponseList = sellerGroupFilter(sellerGroupResponses); //2.根据卖家组获取卖家支持的可售商品列表 List<SkuSellerRelationDO> sellerRelationDOList = queryAvailableProduct(sellerGroupResponseList); //3.查询商品信息,并过滤非自营的商品 List<ProductDetailDO> productDetailDOList = queryProductDetailList(sellerRelationDOList); //4.进行item级别的商品过滤(无需采购和生命周期) List<ProductDetailDO> itemFilterList = itemFilter(productDetailDOList); //5.进行组套商品的商品过滤(无需采购和生命周期) List<ProductDetailDO> suitFilterList = suitFilter(itemFilterList); //6.将详情的商品sku信息绑定到卖家上 List<ProductSellerRelationBO> productSellerRelationBOList = buildBinding(sellerRelationDOList, suitFilterList); //7.读取历史的缓存信息,对已经存在的缓存进行diff处理并刷入缓存 diffRecoverableCache(productSellerRelationBOList, sellerGroupResponses); pageNo++; sellerGroupResponses = querySellerGroupList(pageNo, request.getSellerGroupType()); } return JsonResult.buildSuccess(); } ...}
复制代码


(3)过滤无效的卖家组与查询卖家组支持的可售商品


@Servicepublic class RecoverableServiceImpl implements RecoverableService {    ...    //对卖家组状态非有效的进行过滤    private List<SellerGroupResponse> sellerGroupFilter(List<SellerGroupResponse> sellerGroupResponses) {        //过滤无效的卖家组信息        return sellerGroupResponses.stream()            .filter(sellerGroupResponse -> SellerGroupStatusEnum.EFFECTIVE_STATUS.getCode().equals(sellerGroupResponse.getSellerGroupStatus()))            .collect(Collectors.toList());    }
//返回卖家组支持的可售商品列表 private List<SkuSellerRelationDO> queryAvailableProduct(List<SellerGroupResponse> sellerGroupResponses) { if (!CollectionUtils.isEmpty(sellerGroupResponses)) { //转换为所有的卖家组ID List<Long> sellerGroupIds = sellerGroupResponses.stream().map(SellerGroupResponse::getSellerGroupId).collect(Collectors.toList()); //分页批量查询卖家组支持的可售商品列表 return productRelationRepository.pageQueryAvailableProduct(sellerGroupIds); } return new ArrayList<>(); } ...}
复制代码


(4)卖家组支持的可售商品十万级数据量查询实现


一个 SkuSellerRelationDO 对象大概就 50Byte,10 万个 SkuSellerRelationDO 对象大概就是 500 万 Byte=4MB。


@Repositorypublic class ProductRelationRepository {    ...    //分页查询 卖家组ID 可售商品信息    public List<SkuSellerRelationDO> pageQueryAvailableProduct(List<Long> sellerGroupIds) {        //默认集合初始大小为50000,避免扩容次数频繁,一般商品的数量最多也就是几万-几十万        List<SkuSellerRelationDO> sellerRelationAllList = new ArrayList<>(RecoverableConstants.SKU_INIT_NUM);        //一次最大查询10 卖家组ID,多个分页查询,这里做数据切割        List<List<Long>> splitList = DataCuttingUtil.dataCuttingString(sellerGroupIds, RecoverableConstants.SELLER_ID_LIMIT_NUM);        for (List<Long> sellerGroupIdList : splitList) {            List<SkuSellerRelationDO> productDetailDOList = queryAvailableProduct(sellerGroupIdList);            if (!CollectionUtils.isEmpty(productDetailDOList)) {                sellerRelationAllList.addAll(productDetailDOList);            }        }        return sellerRelationAllList;    }
//根据卖家组ID 批量查询 可售商品信息 public List<SkuSellerRelationDO> queryAvailableProduct(List<Long> sellerGroupIds) { //获取卖家组对应的可售商品 LambdaQueryWrapper<SkuSellerRelationDO> queryWrapper = Wrappers.lambdaQuery(); queryWrapper.in(SkuSellerRelationDO::getSellerGroupId, sellerGroupIds) .eq(SkuSellerRelationDO::getRelationType, SkuSellerRelationTypeEnum.RELATION_TYPE_YES.getCode()) .eq(SkuSellerRelationDO::getDelFlag, DelFlagEnum.EFFECTIVE.getCode()); return skuSellerRelationMapper.selectList(queryWrapper); } ...}
public class DataCuttingUtil { //对集合数据进行切割,切割的数量按传入切割大小计算 public static <T> List<List<T>> dataCuttingString(List<T> splitList, Integer splitSize) { Integer size = splitList.size(); //计算出可以切分出多少个集合对象 int limit = (size + splitSize - 1) / splitSize; return Stream.iterate(0, n -> n + 1) .limit(limit) .parallel() .map(a -> splitList.stream().skip((long) a * splitSize) .limit(splitSize) .parallel() .collect(Collectors.toList())) .collect(Collectors.toList() ); } ...}
复制代码


(5)根据卖家组支持的可售商品查询商品信息


@Servicepublic class RecoverableServiceImpl implements RecoverableService {    @Autowired    private ProductRepository productRepository;    ...
//根据卖家组支持的可售商品,查询商品信息并过滤非自营的商品 private List<ProductDetailDO> queryProductDetailList(List<SkuSellerRelationDO> sellerRelationDOList) { if (!CollectionUtils.isEmpty(sellerRelationDOList)) { //先获取到对应的skuID列表 Set<String> skuIdList = sellerRelationDOList.stream().map(SkuSellerRelationDO::getSkuId).collect(Collectors.toSet()); //一次最大查询1000skuId,多个分页查询,这里做数据切割 List<ProductDetailDO> productDetailDOList = productRepository.pageQueryProductInfoList(skuIdList); if (!CollectionUtils.isEmpty(productDetailDOList)) { List<ProductDetailDO> productDetailDOS = productDetailDOList.stream() .filter(productDetailDO -> productDetailDO.getSellerType().equals(SellerTypeEnum.SELF.getCode())) .collect(Collectors.toList()); return productDetailDOS; } } return new ArrayList<>(); } ...}
@Repositorypublic class ProductRepository { ... //分页查询sku信息 public List<ProductDetailDO> pageQueryProductInfoList(Set<String> skuIdList) { //默认集合初始大小为50000,避免扩容次数频繁,一般商品的数量最多也就是几万-几十万 List<ProductDetailDO> productDetailAllList = new ArrayList<>(RecoverableConstants.SKU_INIT_NUM); //一次最大查询1000skuId,多个分页查询,这里做数据切割 List<Set<String>> splitList = DataCuttingUtil.dataCuttingString(skuIdList, RecoverableConstants.SKU_LIMIT_NUM); for (Set<String> skuIds : splitList) { List<ProductDetailDO> productDetailDOList = queryProductInfoList(skuIds); if (!CollectionUtils.isEmpty(productDetailDOList)) { productDetailAllList.addAll(productDetailDOList); } } return productDetailAllList; }
//批量查询商品的详情信息 public List<ProductDetailDO> queryProductInfoList(Set<String> skuId) { return skuInfoMapper.queryProductInfoList(skuId); } ...}
public class DataCuttingUtil { //对集合数据进行切割,切割的数量按传入切割大小计算 public static <T> List<List<T>> dataCuttingString(List<T> splitList, Integer splitSize) { Integer size = splitList.size(); //计算出可以切分出多少个集合对象 int limit = (size + splitSize - 1) / splitSize; return Stream.iterate(0, n -> n + 1) .limit(limit) .parallel() .map(a -> splitList.stream().skip((long) a * splitSize) .limit(splitSize) .parallel() .collect(Collectors.toList())) .collect(Collectors.toList() ); } ...}
复制代码


(6)对可售商品进行无需采购以及生命周期过滤


@Servicepublic class RecoverableServiceImpl implements RecoverableService {    ...    //进行item级别的商品过滤(无需采购和生命周期)    private List<ProductDetailDO> itemFilter(List<ProductDetailDO> productDetailDOList) {        if (!CollectionUtils.isEmpty(productDetailDOList)) {            //过滤掉无需采购的商品列表            List<ProductDetailDO> productDetailList = filterPurchaseProduct(productDetailDOList);            //商品进行生命周期的查询检查,过滤不符合条件的商品            return filterLifeCycle(productDetailList);        }        return productDetailDOList;    }
//过滤无需采购的item列表信息 private List<ProductDetailDO> filterPurchaseProduct(List<ProductDetailDO> suitItemDOList) { //获取配置的无需采购的列表 List<ItemProcurementConfigDO> itemProcurementConfigList = productConfigRepository.queryProcurementConfigList(); //空集合判断 if (CollectionUtils.isEmpty(suitItemDOList)) { return new ArrayList(); } if (CollectionUtils.isEmpty(itemProcurementConfigList)) { return suitItemDOList; } //集合转map,方便验证是否命中 Map<Integer, ItemProcurementConfigDO> procurementConfigMap = itemProcurementConfigList.stream() .collect(Collectors.toMap(ItemProcurementConfigDO::getCategoryId, Function.identity())); //开始过滤无需采购的商品信息 List<ProductDetailDO> productDetailDOList = suitItemDOList.stream() .filter(productDetail -> !procurementConfigMap.containsKey(productDetail.getCategoryId())) .collect(Collectors.toList()); //返回过滤后的商品信息 return productDetailDOList; }
//进行商品生命周期的过滤 private List<ProductDetailDO> filterLifeCycle(List<ProductDetailDO> productDetailList) { if (!CollectionUtils.isEmpty(productDetailList)) { //构建调用生命周期的接口入参模型 Set<String> itemIdList = productDetailList.stream().map(ProductDetailDO::getItemId).collect(Collectors.toSet()); List<ItemExpriResultDTO> skuExpriResultDTOS = itemPeriodStageRemote.queryByItemIds(itemIdList); //集合转map,方便验证是否命中 Map<String, ItemExpriResultDTO> itemExpriResultMap = skuExpriResultDTOS.stream().collect(Collectors.toMap(ItemExpriResultDTO::getItemId, Function.identity())); //过滤不符合条件的商品 List<ProductDetailDO> productDetailDOList = productDetailList.stream().filter(productDetail -> { //没有命中不保留 if (!itemExpriResultMap.containsKey(productDetail.getItemId())) { return false; } ItemExpriResultDTO itemExpriResultDTO = itemExpriResultMap.get(productDetail.getItemId()); //可采的状态返回true return itemExpriResultDTO.getPurchaseStatus().equals(ItemExpriEnum.RECOVERABLE_STATUS_YES.getCode()); }).collect(Collectors.toList()); //返回过滤后的商品信息 return productDetailDOList; } return productDetailList; } ...}
复制代码


(7)对卖家组支持的可售商品进行过滤组套商品



@Servicepublic class RecoverableServiceImpl implements RecoverableService { ... //进行组套商品的商品过滤(无需采购和生命周期) private List<ProductDetailDO> suitFilter(List<ProductDetailDO> productDetailList) { if (!CollectionUtils.isEmpty(productDetailList)) { //1.查询哪些是组套商品,并且反向寻找到组套商品归属的item信息 List<ProductDetailDO> suitItemDOList = querySuitList(productDetailList); //2.过滤掉无需采购的商品列表 List<ProductDetailDO> productDetailDOList = filterSuitPurchaseProduct(suitItemDOList); //3.商品进行生命周期的查询检查,过滤不符合条件的商品 return filterSuitLifeCycle(productDetailDOList); } return productDetailList; }
//通过商品明细,查询出哪些是组套商品,并输出对应的归属item数据结构 private List<ProductDetailDO> querySuitList(List<ProductDetailDO> productDetailDOList) { //根据skuId批量查询是否组套商品 List<StackSkuRelationDO> stackSkuRelationAllList = queryStackSkuListByIds(productDetailDOList); //没有组套商品的直接返回 if (CollectionUtils.isEmpty(stackSkuRelationAllList)) { return productDetailDOList; } //返回组套商品的详情集合信息(包含item信息) List<ProductDetailDO> itemProductList = queryProductInfoList(stackSkuRelationAllList); //将组套商品绑定到上级对应商品下 return buildSuitBinding(itemProductList, productDetailDOList, stackSkuRelationAllList); }
//根据skuId批量查询是否组套商品 private List<StackSkuRelationDO> queryStackSkuListByIds(List<ProductDetailDO> productDetailDOList) { //先获取到对应的skuID列表 Set<String> skuIdList = productDetailDOList.stream().map(ProductDetailDO::getSkuId).collect(Collectors.toSet()); return productRelationRepository.pageQueryStackSkuListByIds(skuIdList); }
//获取组套商品的详情集合信息 private List<ProductDetailDO> queryProductInfoList(List<StackSkuRelationDO> stackSkuRelationAllList) { //获取组套商品的sku列表,通过sku列表联合查询得到对应的item信息 Set<String> stackSkuIdList = stackSkuRelationAllList.stream().map(StackSkuRelationDO::getStackSkuId).collect(Collectors.toSet()); //查询组套商品详情集合信息 return productRepository.pageQueryProductInfoList(stackSkuIdList); }
//将组套商品绑定上 private List<ProductDetailDO> buildSuitBinding(List<ProductDetailDO> itemProductList, List<ProductDetailDO> productDetailDOList, List<StackSkuRelationDO> stackSkuRelationAllList) { List<ProductDetailDO> productSellerRelationBOList = recoverableConverter.converterProductList(productDetailDOList); //sku对应的组套商品详情(对应组套商品的stackSkuId) Map<String, ProductDetailDO> productDetailMap = itemProductList.stream().collect(Collectors.toMap(ProductDetailDO::getSkuId, Function.identity())); //组套的商品集合(一个商品下多个物料sku组装而成) Map<String, List<StackSkuRelationDO>> stackSkuRelationMap = stackSkuRelationAllList.stream().collect(Collectors.groupingBy(StackSkuRelationDO::getSkuId)); for (ProductDetailDO productDetailDO : productSellerRelationBOList) { //命中到了对应的组套商品 if (stackSkuRelationMap.containsKey(productDetailDO.getSkuId())) { List<ProductDetailDO> productDetailDOS = new ArrayList<>(); //标记为组套商品 productDetailDO.setProductType(2); //同时存储对应的组套商品详情信息 List<StackSkuRelationDO> stackSkuRelationList = stackSkuRelationMap.get(productDetailDO.getSkuId()); for (StackSkuRelationDO stackSkuRelationDO : stackSkuRelationList) { if (productDetailMap.containsKey(stackSkuRelationDO.getStackSkuId())) { ProductDetailDO productDetail = productDetailMap.get(stackSkuRelationDO.getStackSkuId()); productDetailDOS.add(productDetail); } } productDetailDO.setProductDetailList(productDetailDOS); } } return productSellerRelationBOList; }
//过滤无需采购的组套商品 private List<ProductDetailDO> filterSuitPurchaseProduct(List<ProductDetailDO> suitItemDOList) { //获取配置的无需采购的列表 List<ItemProcurementConfigDO> itemProcurementConfigList = productConfigRepository.queryProcurementConfigList(); //空集合判断 if (CollectionUtils.isEmpty(suitItemDOList)) { return new ArrayList(); } if (CollectionUtils.isEmpty(itemProcurementConfigList)) { return suitItemDOList; } //集合转map,方便验证是否命中 Map<Integer, ItemProcurementConfigDO> procurementConfigMap = itemProcurementConfigList.stream().collect(Collectors.toMap(ItemProcurementConfigDO::getCategoryId, Function.identity())); List<ProductDetailDO> productDetailDOList = new ArrayList<>(suitItemDOList.size()); for (ProductDetailDO productDetailDO : suitItemDOList) { if (productDetailDO.getProductType().equals(2)) { List<ProductDetailDO> productDetailList = productDetailDO.getProductDetailList(); //组套商品有商品不满足条件 if (checkSuitPurchaseProduct(productDetailList, procurementConfigMap)) { continue; } } productDetailDOList.add(productDetailDO); } return productDetailDOList; }
//进行组套商品生命周期的过滤 private List<ProductDetailDO> filterSuitLifeCycle(List<ProductDetailDO> productDetailList) { List<ProductDetailDO> productDetailDOAllList = new ArrayList<>(productDetailList.size()); if (!CollectionUtils.isEmpty(productDetailList)) { //查询返回生命周期的数据模型 Map<String, ItemExpriResultDTO> itemExpriResultMap = queryItemExpriMap(productDetailList); for (ProductDetailDO productDetailDO : productDetailList) { if (productDetailDO.getProductType().equals(2)) { List<ProductDetailDO> productDetailDOList = productDetailDO.getProductDetailList(); if (checkSuitLifeCycle(productDetailDOList, itemExpriResultMap)) { continue; } } productDetailDOAllList.add(productDetailDO); } } return productDetailDOAllList; } ...}
复制代码


(8)将过滤后的商品数据与卖家组进行关联


@Servicepublic class RecoverableServiceImpl implements RecoverableService {    ...    //同步可采的数据入缓存    @Override    public JsonResult syncRecoverableProduct(RecoverableRequest request) {        Integer pageNo = 1;        //获取卖家类型对应的卖家组信息        List<SellerGroupResponse> sellerGroupResponses = querySellerGroupList(pageNo, request.getSellerGroupType());        while (!CollectionUtils.isEmpty(sellerGroupResponses)) {            //1.过滤卖家组的非有效状态信息数据            List<SellerGroupResponse> sellerGroupResponseList = sellerGroupFilter(sellerGroupResponses);            //2.根据卖家组获取卖家支持的可售商品列表            List<SkuSellerRelationDO> sellerRelationDOList = queryAvailableProduct(sellerGroupResponseList);            //3.查询商品信息,并过滤非自营的商品            List<ProductDetailDO> productDetailDOList = queryProductDetailList(sellerRelationDOList);            //4.进行item级别的商品过滤(无需采购和生命周期)            List<ProductDetailDO> itemFilterList = itemFilter(productDetailDOList);            //5.进行组套商品的商品过滤(无需采购和生命周期)            List<ProductDetailDO> suitFilterList = suitFilter(itemFilterList);            //6.将详情的商品sku信息绑定到卖家上            List<ProductSellerRelationBO> productSellerRelationBOList = buildBinding(sellerRelationDOList, suitFilterList);            //7.读取历史的缓存信息,对已经存在的缓存进行diff处理并刷入缓存            diffRecoverableCache(productSellerRelationBOList, sellerGroupResponses);            pageNo++;            sellerGroupResponses = querySellerGroupList(pageNo, request.getSellerGroupType());        }        return JsonResult.buildSuccess();    }
//将过滤后的商品明细绑定到卖家组上 private List<ProductSellerRelationBO> buildBinding(List<SkuSellerRelationDO> sellerRelationDOList, List<ProductDetailDO> productDetailDOList) { //先转换集合为map,key为商品的skuId标识 Map<String, ProductDetailDO> skuInfoDOMap = productDetailDOList.stream().collect(Collectors.toMap(ProductDetailDO::getSkuId, Function.identity())); //按卖家组ID进行分组,把下属的商品合并到一个上 Map<Long, List<SkuSellerRelationDO>> skuSellerRelationMap = sellerRelationDOList.stream().collect(Collectors.groupingBy(SkuSellerRelationDO::getSellerGroupId)); //返回的绑定卖家组和商品明细的集合对象 List<ProductSellerRelationBO> sellerRelationBOList = new ArrayList<>(sellerRelationDOList.size());
//遍历卖家和商品关系集合,开始填充商品sku信息绑定到卖家组上 for (Map.Entry<Long, List<SkuSellerRelationDO>> entry : skuSellerRelationMap.entrySet()) { List<SkuSellerRelationDO> skuSellerRelationDOS = entry.getValue(); Long sellerGroupId = entry.getKey(); //循环绑定卖家组下的商品关系 ProductSellerRelationBO productSellerRelationBO = new ProductSellerRelationBO(); productSellerRelationBO.setSellerId(sellerGroupId); List<ProductDetailDO> productDetailList = new ArrayList<>(); //遍历卖家组下的可售商品列表 for (SkuSellerRelationDO sellerRelationDO : skuSellerRelationDOS) { //查询的数据集合中 存在这个商品数据 if (skuInfoDOMap.containsKey(sellerRelationDO.getSkuId())) { ProductDetailDO productDetailDO = skuInfoDOMap.get(sellerRelationDO.getSkuId()); //绑定数据到卖家组上 productDetailList.add(productDetailDO); } } productSellerRelationBO.setProductDetailList(productDetailList); sellerRelationBOList.add(productSellerRelationBO); } return sellerRelationBOList; } ...}
复制代码


(9)读取缓存与刷入缓存的逻辑


@Servicepublic class RecoverableServiceImpl implements RecoverableService {    @Autowired    private RedisReadWriteManager redisReadWriteManager;    ...
//对已经缓存的历史数据进行diff处理,处理缓存变更 private void diffRecoverableCache(List<ProductSellerRelationBO> productSellerRelationBOList, List<SellerGroupResponse> sellerGroupResponses) { //1.获取卖家组ID集合 Set<Long> sellerGroupIdList = sellerGroupResponses.stream().map(SellerGroupResponse::getSellerGroupId).collect(Collectors.toSet()); //2.数据差集比较,并刷入缓存差异信息 diffCache(sellerGroupIdList, productSellerRelationBOList); }
//开始进行数据差集的处理 private void diffCache(Set<Long> sellerIdList, List<ProductSellerRelationBO> productSellerRelationBOList) { //1.批量查询缓存 Map<Long, List<String>> redisSetMap = redisReadWriteManager.getRedisSortedSet(sellerIdList, AbstractRedisKeyConstants::getSellerTypePurchaseSkuZsetKey); //转换缓存的值为具体的对象 Map<Long, Map<String, ProductDetailDO>> productSellerRelationBOMap = redisManagerRepository.converterProductSellerCache(redisSetMap); //进行数据差集处理 Map<String, RedisSortedSetCache> diffSortedSetCache = redisManagerRepository.diffProduct(productSellerRelationBOList, productSellerRelationBOMap, AbstractRedisKeyConstants::getSellerTypePurchaseSkuZsetKey); //执行数据缓存更新 redisReadWriteManager.flushIncrSortedSetMap(diffSortedSetCache); } ...}
@Servicepublic class RedisReadWriteManager { ... //批量获取Sorted Set public <T> Map<T, List<String>> getRedisSortedSet(Collection<T> keys, Function<T, String> getRedisKeyFunction) { if (CollectionUtils.isEmpty(keys)) { return Maps.newHashMap(); } Map<T, List<String>> responseMap = Maps.newHashMap(); for (T key : keys) { String redisKey = getRedisKeyFunction.apply(key); responseMap.put(key, allRedisSortedSet(redisKey)); } return responseMap; }
//获取Sorted Set的所有数据 private List<String> allRedisSortedSet(String redisKey) { Set<String> strings = redisCache.zrangeByScore(redisKey, 0L, Long.MAX_VALUE, 0L, Long.MAX_VALUE); List<String> sortedList = CollectionUtils.isEmpty(strings) ? new ArrayList<>() : new LinkedList<>(strings); return sortedList; }
//刷新有序缓存 public void flushIncrSortedSetMap(Map<String, RedisSortedSetCache> sortedSetSourceMap) { for (Map.Entry<String, RedisSortedSetCache> entry : sortedSetSourceMap.entrySet()) { //获取到缓存的key标志信息 String key = entry.getKey(); //缓存操作对象,每个卖家缓存一份 RedisSortedSetCache sortedSetSource = entry.getValue(); if (sortedSetSource.getDeleteKey()) { redisCache.delete(key); continue; } if (MapUtils.isNotEmpty(sortedSetSource.getAddMap())) { addSortedSet(sortedSetSource.getAddMap(), key); } if (!CollectionUtils.isEmpty(sortedSetSource.getDeleteMemberSet())) { removeSortedSet(key, sortedSetSource.getDeleteMemberSet()); } } }
//添加Sorted Set public void addSortedSet(Map<String, Double> addMap, String redisKey) { if (MapUtils.isEmpty(addMap)) { return; } for (Map.Entry<String, Double> entry : addMap.entrySet()) { String product = entry.getKey(); Double score = entry.getValue(); redisCache.zadd(redisKey, product, score); } }
//删除Sorted Set public void removeSortedSet(String redisKey, Set<String> memberSet) { redisCache.zremove(redisKey, memberSet.toArray(new String[]{})); } ...}
复制代码


(10)卖家组支持的可采商品与缓存的 diff 逻辑


@Repositorypublic class RedisManagerRepository {    @Autowired    private SegmentIDGen segmentIDGen;    ...
//将缓存的数据转换为实体对象 public Map<Long, Map<String, ProductDetailDO>> converterProductSellerCache(Map<Long, List<String>> redisSetMap) { Map<Long, Map<String, ProductDetailDO>> productSellerRelationBOMap = new HashMap<>(redisSetMap.size()); if (!CollectionUtils.isEmpty(redisSetMap)) { for (Map.Entry<Long, List<String>> entry : redisSetMap.entrySet()) { List<String> productSellerList = entry.getValue(); Map<String, ProductDetailDO> productDetailMap = new HashMap<>(productSellerList.size()); for (String content : productSellerList) { ProductDetailDO productDetailDO = JSONObject.parseObject(content, ProductDetailDO.class); productDetailMap.put(productDetailDO.getSkuId(), productDetailDO); } productSellerRelationBOMap.put(entry.getKey(), productDetailMap); } } return productSellerRelationBOMap; }
//对缓存差集的数据进行处理 public <T> Map<String, RedisSortedSetCache> diffProduct(List<ProductSellerRelationBO> productSellerRelationBOList, Map<Long, Map<String, ProductDetailDO>> productSellerCacheMap, Function<Long, String> getRedisKeyFunction) { Map<String, RedisSortedSetCache> redisSortedSetCacheMap = new HashMap<>(); //1.处理缓存中需要新增的数据,删除数据 Map<Long, ProductSellerRelationBO> productSellerRelationResidueMap = diffCacheAddOrDelete(productSellerRelationBOList, productSellerCacheMap, redisSortedSetCacheMap, getRedisKeyFunction); //2.处理缓存中不存在的卖家数据,新增处理 diffAddSellerCache(productSellerRelationResidueMap, redisSortedSetCacheMap, getRedisKeyFunction); //3.处理缓存中存在的卖家数据,结果不存在,删除处理 diffDeleteSellerCache(productSellerCacheMap, redisSortedSetCacheMap, getRedisKeyFunction); return redisSortedSetCacheMap; }
//处理缓存中需要新增的数据,删除数据 private <T> Map<Long, ProductSellerRelationBO> diffCacheAddOrDelete(List<ProductSellerRelationBO> productSellerRelationBOList, Map<Long, Map<String, ProductDetailDO>> productSellerCacheMap, Map<String, RedisSortedSetCache> redisSortedSetCacheMap, Function<Long, String> getRedisKeyFunction) { Map<Long, ProductSellerRelationBO> productSellerRelationResidueMap = new HashMap<>(productSellerRelationBOList.size()); for (ProductSellerRelationBO productSellerRelation : productSellerRelationBOList) { RedisSortedSetCache redisSortedSetCache = new RedisSortedSetCache(); Long sellerId = productSellerRelation.getSellerId();
String redisKey = getRedisKeyFunction.apply(sellerId); //命中了缓存的数据,开始进行差集处理 if (productSellerCacheMap.containsKey(sellerId)) { //获取当前这个卖家下的缓存商品信息,key为商品sku Map<String, ProductDetailDO> productDetailDOMap = productSellerCacheMap.get(sellerId); //卖家的处理后的可售商品列表 List<ProductDetailDO> productDetailList = productSellerRelation.getProductDetailList();
//处理返回差异的数据结果,需新增或者删除的数据集合 for (ProductDetailDO productDetailDO : productDetailList) { //命中了则说明无差异,不处理,从集合中移除 if (!CollectionUtils.isEmpty(productDetailDOMap) && productDetailDOMap.containsKey(productDetailDO.getSkuId())) { productDetailDOMap.remove(productDetailDO.getSkuId()); continue; } //未命中,说明这个数据缓存中不存在,做新增处理 redisSortedSetCache.getAddMap().put(JSONObject.toJSON(productDetailDO).toString(), getSortedSetScore(redisKey)); } //遍历处理完成之后,缓存中还有多余的对象都属于无效数据,需要删除 if (!CollectionUtils.isEmpty(productDetailDOMap)) { for (Map.Entry<String, ProductDetailDO> entry : productDetailDOMap.entrySet()) { redisSortedSetCache.getDeleteMemberSet().add(JSONObject.toJSON(entry.getValue()).toString()); } } //设置到需要处理得缓存对象中 redisSortedSetCacheMap.put(redisKey, redisSortedSetCache);
//删除处理掉的缓存对象 productSellerCacheMap.remove(sellerId); } else { //未命中缓存的 都是新增数据 productSellerRelationResidueMap.put(sellerId, productSellerRelation); } } return productSellerRelationResidueMap; }
//处理缓存中不存在的卖家数据,新增处理 private void diffAddSellerCache(Map<Long, ProductSellerRelationBO> productSellerRelationMap, Map<String, RedisSortedSetCache> redisSortedSetCacheMap, Function<Long, String> getRedisKeyFunction) { Iterator<Map.Entry<Long, ProductSellerRelationBO>> iterator = productSellerRelationMap.entrySet().iterator(); //对处理缓存差集后,还剩余的未处理数据做新增处理 while (iterator.hasNext()) { Map.Entry<Long, ProductSellerRelationBO> entrys = iterator.next(); ProductSellerRelationBO productSellerRelation = entrys.getValue(); RedisSortedSetCache redisSortedSetCache = new RedisSortedSetCache(); Long key = entrys.getKey(); String redisKey = getRedisKeyFunction.apply(key);
//卖家的处理后的可售商品列表 List<ProductDetailDO> productDetailList = productSellerRelation.getProductDetailList(); for (ProductDetailDO productDetailDO : productDetailList) { redisSortedSetCache.getAddMap().put(JSONObject.toJSON(productDetailDO).toString(), getSortedSetScore(redisKey)); } //设置到需要处理得缓存对象中 redisSortedSetCacheMap.put(redisKey, redisSortedSetCache); } }
//处理缓存中存在的卖家数据,结果不存在,删除处理 private void diffDeleteSellerCache(Map<Long, Map<String, ProductDetailDO>> productSellerCacheMap, Map<String, RedisSortedSetCache> redisSortedSetCacheMap, Function<Long, String> getRedisKeyFunction) { //当可售的列表中,不存在缓存已经存在的数据列表,说明缓存已经无效,需要删除该对应的key下的缓存信息 if (!CollectionUtils.isEmpty(productSellerCacheMap)) { for (Map.Entry<Long, Map<String, ProductDetailDO>> entry : productSellerCacheMap.entrySet()) { Long key = entry.getKey(); Map<String, ProductDetailDO> value = entry.getValue(); if (CollectionUtils.isEmpty(value)) { continue; } String redisKey = getRedisKeyFunction.apply(key); RedisSortedSetCache redisSortedSetCache = new RedisSortedSetCache(); redisSortedSetCache.setDeleteKey(Boolean.TRUE); //设置到需要处理得缓存对象中 redisSortedSetCacheMap.put(redisKey, redisSortedSetCache); } } } ...
//新增缓存时需要获取每个缓存key存储的权重值 public Double getSortedSetScore(String cacheKey) { //新增可采商品缓存时基于db的分段发号 Long autoNo = segmentIDGen.genNewNo(cacheKey); return Double.valueOf(autoNo); }}
复制代码


(11)基于 DB 的分段发号器组件


一.号段内存缓冲组件 SegmentBuffer


//号段内存缓冲组件@Data@Accessors(chain = true)public class SegmentBuffer {    //线程是否在运行中    private final AtomicBoolean threadRunning;    private final ReadWriteLock lock;    private String bizTag;    //双buffer    private Segment[] segments;    //当前的使用的segment的index    private volatile int currentPos;    //下一个segment是否处于可切换状态    private volatile boolean nextReady;    //是否初始化完成    private volatile boolean initOk;    private volatile int step;    private volatile int minStep;    private volatile long updateTimestamp;
public SegmentBuffer() { segments = new Segment[]{new Segment(this), new Segment(this)}; currentPos = 0; nextReady = false; initOk = false; threadRunning = new AtomicBoolean(false); lock = new ReentrantReadWriteLock(); }
public Segment getCurrent() { return segments[currentPos]; }
public Lock rLock() { return lock.readLock(); }
public Lock wLock() { return lock.writeLock(); }
public int nextPos() { return (currentPos + 1) % 2; }
public void switchPos() { currentPos = nextPos(); } ...}
复制代码


@Componentpublic class SegmentIDCache implements ApplicationListener<ContextRefreshedEvent> {    private final Map<String, SegmentBuffer> cache = new ConcurrentHashMap<>();
@Resource private LeafAllocNoMapper leafAllocNoMapper;
private volatile boolean initOk = false;
@Override public void onApplicationEvent(ContextRefreshedEvent event) { checkAndInit(); }
//初始化数据 private void checkAndInit() { if (!initOk) { synchronized (this) { if (!initOk) { log.info("Init ..."); //确保加载到kv后才初始化成功 updateCacheFromDb(); initOk = true; log.info("Init Ok ..."); } } } }
public boolean isInitOk() { return initOk; }
public boolean containsKey(String bizCode) { checkAndInit(); return cache.containsKey(bizCode); }
public SegmentBuffer getValue(String bizCode) { checkAndInit(); return cache.get(bizCode); }
//单个初始化加载 public void updateCacheFromDb(String bizCode) { log.info("update cache from db"); try { LeadAllocDO dbBizCodes = leafAllocNoMapper.findByBizTag(bizCode); if (Objects.isNull(dbBizCodes)) { return; } SegmentBuffer buffer = new SegmentBuffer(); buffer.setBizTag(bizCode); Segment segment = buffer.getCurrent(); segment.setValue(new AtomicLong(0)); segment.setMax(0); segment.setStep(0); cache.put(bizCode, buffer); log.info("Add bizCode {} from db to IdCache, SegmentBuffer {}", bizCode, buffer); } catch (Exception e) { log.warn("update cache from db exception", e); } finally { log.info("updateCacheFromDb,cost:{}", 0); } }
//更新缓存key private void updateCacheFromDb() { log.info("update cache from db"); try { List<String> dbBizCodes = leafAllocNoMapper.listAllBizTag(); if (CollectionUtils.isEmpty(dbBizCodes)) { return; } List<String> cacheBiz = new ArrayList<>(cache.keySet()); Set<String> insertBizSet = new HashSet<>(dbBizCodes); Set<String> removeBizSet = new HashSet<>(cacheBiz); //db中新加的tags灌进cache for (String tmp : cacheBiz) { insertBizSet.remove(tmp); } for (String bizCode : insertBizSet) { SegmentBuffer buffer = new SegmentBuffer(); buffer.setBizTag(bizCode); Segment segment = buffer.getCurrent(); segment.setValue(new AtomicLong(0)); segment.setMax(0); segment.setStep(0); cache.put(bizCode, buffer); log.info("Add bizCode {} from db to IdCache, SegmentBuffer {}", bizCode, buffer); } for (String tmp : dbBizCodes) { removeBizSet.remove(tmp); } for (String tag : removeBizSet) { cache.remove(tag); log.info("Remove tag {} from IdCache", tag); } } catch (Exception e) { log.warn("update cache from db exception", e); } finally { log.info("updateCacheFromDb,cost:{}", 0); } }}
复制代码


二.号段 ID 生成器组件 SegmentIDGen


//号段ID生成器组件@Servicepublic class SegmentIDGenImpl implements SegmentIDGen {    //下一次异步更新比率因子    public static final double NEXT_INIT_FACTOR = 0.9;
//最大步长不超过100,0000 private static final int MAX_STEP = 1000000;
//默认一个Segment会维持的时间为15分钟 //如果在15分钟内Segment就消耗完了,则步长要扩容一倍,但不能超过MAX_STEP //如果在超过15*2=30分钟才将Segment消耗完,则步长要缩容一倍,但不能低于MIN_STEP,MIN_STEP的值为数据库中初始的step字段值 private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
//更新因子 //更新因子=2时,表示成倍扩容或者折半缩容 private static final int EXPAND_FACTOR = 2;
private final ExecutorService threadPoolExecutor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new UpdateThreadFactory());
@Autowired private LeafAllocNoRepository leafAllocNoRepository;
@Resource private SegmentIDCache cache;
//生成新的ID @Override public Long genNewNo(String bizTag) { if (!cache.isInitOk()) { throw new RuntimeException("not init"); } //如果没有,此时需要初始化一个 if (!cache.containsKey(bizTag)) { leafAllocNoRepository.insertLeadAlloc(bizTag); cache.updateCacheFromDb(bizTag); } SegmentBuffer buffer = cache.getValue(bizTag); if (!buffer.isInitOk()) { synchronized (buffer) { if (!buffer.isInitOk()) { try { updateSegmentFromDb(bizTag, buffer.getCurrent()); log.info("Init buffer. Update leafkey {} {} from db", bizTag, buffer.getCurrent()); buffer.setInitOk(true); } catch (Exception e) { log.warn("Init buffer {} exception", buffer.getCurrent(), e); throw new RuntimeException("init error:" + bizTag); } } } } return getIdFromSegmentBuffer(buffer); }
//从segment缓冲中获取id private Long getIdFromSegmentBuffer(SegmentBuffer buffer) { while (true) { buffer.rLock().lock(); try { final Segment segment = buffer.getCurrent(); if (!buffer.isNextReady() && (segment.getIdle() < NEXT_INIT_FACTOR * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) { asyncUpdate(buffer); } long value = segment.getValue().getAndIncrement(); if (value < segment.getMax()) { return value; } } finally { buffer.rLock().unlock(); }
//获取的value,大于max,则等待其他线程更新完毕。最多等待100s waitAndSleep(buffer); buffer.wLock().lock(); try { final Segment segment = buffer.getCurrent(); long value = segment.getValue().getAndIncrement(); if (value < segment.getMax()) { return value; } if (buffer.isNextReady()) { buffer.switchPos(); buffer.setNextReady(false); } else { log.error("Both two segments in {} are not ready!", buffer); throw new RuntimeException("next not ready"); } } finally { buffer.wLock().unlock(); } } }
//异步更新初始化 private void asyncUpdate(SegmentBuffer buffer) { long submitTime = System.currentTimeMillis(); threadPoolExecutor.execute(() -> { long executeTime = System.currentTimeMillis(); Segment next = buffer.getSegments()[buffer.nextPos()]; boolean updateOk = false; try { updateSegmentFromDb(buffer.getBizTag(), next); updateOk = true; } catch (Exception e) { log.warn("{} updateSegmentFromDb exception", buffer.getBizTag(), e); } finally { long finishTime = System.currentTimeMillis(); log.info("update segment {} from db {}。st:{}, et:{}, ft:{}", buffer.getBizTag(), next, submitTime, executeTime, finishTime); if (updateOk) { buffer.wLock().lock(); buffer.setNextReady(true); buffer.getThreadRunning().set(false); buffer.wLock().unlock(); } else { buffer.getThreadRunning().set(false); } } }); }
//自旋10000次之后,睡眠10毫秒 private void waitAndSleep(SegmentBuffer buffer) { int roll = 0; while (buffer.getThreadRunning().get()) { roll += 1; if (roll > 10000) { try { TimeUnit.MILLISECONDS.sleep(10); break; } catch (InterruptedException e) { log.warn("Thread {} Interrupted", Thread.currentThread().getName()); break; } } } }
//从db中更新号段 public void updateSegmentFromDb(String bizTag, Segment segment) { SegmentBuffer buffer = segment.getBuffer(); LeadAllocDO leadAllocDO; if (!buffer.isInitOk()) { leadAllocDO = leafAllocNoRepository.updateMaxIdAndGet(bizTag); buffer.setStep(leadAllocDO.getStep()); buffer.setMinStep(leadAllocDO.getStep()); } else if (buffer.getUpdateTimestamp() == 0) { leadAllocDO = leafAllocNoRepository.updateMaxIdAndGet(bizTag); buffer.setUpdateTimestamp(System.currentTimeMillis()); buffer.setStep(leadAllocDO.getStep()); buffer.setMinStep(leadAllocDO.getStep()); } else { int nextStep = calculateNextStep(bizTag, buffer); leadAllocDO = leafAllocNoRepository.updateMaxIdByDynamicStepAndGet(bizTag, nextStep); buffer.setUpdateTimestamp(System.currentTimeMillis()); buffer.setStep(nextStep); buffer.setMinStep(leadAllocDO.getStep()); } // must set value before set max long value = leadAllocDO.getMaxId() - buffer.getStep(); segment.getValue().set(value); segment.setMax(leadAllocDO.getMaxId()); segment.setStep(buffer.getStep()); log.info("updateSegmentFromDb, bizTag: {}, cost:0, segment:{}", bizTag, segment); }
//计算新的步长 private int calculateNextStep(String bizCode, SegmentBuffer buffer) { long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp(); int nextStep = buffer.getStep(); if (duration < SEGMENT_DURATION) { nextStep = Math.min(MAX_STEP, nextStep * EXPAND_FACTOR); } else if (duration < SEGMENT_DURATION * EXPAND_FACTOR) { // do nothing with nextStep } else { nextStep = Math.max(buffer.getMinStep(), nextStep / EXPAND_FACTOR); } log.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", bizCode, buffer.getStep(), String.format("%.2f", ((double) duration / (1000 * 60))), nextStep); return nextStep; }
public static class UpdateThreadFactory implements ThreadFactory { private static int threadInitNumber = 0;
private static synchronized int nextThreadNum() { return threadInitNumber++; }
@Override public Thread newThread(Runnable r) { return new Thread(r, "Thread-Segment-Update-" + nextThreadNum()); } }}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18924638

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
商品中心—商品可采可补可售的技术文档_Java_不在线第一只蜗牛_InfoQ写作社区