写点什么

Redis「8」实现分布式限流与延时队列

作者:Samson
  • 2022 年 5 月 19 日
  • 本文字数:2728 字

    阅读完需:约 9 分钟

Redis「8」实现分布式限流与延时队列

01-分布式限流

01.1-Redis 中执行 Lua 脚本

Redis 天然的支持 Lua 脚本,使其除了可作为分布式缓存外,还可以实现其他的功能,例如分布式限流。


在 Redis 7 之前,Lua 脚本只能通过EVAL命令执行。


// key [key ...] 可以在 Lua 脚本中通过 table KEYS 访问,例如 KEYS[1] 表示第一个 key
// arg [arg ...] 可以在 Lua 脚本中通过 table ARGV 访问,例如 ARGV[1] 表示第一个 arg
EVAL script numkeys key [key ...] arg [arg ...]
复制代码


关于 Lua 的语法可以参考 lua.org/user-manual。更多关于EVAL命令的信息可以参考 redis.io

01.2-分布式限流器实现

限流器主要思路是在 Redis 中维护一个计数器,若计数器超限则进行限流。限流器的 Lua 脚本如下:


-- filter.lualocal c--查看 Redis 中计数器是否超限,计数器为 KEYS[1] ,阈值为 ARGV[1]c = redis.call('incr', KEYS[1])--超限if c and tonumber(c) > tonumber(ARGV[1]) thenreturn c;end--若是第一次调用限流,设置计数器的过期时间,过期时间由 ARGV[2] 指定if tonumber(c) == 1 thenredis.call('expire', KEYS[1], ARGV[2])endreturn c;
复制代码

01.3-在 Spring-data-redis 中调用 Lua 脚本

Spring-data-redis 中,Lua 脚本被抽象化为RedisScript对象。


@Bean(name = "filter_lua")public RedisScript<Long> filterLuaScript() {    return RedisScript.of(new ClassPathResource("lua/filter.lua"), Long.class);}
复制代码


在需要执行 Lua 脚本的对象中,只需要从容器中获取RedisScript对象,然后使用RedisTemplate执行即可。


// this.filterLua 即上面的 RedisScript 对象// key 为 Redis 中的计数器对应的键// threshold ttl 是 filter.lua 中需要的参数,分别表示阈值和计数器持续时间long count = (Long) this.redisTemplate.execute(this.filterLua,                 singletonList(key),                 threshold, ttl);if (count > threshold) {  // 说明超限了} else {  // 未超限}
复制代码

01.4-在 Redisson 中调用 Lua 脚本

Redisson 中也提供了执行 Lua 脚本的接口。与 Spring-data-redis 不同的是,Redisson 中定义了RScript来表示 Lua 脚本。


// redisson 是 RedissonClient 对象long count = (Long) redisson.getScript(StringCodec.INSTANCE).eval(                RScript.Mode.READ_WRITE,                script,                      // lua 脚本                RScript.ReturnType.INTEGER,                singletonList(key),                threshold, "30");
复制代码


需要注意的是,script是一个字符串。与 Sping-data-redis 从文件中读取 Lua 脚本的方式相比,在 Java 代码中拼接 Lua 脚本显然更麻烦,而且更不容易维护。


关于上述两种方式的完整代码,可以参考我的 gitee

02-延时队列

Redis 有时也会被用来实现延时队列功能。与延时队列功能相关的数据结构为 zset,相关命令如下:


  • zadd key score member [score memeber ...],向有序集合中加入元素及分数

  • zrange key min max [withscores],按照下标查询 [min, max] 范围内的元素

  • zrem key member [member ...],从有序集合中移除元素


实现延时队列的思路如下:


  • 生产者将需要延迟的消息 id 添加到 zset 中,其分数设置为“当前时间 + 需要延时的时间”

  • 消费者不断轮训有序集合中的第一个元素与当前时间的大小,若超过当前时间,则认为延时已经满足,消费掉消息。


基于 Redisson 的实现代码如下:


// 生产者线程,负责向延迟队列中添加消息// 获取 zsetString key = "example:delay:queue";RScoredSortedSet<String> delayQueue = this.redisson.getScoredSortedSet(key);
// 每次向 zset 中添加 5 条消息,消息是一个随机的 UUID,score 为当前时间 + 延时int d = Integer.parseInt(delay);for (int i = 0; i < 5; ++i) { String member = UUID.randomUUID().toString().replace("-", ""); long score = System.currentTimeMillis() / 1000 + d;
boolean result = delayQueue.add(score, member); if (result) { LOGGER.info("插入一个消息:[{}]({})", member, score); } else { LOGGER.warn("插入消息失败:[{}]({})", member, score); }}
复制代码


// 消费者线程while (true) {  // 获取 zset  RScoredSortedSet<String> delayQueue = this.redisson.getScoredSortedSet(key);  // 如果 Redis 中没有延时队列,或延时队列中没有消息,则轮训等待  if (delayQueue == null || delayQueue.isEmpty()) {      continue;  }  // a. 查看队列头的元素分数是否满足延时  long score = delayQueue.firstScore().longValue();  if (score <= (System.currentTimeMillis() / 1000)) {      // b. 消费消息      String message = delayQueue.pollFirst();      LOGGER.info("{} ms 消费了一个消息,消息ID {}, 线程ID {}", System.currentTimeMillis(), message, Thread.currentThread().getName());  }}
复制代码


消费者中的代码在多线程情形下是非线程安全的,有些线程会在 b.拿到 null,主要原因是步骤 a.和 b.是非原子的。解决方式:要么加锁,要么通过 Lua 脚本使上述两步称为原子的。加锁会降低并发的性能,这里我们主要通过 Lua 脚本来解决非原子性问题。


从延时队列中检查并消费一个消息的 Lua 脚本如下:


-- consume.lualocal entry = redis.call('zrange', KEYS[1], 0, 0, 'WITHSCORES')
if entry then if entry[2] and tonumber(entry[2]) <= tonumber(ARGV[1]) then redis.call('zrem', KEYS[1], entry[1]) return entry[1] endend
return nil;
复制代码


生产者线程不需要改变,消费者线程中的用法改成:


while (true) {    long now = System.currentTimeMillis() / 1000;    final String message = this.redisTemplate.execute(this.consumeScriptLua, Collections.singletonList(key), now + "");    if (null != message) {        LOGGER.info("{} ms 消费了一个消息,消息ID {}, 线程ID {}", System.currentTimeMillis(), message, Thread.currentThread().getName());    }    try {        TimeUnit.MILLISECONDS.sleep(10);    } catch (InterruptedException ignored) {}}
复制代码


完整的示例代码可以参考我的 gitee


我们来分析一下这种实现延时队列的方式的缺点:


  • 首先,使用轮训的方式,无疑是浪费 CPU 资源的

  • 其次,不是十分的精准,存在一定的误差。


历史文章推荐

Redis「7」实现分布式锁

Redis「6」实现消息队列

Redis「5」事件处理模型与键过期策略

Redis「4」Redis 在秒杀系统中的应用

Redis「3」持久化

Redis「2」缓存一致性与异常处理

Redis「1」流水线、事务、Lua 脚本

发布于: 刚刚阅读数: 2
用户头像

Samson

关注

还未添加个人签名 2019.07.22 加入

还未添加个人简介

评论

发布
暂无评论
Redis「8」实现分布式限流与延时队列_redis_Samson_InfoQ写作社区