写点什么

Redis 缓存穿透 / 击穿 / 雪崩以及数据一致性的解决方案

作者:Java你猿哥
  • 2023-04-11
    湖南
  • 本文字数:12187 字

    阅读完需:约 40 分钟

无论是在开发过程中还是在准备跑路的面试过程中,有关 redis 相关的,难免会涉及到四个特殊场景:缓存穿透、缓存雪崩、缓存击穿以及数据一致性。如果在开发中不注意这些场景的话,在高并发场景下有可能会导致系统崩溃,数据错乱等情况。现在,结合实际的业务场景来复现并解决这些问题。


相关技术:springboot2.2.2+mybatisplus3.1+redis5.0+hutool5.8


缓存穿透缓存穿透是指查询缓存和数据库中都不存在的数据,导致所有的查询压力全部给到了数据库。


比如查询一篇文章信息并对其进行缓存,一般的逻辑是先查询缓存中是否存在该文章,如果存在则直接返回,否则再查询数据库并将查询结果进行缓存。


@Slf4j@Servicepublic class DocumentInfoServiceImpl extends ServiceImpl<DocumentInfoMapper, DocumentInfo> implements DocumentInfoService {


@Resourceprivate StringRedisTemplate stringRedisTemplate;
@Overridepublic DocumentInfo getDocumentDetail(int docId) { String redisKey = "doc::info::" + docId; String obj = stringRedisTemplate.opsForValue().get(redisKey); DocumentInfo documentInfo = null; if (StrUtil.isNotEmpty(obj)) { //缓存命中 log.info("==== select from cache ===="); documentInfo = JSONUtil.toBean(obj, DocumentInfo.class); } else { log.info("==== select from db ===="); documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one(); if (ObjectUtil.isNotNull(documentInfo)) { // 缓存结果 stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS); } } return documentInfo;}
复制代码


}复制代码 @GetMapping("/doc/queryById")public Result<DocumentInfo> queryById(@RequestParam(name = "docId") Integer docId) {return Result.success(documentInfoService.getDocumentDetail(docId));}复制代码如果项目的并发量不大,这样写的话几乎没啥问题。如果项目的并发量很大,那么这就存在一个隐藏问题,如果在访问了一个不存在的文章(这个文章已经被分享出去,但是在后台可能是被删除或者下线状态),那么就会导致所有的请求全部需要到数据库中进行查询,从而给数据库造成压力,甚至造成宕机。http://127.0.0.1:8081/doc/queryById?docId=不存在的 id


2023-01-05 10:18:57.954 INFO 19692 --- [nio-8081-exec-8] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====2023-01-05 10:18:58.121 INFO 19692 --- [nio-8081-exec-5] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====2023-01-05 10:18:58.350 INFO 19692 --- [io-8081-exec-10] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====2023-01-05 10:18:58.519 INFO 19692 --- [nio-8081-exec-3] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====2023-01-05 10:18:58.661 INFO 19692 --- [nio-8081-exec-6] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====2023-01-05 10:18:58.859 INFO 19692 --- [nio-8081-exec-4] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====2023-01-05 10:18:59.012 INFO 19692 --- [nio-8081-exec-9] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====2023-01-05 10:18:59.154 INFO 19692 --- [nio-8081-exec-7] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====复制代码解决方案一:缓存空对象针对缓存穿透问题缓存空对象可以有效避免所产生的影响,当查询一条不存在的数据时,在缓存中存储一个空对象并设置一个过期时间(设置过期时间是为了避免出现数据库中存在了数据但是缓存中仍然是空数据现象),这样可以避免所有请求全部查询数据库的情况。


    // 查询对象不存在    if(StrUtil.equals(obj,"")){        log.info("==== select from cache , data not available ====");        return null;    }    if (StrUtil.isNotEmpty(obj)) {        log.info("==== select from cache ====");        documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);    } else {        log.info("==== select from db ====");        documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();        //如果数据不存在,则缓存一个空对象并设置过期时间        stringRedisTemplate.opsForValue().set(redisKey, ObjectUtil.isNotNull(documentInfo)?JSONUtil.toJsonStr(documentInfo):"", 5L, TimeUnit.SECONDS);
复制代码


// if (ObjectUtil.isNotNull(documentInfo)) {// stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);// }}复制代码 2023-01-05 13:15:01.057 INFO 16600 --- [nio-8081-exec-3] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====


2023-01-05 13:15:01.214 INFO 16600 --- [nio-8081-exec-4] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====2023-01-05 13:15:01.384 INFO 16600 --- [nio-8081-exec-5] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====2023-01-05 13:15:01.540 INFO 16600 --- [nio-8081-exec-6] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====2023-01-05 13:15:01.720 INFO 16600 --- [nio-8081-exec-7] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====复制代码解决方案二:布隆过滤器


缓存空对象的缺点在于无论数据存不存在都需要查询一次数据库,并且 redis 中存储了大量的空数据,这个时候可以采用布隆过滤器来解决。布隆过滤器可以简单的理解为由一个很长的二进制数组结合 n 个 hash 算法计算出 n 个数组下标,将这些数据下标置为 1。在查找数据时,再次通过 n 个 hash 算法计算出数组下标,如果这些下标的值为 1,表示该值可能存在(存在 hash 冲突的原因),如果为 0,则表示该值一定不存在。


/**


  • 布隆过滤器添加元素伪代码*/BitArr[] bit = new BitArr[10000]; // 新建一个二进制数组 List<String> insertData = Arrays.asList("A", "B", "C"); // 待添加元素 for (String insertDatum : insertData) {for (int i=1;i<=3;i++){ // 使用 3 中 hash 算法计算出 3 个数组下标 int bitIdx = hash_i(insertDatum); //hash1(insertDatum),hash2(insertDatum),hash3(insertDatum)bit[bitIdx]=1; // 将下标元素置为 1}}复制代码


/**


  • 布隆过滤器查找元素伪代码*/BitArr[] bit = new BitArr[10000];for (int i=1;i<=3;i++){int bitIdx = hash_i("E"); //计算 E 的数组下标 if(bit[bitIdx]==0){ //如果对应的元素为 0,则一定不存在 return false;}}return true;复制代码


布隆过滤器的实现在使用布隆过滤器时有两个核心参数,分别是预估的数据量 size 以及期望的误判率 fpp,这两个参数我们可以根据自己的业务场景和数据量进行自主设置。在实现布隆过滤器时,有两个核心问题,分别是 hash 函数的选取个数 n 以及确定 bit 数组的大小 len。


根据预估数据量 size 和误判率 fpp,可以计算出 bit 数组的大小 len。


根据预估数据量 size 和 bit 数组的长度大小 len,可以计算出所需要的 hash 函数个数 n。单机版布隆过滤器


目前单机版的布隆过滤器实现方式有很多,比如 Guava 提供的 BloomFilter,Hutool 工具包中提供的 BitMapBloomFilter 等。以 Guava 为例,需要引入对应的依赖包,在 BloomFilter 类中提供了 create 方法来进行布隆过滤器的创建。


<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>21.0</version></dependency>复制代码 public static BloomFilter<Integer> localBloomFilter = BloomFilter.create(Funnels.integerFunnel(),10000L,0.01);复制代码创建完成后,将需要筛选的数据同步到过滤器中。


/**


  • 单机版布隆过滤器数据初始化*/@PostConstructpublic void initDocumentDataLocal(){List<DocumentInfo> documentInfos = documentInfoService.lambdaQuery().select(DocumentInfo::getId).list();if(CollUtil.isNotEmpty(documentInfos)){documentInfos.stream().map(DocumentInfo::getId).forEach(e->{BloomFilterUtil.localBloomFilter.put(e);});}}复制代码在业务代码中,可以直接调用 BloomFilter 提供的 mightContain 方法,判断目标 docId 是否可能存在于过滤器中,如果可能存在,那么继续向下执行业务逻辑,否则直接中断执行。


@Overridepublic DocumentInfo getDocumentDetail(int docId) {//布隆过滤器拦截 boolean mightContain = BloomFilterUtil.localBloomFilter.mightContain(docId);if(!mightContain){ //是否有可能存在于布隆过滤器中 log.info("==== select from bloomFilter , data not available ====");return null;}String redisKey = "doc::info::" + docId;String obj = stringRedisTemplate.opsForValue().get(redisKey);DocumentInfo documentInfo = null;if (StrUtil.isNotEmpty(obj)) {log.info("==== select from cache ====");documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);} else {log.info("==== select from db ====");documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();if(ObjectUtil.isNotNull(documentInfo)){stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);


}}return documentInfo;}复制代码自定义分布式版布隆过滤器


自定义分布式布隆过滤器的存储依赖于 redis 的 bitmap 数据结构来实现,另外还需要定义四个参数,分别为预估数据量 size,误判率 fpp,数组大小 bitNum 以及 hash 函数个数 hashNum 其中预估数据量和误判率需要配置在 yml 文件中。


@Resourceprivate StringRedisTemplate stringRedisTemplate;
@Value("${bloom.filter.size}")private long size; // 预估数据量
@Value("${bloom.filter.fpp}")private double fpp; // 误判率
private long bitNum; //数组大小len
private int hashNum; // hash函数个数size
复制代码


复制代码


根据上面的两个公式,计算出相应的数组长度以及所需的 hash 函数个数,并在 redis 设置一个布隆过滤器的 key。


/** 计算出数组长度,hash函数个数并初始化数组*/@PostConstructprivate void initBloom() {    this.bitNum = getNumOfBits(size, fpp);    this.hashNum = getNumOfHashFun(size, bitNum);    //借助redis的bitmap来实现二进制数组    stringRedisTemplate.opsForValue().setBit("bloom::filter", bitNum, false);}
/** * 计算bit数组大小 * * @param size * @param fpp * @return */private long getNumOfBits(long size, double fpp) { return (long) (-size * Math.log(fpp) / (Math.log(2) * Math.log(2)));}
/** * 计算所需的hash个数 * * @param size * @param numOfBits * @return */private int getNumOfHashFun(long size, long numOfBits) { return Math.max(1, (int) Math.round((double) numOfBits / size * Math.log(2)));}
复制代码


复制代码另外,需要提供两个方法,分别为添加元素的 putBloomFilterRedis 方法和判断元素是否有可能存在的方法 existBloomFilterRedis,其中的实现方式参考了 guava。


/*** 像自定义布隆过滤器中添加元素* @param key/public void putBloomFilterRedis(String key) {long hash64 = HashUtil.metroHash64(key.getBytes());int hash1 = (int) hash64;int hash2 = (int) (hash64 >>> 32);for (int i = 1; i <= hashNum; i++) {/** 上面不是说,要使用 n 个 hash 函数吗??为啥这里直接用一个动态变量取乘积了呢???* 不用担心,请看《Less Hashing, Same Performance: Building a Better Bloom Filter》,* 里面论述了这种操作不会影响布隆过滤器的性能,毕竟 hash 的代价还是很大的,这算是个有效的优化手段吧:* A standard technique from the hashing literature is to use two hash* functions h(x) and h(x) to simulate additional hash functions of the form g(x) = h(x) + ih(x) .*/int combinedHash = hash1 + i * hash2;if (combinedHash < 0) {//如果为负数,则取反(保证结果为正数)combinedHash = ~combinedHash;}// 计算出数组下标,并将下标值置为 1int bitIdx = (int) (combinedHash % bitNum);stringRedisTemplate.opsForValue().setBit("bloom::filter", bitIdx, true);}}


/** * 判断自定义布隆过滤器中元素是否有可能存在 * @param key * @return */public boolean existBloomFilterRedis(String key) {    long hash64 = HashUtil.metroHash64(key.getBytes());    int hash1 = (int) hash64;    int hash2 = (int) (hash64 >>> 32);    for (int i = 1; i <= hashNum; i++) {        int combinedHash = hash1 + i * hash2;        if (combinedHash < 0) {            combinedHash = ~combinedHash;        }        int bitIdx = (int) (combinedHash % bitNum);        //判断下标值是否为1,如果不为1直接返回false        Boolean bit = stringRedisTemplate.opsForValue().getBit("bloom::filter", bitIdx);        if (!bit) {            return false;        }    }    return true;}
复制代码


复制代码方法实现后,将所有的 key 值数据同步到 redis 中。


@Componentpublic class BloomFilterInitData {


@Resourceprivate BloomFilterUtil bloomFilterUtil;
@Resourceprivate DocumentInfoService documentInfoService;
@PostConstructpublic void initDocumentData(){ List<DocumentInfo> documentInfos = documentInfoService.lambdaQuery().select(DocumentInfo::getId).list(); if(CollUtil.isNotEmpty(documentInfos)){ documentInfos.stream().map(m -> { return "doc::info::" + m.getId().intValue(); }).forEach(e->{ bloomFilterUtil.putBloomFilterRedis(e); }); }}
复制代码


}复制代码上面全部搞定后,启动项目并测试结果是否有效,在启动前先在数据表中搞几条测试数据。


@Overridepublic DocumentInfo getDocumentDetail(int docId) {String redisKey = "doc::info::" + docId;// 布隆过滤器中是否有可能存在这个 keyboolean b = bloomFilterUtil.existBloomFilterRedis(redisKey);if(!b){// 如果不存在,直接返回空 log.info("==== select from bloomFilter , data not available ====");return null;}String obj = stringRedisTemplate.opsForValue().get(redisKey);DocumentInfo documentInfo = null;if (StrUtil.isNotEmpty(obj)) {log.info("==== select from cache ====");documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);} else {log.info("==== select from db ====");documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();if(ObjectUtil.isNotNull(documentInfo)){stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);


}}return documentInfo;}复制代码查询存在的数据。


查询不存在的数据。


查看两次请求打印的 log,不存在的数据成功被拦截掉了,避免再去查询数据库,即使存在一定的误判率,也几乎不会有啥影响,最多就是查询一次数据库。


虽然布隆过滤器可以有效的解决缓存穿透问题,并且实现的算法查找效率也很快。但是,也存在一定的缺点,由于存在 hash 冲突的原因,一方面存在一定的误判率(某个在过滤器中并不存在的 key,但是通过 hash 计算出来的下标值都为 1)。另一方面,删除比较困难(如果将一个数组位置为 0,那么这个位置有可能也代表其他 key 的值,会影响到其他的 key)。


缓存击穿缓存击穿是指访问某个热点数据时,缓存中并不存在该数据或者缓存过期了,这个时候全部的请求压力给到了数据库。


基于上面的代码,来模拟短时间内进行并发请求,看看会不会将请求全部打到数据库。


public static void main(String[] args) throws InterruptedException {/**


  • 短时间内并发请求接口,并访问同一个数据*/ExecutorService executorService = Executors.newFixedThreadPool(1000);CountDownLatch countDownLatch = new CountDownLatch(1000);for (int i = 0; i < 1000; i++) {executorService.execute(() -> {HttpResponse response = HttpUtil.createGet("http://127.0.0.1:8081/doc/queryById?docId=1").execute();System.out.println(response.body());countDownLatch.countDown();});}countDownLatch.await();executorService.shutdown();}复制代码根据日志输出结果显示,请求确实给到了数据库。


针对缓存击穿问题,有两种解决方案,一种是对热点数据不设置过期时间,另一种是采用互斥锁的方式。


解决方案一:热点数据不设置过期时间热点数据不设置过期时间,当后台更新热点数据数需要同步更新缓存中的数据,这种解决方式适用于不严格要求缓存一致性的场景。


解决方案二:使用互斥锁如果是单机部署的环境下可以使用 synchronized 或 lock 来处理,保证同时只能有一个线程来查询数据库,其他线程可以等待数据缓存成功后在被唤醒,从而直接查询缓存即可。如果是分布式部署,可以采用分布式锁来实现互斥。


@Componentpublic class RedisLockUtil {


@Resourceprivate StringRedisTemplate stringRedisTemplate;
/** * 模拟互斥锁 * @param key * @param value * @param exp * @return */public boolean tryLock(String key, String value, long exp) { Boolean absent = stringRedisTemplate.opsForValue().setIfAbsent(key, value, exp, TimeUnit.SECONDS); if (absent) { return true; } return tryLock(key, value, exp); //如果线程没有获取锁,则在此处循环获取}
/** * 释放锁 * @param key * @param value */public void unLock(String key, String value) { String s = stringRedisTemplate.opsForValue().get(key); if (StrUtil.equals(s, value)) { //避免锁被其他线程误删 stringRedisTemplate.delete(key); }}
复制代码


}复制代码有了上面的两个方法,可以对业务代码进行改造,在查询数据库前进行加锁,读取完成后在释放锁。


@Overridepublic DocumentInfo getDocumentDetail(int docId) {String redisKey = "doc::info::" + docId;boolean b = bloomFilterUtil.existBloomFilterRedis(redisKey);if (!b) {log.info("==== select from bloomFilter , data not available ====");return null;}String obj = stringRedisTemplate.opsForValue().get(redisKey);DocumentInfo documentInfo = null;if (StrUtil.isNotEmpty(obj)) {log.info("==== select from cache ====");documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);} else {String s = UUID.randomUUID().toString(); //给锁加个标识,避免误删 String lockKey = redisKey+"::lock";boolean lock = redisLockUtil.tryLock(lockKey, s, 60); //尝试加锁 if (lock) {try {//如果加锁成功,先再次查询缓存,有可能上一个线程查询并添加到缓存了 obj = stringRedisTemplate.opsForValue().get(redisKey);if (StrUtil.isNotEmpty(obj)) {log.info("==== select from cache ====");documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);} else {log.info("==== select from db ====");documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();if (ObjectUtil.isNotNull(documentInfo)) {stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);}}} finally {redisLockUtil.unLock(lockKey, s); //释放锁}}}return documentInfo;}复制代码一顿梭哈后,再次模拟并发查询,看看最终效果,理想的结果状态应该是查询一次数据库,后面的查询直接通过缓存获取。通过日志输出可以看出来,击穿问题被有效解决啦。


缓存雪崩缓存雪崩是指对热点数据设置了相同的过期时间,在同一时间这些热点数据 key 大批量发生过期,请求全部转发到数据库,从而导致数据库压力骤增,甚至宕机。与缓存击穿不同的是,缓存击穿是单个热点数据过期,而缓存雪崩是大批量热点数据过期。


针对缓存雪崩问题,常见的解决方案有多种,比如设置随机的过期时间或者不设置过期时间,搭建高可用的缓存架构避免 redis 服务宕机,服务降级等。


解决方案一:设置随机的过期时间将 key 的过期时间后面加上一个随机数,这个随机数值的范围可以根据自己的业务情况自行设定,这样可以让 key 均匀的失效,避免大批量的同时失效。


if (ObjectUtil.isNotNull(documentInfo)) {//生成一个随机数 int randomInt = RandomUtil.randomInt(2, 10);//过期时间+随机数 stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L+randomInt, TimeUnit.SECONDS);}复制代码解决方案二:不设置过期时间不设置过期时间时,需要注意的是,在更新数据库数据时,同时也需要更新缓存数据,否则数据会出现不一致的情况。这种方式比较适用于不严格要求缓存一致性的场景。


if (ObjectUtil.isNotNull(documentInfo)) {stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo));}复制代码解决方案三:搭建高可用集群缓存服务故障时,也会触发缓存雪崩,为了避免因服务故障而发生的雪崩,推荐使用高可用的服务集群,这样即使发生故障,也可以进行故障转移。集群相关的在之前也有介绍过 Redis 高可用架构搭建到原理分析


数据一致性通常情况下,使用缓存的直接目的是为了提高系统的查询效率,减轻数据库的压力。一般情况下使用缓存是下面这几步骤:


查询缓存,数据是否存在如果数据存在,直接返回如果数据不存在,再查询数据库如果数据库中数据存在,那么将该数据存入缓存并返回。如果不存在,返回空。


这么搞好像看上去并没有啥问题,那么会有一个细节问题:当一条数据存入缓存后,立刻又被修改了,那么这个时候缓存该如何更新呢。不更新肯定不行,这样导致了缓存中的数据与数据库中的数据不一致。一般情况下对于缓存更新有下面这几种情况:


先更新缓存,再更新数据库先更新数据库,再更新缓存先删除缓存,再更新数据库先更新数据库,再删除缓存先更新缓存,再更新数据库先更新缓存,再更新数据库这种情况下,如果业务执行正常,不出现网络等问题,这么操作不会有啥问题,两边都可以更新成功。但是,如果缓存更新成功了,但是当更新数据库时或者在更新数据库之前出现了异常,导致数据库无法更新。这种情况下,缓存中的数据变成了一条实际不存在的假数据。


比如,在更新文章详情时,先修改了 redis 中的数据,在更新数据库前抛出一个异常来模拟数据库更新失败的场景。


public boolean updateDocument(DocumentInfo documentInfo) {String redisKey = "doc::info::" + documentInfo.getId();// 先更新缓存 stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo));// 模拟更新数据库前出现异常 int i = 1 / 0;// 在更新数据库 boolean b = this.updateById(documentInfo);return b;}复制代码 @PostMapping("/doc/updateDocument")public Result<Boolean> updateDocument(@RequestBody DocumentInfo documentInfo) {return Result.success(documentInfoService.updateDocument(documentInfo));}复制代码先调用更新的接口,在调用查询的接口,结果发现查询的接口返回的并不是数据库中的实际数据,这个时候就造成了缓存与数据库数据不一致的情况。


先更新数据库,再更新缓存先更新数据库,再更新缓存和先更新缓存,再更新数据库的情况基本一致,如果失败,会导致数据库中是最新的数据,缓存中是旧数据。还有一种极端情况,在高并发情况下容易出现数据覆盖的现象:A 线程更新完数据库后,在要执行更新缓存的操作时,线程被阻塞了,这个时候线程 B 更新了数据库并成功更新了缓存,当 B 执行完成后线程 A 继续向下执行,那么最终线程 B 的数据会被覆盖。


@Overridepublic boolean updateDocument(DocumentInfo documentInfo) {String redisKey = "doc::info::" + documentInfo.getId();// 更新数据库 boolean b = this.updateById(documentInfo);//以标题为标识,模拟线程阻塞。当一个请求的标题为‘模拟数据覆盖’时,线程停 4 秒 if(StrUtil.equals(documentInfo.getTitle(),"模拟数据覆盖")){try {Thread.sleep(4000);}catch (Exception e){


    }}// 更新缓存stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo));// 模拟更新数据库前出现异常return b;
复制代码


}复制代码先删除缓存,再更新数据库先删除缓存,再更新数据库这种情况,如果并发量不大用起来不会有啥问题。但是在并发场景下会有这样的问题:线程 A 在删除缓存后,在写入数据库前发生了阻塞。这时线程 B 查询了这条数据,发现缓存中不存在,继而向数据库发起查询请求,并将查询结果缓存到了 redis。当线程 B 执行完成后,线程 A 继续向下执行更新了数据库,那么这时缓存中的数据为旧数据,与数据库中的值不一致。


先更新数据库,再删除缓存先更新数据库,再删除缓存也并不是绝对安全的,在高并发场景下,如果线程 A 查询一条在缓存中不存在的数据(这条数据有可能过期被删除了),查询数据库后在要将查询结果缓存到 redis 时发生了阻塞。这个时候线程 B 发起了更新请求,先更新了数据库,再次删除了缓存。当线程 B 执行成功后,线程 A 继续向下执行,将查询结果缓存到了 redis 中,那么此时缓存中的数据与数据库中的数据发生了不一致。


解决数据不一致方案延时双删延时双删,即在写数据库之前删除一次,写完数据库后,再删除一次,在第二次删除时,并不是立即删除,而是等待一定时间在做删除。


这个延时的功能可以使用 mq 来实现,这里为了省事,偷个懒,本地测试使用的延时队列来模拟 mq 达到延时效果。首先需要定义一个队列元素对象 DoubleDeleteTask。


@Datapublic class DoubleDeleteTask implements Delayed {


private String key; // 需要删除的key
private long time; //需要延迟的时间
public DoubleDeleteTask(String key, long time) { this.key = key; this.time = time + System.currentTimeMillis();}
@Overridepublic long getDelay(TimeUnit unit) { return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}
@Overridepublic int compareTo(Delayed o) { return Long.compare(time, ((DoubleDeleteTask) o).time);}
复制代码


}复制代码然后定义一个队列并交给 spring 管理。


@Configurationpublic class DoubleDeleteQueueConfig {


@Bean(name = "doubleDeleteQueue")public DelayQueue<DoubleDeleteTask> doubleDeleteQueue() {   return new DelayQueue<DoubleDeleteTask>();}
复制代码


}复制代码设置一个独立线程,特意用来处理延时的任务。如果数据删除失败,可以自定义重试次数以保证数据的一致性,但是也会带来一定的性能影响,如果在实际项目中,建议还是以异步的方式来实现重试。


@Slf4j@Componentpublic class DoubleDeleteTaskRunner implements CommandLineRunner {


@Resourceprivate DelayQueue<DoubleDeleteTask> doubleDeleteQueue;
@Resourceprivate StringRedisTemplate stringRedisTemplate;
private static final int retryCount = 3; //失败重试次数
@Overridepublic void run(String... args) throws Exception { new Thread(() -> { try { while (true) { DoubleDeleteTask take = doubleDeleteQueue.take(); //取出队列元素 String key = take.getKey(); try { stringRedisTemplate.delete(key); log.info("====延时删除key:{}====",key); } catch (Exception e) { //失败重试 int count = 1; for (int i = 1; i <= retryCount; i++) { if (count <= retryCount) { log.info("====延时删除key:{},失败重试次数:{}====",key,count); Boolean r = stringRedisTemplate.delete(key); if (r) { break; } else { count++; } }else break; } } } } catch (Exception e) { e.printStackTrace(); } }, "double-delete-task").start();}
复制代码


}复制代码使用延时队列,处理延时双删。最终测试,通过日志的打印可以确定实现了延时删除的功能。


@Overridepublic boolean updateDocument(DocumentInfo documentInfo) {String redisKey = "doc::info::" + documentInfo.getId();// 更新缓存 stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo));// 更新数据库 boolean b = this.updateById(documentInfo);// 再次延时删除缓存 doubleDeleteQueue.add(new DoubleDeleteTask(redisKey,2000L));return b;}复制代码


最后在高并发的场景下,使用 reids 还是存在很多坑的,稍不注意就会出现缓存穿透,缓存雪崩等情况,严重的话可以直接造成服务宕机,所以在以后的开发中需要注意(如果项目没啥并发量的话,可以不用考虑)。

用户头像

Java你猿哥

关注

一只在编程路上渐行渐远的程序猿 2023-03-09 加入

关注我,了解更多Java、架构、Spring等知识

评论

发布
暂无评论
Redis缓存穿透/击穿/雪崩以及数据一致性的解决方案_redis_Java你猿哥_InfoQ写作社区