写点什么

源码分析:Redisson 分布式锁过程分析

发布于: 2021 年 03 月 31 日
源码分析:Redisson分布式锁过程分析

封面图来自:Redisson性能压测权威发布

一 摘要

Redisson 版本为 3.12.2,maven 引入依赖信息:

<dependency>    <groupId>org.redisson</groupId>    <artifactId>redisson</artifactId>    <version>3.12.2</version></dependency>
复制代码

二 锁过程源码

如下代码所示,是我们适用 Redisson 获取和释放分布式锁的一个 demo:

RedissonClient redisson = Redisson.create(); RLock lock = redisson.getLock("anyLock");lock.lock();// 其他代码....lock.unlock();
复制代码

其中,Redisson.create();是默认的创建方法,内容为:

public static RedissonClient create() {    Config config = new Config();    ((SingleServerConfig)config.useSingleServer().setTimeout(1000000)).setAddress("redis://127.0.0.1:6379");    return create(config);}
复制代码

可见,这里使用了本地的 redis 集群,和默认的 6379 端口。

这里重点分析加锁过程,也就是 lock.lock(); 方法部分,来看 Redisson 是怎样实现加锁,以及可能得锁续期等 watchdog 的动作,下面是 RedissonLock 类中的 lock()方法:

public void lock() {    try {        this.lock(-1L, (TimeUnit)null, false);    } catch (InterruptedException var2) {        throw new IllegalStateException();    }}
复制代码

这里继续向下调用了一个含参数的 lock()方法,设置了释放时间(默认设置了-1),TimeUnit(null),是否可中断(false),我们继续看这个方法:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {    long threadId = Thread.currentThread().getId();    Long ttl = this.tryAcquire(leaseTime, unit, threadId);    if (ttl != null) {        RFuture<RedissonLockEntry> future = this.subscribe(threadId);        if (interruptibly) {            this.commandExecutor.syncSubscriptionInterrupted(future);        } else {            this.commandExecutor.syncSubscription(future);        }
try { while(true) { ttl = this.tryAcquire(leaseTime, unit, threadId); if (ttl == null) { return; }
if (ttl >= 0L) { try { ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException var13) { if (interruptibly) { throw var13; }
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else if (interruptibly) { ((RedissonLockEntry)future.getNow()).getLatch().acquire(); } else { ((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly(); } } } finally { this.unsubscribe(future, threadId); } }}
复制代码

这一部分代码较长,我们按照步骤整理一下:

1、获取当前线程的线程 id;

2、tryAquire 尝试获取锁,并返回 ttl

3、如果 ttl 为空,则结束流程;否则进入后续逻辑;

4、this.subscribe(threadId)订阅当前线程,返回一个 RFuture;

5、下一步涉及是否可中断标记的判断,如果可中断,调用

this.commandExecutor.syncSubscriptionInterrupted(future);
复制代码

否则,调用:

this.commandExecutor.syncSubscription(future);
复制代码

6、通过 while(true)循环,一直尝试获取锁:ttl = this.tryAcquire(leaseTime, unit, threadId);

中止条件: 1)ttl == null;2)如果 ttl>=0,((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

这个过程中,会判断 iterruptibly,为 true 时会处理中断

7、fially 代码块,会解除订阅

this.unsubscribe(future, threadId);
复制代码

三 详细分析


redisson watchdog 使用和原理这篇文章整理了一张加锁流程图,我们引用如下:

下面详细分析 Redisson 获取锁、锁等待、释放锁的详细实现过程。

3.1 获取锁

3.1.1 核心获取锁的方法-tryLockInnerAsync


 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {    this.internalLockLeaseTime = unit.toMillis(leaseTime);    return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});}
复制代码

第二行的代码很长,我们对文本做一些换行处理:

return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then 	redis.call('hset', KEYS[1], ARGV[2], 1); 	redis.call('pexpire', KEYS[1], ARGV[1]); 	return nil; end; 
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
复制代码

这样就比较容易看出,重点是一系列的 redis 命令。分析如下:

RedissonLock 类 tryLockInnerAsync 通过 eval 命令执行 Lua 代码完成加锁操作。KEYS[1]为锁在 redis 中的 key,key 对应 value 为 map 结构,ARGV[1]为锁超时时间,ARGV[2]为锁 value 中的 key。ARGV[2]由 UUID+threadId 组成,用来标记锁被谁持有。

1)第一个 If 判断 key 是否存在,不存在则完成加锁操作

redis.call('hset', KEYS[1], ARGV[2], 1);创建 key[1] map 中添加 key:ARGV[2] ,value:1;

redis.call('pexpire', KEYS[1], ARGV[1]);设置 key[1]过期时间,避免发生死锁。eval 命令执行 Lua 代码的时候,Lua 代码将被当成一个命令去执行,并且直到 eval 命令执行完成,Redis 才会执行其他命令。可避免第一条命令执行成功第二条命令执行失败导致死锁。

2)第二个 if 判断 key 存在且当前线程已经持有锁, 重入:

redis.call('hexists', KEYS[1], ARGV[2]);判断 redis 中锁的标记值是否与当前请求的标记值相同,相同代表该线程已经获取锁;

redis.call('hincrby', KEYS[1], ARGV[2], 1);记录同一线程持有锁之后累计加锁次数,实现锁重入;

redis.call('pexpire', KEYS[1], ARGV[1]); 重置锁超时时间。

(3)key 存在被其他线程获取的锁, 等待:

redis.call('pttl', KEYS[1]); 加锁失败返回锁过期时间。


其中 pexpire 语句的重置锁超时时间,实际上就是 Redisson 的 watch dog 机制

3.1.2 commandExecutor.evalWriteAsync

继续向下,commandExecutor.evalWriteAsync:

public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {    NodeSource source = this.getNodeSource(key);    return this.evalAsync(source, false, codec, evalCommandType, script, keys, params);}
复制代码

3.1.3 syncSubscriptionInterrupted


public void syncSubscriptionInterrupted(RFuture<?> future) throws InterruptedException {    MasterSlaveServersConfig config = this.connectionManager.getConfig();    int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();    if (!future.await((long)timeout)) {        ((RPromise)future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));    }
future.sync();}
复制代码

关于超时时间的计算,使用的是 config 中的 Timeout 时间+重试周期 x 重试次数;当 RFuture 等待超时时,就会使用 tryFailure 抛出 RedisTimeoutException 的异常信息,提示订阅失败。


3.2 锁等待 lockInterruptibly

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {        this.lock(leaseTime, unit, true);    }
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(leaseTime, unit, threadId); if (ttl != null) { RFuture<RedissonLockEntry> future = this.subscribe(threadId); if (interruptibly) { this.commandExecutor.syncSubscriptionInterrupted(future); } else { this.commandExecutor.syncSubscription(future); }
try { while(true) { ttl = this.tryAcquire(leaseTime, unit, threadId); if (ttl == null) { return; }
if (ttl >= 0L) { try { ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException var13) { if (interruptibly) { throw var13; }
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else if (interruptibly) { ((RedissonLockEntry)future.getNow()).getLatch().acquire(); } else { ((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly(); } } } finally { this.unsubscribe(future, threadId); } } }
复制代码

这段代码也很长,简单总结如下:

(1)步骤一:调用加锁操作;

(2)步骤二:步骤一中加锁操作失败,订阅消息,利用 redis 的 pubsub 提供一个通知机制来减少不断的重试,避免发生活锁。

注:

活锁:是指线程 1 可以使用资源,但它很礼貌,让其他线程先使用资源,线程 2 也可以使用资源,但它很绅士,也让其他线程先使用资源。这样你让我,我让你,最后两个线程都无法使用资源。

(3)步骤三:

getLath()获取 RedissionLockEntry 实例 latch 变量,由于 permits 为 0,所以调用 acquire()方法后线程阻塞。


3.3 释放锁 - unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) {    return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});}
复制代码

与获取锁代码类似,还是一个比较长的 redis 命令,我们把 redis 命令格式化整理后如下:

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; 
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
复制代码


(1)第一个 if 判断锁对应 key 存在,value 中是否存在当前要释放锁的标示,不存在返回 nil,确保锁只能被持有的线程释放;

(2)对应 key 存在,value 中存在当前要释放锁的标示,将锁标示对应值-1,第二个 if 判断锁标示对应的值是否大于 0,大于 0,表示有锁重入情况发生,重新设置锁过期时间;

(3)对应 key 存在,value 中存在当前要释放锁的标示,将锁标示对应值-1 后等于 0,调用 del 操作释放锁,并 publish 消息,将获取锁被阻塞的线程恢复重新获取锁;

这里的代码中,涉及了一个重要的类:LockPubSub,下面是释放时执行的 release()方法:

订阅者接收到 publish 消息后,执行 release 操作,调用 acquire 被阻塞的线程将继续执行获取锁操作。

3.4 其他-CommandSyncService

在命令执行时,我们可以看到 Redisson 是通过 this.commandExecutor 执行的,而这个是在 Redisson 的构造方法中做的初始化:

protected Redisson(Config config) {    this.config = config;    Config configCopy = new Config(config);    this.connectionManager = ConfigSupport.createConnectionManager(configCopy);    this.evictionScheduler = new EvictionScheduler(this.connectionManager.getCommandExecutor());    this.writeBehindService = new WriteBehindService(this.connectionManager.getCommandExecutor());}
复制代码


四 Redisson 与 Jedis 分布式锁实现对比

在某业务中,使用的是基于 Jedis 封装得分布式锁操作工具,虽然并非是 Jedis 提供的标准实现,但从中可以了解一下分布式锁的不同实现:

4.1 获取锁

下面是某业务封装得 jedis 获取分布式锁和释放的工具:

public static boolean tryGetDistributedLock(JedisPool jedisPool, String lockKey, String requestId, int expireTime) throws RedisToolException {    Jedis jedis = null;
boolean var6; try { jedis = jedisPool.getResource(); String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime); var6 = "OK".equals(result); } catch (Throwable var10) { throw new RedisToolException(var10.getMessage()); } finally { if (jedis != null) { jedis.close(); }
}
return var6;}
复制代码

继续向下,查看 jedis.set 方法:

public String set(String key, String value, String nxxx, String expx, int time) {    this.checkIsInMultiOrPipeline();    this.client.set(key, value, nxxx, expx, time);    return this.client.getStatusCodeReply();}
复制代码

可见,同样是为了保证设置锁 key 和 设置超时时间两个动作的原子性,Redisson 是使用 lua 脚本,而 Jedis 是通过 Redis 提供的 set 命令。早期必须 lua 脚本来实现,是因为 redis 旧版本没有提供这个新的 set 命令,不支持一个命令中同时设置 key 和超时时间。

4.2 释放锁

public static boolean releaseDistributedLock(JedisPool jedisPool, String lockKey, String requestId) {    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";    Jedis jedis = null;
boolean var6; try { jedis = jedisPool.getResource(); Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); var6 = RELEASE_SUCCESS.equals(result); } finally { if (jedis != null) { jedis.close(); }
}
return var6;}
复制代码

Jedis 在释放锁的时候,还是通过脚本来实现的判断和删除 key,保证操作的原子性。

4.3 锁续期支持

4.3.1 Redisson 的 watch dog

Redisson 提供了订阅和 watch dog 机制,当业务线程还在执行但锁超时时,如果开启了 watch dog,那么可以实现自动续期。但事实上,这个机制比较耗费资源,所以一般不建议开启,除非业务确实有较强的这方面需求。

官方文档对 watch dog 的描述:

lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)默认值:30000监控锁的看门狗超时时间单位为毫秒。该参数只适用于分布式锁的加锁请求中未明确使用leaseTimeout参数的情况。如果该看门狗未使用lockWatchdogTimeout去重新调整一个分布式锁的lockWatchdogTimeout超时,那么这个锁将变为失效状态。这个参数可以用来避免由Redisson客户端节点宕机或其他原因造成死锁的情况。
复制代码

4.3.2 基于 Jedis 实现

除了 tryGetDistributedLock 之外,还提供了一种过期时间怕短的锁方法,当锁即将超时时,会抛出超时异常,这样业务在捕获异常后,可以选择继续获取锁、或回滚事务并释放锁等动作,把主动权交给业务方。示例代码如下:

public static void lock(JedisPool jedisPool, String lockKey, String requestId, int expireTime, int timeout) throws Exception {    if (timeout > expireTime) {        throw new Exception("timeout 必须大于 expireTime");    } else {        Random random = new Random();
while(timeout > 0) { boolean lock = tryGetDistributedLock(jedisPool, lockKey, requestId, expireTime); if (lock) { return; }
int applyTime = random.nextInt(100); timeout -= applyTime; Thread.sleep((long)applyTime); }
throw new LockTimeOutException("distributedLock timeout"); }}
复制代码

五 总结

本文基于 Redisson3.12.2 版本源码,对 Redisson 分布式锁过程进行了分析。从获取锁、释放锁的过程,可以大概了解 Redisson 的主要设计思想。此外,还对基于 Jedis 实现的一个分布式锁示例与 Redisson 进行对比,来看基于 Redis 的分布式锁的两种不同实现方式。


发布于: 2021 年 03 月 31 日阅读数: 16
用户头像

磨炼中成长,痛苦中前行 2017.10.22 加入

微信公众号【程序员架构进阶】。多年项目实践,架构设计经验。曲折中向前,分享经验和教训

评论

发布
暂无评论
源码分析:Redisson分布式锁过程分析