前提
最近公司在做有需求在做分布式限流,调研的限流框架大概有
1、spring cloud gateway 集成 redis 限流,但属于网关层限流
2、阿里 Sentinel,功能强大、带监控平台
3、srping cloud hystrix,属于接口层限流,提供线程池与信号量两种方式
4、其他:redission、手撸代码
实际需求情况属于业务端限流,redission 更加方便,使用更加灵活,下面介绍下 redission 分布式限流如何使用及原理:
一、使用
使用很简单、如下
// 1、 声明一个限流器RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);// 2、 设置速率,5秒中产生3个令牌rateLimiter.trySetRate(RateType.OVERALL, 3, 5, RateIntervalUnit.SECONDS);// 3、试图获取一个令牌,获取到返回truerateLimiter.tryAcquire(1)
复制代码
二、原理
1、getRateLimiter
// 声明一个限流器 名称 叫keyredissonClient.getRateLimiter(key)
复制代码
2、trySetRate
trySetRate 方法跟进去底层实现如下:
@Overridepublic RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);" + "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);" + "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);", Collections.<Object>singletonList(getName()), rate, unit.toMillis(rateInterval), type.ordinal());}
复制代码
举个例子,更容易理解:
比如下面这段代码,5 秒中产生 3 个令牌,并且所有实例共享( RateType.OVERALL 所有实例共享、 RateType.CLIENT 单实例端共享)
trySetRate(RateType.OVERALL, 3, 5, RateIntervalUnit.SECONDS);
复制代码
那么 redis 中就会设置 3 个参数:
hsetnx,key,rate,3hsetnx,key,interval,5hsetnx,key,type,0
复制代码
接着看 tryAcquire(1) 方法:底层源码如下
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "local rate = redis.call('hget', KEYS[1], 'rate');" //1 + "local interval = redis.call('hget', KEYS[1], 'interval');" //2 + "local type = redis.call('hget', KEYS[1], 'type');" //3 + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')" //4 + "local valueName = KEYS[2];" //5 + "if type == 1 then " + "valueName = KEYS[3];" //6 + "end;" + "local currentValue = redis.call('get', valueName); " //7 + "if currentValue ~= false then " + "if tonumber(currentValue) < tonumber(ARGV[1]) then " //8 + "return redis.call('pttl', valueName); " + "else " + "redis.call('decrby', valueName, ARGV[1]); " //9 + "return nil; " + "end; " + "else " //10 + "redis.call('set', valueName, rate, 'px', interval); " + "redis.call('decrby', valueName, ARGV[1]); " + "return nil; " + "end;", Arrays.<Object>asList(getName(), getValueName(), getClientValueName()), value, commandExecutor.getConnectionManager().getId().toString());}
复制代码
第 1、2、3 备注行是获取上一步 set 的 3 个值: rate、interval、type ,如果这 3 个值没有设置,直接返回 rateLimiter 没有被初始化。
第 5 备注行声明一个变量叫 valueName 值为 KEYS[2],KEYS[2] 对应的值是 getValueName() 方法, getValueName() 返回的就是上面第一步 getRateLimiter 我们设置的 key;如果 type=1,表示全局共享,那么 valueName 的值改为取 KEYS[3] , KEYS[3] 对应的值为 getClientValueName() ,查看 getClientValueName() 源码:
String getClientValueName() { return suffixName(getValueName(), commandExecutor.getConnectionManager().getId().toString()); }
复制代码
ConnectionManager().getId() 如下:
public interface ConnectionManager { UUID getId(); 省略...}
复制代码
这个 getId()是每个客户端初始化的时候生成的 UUID,即每个客户端的 getId 是唯一的,这也就验证了 trySetRate 方法中 RateType.ALL 与 RateType.PER_CLIENT 的作用。
接着看第 7 标准行,获取 valueName 对应的值 currentValue ;首次获取肯定为空,那么看第 10 标准行 else 的逻辑
set valueName 3 px 5 ,设置 key=valueName value=3 过期时间为 5 秒
decrby valueName 1 ,将上面 valueName 的值减 1
那么如果第二次访问,第 7 标注行返回的值存在,将会走第 8 标注行,紧接着走如下判断
如果当前 valueName 的值也就是 3,小于要获得的令牌数量( tryAcquire 方法中的入参),那么说明当前时间内(key 的有效期 5 秒内),令牌的数量已经被用完,返回 pttl(key 的剩余过期时间);反之说明桶中有足够的令牌,获取之后将会把桶中的令牌数量减 1,至此结束。
总结
redission 分布式限流采用令牌桶思想和固定时间窗口, trySetRate 方法设置桶的大小,利用 redis key 过期机制达到时间窗口目的,控制固定时间窗口内允许通过的请求量。
评论