场景模拟
一般电子商务网站都会遇到如团购、秒杀、特价之类的活动,而这样的活动有一个共同的特点就是访问量激增、上千甚至上万人抢购一个商品。然而,作为活动商品,库存肯定是很有限的,如何控制库存不让出现超买,以防止造成不必要的损失是众多电子商务网站程序员头疼的问题,这同时也是最基本的问题。
在秒杀系统设计中,超卖是一个经典、常见的问题,任何商品都会有数量上限,如何避免成功下订单买到商品的人数不超过商品数量的上限,这是每个抢购活动都要面临的难点。
针对大量的并发请求,我们可以通过 Redis 来抗,也就是说对于库存直接请求 Redis 缓存,不直接请求数据库,如在 Redis 中有 50 个库存,如下:
但不管是缓存还是数据库,在不做任何处理的情况下,都会出现超买的问题,常见的处理方式就是在代码中通过 JVM 加锁的方式,如下:
server1
@RestController
public class SkillController {
@Autowired
private RedisTemplate redisTemplate;
// 秒杀接口
@RequestMapping("/deduct_stock")
public String deductStock() {
// 加锁
synchronized (this) {
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());
if (stock > 0) {
// 库存 -1
int realStock = stock - 1;
// 扣减库存
redisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
}
return "8080";
}
}
复制代码
当然,在单机情况下确实没有任何问题,但现在绝大多数系统都是分布式系统,就算是 ERP 系统也会部署 2 台机器防止单点故障,所以一般情况下一个请求如下:
server2
server2 和 server1 代码基本相同,只是开启了 2 个 JVM 实例。
@RestController
public class SkillController {
@Autowired
private RedisTemplate redisTemplate;
// 秒杀接口
@RequestMapping("/deduct_stock")
public String deductStock() {
// 加锁
synchronized (this) {
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());
if (stock > 0) {
// 库存 -1
int realStock = stock - 1;
// 扣减库存
redisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
}
return "8090";
}
}
复制代码
Nginx
一般来说,前端通过 nginx 请求转发并通过 upstream 实现负载均衡,其关键配置如下:
Jmeter
我们这里通过 Jmeter 来进行并发压测,不会用的参考 Jmeter 使用,然后这里提供下载链接:Jmeter 下载 (提取码:2hyo)。
并发请求:1 s 内 200 个请求(模拟高并发),循环 5 次,一共 1000 个总请求。
请求地址:秒杀的减库存接口。
JVM 锁
了解了上面的配置,然后启动 2 个实例,端口分别为 8080,8090,如下:
如过不知道如何启动 2 个实例的看下面:
注意:要修改启动端口。
使用 JVM 锁也就是同步代码块的方式存在问题,如上面测试的结果如下:
不仅 2 个服务同时存在相同的库存,甚至同一个服务也存在相同的值,很明显在高并发分布式场景下,JVM 层面的锁是不可行的。
Redis SETNX
SETNX
格式:setnx key value
将 key 的值设为 value ,当且仅当 key 不存在。
若给定的 key 已经存在,则 SETNX 不做任何动作。
SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
1、实现一个最简单的分布式锁
@RestController
public class SkillController {
@Autowired
private RedisTemplate redisTemplate;
@RequestMapping("/deduct_stock")
public String deductStock() {
// 商品 ID,具体应用中应该是请求传入的
String lockKey = "lock:product_01";
// SETNX 加锁
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "product");
// 如果为 false 说明这把锁存在,直接返回
if (!result) {
// 模拟返回业务
return "系统繁忙";
}
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());
if (stock > 0) {
// 库存 -1
int realStock = stock - 1;
// 扣减库存 模拟其他更多业务操作
redisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
// 加锁后需要释放锁
redisTemplate.delete(lockKey);
return "8080";
}
}
复制代码
2、存在的问题
① 业务代码异常---死锁
在实际的场景中,一次秒杀过程涉及到很多的业务操作,如果在释放锁之前的某个业务操作抛异常,使得锁没有被释放,那么此时就会存在死锁问题。此时这个 key 永远存在于 redis 中,其它线程执行 SETNX 永远失败。
也就是说我们要保证释放锁得到执行,所以要把上面的业务代码放在 try catch
或者 try finally
中:
② Redis 宕机
但其实上面的代码并不一定能完全解决问题,如果 Redis 宕机或者被重启,同样会导致 finally 中的代码执行失败,结果就同上了。
所以一般我们需要给这个 key 设置过期时间,即:
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "product");
redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
复制代码
但上面的写法存在原子性问题,所以我们不能分开来写,得合成一条命令:
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "product", 10, TimeUnit.SECONDS);
复制代码
③ 高并发可能存在的问题
一般来说,并发量不大的情况下,上面的写法已经满足要求,但对于几千上万的并发,可能会导致接口的响应变慢,比如:
某个请求 A 执行完整个操作需要 15s ,而我们上面设置的超时时间为 10s ,所以此时请求 A 并没有执行完,但由于设置了过期时间把 key 给删掉了,然后这时再来一个请求 B,是可以加锁成功的,且在 8s 内执行完成,而 B 在执行过程中 A 同样也在执行,如果此时 A 可能先于 B 去执行 finally 中的代码把锁给删除了,但 A 删除的锁并不是它的,而是 B 加的锁,同理,当请求 C 加锁后又被请求 B 给释放了,也就是说,这把分布式锁直接无效了(尽管可能性很小),同样会出现超买问题。
这个问题的根本在于:自己加的锁被被别人释放了。
因此我们可以确定 value 值唯一性,如 UUID,如下:
但这种方式同样存在原子问题,也就是上图中 ② 处的代码,结果也会导致自己加的锁被被别人释放。
Redisson
针对上面的问题,我们可以通过 Redisson 来解决,其使用非常简单,和 JDK 中的 Lock 使用类似,如下:
@RestController
public class RedissonController {
@Autowired
private Redisson redisson;
@Autowired
private RedisTemplate redisTemplate;
@RequestMapping("/deduct_stock1")
public String deductStock() {
// 商品 ID,具体应用中应该是请求传入的
String lockKey = "lock:product_01";
// 获取锁
RLock lock = redisson.getLock(lockKey);
// 加锁
lock.lock();
try {
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());
if (stock > 0) {
// 库存 -1
int realStock = stock - 1;
// 扣减库存
redisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
// 释放锁
lock.unlock();
}
return "8080";
}
}
复制代码
再去测试就是正常的了:
Redisson 其原理如下:
Redisson 锁的加锁机制如上图所示,线程去获取锁,获取成功则执行保存数据到 redis 数据库。如果获取失败,则一直通过 while 循环尝试获取锁(可自定义等待时间,超时后返回失败),获取成功后,保存数据到 redis 数据库。
Redisson 提供的分布式锁是支持锁自动续期的(锁续命),也就是说,如果线程仍旧没有执行完,那么 Redisson 会自动给 redis 中的目标 key 延长超时时间,这在 Redisson 中称之为 Watch Dog(看门狗)机制。
那么 redisson 是怎么实现原子性的?
当然是 lua
。不管是加锁操作,还是看门狗机制都是通过 lua
来保证其原子性。
其加锁调用链路如下:
RedissonLock.lock()--->lockInterruptibly()--->tryAcquire()--->tryLockInnerAsync()
复制代码
关键代码就在 tryLockInnerAsync()
中:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 如果锁不存在,则通过hset设置它的值,并设置过期时间
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果锁已存在,其是当前线程,则通过hincrby给数值递增1,即锁的重入
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果锁已存在,不是当前线程,则返回过期时间 ttl
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
复制代码
那么对于锁自动续期如下:
RedissonLock.lock()--->lockInterruptibly()--->tryAcquire()--->scheduleExpirationRenewal()
复制代码
scheduleExpirationRenewal()
方法会开启一个子线程去执行自动延期的操作,当然也是执行 lua
代码,如下,截取关键部分:
// getName()就是当前锁的名字
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 判断这个锁 getName() 是否在redis中存在,如果存在就进行 pexpire 延期 默认30s
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
复制代码
他会判断这个锁 getName()
是否在 redis 中存在,如果存在就进行 pexpire
延期,默认lockWatchdogTimeout=30s
,且是每间隔lockWatchdogTimeout/3=10s
时间,去执行延时操作。
源码:https://gitee.com/javatv/redis.git
参考:redisson 中的看门狗机制
评论