写点什么

限流与 Guava RateLimiter 原理解析

用户头像
千珏
关注
发布于: 2021 年 05 月 12 日
限流与Guava RateLimiter原理解析

背景

在高并发场景或者对某些操作(如 IO)速率有要求时,我们可以使用一些流控方案,比如令牌桶,漏斗等,Guava 开源的 RateLimiter 提供了在多线程环境下很好的流控解决方案

通俗解释

如果要我们自己实现一个这样的流控方案,会怎么实现呢?我们从最简单开始,一步步新增特性。

假设给定 QPS = 5,每秒可以获取 5 个令牌,那么就是每 200ms 可以获取一个令牌,我们只需要一个变量记录上次成功获取令牌的时间就行啦。比如用 lastAcquireTime 记录,初始时 lastAcquireTime=0,当获取 1 个令牌时,如果当前时间 now-lastAcquireTime < 200ms,那么就不能获取(一次获取 n 个令牌同理,now-lastAcquireTime < n * 200ms),阻塞等待 200ms - (now - lastAcquireTime)这么长时间,然后获取成功,同时更新 lastAcquireTime 就行啦。

上述简单情况我们只记录了上次获取的时间,那么如果自从上次获取令牌之后很长一段时间没有获取令牌了,这段时间内没有使用的令牌是否应该保存下来,等到下一个获取令牌的请求到来时,直接获取成功呢?这里肯定是需要的,那么保存多少令牌,是不是一下子全部发放出去就得看具体的业务了。

比如限流是为了不那么快的写缓冲区,那么停了一段时间之后,缓冲区已经是空的了,我们自然是希望下次写缓冲区的时候,可以一次性拿到和缓冲区大小一致的令牌,然后立刻写满缓冲区,而不是还要等。

另一个例子是,限流是为了保护后面的服务,假设是访问缓存,经过一段时间不妨问缓存之后,如果突然大量的请求同时访问缓存,可能会造成缓存雪崩,我们应该是平缓增大的方式去获取令牌来访问缓存。

因此我们用一个变量 storedPermits 记录我们可以保存的令牌数,用 strategy 变量表示利用 storedPermits 的策略,是立即使用所有还是平滑使用,对应了 Guava RateLimiter 的两种实现SmoothBurstySmoothWarmingUp

我们以SmoothBursty为例来说明 RateLimiter 的原理

基本使用

//创建并初始化一个RateLimiter,qps=5RateLimiter rateLimiter = RateLimiter.create(5);//阻塞的方式获取一个令牌rateLimiter.acquire(1);//非阻塞的方式获取一个令牌,如果在指定的时间范围内无法获取,则立即返回false,否则阻塞一定的时间,并获取成功rateLimiter.tryAcquire(1, 1, TimeUnit.SECONDS);//修改速率rateLimiter.setRate(2.0);
复制代码

类图


流程

1、调用RateLimiter.create(5)之后的状态

//默认最多可以储备maxBurstSeconds * qps个令牌maxPermits = 5//初始时并没有储备令牌storedPermits = 0//由于我们设置了qps=5,因此固定每200ms可以填充一个令牌,这里用微妙表示stableIntervalMicros = 200000//这里初始化时其实是程序调用setRate的时间,为了方便,直接初始化为0nextFreeTicketMicros = 0//默认只能保存最多1秒的令牌,可以这么理解这个字段,比如在一段时间里面没有客户端来获取令牌,那么这段时间内的令牌不应该被累积起来,但是SmoothBursty策略允许累计最大maxBurstSeconds时间内的令牌,后续请求到来时,直接获取成功而不需要等待maxBurstSeconds = 1
复制代码

2、假设创建完 RateLimiter 之后,立即调用 acquire()方法(不考虑程序执行的时间差),则本次 acquire 之后状态如下


maxPermits = 5 //不变,只跟qps有关storedPermits = 0 //由于我们假设create和acquire没有时间差,这里自然就没有保存令牌,因此还是0stableIntervalMicros = 200000 //不变,只跟qps有关nextFreeTicketMicros = 200000 //注意,由于我们获取了一个令牌,而令牌恢复时间stableIntervalMicros是200ms,因此只能等到200ms后再继续获取令牌了maxBurstSeconds = 1 //不变,初始化之后就不变
复制代码

3、由于步骤 2 没有阻塞,立即返回,因此我们立即调用 acquire 进行下一次获取,由于 nextFreeTicketMicros 已经是 200ms 之后了,因此当前线程会 sleep 200ms,然后返回,此时的状态如下


maxPermits = 5 //不变,只跟qps有关storedPermits = 0 //上次获取到目前并没有时间差,因此还是0stableIntervalMicros = 200000 //不变,只跟qps有关nextFreeTicketMicros = 400000 //注意,这个时间是一个时间戳,代表RateLimiter初始化以来经过的微妙数maxBurstSeconds = 1 //不变,初始化之后就不变
复制代码

4、假设经过步骤 3 之后,客户端有一段时间(大于 1 秒)没有来获取令牌了,当客户端再次获取令牌时的状态如下(只展示变化的)


获取前storedPermits = 5 //由于已经积压了超过1秒,此值=maxPermitsnextFreeTicketMicros = 0 //我们以相对时间来讨论,假设相对当前时间是0
获取后storedPermits = 4 //消耗掉了一个令牌nextFreeTicketMicros = 0 //注意这里还是0,因为使用的是历史积压的令牌,并没有消耗未来补充的令牌,这也就意味着后续连续4次调用都不会阻塞
复制代码

理解

根据上述流程,我们知道了 RateLimiter 实现最重要的是 nextFreeTicketMicros 这个时间,如果当前时间小于该时间,则阻塞到该时间,并更新 nextFreeTicketMicros = nextFreeTicketMicros + stableIntervalMicros,否则直接返回,若消耗的是积压的令牌,则不更新该时间,否则更新 nextFreeTicketMicros = nextFreeTicketMicros + stableIntervalMicros

思考:在多线程环境下这样使用会出现什么问题呢?

我们使用如下代码测试:

//模拟获取令牌的任务,并打印获取到令牌阻塞的时间static class Task implements Runnable {    String name;    RateLimiter limiter;
Task(String name, RateLimiter limiter){ this.name = name; this.limiter = limiter; }
@Override public void run() { double ws = limiter.acquire(); System.out.println("thread " + name + " wait " + ws + " seconds"); } }
//测试类public class DemoRateLimit { public static void main(String[] args) throws InterruptedException { //创建一个1qps的RateLimiter RateLimiter limiter = RateLimiter.create(1); Thread[] threads = new Thread[10]; //创建并启动10个线程 for (int i = 0; i < 10; i++){ threads[i] = new Thread(new Task(String.valueOf(i), limiter)); threads[i].start(); } //等待输出结果 for (int i = 0; i < 10; i++){ threads[i].join(); } }}
//输出结果如下thread 0 wait 0.0 secondsthread 1 wait 0.995701 secondsthread 7 wait 1.995693 secondsthread 6 wait 2.995672 secondsthread 5 wait 3.995657 secondsthread 4 wait 4.995537 secondsthread 3 wait 5.995497 secondsthread 2 wait 6.995434 secondsthread 8 wait 7.993244 secondsthread 9 wait 8.993134 seconds
复制代码

可以看到,acquire 采取了预分配的策略,就是该操作一定会成功,但是你得等很久,而且不保证线程的公平性(取决于 synchronized)

在高并发环境下面容易长时间阻塞线程,因此不应该这样获取,应该使用 tryAcquire

tryAcquire 的原理是,判断给定的超时时间是否大于 nextFreeTicketMicros,如果大于,则直接返回获取失败,否则获取成功,并阻塞相对应的时间


对比 SmoothBursty 与 SmoothWarmingUp

SmoothWarmingUp 里面主要是维护了一个函数,实现了在 warmupPeriodMicros 时间范围里面缓慢释放已保存令牌的过程,我们用一个测试来对比一下

//统一2qps,warmup时间为2秒public class RateLimiterTest {
public static void main(String[] args) throws InterruptedException { testBursty(); testWarmUp(); }
static void testBursty() throws InterruptedException { RateLimiter limiter = RateLimiter.create(2); acquire(limiter); }
static void testWarmUp() throws InterruptedException { RateLimiter limiter = RateLimiter.create(2, 2, TimeUnit.SECONDS); acquire(limiter); }

static void acquire(RateLimiter limiter) throws InterruptedException { long start = System.currentTimeMillis(); for (int i = 0; i < 5; i++) { limiter.acquire(1); long now = System.currentTimeMillis(); System.out.println("acquire at " + (now - start) + " ms"); } TimeUnit.SECONDS.sleep(2); for (int i = 0; i < 5; i++) { limiter.acquire(1); long now = System.currentTimeMillis(); System.out.println("acquire at " + (now - start) + " ms"); } }}//testBursty的输出如下,前面5次获取遵循500ms一次,后面由于休眠了2秒,积压了2个令牌(最大积压为1秒 * qps),因此6,7次立即获取到了,第8次由于前面讲过是面向未来预分配的,因此也是立即获取,后面遵循500ms一次acquire at 1 msacquire at 506 msacquire at 1001 msacquire at 1504 msacquire at 2001 msacquire at 4005 msacquire at 4005 msacquire at 4006 msacquire at 4507 msacquire at 5007 ms//testWarmUp的输出如下,前面2秒预热期内(1,2,3次获取)并不是按照500ms间隔的,而是以预热的速率获取,后面2秒后遵循500ms一次,但是我们休眠2秒后,从第6次获取开始,由于我们保存了一些令牌,因此不必要经过完整的2秒预热器啦,而是快速消耗了我们保存的令牌(这个速率比预热速率更快,但不是一瞬间释放),然后进入正常的500ms一次acquire at 2 msacquire at 1253 msacquire at 2004 msacquire at 2506 msacquire at 3005 msacquire at 5006 msacquire at 5767 msacquire at 6266 msacquire at 6765 msacquire at 7264 ms
复制代码

总结

如果我们限流后面的是类似缓冲区这种,可以使用 SmoothBursty,如果是服务调用,建议使用 SmoothWarmingUp,尽可能使用 tryAcquire 加上超时时间获取,否则线程会一直阻塞

发布于: 2021 年 05 月 12 日阅读数: 24
用户头像

千珏

关注

还未添加个人签名 2017.12.20 加入

还未添加个人简介

评论

发布
暂无评论
限流与Guava RateLimiter原理解析