假设有这样一个场景,在一个购票软件上买一张票,但是此时剩余票数只有一张或几张,这个时候有几十个人都在同时使用这个软件购票。在不考虑任何影响下,正常的逻辑是首先判断当前是否还有剩余的票,如果有,那么就进行购买并扣减库存数,否则就会提示票数不足,购买失败。伪代码如下:
void buyTicket() {
int stockNum = byTicketMapper.selectStockNum();
if(stockNum>0){
//TODO 买票流程....
byTicketMapper.reduceStock(); // 扣减库存
}else{
log.info("=====>票卖完了<====");
}
}
复制代码
这段代码在逻辑上没有问题,但是在并发场景下,可能会存在一个严重的问题。当剩余票数为 1 时,有 A,B 两个用户同时点击了购买按钮,A 用户通过了库存大于 0 的校验并开始执行购票逻辑,但是由于一些原因造成 A 用户的购票线程有短暂的阻塞。
而在这个阻塞的过程中,用户 B 发起了购买请求,并且也通过了库存大于 0 的校验,直到整个购买流程执行完成并且扣减了库存。那么这个时候剩余库存刚好为 0,不会再有用户发起购买请求,这时用户 A 的购买请求阻塞被唤醒,因为在此之前已经校验过库存大于 0,所以执行完购买流程后,库存还会被扣减一次。那么此时的库存为-1,这就是常听到的超卖问题。
为了避免这个问题,我们可以通过加锁了方式,来保证并发的安全性。像 JVM 提供的内置锁 synchronized,JUC 提供的重入锁 ReentrantLock,但是这两种锁只能保证单机环境下并发安全问题,一般在实际工作中很少会部署单节点的项目,通常都是多节点集群部署,这两个锁就失去了意义。这个时候就可以借助 redis 来实现分布式锁。
分布式锁 setnx
在集群部署的情况下,通常使用 redis 来实现分布式锁。其中 redis 提供了 setnx 命令,标识只有 key 不存在时才能设值成功,从而达到加锁的效果。
下面通过 redis 来改造上述的代码,其方式是购票线程首先获取锁,如果获取锁成功,那么继续执行购票业务流程,直到所有流程执行完成并扣减库存后,最终在释放锁。如果获取锁失败,那么就给出一个友好的系统提示。
void buyTicket() {
// 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
if (lock) {
int stockNum = byTicketMapper.selectStockNum();
if(stockNum>0){
//TODO 买票流程....
byTicketMapper.reduceStock(); // 扣减库存
}else{
log.info("=====>票卖完了<====");
}
// 释放锁
redisTemplate.delete("lock");
} else {
log.info("=====>系统繁忙,请稍后!<====");
}
}
复制代码
问题 1:死锁问题
通过上面的一顿梭哈,你以为这样就可以了吗,其实不然。设想一下,如果线程 A 在获取锁成功后,在执行购票的逻辑中出现了异常,那么这个时候就会造成锁得不到释放,其他线程始终获取不到锁,这就造成严重的死锁问题。
为了避免死锁问题的出现,我们可以对异常进行捕获,在 finally 中去释放锁,这样不管业务执行成功或失败,最后都会去释放锁。
void buyTicket() {
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
if (lock) {
try {
int stockNum = byTicketMapper.selectStockNum();
if (stockNum > 0) {
//TODO 买票流程....
byTicketMapper.reduceStock(); // 扣减库存
} else {
log.info("=====>票卖完了<====");
}
}finally {
redisTemplate.delete("lock"); // 释放锁
}
} else {
log.info("=====>系统繁忙,请稍后!<====");
}
}
复制代码
你以为这就结束了吗?死锁就不会发生了吗?如果你认为这样就能避免死锁的发生,那你就太不细心啦。如果在程序刚想像执行释放锁的逻辑时,redis 服务突然宕机了,那么这时锁释放就失败了。在将 redis 服务重启后,加锁的数据又被恢复了,这样又出现了死锁的现象。
为了避免这个问题,可以为锁设置一个过期时间,这样即使 redis 重启恢复数据后,也会很快的过期掉。不过需要注意的是,在设置锁的过期时间时,一定要保证原子性操作,不然还是会出现死锁问题。
//不是原子操作,会出现死锁问题
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
//如果刚要执行该语句时,redis宕机了。上面的锁无法释放
redisTemplate.expire("lock",Duration.ofSeconds(5L));
//原子操作
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1", Duration.ofSeconds(5L));
复制代码
问题 2:锁被其他线程释放问题
经过上面的又一顿梭哈,死锁的问题可以避免了,这样在高并发的情况下就能安全的执行了吗。如果锁的过期时间设置了 5 秒,当 A 线程发起购票请求并获取到了锁,但是 A 线程在执行购票流程时花费了 6 秒,此时线程 A 的锁已经过期。
这时线程 B 重新获取了锁并且也开始执行购票流程,但是 A 线程要比 B 线程执行的要快,当 A 线程释放锁时,问题就出现了。由于 A 线程执行的过程锁已经过期了,那么在执行释放锁的流程时,最终被释放的是线程 B 的锁,这就导致 B 的锁被 A 线程释放问题。
对于这个现象,可以给每个锁设置一个唯一标识,比如像 UUID,线程 ID。在释放锁时,校验一下这个锁的标识是否为需要删除的锁,如果是,在进行锁的释放。
public void buyTicket() {
String uuid = UUID.randomUUID().toString();
// 为锁设置一个唯一标识
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, Duration.ofSeconds(5L));
if (lock) {
try {
int stockNum = byTicketMapper.selectStockNum();
if (stockNum > 0) {
//TODO 买票流程....
byTicketMapper.reduceStock(); // 扣减库存
} else {
log.info("=====>票卖完了<====");
}
}finally {
String lockValue = redisTemplate.opsForValue().get("lock");
if(lockValue.equals(uuid)){ //校验标识,通过则释放锁
redisTemplate.delete("lock");
}
}
} else {
log.info("=====>系统繁忙,请稍后!<====");
}
}
复制代码
问题 3:锁续期问题
使用 setnx 命令做分布式锁时,无法避免的一个问题就是:线程尚未执行完成,但是锁已经过期。在解决锁被其他线程误删的代码中,并不是 100%能解决的,问题点在于下面这段代码。
如果线程 A 已经执行到了 if 语句并且通过了判断,当刚要执行释放锁的逻辑时,线程 A 的锁过期了并且线程 B 重新获取到了锁,那么线程 A 在释放锁时,释放的是 B 的锁。
为了完全能够解决这个问题,可以采用锁续期的方式,其实现方式是单独开一个线程用来定时监听线程的锁是否还被持有,如果还持有,那么就给这把锁延长一些过期时间,这样就不会出现上述问题了。目前市面上已经为我们提供了锁自动续期的中间件,比如 redisson
String lockValue = redisTemplate.opsForValue().get("lock");
if(lockValue.equals(uuid)){ // 线程A的锁过期
redisTemplate.delete("lock"); // 线程A删除了线程B的锁
}
复制代码
Redisson
redisson 一般使用最多的场景就是分布式锁了,它不仅保证了并发场景下线程安全的问题,也解决了锁续期的问题。使用方式也比较简单,以 3.5.7 版本为例,首先需要配置 redisson 信息,根据自己的 redis 集群模式自由选择配置。在配置完成后,再来改造上面的购票方法。
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
// 单机配置
config.useSingleServer().setAddress("redis://127.0.0.1:3306").setDatabase(0);
// 主从配置
// config.useMasterSlaveServers().setMasterAddress("").addSlaveAddress("","");
// 哨兵配置
// config.useSentinelServers().addSentinelAddress("").setMasterName("");
// Cluster配置
//config.useClusterServers().addNodeAddress("");
return Redisson.create(config);
}
复制代码
对于 redisson 使用起来也非常简单,通过 getLock 方法获取到 RLock 对象。通过 RLock 的 tryLock 或 lock 方法来进行加锁(底层都是通过 Lua 脚本来实现的)。当获取到锁并且扣减库存后,可以使用 unlock 方法进行锁释放。
void buyTicket() {
RLock lock = redissonClient.getLock("lock");
if (lock.tryLock()) { // 获取锁
try {
int stockNum = byTicketMapper.selectStockNum();
if (stockNum > 0) {
//TODO 买票流程....
byTicketMapper.reduceStock(); // 扣减库存
} else {
log.info("=====>票卖完了<====");
}
} finally {
lock.unlock(); //释放锁
}
} else {
log.info("=====>系统繁忙,请稍后!<====");
}
}
复制代码
Watch Dog 机制
那 redisson 是如何做到锁续期的呢?其实在 redisson 内部有一个看 watch dog 机制(看门狗机制), 但是看门狗机制并不是在加锁时就能启动的。
需要注意的是在加锁时,如果使用tryLock(long t1,long t2, TimeUnit unit)或lock(long t1,long t2, TimeUnit unit)
方法并且将 t2 参数值设为了一个不为-1 的值,那么看门狗将无法生效。
看门狗在启动后会监听主线程还在执行,如果还在执行那么将会通过 Lua 脚本每 10 秒给锁续期 30 秒。watchlog 的延时时间默认为 30 秒,这个值可以在配置 config 时自己定义。
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1L) { // 如果leaseTime不是-1,那么将无法使用看门狗
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
if (future.isSuccess()) {
Boolean ttlRemaining = (Boolean)future.getNow();
if (ttlRemaining) {
// 看门口机制
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}
}
private long lockWatchdogTimeout = 30000L; //默认30秒
private void scheduleExpirationRenewal(final long threadId) {
if (!expirationRenewalMap.containsKey(this.getEntryName())) {
// 每10秒执行续期
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
// 通过LUA脚本为锁续期
RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
if (!future.isSuccess()) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
} else {
if ((Boolean)future.getNow()) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); // 每10秒执行一次
if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {
task.cancel();
}
}
}
复制代码
问题 4:主从切换导致锁丢失问题
虽然 redisson 帮助我们解决了锁续期的问题,但是在 redis 集群架构中,由于主从复制具有一定的延时,那么在极端情况下就会出现这样一个问题:当一个线程获取锁成功,并且成功向主节点保存了锁信息,当主节点还未向从节点同步锁信息时,主节点宕机了,那么当发生故障转移从节点切换为主节点时,线程加的锁就丢失了。
为了解决这个问题,redis 引入了红锁 RedLock,RedLock 与大多数中间件的选举机制类似,采用过半的方式来决定操作成功还是不成功。
RedLock
RedLock 在工作中,并不接受 redis 的集群架构,无论是主从,哨兵还是 Cluster。每台 redis 服务都是独立的,都是一台独立的 Master 节点。
在加锁的过程中,RedLock 会记录开始加锁时的时间以及加锁成功后的时间,这两个时间差就是一台机器加锁成功所需要的时间。比如启动了 5 个 redis 服务,线程 A 设置锁的超时时间为 5 秒,当向第一台 redis 服务加锁成功后花费了 1 秒,向第二台服务加锁成功后也花费了一秒。
这个时候加到第二台机器时,已经花费了两秒的时间,但是加锁数并未过半,还需要加锁一台才能完全算加锁成功,这个时候第三台机器加锁成功又花费了 1 秒。那么总的加锁时间就是 3 秒,锁的实际过期时间就为 2 秒。
特别需要注意的是,在向 redis 服务建立网络连接时,要设置一个超时时间,避免 redis 服务宕机时,客户端还在傻傻的等待回应,这里超时时间官方给到建议是 5-50 毫秒之间,当连接超时时,客户端会继续向下一个节点发起连接。
如果因为某些原因,获取锁失败(加锁没有超半数或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁,即便某些 Redis 实例根本就没有加锁成功。
在并发场景下,RedLock 会出现这样一个问题,比如有三个线程同时去获取了同一张票的锁,此时 A 线程已经成功给 redis-1 和 reids-2 加上了锁,线程 B 已经成功给 redis-3,reids-4 加上了锁,线程 C 成功的给 reids-5 加上了锁,这个时候三个线程再去加锁时,没有机器可加了,发现加锁成功数都未过半,那么就导致客户端始终获取不到锁。
当客户端无法取到锁时,应该在随机延迟一定时间,然后进行重试, 防止多个客户端在同时抢夺同一资源的锁。
释放锁比较简单,向所有的 Redis 实例发送释放锁命令即可,不用关心之前有没有从 Redis 实例成功获取到锁.
在了解了 RedLock 后,最后再来改造购票的代码逻辑。首先需要根据 redis 的实例数来定义对应的 Bean 实例,redis 的实例最少要有三台。
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
// 单机配置
config.useSingleServer().setAddress("redis://192.168.36.128:3306").setDatabase(0);
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2() {
Config config = new Config();
// 单机配置
config.useSingleServer().setAddress("redis://192.168.36.130:3306").setDatabase(0);
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3() {
Config config = new Config();
// 单机配置
config.useSingleServer().setAddress("redis://192.168.36.131:3306").setDatabase(0);
return Redisson.create(config);
}
复制代码
在配置完成后,为每台实例都设置同一把锁,最后在调用 RedissonRedLock 提供的 tryLock 和 unlock 进行加锁和解锁。
void buyTicket(){
RLock lock = redissonClient.getLock("lock");
RLock lock2 = redissonClient2.getLock("lock");
RLock lock3 = redissonClient3.getLock("lock");
RedissonRedLock redLock = new RedissonRedLock(lock,lock2,lock3); // 分别向三台实例加锁
if (redLock.tryLock()) {
try {
int stockNum = byTicketMapper.selectStockNum();
if (stockNum > 0) {
//TODO 买票流程....
byTicketMapper.reduceStock(); // 扣减库存
} else {
log.info("=====>票卖完了<====");
}
} finally {
redLock.unlock(); //释放锁
}
} else {
log.info("=====>系统繁忙,请稍后!<====");
}
}
复制代码
总结
在使用 reids 做分布式锁时,并没有想象中的那么简单,高并发场景下容易出现死锁,锁被其他线程误删,锁续期,锁丢失等问题,在实际开发中应该考虑到这些问题并根据相应的解决办法来解决这些问题,从而保证系统的安全性。本文中可能会存在一些遗漏或错误,后续会继续跟进。
文章来源
评论