写点什么

阿里 P6 面试官:Redis 如何实现分布式锁?锁过期了怎么办?

发布于: 7 小时前

Redis 实现分布式锁的原理

前面讲了 Redis 在实际业务场景中的应用,那么下面再来了解一下 Redisson 功能性场景的应用,也就是大家经常使用的分布式锁的实现场景。


关于分布式锁的概念,本文就不做描述。


•引入 redisson 依赖


<dependency>      <groupId>org.redisson</groupId>      <artifactId>redisson</artifactId>      <version>3.16.0</version>  </dependency>
复制代码


•编写简单的测试代码


public class RedissonTest {      private static RedissonClient redissonClient;      static {          Config config=new Config();          config.useSingleServer().setAddress("redis://192.168.221.128:6379");          redissonClient=Redisson.create(config);      }      public static void main(String[] args) throws InterruptedException {          RLock rLock=redissonClient.getLock("updateOrder");          //最多等待100秒、上锁10s以后自动解锁          if(rLock.tryLock(100,10,TimeUnit.SECONDS)){              System.out.println("获取锁成功");          }          Thread.sleep(2000);          rLock.unlock();          redissonClient.shutdown();      }  }
复制代码


Redisson 分布式锁的实现原理

你们会发现,通过 redisson,非常简单就可以实现我们所需要的功能,当然这只是 redisson 的冰山一角,redisson 最强大的地方就是提供了分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的并发程序的工具包获得了协调分布式多级多线程并发系统的能力,降低了程序员在分布式环境下解决分布式问题的难度,下面分析一下 RedissonLock 的实现原理


RedissonLock.tryLock

@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {    long time = unit.toMillis(waitTime);    long current = System.currentTimeMillis();    long threadId = Thread.currentThread().getId();    //通过tryAcquire方法尝试获取锁    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);    // lock acquired    if (ttl == null) { //表示成功获取到锁,直接返回        return true;    }    //省略部分代码....}
复制代码


tryAcquire

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {    RFuture<Long> ttlRemainingFuture;    //leaseTime就是租约时间,就是redis key的过期时间。    if (leaseTime != -1) { //如果设置过期时间        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);    } else {//如果没设置了过期时间,则从配置中获取key超时时间,默认是30s过期        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);    }    //当tryLockInnerAsync执行结束后,触发下面回调    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {        if (e != null) { //说明出现异常,直接返回            return;        }        // lock acquired        if (ttlRemaining == null) { //表示第一次设置锁键            if (leaseTime != -1) { //表示设置过超时时间,更新internalLockLeaseTime,并返回                internalLockLeaseTime = unit.toMillis(leaseTime);            } else { //leaseTime=-1,启动Watch Dog                scheduleExpirationRenewal(threadId);            }        }    });    return ttlRemainingFuture;}
复制代码


tryLockInnerAsync

通过 lua 脚本来实现加锁的操作


1.判断 lock 键是否存在,不存在直接调用 hset 存储当前线程信息并且设置过期时间,返回 nil,告诉客户端直接获取到锁。2.判断 lock 键是否存在,存在则将重入次数加 1,并重新设置过期时间,返回 nil,告诉客户端直接获取到锁。3.被其它线程已经锁定,返回锁有效期的剩余时间,告诉客户端需要等待。


<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,                          "if (redis.call('exists', KEYS[1]) == 0) then " +                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +                          "return nil; " +                          "end; " +                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +                          "return nil; " +                          "end; " +                          "return redis.call('pttl', KEYS[1]);",                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}
复制代码


关于 Lua 脚本,我们稍后再解释。


unlock 释放锁流程

释放锁的流程,脚本看起来会稍微复杂一点


1.如果 lock 键不存在,通过publish指令发送一个消息表示锁已经可用。2.如果锁不是被当前线程锁定,则返回 nil3.由于支持可重入,在解锁时将重入次数需要减 14.如果计算后的重入次数>0,则重新设置过期时间 5.如果计算后的重入次数<=0,则发消息说锁已经可用


protected RFuture<Boolean> unlockInnerAsync(long threadId) {    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +                          "return nil;" +                          "end; " +                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +                          "if (counter > 0) then " +                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +                          "return 0; " +                          "else " +                          "redis.call('del', KEYS[1]); " +                          "redis.call('publish', KEYS[2], ARGV[1]); " +                          "return 1; " +                          "end; " +                          "return nil;",                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}
复制代码


RedissonLock 有竞争的情况

有竞争的情况在 redis 端的 lua 脚本是相同的,只是不同的条件执行不同的 redis 命令。当通过 tryAcquire 发现锁被其它线程申请时,需要进入等待竞争逻辑中


1.this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败 2.this.await 返回 true,进入循环尝试获取锁。


继续看 RedissonLock.tryLock 后半部分代码如下:


public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {//省略部分代码        time -= System.currentTimeMillis() - current;        if (time <= 0) {            acquireFailed(waitTime, unit, threadId);            return false;        }        current = System.currentTimeMillis();       // 订阅监听redis消息,并且创建RedissonLockEntry        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);      // 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {            if (!subscribeFuture.cancel(false)) { //取消订阅                subscribeFuture.onComplete((res, e) -> {                    if (e == null) {                        unsubscribe(subscribeFuture, threadId);                    }                });            }            acquireFailed(waitTime, unit, threadId); //表示抢占锁失败            return false; //返回false        }        try {            //判断是否超时,如果等待超时,返回获的锁失败            time -= System.currentTimeMillis() - current;            if (time <= 0) {                acquireFailed(waitTime, unit, threadId);                return false;            }            //通过while循环再次尝试竞争锁            while (true) {                 long currentTime = System.currentTimeMillis();                ttl = tryAcquire(waitTime, leaseTime, unit, threadId); //竞争锁,返回锁超时时间                // lock acquired                if (ttl == null) { //如果超时时间为null,说明获得锁成功                    return true;                }                //判断是否超时,如果超时,表示获取锁失败                time -= System.currentTimeMillis() - currentTime;                if (time <= 0) {                    acquireFailed(waitTime, unit, threadId);                    return false;                }                // 通过信号量(共享锁)阻塞,等待解锁消息.  (减少申请锁调用的频率)                // 如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。                // 否则就在wait time 时间范围内等待可以通过信号量                currentTime = System.currentTimeMillis();                if (ttl >= 0 && ttl < time) {                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                } else {                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);                }                // 更新等待时间(最大等待时间-已经消耗的阻塞时间)                time -= System.currentTimeMillis() - currentTime;                if (time <= 0) { //获取锁失败                    acquireFailed(waitTime, unit, threadId);                    return false;                }            }        } finally {            unsubscribe(subscribeFuture, threadId); //取消订阅        }//        return get(tryLockAsync(waitTime, leaseTime, unit));    }
复制代码


锁过期了怎么办?

一般来说,我们去获得分布式锁时,为了避免死锁的情况,我们会对锁设置一个超时时间,但是有一种情况是,如果在指定时间内当前线程没有执行完,由于锁超时导致锁被释放,那么其他线程就会拿到这把锁,从而导致一些故障。


为了避免这种情况,Redisson 引入了一个 Watch Dog 机制,这个机制是针对分布式锁来实现锁的自动续约,简单来说,如果当前获得锁的线程没有执行完,那么 Redisson 会自动给 Redis 中目标 key 延长超时时间。


默认情况下,看门狗的续期时间是 30s,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。


@Overridepublic boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {    return tryLock(waitTime, -1, unit);  //leaseTime=-1}
复制代码


实际上,当我们通过 tryLock 方法没有传递超时时间时,默认会设置一个 30s 的超时时间,避免出现死锁的问题。


private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {    RFuture<Long> ttlRemainingFuture;    if (leaseTime != -1) {         ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);    } else { //当leaseTime为-1时,leaseTime=internalLockLeaseTime,默认是30s,表示当前锁的过期时间。        //this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);    }    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {        if (e != null) { //说明出现异常,直接返回            return;        }        // lock acquired        if (ttlRemaining == null) { //表示第一次设置锁键            if (leaseTime != -1) { //表示设置过超时时间,更新internalLockLeaseTime,并返回                internalLockLeaseTime = unit.toMillis(leaseTime);            } else { //leaseTime=-1,启动Watch Dog                scheduleExpirationRenewal(threadId);            }        }    });    return ttlRemainingFuture;}
复制代码


由于默认设置了一个 30s 的过期时间,为了防止过期之后当前线程还未执行完,所以通过定时任务对过期时间进行续约。


•首先,会先判断在 expirationRenewalMap 中是否存在了 entryName,这是个 map 结构,主要还是判断在这个服务实例中的加锁客户端的锁 key 是否存在,•如果已经存在了,就直接返回;主要是考虑到 RedissonLock 是可重入锁。


protected void scheduleExpirationRenewal(long threadId) {    ExpirationEntry entry = new ExpirationEntry();    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);    if (oldEntry != null) {        oldEntry.addThreadId(threadId);    } else {// 第一次加锁的时候会调用,内部会启动WatchDog        entry.addThreadId(threadId);        renewExpiration();    }}
复制代码


定义一个定时任务,该任务中调用renewExpirationAsync方法进行续约。


private void renewExpiration() {    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());    if (ee == null) {        return;    }    //用到了时间轮机制    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {        @Override        public void run(Timeout timeout) throws Exception {            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());            if (ent == null) {                return;            }            Long threadId = ent.getFirstThreadId();            if (threadId == null) {                return;            }            // renewExpirationAsync续约租期            RFuture<Boolean> future = renewExpirationAsync(threadId);            future.onComplete((res, e) -> {                if (e != null) {                    log.error("Can't update lock " + getRawName() + " expiration", e);                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());                    return;                }                if (res) {                    // reschedule itself                    renewExpiration();                }            });        }    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//每次间隔租期的1/3时间执行    ee.setTimeout(task);}
复制代码


执行 Lua 脚本,对指定的 key 进行续约。


protected RFuture<Boolean> renewExpirationAsync(long threadId) {    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +                          "return 1; " +                          "end; " +                          "return 0;",                          Collections.singletonList(getRawName()),                          internalLockLeaseTime, getLockName(threadId));}
复制代码


Lua 脚本

Lua 是一个高效的轻量级脚本语言(和 JavaScript 类似),用标准 C 语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。Lua 在葡萄牙语中是“月亮”的意思,它的 logo 形式卫星,寓意是 Lua 是一个“卫星语言”,能够方便地嵌入到其他语言中使用;其实在很多常见的框架中,都有嵌入 Lua 脚本的功能,比如 OpenResty、Redis 等。


使用 Lua 脚本的好处:


1.减少网络开销,在 Lua 脚本中可以把多个命令放在同一个脚本中运行 2.原子操作,redis 会将整个脚本作为一个整体执行,中间不会被其他命令插入。换句话说,编写脚本的过程中无需担心会出现竞态条件 3.复用性,客户端发送的脚本会永远存储在 redis 中,这意味着其他客户端可以复用这一脚本来完成同样的逻辑


Lua 的下载和安装

Lua 是一个独立的脚本语言,所以它有专门的编译执行工具,下面简单带大家安装一下。


•下载 Lua 源码包: https://www.lua.org/download.htmlhttps://www.lua.org/ftp/lua-5.4.3.tar.gz•安装步骤如下


tar -zxvf lua-5.4.3.tar.gz  cd lua-5.4.3  make linux  make install
复制代码


如果报错,说找不到 readline/readline.h, 可以通过 yum 命令安装


yum -y install readline-devel ncurses-devel
复制代码


最后,直接输入lua命令即可进入 lua 的控制台。Lua 脚本有自己的语法、变量、逻辑运算符、函数等,这块我就不在这里做过多的说明,用过 JavaScript 的同学,应该只需要花几个小时就可以全部学完,简单演示两个案例如下。


array = {"Lua", "mic"}for i= 0, 2 do   print(array[i])endarray = {"mic", "redis"}for key,value in ipairs(array)do   print(key, value)end
复制代码


Redis 与 Lua

Redis 中集成了 Lua 的编译和执行器,所以我们可以在 Redis 中定义 Lua 脚本去执行。同时,在 Lua 脚本中,可以直接调用 Redis 的命令,来操作 Redis 中的数据。


redis.call(‘set’,'hello','world')local value=redis.call(‘get’,’hello’)
复制代码


redis.call 函数的返回值就是 redis 命令的执行结果,前面我们介绍过 redis 的 5 中类型的数据返回的值的类型也都不一样,redis.call 函数会将这 5 种类型的返回值转化对应的 Lua 的数据类型


在很多情况下我们都需要脚本可以有返回值,毕竟这个脚本也是一个我们所编写的命令集,我们可以像调用其他 redis 内置命令一样调用我们自己写的脚本,所以同样 redis 会自动将脚本返回值的 Lua 数据类型转化为 Redis 的返回值类型。 在脚本中可以使用 return 语句将值返回给 redis 客户端,通过 return 语句来执行,如果没有执行 return,默认返回为 nil。


Redis 中执行 Lua 脚本相关的命令

编写完脚本后最重要的就是在程序中执行脚本。Redis 提供了 EVAL 命令可以使开发者像调用其他 Redis 内置命令一样调用脚本。


EVAL 命令-执行脚本

[EVAL] [脚本内容] [key 参数的数量] [key …] [arg …]


可以通过 key 和 arg 这两个参数向脚本中传递数据,他们的值可以在脚本中分别使用 KEYS ARGV 这两个类型的全局变量访问。


比如我们通过脚本实现一个 set 命令,通过在 redis 客户端中调用,那么执行的语句是:


eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello
复制代码


上述脚本相当于使用 Lua 脚本调用了 Redis 的set命令,存储了一个 key=lua,value=hello 到 Redis 中。


EVALSHA 命令

考虑到我们通过 eval 执行 lua 脚本,脚本比较长的情况下,每次调用脚本都需要把整个脚本传给 redis,比较占用带宽。为了解决这个问题,redis 提供了 EVALSHA 命令允许开发者通过脚本内容的 SHA1 摘要来执行脚本。该命令的用法和 EVAL 一样,只不过是将脚本内容替换成脚本内容的 SHA1 摘要


1.Redis 在执行 EVAL 命令时会计算脚本的 SHA1 摘要并记录在脚本缓存中 2.执行 EVALSHA 命令时 Redis 会根据提供的摘要从脚本缓存中查找对应的脚本内容,如果找到了就执行脚本,否则返回“NOSCRIPT No matching script,Please use EVAL”


# 将脚本加入缓存并生成sha1命令script load "return redis.call('get','lua')"# ["13bd040587b891aedc00a72458cbf8588a27df90"]# 传递sha1的值来执行该命令evalsha "13bd040587b891aedc00a72458cbf8588a27df90" 0
复制代码


Redisson 执行 Lua 脚本

通过 lua 脚本来实现一个访问频率限制功能。


思路,定义一个 key,key 中包含 ip 地址。 value 为指定时间内的访问次数,比如说是 10 秒内只能访问 3 次。


•定义 Lua 脚本。


local times=redis.call('incr',KEYS[1])  -- 如果是第一次进来,设置一个过期时间  if times == 1 then     redis.call('expire',KEYS[1],ARGV[1])  end  -- 如果在指定时间内访问次数大于指定次数,则返回0,表示访问被限制  if times > tonumber(ARGV[2]) then     return 0  end  -- 返回1,允许被访问  return 1
复制代码


•定义 controller,提供访问测试方法


@RestController  public class RedissonController {      @Autowired      RedissonClient redissonClient;      private final String LIMIT_LUA=          "local times=redis.call('incr',KEYS[1])\n" +          "if times == 1 then\n" +          "   redis.call('expire',KEYS[1],ARGV[1])\n" +          "end\n" +          "if times > tonumber(ARGV[2]) then\n" +          "   return 0\n" +          "end\n" +          "return 1";      @GetMapping("/lua/{id}")      public String lua(@PathVariable("id") Integer id) throws ExecutionException, InterruptedException {          List<Object> keys= Arrays.asList("LIMIT:"+id);          RFuture<Object> future=redissonClient.getScript().              evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA, RScript.ReturnType.INTEGER,keys,10,3);          return future.get().toString();      }  }
复制代码


需要注意,上述脚本执行的时候会有问题,因为 redis 默认的序列化方式导致 value 的值在传递到脚本中时,转成了对象类型,需要修改redisson.yml文件,增加 codec 的序列化方式。


application.yml


spring:    redis:      redisson:        file: classpath:redisson.yml
复制代码


redisson.yml


singleServerConfig:    address: redis://192.168.221.128:6379  codec: !<org.redisson.codec.JsonJacksonCodec> {}
复制代码


Lua 脚本的原子性

redis 的脚本执行是原子的,即脚本执行期间 Redis 不会执行其他命令。所有的命令必须等待脚本执行完以后才能执行。为了防止某个脚本执行时间过程导致 Redis 无法提供服务。Redis 提供了 lua-time-limit 参数限制脚本的最长运行时间。默认是 5 秒钟。

非事务性操作

当脚本运行时间超过这个限制后,Redis 将开始接受其他命令但不会执行(以确保脚本的原子性),而是返回 BUSY 的错误,下面演示一下这种情况。


打开两个客户端窗口,在第一个窗口中执行 lua 脚本的死循环


eval "while true do end" 0
复制代码


在第二个窗口中运行get lua,会得到如下的异常。


(error) BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.
复制代码


我们会发现执行结果是 Busy, 接着我们通过 script kill 的命令终止当前执行的脚本,第二个窗口的显示又恢复正常了。

存在事务性操作

如果当前执行的 Lua 脚本对 Redis 的数据进行了修改(SET、DEL 等),那么通过 SCRIPT KILL 命令是不能终止脚本运行的,因为要保证脚本运行的原子性,如果脚本执行了一部分终止,那就违背了脚本原子性的要求。最终要保证脚本要么都执行,要么都不执行


同样打开两个窗口,第一个窗口运行如下命令


eval "redis.call('set','name','mic') while true do end" 0
复制代码


在第二个窗口运行


get lua
复制代码


结果一样,仍然是 busy,但是这个时候通过 script kill 命令,会发现报错,没办法 kill。


(error) UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.
复制代码


遇到这种情况,只能通过 shutdown nosave 命令来强行终止 redis。


shutdown nosave 和 shutdown 的区别在于 shutdown nosave 不会进行持久化操作,意味着发生在上一次快照后的数据库修改都会丢失。

Redisson 的 Lua 脚本

了解了 lua 之后,我们再回过头来看看 Redisson 的 Lua 脚本,就不难理解了。


<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,                          "if (redis.call('exists', KEYS[1]) == 0) then " +                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +                          "return nil; " +                          "end; " +                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +                          "return nil; " +                          "end; " +                          "return redis.call('pttl', KEYS[1]);",                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}
复制代码


Redis 中的 Pub/Sub 机制

下面是 Redisson 中释放锁的代码,在代码中我们发现一个 publish 的指令redis.call('publish', KEYS[2], ARGV[1]),这个指令是干啥的呢?


protected RFuture<Boolean> unlockInnerAsync(long threadId) {    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +                          "return nil;" +                          "end; " +                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +                          "if (counter > 0) then " +                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +                          "return 0; " +                          "else " +                          "redis.call('del', KEYS[1]); " +                          "redis.call('publish', KEYS[2], ARGV[1]); " +                          "return 1; " +                          "end; " +                          "return nil;",                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}
复制代码


Redis 提供了一组命令可以让开发者实现“发布/订阅”模式(publish/subscribe) . 该模式同样可以实现进程间的消息传递,它的实现原理是:


•发布/订阅模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或多个频道,而发布者可以向指定的频道发送消息,所有订阅此频道的订阅者都会收到该消息•发布者发布消息的命令是 PUBLISH, 用法是


PUBLISH channel message
复制代码


比如向 channel.1 发一条消息:hello


PUBLISH channel.1 “hello”
复制代码


这样就实现了消息的发送,该命令的返回值表示接收到这条消息的订阅者数量。因为在执行这条命令的时候还没有订阅者订阅该频道,所以返回为 0. 另外值得注意的是消息发送出去不会持久化,如果发送之前没有订阅者,那么后续再有订阅者订阅该频道,之前的消息就收不到了


订阅者订阅消息的命令是:


SUBSCRIBE channel [channel …]
复制代码


该命令同时可以订阅多个频道,比如订阅 channel.1 的频道:SUBSCRIBE channel.1,执行 SUBSCRIBE 命令后客户端会进入订阅状态。


一般情况下,我们不会用 pub/sub 来做消息发送机制,毕竟有这么多 MQ 技术在。


分享一些资深架构师录制的视频录像:有 Spring,MyBatis,Netty 源码分析,高并发、高性能、分布式、微服务架构的原理,JVM 性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多:

免费获取方式:点赞这篇文章+关注我,需要的朋友 点击这里 即可获取 希望对你有帮助~~


用户头像

还未添加个人签名 2018.11.21 加入

还未添加个人简介

评论

发布
暂无评论
阿里P6面试官:Redis如何实现分布式锁?锁过期了怎么办?