写点什么

商品中心—自研缓存框架的技术文档

  • 2025-06-18
    福建
  • 本文字数:11094 字

    阅读完需:约 36 分钟

1.商品 C 端系统监听商品变更及刷新缓存


FlushRedisCache 的 flushRedisStringData()方法刷新缓存的逻辑是:首先从 DB 查询最新的数据 -> 然后删除旧缓存 -> 最后更新缓存。


@Configurationpublic class ConsumerBeanConfig {    //配置内容对象    @Autowired    private RocketMQProperties rocketMQProperties;        //监听商品修改的MQ消息    @Bean("productUpdateTopic")    public DefaultMQPushConsumer productUpdateTopic(ProductUpdateListener productUpdateListener) throws MQClientException {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.PRODUCT_UPDATE_CONSUMER_GROUP);        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());        consumer.subscribe(RocketMqConstant.PRODUCT_UPDATE_TOPIC, "*");        consumer.registerMessageListener(productUpdateListener);        consumer.start();        return consumer;    }}
//商品变更时的缓存处理@Componentpublic class ProductUpdateListener implements MessageListenerConcurrently { @DubboReference(version = "1.0.0") private TableDataUpdateApi tableDataUpdateApi; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { //消息处理这里,涉及到sku的缓存更新以及对应的整个商品明细的缓存更新 String msg = new String(messageExt.getBody()); log.info("执行商品缓存数据更新逻辑,消息内容:{}", msg); TableDataChangeDTO tableDataChangeMessage = JsonUtil.json2Object(msg, TableDataChangeDTO.class); //更新sku对应的商品缓存信息 tableDataUpdateApi.tableDataChange(tableDataChangeMessage); //发送回调消息通知 tableDataUpdateApi.sendCallbackMessage(tableDataChangeMessage); } } catch (Exception e) { log.error("consume error, 商品缓存更新失败", e); //本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}
//商品信息变更服务@DubboService(version = "1.0.0", interfaceClass = TableDataUpdateApi.class, retries = 0)public class TableDataUpdateApiImpl implements TableDataUpdateApi { @Resource private FlushRedisCache flushRedisCache; private ExecutorService executorService = Executors.newFixedThreadPool(10); //商品表数据变更逆向更新缓存 @Override public JsonResult tableDataChange(TableDataChangeDTO tableDataChangeDTO) { executorService.execute(() -> { try { //刷新缓存数据 flushRedisCache.flushRedisStringData(false, tableDataChangeDTO.getTableName(), Sets.newHashSet(tableDataChangeDTO.getKeyId())); } catch (Exception e) { log.error("刷新string类型缓存异常,入参为tableDataChangeDTO={}", tableDataChangeDTO, e); } try { flushRedisCache.flushRedisSetData(false, tableDataChangeDTO.getTableName(), Sets.newHashSet(tableDataChangeDTO.getKeyId())); } catch (Exception e) { log.error("刷新set类型缓存异常,入参为tableDataChangeDTO={}", tableDataChangeDTO, e); } try { flushRedisCache.flushRedisSortedSetData(false, tableDataChangeDTO.getTableName(), Sets.newHashSet(tableDataChangeDTO.getKeyId())); } catch (Exception e) { log.error("刷新sortedset类型缓存异常,入参为tableDataChangeDTO={}", tableDataChangeDTO, e); } }); return JsonResult.buildSuccess(); } ...}
//数据变更—刷新缓存@Componentpublic class FlushRedisCache { //继承了AbstractRedisStringCache的缓存实例会被注入到abstractRedisStringCacheMap这个map中 //例如CategoryBaseCache、FrontCategoryCache、ItemCollectCache、ProductDetailCache、SkuCollectCache等 @Autowired private Map<String, AbstractRedisStringCache> abstractRedisStringCacheMap; //更新string类型缓存 //@param flushAll 是否全量刷新 //@param tableName 表名 //@param idSet 主键ID集合 public void flushRedisStringData(boolean flushAll, String tableName, Set<Long> idSet) { for (Map.Entry<String, AbstractRedisStringCache> entry : abstractRedisStringCacheMap.entrySet()) { AbstractRedisStringCache stringCache = entry.getValue(); if (flushAll) { stringCache.flushRedisStringDataByTableUpdateData(); continue; } //继承AbstractRedisStringCache的每个缓存实例都指定来表名,如下用于匹配出对应的缓存实例 if (stringCache.getTableName().contains(tableName)) { stringCache.flushRedisStringDataByTableAndIdSet(tableName, idSet); } } } ...}
//Redis(string)缓存抽象类public abstract class AbstractRedisStringCache<DO, BO> { @Resource private RedisReadWriteManager redisReadWriteManager; ... //刷新缓存—根据主表ID集合(关联表变更需要查询主表) public void flushRedisStringDataByTableAndIdSet(String tableName, Set<Long> idSet) { Optional<Set<Long>> idSetOpt = getStringDatabase().getTableIdSetByRelationTableIdSet(tableName, idSet); if (!idSetOpt.isPresent()) { return; } flushRedisStringDataByIdSet(idSetOpt.get()); } //刷新缓存—根据主键ID集合 //@param idSet 数据表主键ID private void flushRedisStringDataByIdSet(Set<Long> idSet) { //根据id集合从数据库中查询出数据 Optional<RedisStringCache<DO>> stringSourceOpt = getStringDatabase().listTableDataByIdSet(idSet, queryType()); if (!stringSourceOpt.isPresent()) { return; } RedisStringCache<DO> redisStringCache = stringSourceOpt.get(); if (!CollectionUtils.isEmpty(redisStringCache.getDeleteSet())) { //通过缓存读写组件删除缓存 redisReadWriteManager.delete(redisStringCache.getDeleteSet().stream().map(this::getRedisKey).collect(toSet()).toArray(new String[]{})); } if (CollectionUtils.isEmpty(redisStringCache.getAddList())) { return; } List<BO> boList = convertDO2BO(redisStringCache.getAddList()); Map<String, BO> redisMap = convertBO2Map(boList); //通过缓存读写组件写入缓存 redisReadWriteManager.batchWriteRedisString(redisMap); } ...}
//缓存读写管理@Servicepublic class RedisReadWriteManager { @Resource private RedisCache redisCache; ... //删除指定的key public void delete(String... keys) { Arrays.asList(keys).stream().forEach(key -> redisCache.delete(key)); } //批量添加string缓存 public <T> void batchWriteRedisString(Map<String, T> redisMap) { List<Map.Entry<String, T>> list = Lists.newArrayList(redisMap.entrySet()); try { for (List<Map.Entry<String, T>> entries : Lists.partition(list, PAGE_SIZE_100)) { for (Map.Entry<String, T> entry : entries) { redisCache.setex(true, entry.getKey(), JSON.toJSONString(entry.getValue()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS)); } try { Thread.sleep(SLEEP_100); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (Exception e) { log.error("批量添加string缓存异常 redisMap={}", redisMap, e); } } ...}
复制代码


2.自研缓存框架的数据表缓存组件


(1)自研缓存框架的目录结构



(2)通过注解实现继承同一个抽象类的数据表缓存组件实例自动注入 Map


继承 AbstractRedisStringCache 的数据表缓存组件实例会被注入到 Map 中,之后通过 Map 便可以根据表名来获取对应的数据表缓存组件实例了。数据表缓存组件实例中会封装一些获取 DB 数据表数据、获取缓存 key 等方法,而其继承的抽象类会提供一些根据 DB 获取缓存、刷新缓存等模版方法。


@Componentpublic class FlushRedisCache {    //继承了AbstractRedisStringCache的缓存实例会被注入到abstractRedisStringCacheMap这个map中    //例如CategoryBaseCache、FrontCategoryCache、ItemCollectCache、ProductDetailCache、SkuCollectCache等    @Autowired    private Map<String, AbstractRedisStringCache> abstractRedisStringCacheMap;
//继承了AbstractRedisSortedSetCache的缓存实例会被注入到abstractRedisSortedSetCacheMap这个map中 //例如SkuSellerCache @Autowired private Map<String, AbstractRedisSortedSetCache> abstractRedisSortedSetCacheMap; ...}
@Service("productDetailCache")public class ProductDetailCache extends AbstractRedisStringCache<ProductDetailDO, ProductDetailBO> { //DB操作组件 @Resource private ProductDetailStringDatabase productDetailStringDatabase; @Resource private ProductDetailConverter productDetailConverter; @Override protected RedisStringDatabase<ProductDetailDO> getStringDatabase() { return productDetailStringDatabase; } @Override protected String getPendingRedisKey() { return AbstractRedisKeyConstants.PRODUCT_SKU_INFO_STRING; } @Override protected List<ProductDetailBO> convertDO2BO(Collection<ProductDetailDO> doList) { return productDetailConverter.converterDetailList(doList); } @Override protected Map<String, Object> getTableFieldsMap(String key) { Map<String, Object> dbInputMap = Maps.newHashMap(); dbInputMap.put(SkuCollectStringDatabase.SKU_ID, key); return dbInputMap; } @Override protected Class<ProductDetailBO> getBOClass() { return ProductDetailBO.class; } ...}
//Redis(string)缓存抽象类//@param <DO> 数据对象//@param <BO> 缓存对象public abstract class AbstractRedisStringCache<DO, BO> { @Resource private RedisReadWriteManager redisReadWriteManager; //获取redis key //@param key 需要替换的关键字 protected String getRedisKey(String key) { return String.format(getPendingRedisKey(), key); } //单个获取数据库表名 public String getTableName() { return getStringDatabase().getTableName(); } //获取DB读取对象 protected abstract RedisStringDatabase<DO> getStringDatabase(); //获取待处理的Redis key protected abstract String getPendingRedisKey(); //DO转BO protected abstract List<BO> convertDO2BO(Collection<DO> doList); //关联表字段值 protected abstract Map<String, Object> getTableFieldsMap(String key); //获取BO对象的class protected abstract Class<BO> getBOClass(); ... //模版方法:根据关键字批量获取数据 //@param useLocalCache 是否使用本地缓存 //@param keyList 入参关键字 public Optional<List<BO>> listRedisStringData(Boolean useLocalCache, List<String> keyList) { if (CollectionUtils.isEmpty(keyList)) { return Optional.empty(); } Optional<List<BO>> boListOpt = redisReadWriteManager.listRedisStringDataByCache(useLocalCache, keyList, getBloomKey(), getStringDatabase()::getTableSingleFiled, getStringDatabase().getBloomField(), getBOClass(), this::getRedisKey, (key) -> { Map<String, Object> tableFieldsMap = getTableFieldsMap(key); Optional<DO> doOpt; try { doOpt = getStringDatabase().getTableData(tableFieldsMap, queryType()); } catch (Exception e) { log.error("根据关键字批量获取数据异常 key={},paramMap={}", key, tableFieldsMap, e); return Optional.empty(); } if (!doOpt.isPresent()) { return Optional.empty(); } List<BO> boList = convertDO2BO(Arrays.asList(doOpt.get())); if (CollectionUtils.isEmpty(boList)) { return Optional.empty(); } return Optional.of(boList.get(0)); } ); return boListOpt; } //模版方法:根据多关键字批量获取集合数据 //@param keyList 入参关键字 //@param requestMap key 数据库表字段,value 字段值,该map中的字段不要与getTableFieldsMap(key)获取的字段重复 public Optional<List<BO>> listRedisStringData(List<String> keyList, Map<String, Object> requestMap) { if (CollectionUtils.isEmpty(keyList)) { return Optional.empty(); } Optional<List<BO>> boListOpt = redisReadWriteManager.listRedisStringDataByBatchCache(keyList, getBOClass(), this::getRedisKey, (key) -> { Map<String, Object> tableFieldsMap = getTableFieldsMap(key); if (MapUtils.isNotEmpty(requestMap)) { tableFieldsMap.putAll(requestMap); } Optional<List<DO>> doOpt; try { doOpt = getStringDatabase().listTableData(tableFieldsMap, queryType()); } catch (Exception e) { log.error("根据关键字批量获取数据异常 key={},paramMap={}", key, tableFieldsMap, e); return Optional.empty(); } if (!doOpt.isPresent()) { return Optional.empty(); } List<BO> boList = convertDO2BO(doOpt.get()); if (CollectionUtils.isEmpty(boList)) { return Optional.empty(); } return Optional.of(boList); }); return boListOpt; } //模版方法:刷新缓存—根据主表ID集合(关联表变更需要查询主表) //@param tableName 关联表名 //@param idSet 关联表主键集合 public void flushRedisStringDataByTableAndIdSet(String tableName, Set<Long> idSet) { Optional<Set<Long>> idSetOpt = getStringDatabase().getTableIdSetByRelationTableIdSet(tableName, idSet); if (!idSetOpt.isPresent()) { return; } flushRedisStringDataByIdSet(idSetOpt.get()); } //模版方法:刷新缓存—根据表变更数据ID集合 public void flushRedisStringDataByTableUpdateData() { Optional<Set<Long>> updateIdSetOpt = getStringDatabase().getTableUpdateIdSet(); if (!updateIdSetOpt.isPresent()) { return; } flushRedisStringDataByIdSet(updateIdSetOpt.get()); } //模版方法:刷新缓存—根据主键ID集合 //@param idSet 数据表主键ID private void flushRedisStringDataByIdSet(Set<Long> idSet) { //根据id集合从数据库中查询出数据 Optional<RedisStringCache<DO>> stringSourceOpt = getStringDatabase().listTableDataByIdSet(idSet, queryType()); if (!stringSourceOpt.isPresent()) { return; } RedisStringCache<DO> redisStringCache = stringSourceOpt.get(); if (!CollectionUtils.isEmpty(redisStringCache.getDeleteSet())) { //通过缓存读写组件删除缓存 redisReadWriteManager.delete(redisStringCache.getDeleteSet().stream().map(this::getRedisKey).collect(toSet()).toArray(new String[]{})); } if (CollectionUtils.isEmpty(redisStringCache.getAddList())) { return; } List<BO> boList = convertDO2BO(redisStringCache.getAddList()); Map<String, BO> redisMap = convertBO2Map(boList); //通过缓存读写组件写入缓存 redisReadWriteManager.batchWriteRedisString(redisMap); } ...}
复制代码


3.自研缓存框架的通用缓存读写组件与 DB 操作组件


每个数据表缓存组件都会有至少两个必备组件:一个通用缓存读写组件 + 一个数据表的 DB 操作组件。

 

(1)通用缓存读写组件


通用缓存读写组件也包含两个必备组件:一个操作缓存的 RedisCache 组件 + 一个操作分布式锁的 RedisLock 组件。通用缓存读写组件封装了大量基础的缓存读写操作,这些基础的缓存读写操作会结合 DB 读库 + 缓存问题等解决方案来进行实现。


//缓存读写管理@Servicepublic class RedisReadWriteManager {    @Resource    private RedisCache redisCache;        @Resource    private RedisLock redisLock;    ...        //删除指定的key    public void delete(String... keys) {        Arrays.asList(keys).stream().forEach(key -> redisCache.delete(key));    }        //批量添加string缓存    public <T> void batchWriteRedisString(Map<String, T> redisMap) {        List<Map.Entry<String, T>> list = Lists.newArrayList(redisMap.entrySet());        try {            for (List<Map.Entry<String, T>> entries : Lists.partition(list, PAGE_SIZE_100)) {                for (Map.Entry<String, T> entry : entries) {                    redisCache.setex(true, entry.getKey(), JSON.toJSONString(entry.getValue()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS));                }                try {                    Thread.sleep(SLEEP_100);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        } catch (Exception e) {            log.error("批量添加string缓存异常 redisMap={}", redisMap, e);        }    }        //批量获取多缓存数据    public <T> Optional<List<T>> listRedisStringDataByBatchCache(List<String> keyList, Class<T> clazz, Function<String, String> getRedisKeyFunction, Function<String, Optional<List<T>>> getDbFuction) {        try {            List<T> list = Lists.newArrayList();            List<String> pendingKeyList = keyList.stream().distinct().collect(toList());            List<String> redisKeyList = pendingKeyList.stream().map(getRedisKeyFunction).distinct().collect(toList());            List<String> cacheList = redisCache.mget(redisKeyList);            for (int i = 0; i < cacheList.size(); i++) {                String cache = cacheList.get(i);                //过滤无效缓存                if (EMPTY_OBJECT_STRING.equals(cache)) {                    continue;                }                if (StringUtils.isNotBlank(cache)) {                    List<T> tList = JSON.parseArray(cache, clazz);                    list.addAll(tList);                    continue;                }                //缓存没有则读库                Optional<List<T>> optional = listRedisStringDataByDb(pendingKeyList.get(i), getRedisKeyFunction, getDbFuction);                if (optional.isPresent()) {                    list.addAll(optional.get());                }            }            return CollectionUtils.isEmpty(list) ? Optional.empty() : Optional.of(list);        } catch (Exception e) {            log.error("批量获取缓存数据异常 keyList={},clazz={}", keyList, clazz, e);            throw e;        }    }        //读取数据库表多数据赋值到Redis    public <T> Optional<List<T>> listRedisStringDataByDb(String key, Function<String, String> getRedisKeyFunction, Function<String, Optional<List<T>>> getDbFuction) {        if (StringUtils.isEmpty(key) || Objects.isNull(getDbFuction)) {            return Optional.empty();        }        try {            if (!redisLock.lock(key)) {                return Optional.empty();            }            String redisKey = getRedisKeyFunction.apply(key);            Optional<List<T>> optional = getDbFuction.apply(key);            if (!optional.isPresent()) {                //把空对象暂存到redis                redisCache.setex(true, redisKey, EMPTY_OBJECT_STRING, RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_ONE_DAY, TimeUnit.HOURS, NUMBER_24));                log.warn("发生缓存穿透 redisKey={}", redisKey);                return optional;            }            //把表数据对象存到redis            redisCache.setex(true, redisKey, JSON.toJSONString(optional.get()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS));            log.info("表数据对象存到Redis redisKey={}, data={}", redisKey, optional.get());            return optional;        } finally {            redisLock.unlock(key);        }    }    ...}
复制代码


(2)数据表的 DB 操作组件


该组件主要提供对数据表数据的查询方法。


@Service("productDetailStringDatabase")public class ProductDetailStringDatabase extends AbstractRedisStringDatabase<ProductDetailDO> {    public static final String SKU_ID = "skuId";    private static final String TABLE_NAME = "sku_info";        @Resource    private ProductRepository productRepository;        @Override    public String getTableName() {        return TABLE_NAME;    }        @Override    public Set<String> getTableNameSet() {        return Sets.newHashSet(getTableName());    }        @Override    public Optional<ProductDetailDO> getTableData(Map<String, Object> tableFieldsMap, String queryType) {        if (tableFieldsMap.containsKey(SKU_ID)) {            String skuId = (String) tableFieldsMap.get(SKU_ID);            ProductDetailDO productDetailDO = productRepository.queryProductDetail(skuId);            if (!Objects.isNull(productDetailDO) && DelFlagEnum.EFFECTIVE.getCode().equals(productDetailDO.getDelFlag())) {                return Optional.of(productDetailDO);            }            return Optional.empty();        }        return Optional.empty();    }        @Override    public Optional<List<ProductDetailDO>> listTableData(Map<String, Object> tableFieldsMap, String queryType) {        return Optional.empty();    }        @Override    public Optional<RedisStringCache<ProductDetailDO>> listTableDataByIdSet(Set<Long> idSet, String queryType) {        RedisStringCache redisStringCache = new RedisStringCache();        List<ProductDetailDO> addList = new ArrayList<>();        for (Long skuId : idSet) {            ProductDetailDO productDetailDO = productRepository.queryProductDetail(String.valueOf(skuId));            if (!Objects.isNull(productDetailDO)) {                addList.add(productDetailDO);            }        }        redisStringCache.setAddList(addList);        return Optional.of(redisStringCache);    }        @Override    public Optional<RedisStringCache<ProductDetailDO>> listTableDataByIdSet(List<Long> idSet, String queryType) {        Long skuId = idSet.get(0);        ProductDetailDO productDetailDO = productRepository.queryProductDetail(String.valueOf(skuId));        RedisStringCache redisStringCache = new RedisStringCache();        redisStringCache.setAddList(Arrays.asList(productDetailDO));        return Optional.of(redisStringCache);    }}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18933087

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
商品中心—自研缓存框架的技术文档_不在线第一只蜗牛_InfoQ写作社区