写点什么

老夫带你深度剖析 Redisson 实现分布式锁的原理

  • 2021 年 11 月 12 日
  • 本文字数:6738 字

    阅读完需:约 22 分钟

});


return ttlRemainingFuture;


}

tryLockInnerAsync

通过 lua 脚本来实现加锁的操作


  1. 判断 lock 键是否存在,不存在直接调用 hset 存储当前线程信息并且设置过期时间,返回 nil,告诉客户端直接获取到锁。

  2. 判断 lock 键是否存在,存在则将重入次数加 1,并重新设置过期时间,返回 nil,告诉客户端直接获取到锁。

  3. 被其它线程已经锁定,返回锁有效期的剩余时间,告诉客户端需要等待。


<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {


return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,


"if (redis.call('exists', KEYS[1]) == 0) then " +


"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +


"redis.call('pexpire', KEYS[1], ARGV[1]); " +


"return nil; " +


"end; " +


"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +


"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +


"redis.call('pexpire', KEYS[1], ARGV[1]); " +


"return nil; " +


"end; " +


"return redis.call('pttl', KEYS[1]);",


Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));


}


关于 Lua 脚本,我们稍后再解释。

unlock 释放锁流程

释放锁的流程,脚本看起来会稍微复杂一点


  1. 如果 lock 键不存在,通过publish指令发送一个消息表示锁已经可用。

  2. 如果锁不是被当前线程锁定,则返回 nil

  3. 由于支持可重入,在解锁时将重入次数需要减 1

  4. 如果计算后的重入次数>0,则重新设置过期时间

  5. 如果计算后的重入次数<=0,则发消息说锁已经可用


protected RFuture<Boolean> unlockInnerAsync(long threadId) {


return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,


"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +


"return nil;" +


"end; " +


"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +


"if (counter > 0) then " +


"redis.call('pexpire', KEYS[1], ARGV[2]); " +


"return 0; " +


"else " +


"redis.call('del', KEYS[1]); " +


"redis.call('publish', KEYS[2], ARGV[1]); " +


"return 1; " +


"end; " +


"return nil;",


Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));


}


RedissonLock 有竞争的情况




有竞争的情况在 redis 端的 lua 脚本是相同的,只是不同的条件执行不同的 redis 命令。当通过 tryAcquire 发现锁被其它线程申请时,需要进入等待竞争逻辑中


  1. this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败

  2. this.await 返回 true,进入循环尝试获取锁。


继续看 RedissonLock.tryLock 后半部分代码如下:


public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {


//省略部分代码


time -= System.currentTimeMillis() - current;


if (time <= 0) {


acquireFailed(waitTime, unit, threadId);


return false;


}


current = System.currentTimeMillis();


// 订阅监听 redis 消息,并且创建 RedissonLockEntry


RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);


// 阻塞等待 subscribe 的 future 的结果对象,如果 subscribe 方法调用超过了 time,说明已经超过了客户端设置的最大 wait time,则直接返回 false,取消订阅,不再继续申请锁了。


if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {


if (!subscribeFuture.cancel(false)) { //取消订阅


subscribeFuture.onComplete((res, e) -> {


if (e == null) {


unsubscribe(subscribeFuture, threadId);


}


});


}


acquireFailed(waitTime, unit, threadId); //表示抢占锁失败


return false; //返回 false


}


try {


//判断是否超时,如果等待超时,返回获的锁失败


time -= System.currentTimeMillis() - current;


if (time <= 0) {


acquireFailed(waitTime, unit, threadId);


return false;


}


//通过 while 循环再次尝试竞争锁


while (true) {


long currentTime = System.currentTimeMillis();


ttl = tryAcquire(waitTime, leaseTime, unit, threadId); //竞争锁,返回锁超时时间


// lock acquired


if (ttl == null) { //如果超时时间为 null,说明获得锁成功


return true;


}


//判断是否超时,如果超时,表示获取锁失败


time -= System.currentTimeMillis() - currentTime;


if (time <= 0) {


acquireFailed(waitTime, unit, threadId);


return false;


}


// 通过信号量(共享锁)阻塞,等待解锁消息. (减少申请锁调用的频率)


// 如果剩余时间(ttl)小于 wait time ,就在 ttl 时间内,从 Entry 的信号量获取一个许可(除非被中断或者一直没有可用的许可)。


// 否则就在 wait time 时间范围内等待可以通过信号量


currentTime = System.currentTimeMillis();


if (ttl >= 0 && ttl < time) {


subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);


} else {


subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);


}


// 更新等待时间(最大等待时间-已经消耗的阻塞时间)


time -= System.currentTimeMillis() - currentTime;


if (time <= 0) { //获取锁失败


acquireFailed(waitTime, unit, threadId);


return false;


}


}


} finally {


unsubscribe(subscribeFuture, threadId); //取消订阅


}


// return get(tryLockAsync(waitTime, leaseTime, unit));


}


锁过期了怎么办?




一般来说,我们去获得分布式锁时,为了避免死锁的情况,我们会对锁设置一个超时时间,但是有一种情况是,如果在指定时间内当前线程没有执行完,由于锁超时导致锁被释放,那么其他线程就会拿到这把锁,从而导致一些故障。


为了避免这种情况,Redisson 引入了一个 Watch Dog 机制,这个机制是针对分布式锁来实现锁的自动续约,简单来说,如果当前获得锁的线程没有执行完,那么 Redisson 会自动给 Redis 中目标 key 延长超时时间。


默认情况下,看门狗的续期时间是 30s,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。


@Override


public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {


return tryLock(waitTime, -1, unit); //leaseTime=-1


}


实际上,当我们通过 tryLock 方法没有传递超时时间时,默认会设置一个 30s 的超时时间,避免出现死锁的问题。


private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {


RFuture<Long> ttlRemainingFuture;


if (leaseTime != -1) {


ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);


} else { //当 leaseTime 为-1 时,leaseTime=internalLockLeaseTime,默认是 30s,表示当前锁的过期时间。


//this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();


ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,


TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);


}


ttlRemainingFuture.onComplete((ttlRemaining, e) -> {


if (e != null) { //说明出现异常,直接返回


return;


}


// lock acquired


if (ttlRemaining == null) { //表示第一次设置锁键


if (leaseTime != -1) { //表示设置过超时时间,更新 internalLockLeaseTime,并返回


internalLockLeaseTime = unit.toMillis(leaseTime);


} else { //leaseTime=-1,启动 Watch Dog


scheduleExpirationRenewal(threadId);


}


}


});


return ttlRemainingFuture;


}


由于默认设置了一个 30s 的过期时间,为了防止过期之后当前线程还未执行完,所以通过定时任务对过期时间进行续约。


  • 首先,会先判断在 expirationRenewalMap 中是否存在了 entryName,这是个 map 结构,主要还是判断在这个服务实例中的加锁客户端的锁 key 是否存在,

  • 如果已经存在了,就直接返回;主要是考虑到 RedissonLock 是可重入锁。


protected void scheduleExpirationRenewal(long threadId) {


ExpirationEntry entry = new ExpirationEntry();


ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);


if (oldEntry != null) {


oldEntry.addThreadId(threadId);


} else {// 第一次加锁的时候会调用,内部会启动 WatchDog


entry.addThreadId(threadId);


renewExpiration();


}


}


定义一个定时任务,该任务中调用renewExpirationAsync方法进行续约。


private void renewExpiration() {


ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());


if (ee == null) {


return;


}


//用到了时间轮机制


Timeout task = commandExecutor.getConnectionM


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


anager().newTimeout(new TimerTask() {


@Override


public void run(Timeout timeout) throws Exception {


ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());


if (ent == null) {


return;


}


Long threadId = ent.getFirstThreadId();


if (threadId == null) {


return;


}


// renewExpirationAsync 续约租期


RFuture<Boolean> future = renewExpirationAsync(threadId);


future.onComplete((res, e) -> {


if (e != null) {


log.error("Can't update lock " + getRawName() + " expiration", e);


EXPIRATION_RENEWAL_MAP.remove(getEntryName());


return;


}


if (res) {


// reschedule itself


renewExpiration();


}


});


}


}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//每次间隔租期的 1/3 时间执行


ee.setTimeout(task);


}


执行 Lua 脚本,对指定的 key 进行续约。


protected RFuture<Boolean> renewExpirationAsync(long threadId) {


return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,


"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +


"redis.call('pexpire', KEYS[1], ARGV[1]); " +


"return 1; " +


"end; " +


"return 0;",


Collections.singletonList(getRawName()),


internalLockLeaseTime, getLockName(threadId));


}


Lua 脚本


=====


Lua 是一个高效的轻量级脚本语言(和 JavaScript 类似),用标准 C 语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。Lua 在葡萄牙语中是“月亮”的意思,它的 logo 形式卫星,寓意是 Lua 是一个“卫星语言”,能够方便地嵌入到其他语言中使用;其实在很多常见的框架中,都有嵌入 Lua 脚本的功能,比如 OpenResty、Redis 等。


使用 Lua 脚本的好处:


  1. 减少网络开销,在 Lua 脚本中可以把多个命令放在同一个脚本中运行

  2. 原子操作,redis 会将整个脚本作为一个整体执行,中间不会被其他命令插入。换句话说,编写脚本的过程中无需担心会出现竞态条件

  3. 复用性,客户端发送的脚本会永远存储在 redis 中,这意味着其他客户端可以复用这一脚本来完成同样的逻辑


Lua 的下载和安装




Lua 是一个独立的脚本语言,所以它有专门的编译执行工具,下面简单带大家安装一下。



https://www.lua.org/ftp/lua-5.4.3.tar.gz


  • 安装步骤如下


tar -zxvf lua-5.4.3.tar.gz


cd lua-5.4.3


make linux


make install


如果报错,说找不到 readline/readline.h, 可以通过 yum 命令安装


yum -y install readline-devel ncurses-devel


最后,直接输入lua命令即可进入 lua 的控制台。Lua 脚本有自己的语法、变量、逻辑运算符、函数等,这块我就不在这里做过多的说明,用过 JavaScript 的同学,应该只需要花几个小时就可以全部学完,简单演示两个案例如下。


array = {"Lua", "mic"}


for i= 0, 2 do


print(array[i])


end


array = {"mic", "redis"}


for key,value in ipairs(array)


do


print(key, value)


end


Redis 与 Lua




Redis 中集成了 Lua 的编译和执行器,所以我们可以在 Redis 中定义 Lua 脚本去执行。同时,在 Lua 脚本中,可以直接调用 Redis 的命令,来操作 Redis 中的数据。


redis.call(‘set’,'hello','world')


local value=redis.call(‘get’,’hello’)


redis.call 函数的返回值就是 redis 命令的执行结果,前面我们介绍过 redis 的 5 中类型的数据返回的值的类型也都不一样,redis.call 函数会将这 5 种类型的返回值转化对应的 Lua 的数据类型


在很多情况下我们都需要脚本可以有返回值,毕竟这个脚本也是一个我们所编写的命令集,我们可以像调用其他 redis 内置命令一样调用我们自己写的脚本,所以同样 redis 会自动将脚本返回值的 Lua 数据类型转化为 Redis 的返回值类型。 在脚本中可以使用 return 语句将值返回给 redis 客户端,通过 return 语句来执行,如果没有执行 return,默认返回为 nil。


Redis 中执行 Lua 脚本相关的命令




编写完脚本后最重要的就是在程序中执行脚本。Redis 提供了 EVAL 命令可以使开发者像调用其他 Redis 内置命令一样调用脚本。

EVAL 命令-执行脚本

[EVAL] [脚本内容] [key 参数的数量] [key …] [arg …]


可以通过 key 和 arg 这两个参数向脚本中传递数据,他们的值可以在脚本中分别使用 KEYS ARGV?这两个类型的全局变量访问。


比如我们通过脚本实现一个 set 命令,通过在 redis 客户端中调用,那么执行的语句是:


eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello


上述脚本相当于使用 Lua 脚本调用了 Redis 的set命令,存储了一个 key=lua,value=hello 到 Redis 中。

EVALSHA 命令

考虑到我们通过 eval 执行 lua 脚本,脚本比较长的情况下,每次调用脚本都需要把整个脚本传给 redis,比较占用带宽。为了解决这个问题,redis 提供了 EVALSHA 命令允许开发者通过脚本内容的 SHA1 摘要来执行脚本。该命令的用法和 EVAL 一样,只不过是将脚本内容替换成脚本内容的 SHA1 摘要


  1. Redis 在执行 EVAL 命令时会计算脚本的 SHA1 摘要并记录在脚本缓存中

  2. 执行 EVALSHA 命令时 Redis 会根据提供的摘要从脚本缓存中查找对应的脚本内容,如果找到了就执行脚本,否则返回“NOSCRIPT No matching script,Please use EVAL”

将脚本加入缓存并生成 sha1 命令

script load "return redis.call('get','lua')"

["13bd040587b891aedc00a72458cbf8588a27df90"]

传递 sha1 的值来执行该命令

evalsha "13bd040587b891aedc00a72458cbf8588a27df90" 0

Redisson 执行 Lua 脚本

通过 lua 脚本来实现一个访问频率限制功能。


思路,定义一个 key,key 中包含 ip 地址。 value 为指定时间内的访问次数,比如说是 10 秒内只能访问 3 次。


  • 定义 Lua 脚本。


local times=redis.call('incr',KEYS[1])


-- 如果是第一次进来,设置一个过期时间


if times == 1 then


redis.call('expire',KEYS[1],ARGV[1])


end


-- 如果在指定时间内访问次数大于指定次数,则返回 0,表示访问被限制


if times > tonumber(ARGV[2]) then


return 0


end


-- 返回 1,允许被访问


return 1


  • 定义 controller,提供访问测试方法


@RestController


public class RedissonController {


@Autowired


RedissonClient redissonClient;


private final String LIMIT_LUA=


"local times=redis.call('incr',KEYS[1])\n" +


"if times == 1 then\n" +


" redis.call('expire',KEYS[1],ARGV[1])\n" +


"end\n" +


"if times > tonumber(ARGV[2]) then\n" +


" return 0\n" +


"end\n" +


"return 1";


@GetMapping("/lua/{id}")


public String lua(@PathVariable("id") Integer id) throws ExecutionException, InterruptedException {


List<Object> keys= Arrays.asList("LIMIT:"+id);


RFuture<Object> future=redissonClient.getScript().


evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA, RScript.ReturnType.INTEGER,keys,10,3);


return future.get().toString();


}


}


需要注意,上述脚本执行的时候会有问题,因为 redis 默认的序列化方式导致 value 的值在传递到脚本中时,转成了对象类型,需要修改redisson.yml文件,增加 codec 的序列化方式。


  • application.yml


spring:


redis:


redisson:


file: classpath:redisson.yml


  • redisson.yml


singleServerConfig:


address: redis://192.168.221.128:6379


codec: !<org.redisson.codec.JsonJacksonCodec> {}


Lua 脚本的原子性




redis 的脚本执行是原子的,即脚本执行期间 Redis 不会执行其他命令。所有的命令必须等待脚本执行完以后才能执行。为了防止某个脚本执行时间过程导致 Redis 无法提供服务。Redis 提供了 lua-time-limit 参数限制脚本的最长运行时间。默认是 5 秒钟。

非事务性操作

当脚本运行时间超过这个限制后,Redis 将开始接受其他命令但不会执行(以确保脚本的原子性),而是返回 BUSY 的错误,下面演示一下这种情况。


打开两个客户端窗口,在第一个窗口中执行 lua 脚本的死循环


eval "while true do end" 0


在第二个窗口中运行get lua,会得到如下的异常。


(error) BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.

评论

发布
暂无评论
老夫带你深度剖析Redisson实现分布式锁的原理