写点什么

如何用 Redis 实现一个分布式锁

作者:Ayue、
  • 2022 年 6 月 23 日
  • 本文字数:4877 字

    阅读完需:约 16 分钟

如何用 Redis 实现一个分布式锁

场景模拟

一般电子商务网站都会遇到如团购、秒杀、特价之类的活动,而这样的活动有一个共同的特点就是访问量激增、上千甚至上万人抢购一个商品。然而,作为活动商品,库存肯定是很有限的,如何控制库存不让出现超买,以防止造成不必要的损失是众多电子商务网站程序员头疼的问题,这同时也是最基本的问题。


在秒杀系统设计中,超卖是一个经典、常见的问题,任何商品都会有数量上限,如何避免成功下订单买到商品的人数不超过商品数量的上限,这是每个抢购活动都要面临的难点。


针对大量的并发请求,我们可以通过 Redis 来抗,也就是说对于库存直接请求 Redis 缓存,不直接请求数据库,如在 Redis 中有 50 个库存,如下:



但不管是缓存还是数据库,在不做任何处理的情况下,都会出现超买的问题,常见的处理方式就是在代码中通过 JVM 加锁的方式,如下:


server1


@RestControllerpublic class SkillController {
@Autowired private RedisTemplate redisTemplate;
// 秒杀接口 @RequestMapping("/deduct_stock") public String deductStock() {
// 加锁 synchronized (this) { int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString()); if (stock > 0) { // 库存 -1 int realStock = stock - 1; // 扣减库存 redisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } }
return "8080"; }}
复制代码


当然,在单机情况下确实没有任何问题,但现在绝大多数系统都是分布式系统,就算是 ERP 系统也会部署 2 台机器防止单点故障,所以一般情况下一个请求如下:



server2


server2 和 server1 代码基本相同,只是开启了 2 个 JVM 实例。


@RestControllerpublic class SkillController {
@Autowired private RedisTemplate redisTemplate;
// 秒杀接口 @RequestMapping("/deduct_stock") public String deductStock() {
// 加锁 synchronized (this) { int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString()); if (stock > 0) { // 库存 -1 int realStock = stock - 1; // 扣减库存 redisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } }
return "8090"; }}
复制代码


Nginx


一般来说,前端通过 nginx 请求转发并通过 upstream 实现负载均衡,其关键配置如下:



Jmeter


我们这里通过 Jmeter 来进行并发压测,不会用的参考 Jmeter 使用,然后这里提供下载链接:Jmeter 下载 (提取码:2hyo)。


并发请求:1 s 内 200 个请求(模拟高并发),循环 5 次,一共 1000 个总请求。



请求地址:秒杀的减库存接口。


JVM 锁

了解了上面的配置,然后启动 2 个实例,端口分别为 8080,8090,如下:



如过不知道如何启动 2 个实例的看下面:



注意:要修改启动端口。


使用 JVM 锁也就是同步代码块的方式存在问题,如上面测试的结果如下:



不仅 2 个服务同时存在相同的库存,甚至同一个服务也存在相同的值,很明显在高并发分布式场景下,JVM 层面的锁是不可行的。

Redis SETNX

SETNX

格式:setnx key value

将 key 的值设为 value ,当且仅当 key 不存在。

若给定的 key 已经存在,则 SETNX 不做任何动作。

SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。


1、实现一个最简单的分布式锁


@RestControllerpublic class SkillController {
@Autowired private RedisTemplate redisTemplate;
@RequestMapping("/deduct_stock") public String deductStock() {
// 商品 ID,具体应用中应该是请求传入的 String lockKey = "lock:product_01"; // SETNX 加锁 Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "product"); // 如果为 false 说明这把锁存在,直接返回 if (!result) { // 模拟返回业务 return "系统繁忙"; } int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString()); if (stock > 0) { // 库存 -1 int realStock = stock - 1; // 扣减库存 模拟其他更多业务操作 redisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); }
// 加锁后需要释放锁 redisTemplate.delete(lockKey); return "8080"; }}
复制代码


2、存在的问题


① 业务代码异常---死锁


在实际的场景中,一次秒杀过程涉及到很多的业务操作,如果在释放锁之前的某个业务操作抛异常,使得锁没有被释放,那么此时就会存在死锁问题。此时这个 key 永远存在于 redis 中,其它线程执行 SETNX 永远失败。


也就是说我们要保证释放锁得到执行,所以要把上面的业务代码放在 try catch 或者 try finally 中:



② Redis 宕机


但其实上面的代码并不一定能完全解决问题,如果 Redis 宕机或者被重启,同样会导致 finally 中的代码执行失败,结果就同上了。


所以一般我们需要给这个 key 设置过期时间,即:


Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "product");
redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
复制代码


但上面的写法存在原子性问题,所以我们不能分开来写,得合成一条命令:


Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "product", 10, TimeUnit.SECONDS);
复制代码


③ 高并发可能存在的问题


一般来说,并发量不大的情况下,上面的写法已经满足要求,但对于几千上万的并发,可能会导致接口的响应变慢,比如:


某个请求 A 执行完整个操作需要 15s ,而我们上面设置的超时时间为 10s ,所以此时请求 A 并没有执行完,但由于设置了过期时间把 key 给删掉了,然后这时再来一个请求 B,是可以加锁成功的,且在 8s 内执行完成,而 B 在执行过程中 A 同样也在执行,如果此时 A 可能先于 B 去执行 finally 中的代码把锁给删除了,但 A 删除的锁并不是它的,而是 B 加的锁,同理,当请求 C 加锁后又被请求 B 给释放了,也就是说,这把分布式锁直接无效了(尽管可能性很小),同样会出现超买问题。


这个问题的根本在于:自己加的锁被被别人释放了


因此我们可以确定 value 值唯一性,如 UUID,如下:



但这种方式同样存在原子问题,也就是上图中 ② 处的代码,结果也会导致自己加的锁被被别人释放

Redisson

针对上面的问题,我们可以通过 Redisson 来解决,其使用非常简单,和 JDK 中的 Lock 使用类似,如下:


@RestControllerpublic class RedissonController {
@Autowired private Redisson redisson;
@Autowired private RedisTemplate redisTemplate;
@RequestMapping("/deduct_stock1") public String deductStock() {
// 商品 ID,具体应用中应该是请求传入的 String lockKey = "lock:product_01";
// 获取锁 RLock lock = redisson.getLock(lockKey); // 加锁 lock.lock();
try { int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString()); if (stock > 0) { // 库存 -1 int realStock = stock - 1; // 扣减库存 redisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { // 释放锁 lock.unlock(); }
return "8080"; }}
复制代码


再去测试就是正常的了:



Redisson 其原理如下:



Redisson 锁的加锁机制如上图所示,线程去获取锁,获取成功则执行保存数据到 redis 数据库。如果获取失败,则一直通过 while 循环尝试获取锁(可自定义等待时间,超时后返回失败),获取成功后,保存数据到 redis 数据库。


Redisson 提供的分布式锁是支持锁自动续期的(锁续命),也就是说,如果线程仍旧没有执行完,那么 Redisson 会自动给 redis 中的目标 key 延长超时时间,这在 Redisson 中称之为 Watch Dog(看门狗)机制。


那么 redisson 是怎么实现原子性的


当然是 lua。不管是加锁操作,还是看门狗机制都是通过 lua来保证其原子性。


其加锁调用链路如下:


RedissonLock.lock()--->lockInterruptibly()--->tryAcquire()--->tryLockInnerAsync()
复制代码


关键代码就在 tryLockInnerAsync() 中:


<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {    internalLockLeaseTime = unit.toMillis(leaseTime);    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,          // 如果锁不存在,则通过hset设置它的值,并设置过期时间              "if (redis.call('exists', KEYS[1]) == 0) then " +                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +                  "return nil; " +              "end; " +              // 如果锁已存在,其是当前线程,则通过hincrby给数值递增1,即锁的重入              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +                  "return nil; " +              "end; " +              // 如果锁已存在,不是当前线程,则返回过期时间 ttl              "return redis.call('pttl', KEYS[1]);",                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}
复制代码


那么对于锁自动续期如下:


RedissonLock.lock()--->lockInterruptibly()--->tryAcquire()--->scheduleExpirationRenewal()
复制代码


scheduleExpirationRenewal() 方法会开启一个子线程去执行自动延期的操作,当然也是执行 lua代码,如下,截取关键部分:


// getName()就是当前锁的名字 RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,        // 判断这个锁 getName() 是否在redis中存在,如果存在就进行 pexpire 延期 默认30s        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +            "redis.call('pexpire', KEYS[1], ARGV[1]); " +            "return 1; " +        "end; " +        "return 0;",          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
复制代码


他会判断这个锁 getName() 是否在 redis 中存在,如果存在就进行 pexpire 延期,默认lockWatchdogTimeout=30s,且是每间隔lockWatchdogTimeout/3=10s时间,去执行延时操作。


源码:https://gitee.com/javatv/redis.git


参考:redisson 中的看门狗机制

发布于: 刚刚阅读数: 3
用户头像

Ayue、

关注

🏆 InfoQ写作平台-签约作者 🏆 2019.10.16 加入

个人站点:javatv.net | 学习知识,目光坚毅

评论

发布
暂无评论
如何用 Redis 实现一个分布式锁_redis_Ayue、_InfoQ写作社区