01-分布式限流
01.1-Redis 中执行 Lua 脚本
Redis 天然的支持 Lua 脚本,使其除了可作为分布式缓存外,还可以实现其他的功能,例如分布式限流。
在 Redis 7 之前,Lua 脚本只能通过EVAL
命令执行。
// key [key ...] 可以在 Lua 脚本中通过 table KEYS 访问,例如 KEYS[1] 表示第一个 key
// arg [arg ...] 可以在 Lua 脚本中通过 table ARGV 访问,例如 ARGV[1] 表示第一个 arg
EVAL script numkeys key [key ...] arg [arg ...]
复制代码
关于 Lua 的语法可以参考 lua.org/user-manual。更多关于EVAL
命令的信息可以参考 redis.io。
01.2-分布式限流器实现
限流器主要思路是在 Redis 中维护一个计数器,若计数器超限则进行限流。限流器的 Lua 脚本如下:
-- filter.lua
local c
--查看 Redis 中计数器是否超限,计数器为 KEYS[1] ,阈值为 ARGV[1]
c = redis.call('incr', KEYS[1])
--超限
if c and tonumber(c) > tonumber(ARGV[1]) then
return c;
end
--若是第一次调用限流,设置计数器的过期时间,过期时间由 ARGV[2] 指定
if tonumber(c) == 1 then
redis.call('expire', KEYS[1], ARGV[2])
end
return c;
复制代码
01.3-在 Spring-data-redis 中调用 Lua 脚本
Spring-data-redis 中,Lua 脚本被抽象化为RedisScript
对象。
@Bean(name = "filter_lua")
public RedisScript<Long> filterLuaScript() {
return RedisScript.of(new ClassPathResource("lua/filter.lua"), Long.class);
}
复制代码
在需要执行 Lua 脚本的对象中,只需要从容器中获取RedisScript
对象,然后使用RedisTemplate
执行即可。
// this.filterLua 即上面的 RedisScript 对象
// key 为 Redis 中的计数器对应的键
// threshold ttl 是 filter.lua 中需要的参数,分别表示阈值和计数器持续时间
long count = (Long) this.redisTemplate.execute(this.filterLua,
singletonList(key),
threshold, ttl);
if (count > threshold) {
// 说明超限了
} else {
// 未超限
}
复制代码
01.4-在 Redisson 中调用 Lua 脚本
Redisson 中也提供了执行 Lua 脚本的接口。与 Spring-data-redis 不同的是,Redisson 中定义了RScript
来表示 Lua 脚本。
// redisson 是 RedissonClient 对象
long count = (Long) redisson.getScript(StringCodec.INSTANCE).eval(
RScript.Mode.READ_WRITE,
script, // lua 脚本
RScript.ReturnType.INTEGER,
singletonList(key),
threshold, "30");
复制代码
需要注意的是,script
是一个字符串。与 Sping-data-redis 从文件中读取 Lua 脚本的方式相比,在 Java 代码中拼接 Lua 脚本显然更麻烦,而且更不容易维护。
关于上述两种方式的完整代码,可以参考我的 gitee。
02-延时队列
Redis 有时也会被用来实现延时队列功能。与延时队列功能相关的数据结构为 zset,相关命令如下:
zadd key score member [score memeber ...],向有序集合中加入元素及分数
zrange key min max [withscores],按照下标查询 [min, max] 范围内的元素
zrem key member [member ...],从有序集合中移除元素
实现延时队列的思路如下:
基于 Redisson 的实现代码如下:
// 生产者线程,负责向延迟队列中添加消息
// 获取 zset
String key = "example:delay:queue";
RScoredSortedSet<String> delayQueue = this.redisson.getScoredSortedSet(key);
// 每次向 zset 中添加 5 条消息,消息是一个随机的 UUID,score 为当前时间 + 延时
int d = Integer.parseInt(delay);
for (int i = 0; i < 5; ++i) {
String member = UUID.randomUUID().toString().replace("-", "");
long score = System.currentTimeMillis() / 1000 + d;
boolean result = delayQueue.add(score, member);
if (result) {
LOGGER.info("插入一个消息:[{}]({})", member, score);
} else {
LOGGER.warn("插入消息失败:[{}]({})", member, score);
}
}
复制代码
// 消费者线程
while (true) {
// 获取 zset
RScoredSortedSet<String> delayQueue = this.redisson.getScoredSortedSet(key);
// 如果 Redis 中没有延时队列,或延时队列中没有消息,则轮训等待
if (delayQueue == null || delayQueue.isEmpty()) {
continue;
}
// a. 查看队列头的元素分数是否满足延时
long score = delayQueue.firstScore().longValue();
if (score <= (System.currentTimeMillis() / 1000)) {
// b. 消费消息
String message = delayQueue.pollFirst();
LOGGER.info("{} ms 消费了一个消息,消息ID {}, 线程ID {}", System.currentTimeMillis(), message, Thread.currentThread().getName());
}
}
复制代码
消费者中的代码在多线程情形下是非线程安全的,有些线程会在 b.拿到 null,主要原因是步骤 a.和 b.是非原子的。解决方式:要么加锁,要么通过 Lua 脚本使上述两步称为原子的。加锁会降低并发的性能,这里我们主要通过 Lua 脚本来解决非原子性问题。
从延时队列中检查并消费一个消息的 Lua 脚本如下:
-- consume.lua
local entry = redis.call('zrange', KEYS[1], 0, 0, 'WITHSCORES')
if entry then
if entry[2] and tonumber(entry[2]) <= tonumber(ARGV[1]) then
redis.call('zrem', KEYS[1], entry[1])
return entry[1]
end
end
return nil;
复制代码
生产者线程不需要改变,消费者线程中的用法改成:
while (true) {
long now = System.currentTimeMillis() / 1000;
final String message = this.redisTemplate.execute(this.consumeScriptLua, Collections.singletonList(key), now + "");
if (null != message) {
LOGGER.info("{} ms 消费了一个消息,消息ID {}, 线程ID {}", System.currentTimeMillis(), message, Thread.currentThread().getName());
}
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException ignored) {}
}
复制代码
完整的示例代码可以参考我的 gitee 。
我们来分析一下这种实现延时队列的方式的缺点:
首先,使用轮训的方式,无疑是浪费 CPU 资源的
其次,不是十分的精准,存在一定的误差。
历史文章推荐
Redis「7」实现分布式锁
Redis「6」实现消息队列
Redis「5」事件处理模型与键过期策略
Redis「4」Redis 在秒杀系统中的应用
Redis「3」持久化
Redis「2」缓存一致性与异常处理
Redis「1」流水线、事务、Lua 脚本
评论