写点什么

【Java 技术开发专题】系列之「Guava RateLimiter」针对于限流器的入门到实战(含源码分析介绍)

作者:浩宇天尚
  • 2021 年 12 月 12 日
  • 本文字数:3725 字

    阅读完需:约 12 分钟

【Java技术开发专题】系列之「Guava RateLimiter」针对于限流器的入门到实战(含源码分析介绍)

限流器的思路和算法

如果让你来造一个限流器,有啥想法?

漏桶算法

  • 用一个固定大小的队列。比如设置限流为 5qps,1s 可以接受 5 个请求;那我们就造一个大小为 5 的队列,如果队列为满了,就拒绝请求;如果队列未满,就往队列添加请求。

令牌算法

令牌听起来挺酷的。以固定的速率往桶里发放令牌。然后消费者每次要取到令牌(acquire)才可以响应请求。


由于令牌是固定间隔发放的,假设还是 5qps,如果我有 1s 内没有请求,我的令牌桶就满了,可以一瞬间响应 5 个请求(一次过取 5 个令牌),也就是可以应对瞬时流量。


那么这里也涉及一个固定间隔发放的问题,难道也是需要定时任务往”桶里“放令牌吗?


那我们来看下 Guava 怎么搞的,假设限流为 2qps,那么固定发放令牌的时间 stableIntervalMicros 就是 500ms,初始化的 storedPermits 当前桶里的令牌数是 0。


RateLimiter 限流器

RateLimiter 从概念上来讲,速率限制器会在可配置的速率下分配许可证。如果必要的话,每个 acquire() 会阻塞当前线程直到许可证可用后获取该许可证。一旦获取到许可证,不需要再释放许可证。


RateLimiter 使用的是一种叫令牌桶的流控算法,RateLimiter 会按照一定的频率往桶里扔令牌,线程拿到令牌才能执行,比如:你希望自己的应用程序 QPS 不要超过 1000,那么 RateLimiter 设置 1000 的速率后,就会每秒往桶里扔 1000 个令牌。



com.google.common.util.concurrent.RateLimiter@ThreadSafe@Betapublic abstract class RateLimiter extends Object
复制代码

RateLimiter 的作用

RateLimiter 经常用于限制对一些物理资源或者逻辑资源的访问速率。与 Semaphore 相比,Semaphore 限制了并发访问的数量而不是使用速率。(注意尽管并发性和速率是紧密相关的,比如参考Little定律


通过设置许可证的速率来定义 RateLimiter。在默认配置下,许可证会在固定的速率下被分配,速率单位是每秒多少个许可证。为了确保维护配置的速率,许可会被平稳地分配,许可之间的延迟会做调整。


可能存在配置一个拥有预热期的 RateLimiter 的情况,在这段时间内,每秒分配的许可数会稳定地增长直到达到稳定的速率。


举例来说明如何使用 RateLimiter,想象下我们需要处理一个任务列表,但我们不希望每秒的任务提交超过两个:


//速率是每秒两个许可final RateLimiter rateLimiter = RateLimiter.create(2.0);void submitTasks(List tasks, Executor executor) {    for (Runnable task : tasks) {        rateLimiter.acquire(); // 也许需要等待        executor.execute(task);    }}
复制代码


再举另外一个例子,想象下我们制造了一个数据流,并希望以每秒 5kb 的速率处理它。可以通过要求每个字节代表一个许可,然后指定每秒 5000 个许可来完成:


// 每秒5000个许可final RateLimiter rateLimiter = RateLimiter.create(5000.0);void submitPacket(byte[] packet) {    rateLimiter.acquire(packet.length);    networkService.send(packet);}
复制代码


有一点很重要,那就是请求的许可数从来不会影响到请求本身的限制(调用 acquire(1) 和调用 acquire(1000) 将得到相同的限制效果,如果存在这样的调用的话),但会影响下一次请求的限制。


如果一个高开销的任务抵达一个空闲的 RateLimiter,它会被马上许可,但是下一个请求会经历额外的限制,从而来偿付高开销任务。注意:RateLimiter 并不提供公平性的保证。

create 方法

  • create(double permitsPerSecond, long warmupPeriod, TimeUnit unit):根据指定的稳定吞吐率和预热期来创建 RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指 QPS,每秒多少个请求量),在这段预热时间内,RateLimiter 每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和)

  • create(double permitsPerSecond):根据指定的稳定吞吐率创建 RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指 QPS,每秒多少查询)

acquire 方法

  • acquire():从 RateLimiter 获取一个许可,该方法会被阻塞直到获取到请求


public double acquire()
复制代码


从 RateLimiter 获取一个许可,该方法会被阻塞直到获取到请求。如果存在等待的情况的话,告诉调用者获取到该请求所需要的睡眠时间。该方法等同于 acquire(1)。


  • 返回值


执行速率的所需要的睡眠时间,单位为妙;如果没有则返回 0


  • acquire(int permits):从 RateLimiter 获取指定许可数,该方法会被阻塞直到获取到请求。


public double acquire(int permits)
复制代码


从 RateLimiter 获取指定许可数,该方法会被阻塞直到获取到请求数。如果存在等待的情况的话,告诉调用者获取到这些请求数所需要的睡眠时间。


  • 参数:permits – 需要获取的许可数

  • 返回:执行速率的所需要的睡眠时间,单位为妙;如果没有则返回 0

  • 抛出:IllegalArgumentException – 如果请求的许可数为负数或者为 0

tryAcquire 方法


  • tryAcquire():从 RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话

  • tryAcquire(int permits):从 RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话

  • tryAcquire(int permits, long timeout, TimeUnit unit):从 RateLimiter 获取指定许可数如果该许可数可以在不超过 timeout 的时间内获取得到的话,或者如果无法在 timeout 过期之前获取得到许可数的话,那么立即返回 false (无需等待)

  • tryAcquire(long timeout, TimeUnit unit):从 RateLimiter 获取许可如果该许可可以在不超过 timeout 的时间内获取得到的话,或者如果无法在 timeout 过期之前获取得到许可的话,那么立即返回 false(无需等待)

详细分析

public static RateLimiter create(double permitsPerSecond)
复制代码


根据指定的稳定吞吐率创建 RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指 QPS,每秒多少查询)。


返回的 RateLimiter 确保了在平均情况下,每秒发布的许可数不会超过 permitsPerSecond,每秒钟会持续发送请求。


当传入请求速率超过 permitsPerSecond,速率限制器会每秒释放一个许可(1.0 / permitsPerSecond 这里是指设定了 permitsPerSecond 为 1.0) 。


当速率限制器闲置时,允许许可数暴增到 permitsPerSecond,随后的请求会被平滑地限制在稳定速率 permitsPerSecond 中。

参数

  • permitsPerSecond – 返回的 RateLimiter 的速率,意味着每秒有多少个许可变成有效。

抛出异常

  • IllegalArgumentException – 如果 permitsPerSecond 为负数或者为 0


public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit)
复制代码


根据指定的稳定吞吐率和预热期来创建 RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指 QPS,每秒多少查询),在这段预热时间内,RateLimiter 每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率(只要存在足够请求数来使其饱和)。


同样地,如果 RateLimiter 在 warmupPeriod 时间内闲置不用,它将会逐步地返回冷却状态。


它会像它第一次被创建般经历同样的预热期。返回的 RateLimiter 主要用于那些需要预热期的资源,这些资源实际上满足了请求(比如一个远程服务),而不是在稳定(最大)的速率下可以立即被访问的资源。返回的 RateLimiter 在冷却状态下启动(即预热期将会紧跟着发生),并且如果被长期闲置不用,它将回到冷却状态。

参数

  • permitsPerSecond – 返回的 RateLimiter 的速率,意味着每秒有多少个许可变成有效。

  • warmupPeriod – 在这段时间内 RateLimiter 会增加它的速率,在抵达它的稳定速率或者最大速率之前

  • unit – 参数 warmupPeriod 的时间单位

抛出异常

  • IllegalArgumentException – 如果 permitsPerSecond 为负数或者为 0

实践案例

第 1 次获取 10 个令牌
  • nowMicro 是刚开始运行的时间,是一个很小的数,约等于 0;

  • resync(nowMicro),更新令牌数,由于 nowMicro 约等于 0,其实令牌数不会更新((0-0)/5000 = 0),令牌数还是 0(约等于 0)

  • storedPermitsToSpend,其实当前并没有令牌,所以取 min,约等于 0;

  • freshPermits,需要预支付 10 个令牌,约等于 10;

  • 预支付之后需要等待 10 * interval = 10 * 500 ,约等于 5000ms,5000000 微秒

  • this.nextFreeTicketMicros 需要加上 waitMicros 也就是 下一次可以获得令牌的时间是 5000ms 之后。

  • 所以我们看到输出信息的第一行在第 0s 获取了 10 个令牌之后,下一次再想获取 1 个令牌需要等待 5000ms 也就是 5s。

第 2 次获取 1 个令牌

然后再一次想获取 1 个令牌,当前时间还是约等于 0,这时候 resync,nowMicros(0)比 nextFreeTicketMicros(5000)小,令牌不更新。


returnValue=5000,storedPermitsToSpend=0,freshPermits=1,需要再等 waitMicros=1 * 500ms,然后 nextFreeTicketMicros 更新为 5000+500=5500,返回 returnValue=5000;外层函数睡眠 5000ms,返回 5000(输出打印获取 1 个 token,约 5s)

第 3 次获取 10 个令牌

上面说的,睡了 5000ms,当前时间 nowMicros=5000;


resync,nowMicros(5000)比 nextFreeTicketMicros(5500)小,令牌不更新,还是欠费状态,只能预支付。


returnValue=5500, storedPermitsToSpend=0,freshPermits=10,需要预支付 10 个令牌, waitMicros=10 * 500ms = 5000,然后 nextFreeTicketMicros 更新为 5500+5000=10500,返回 returnValue=5500;外层函数睡眠 5500-5000=500ms,返回 500(输出打印获取 10 个 token,约 0.5s)

资料参考

https://zhuanlan.zhihu.com/p/60979444


https://blog.csdn.net/waltonhuang/article/details/106763512

发布于: 1 小时前阅读数: 6
用户头像

浩宇天尚

关注

🏆 InfoQ写作平台-签约作者 🏆 2020.03.25 加入

【个人简介】酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“ 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、APM专题及微服务/分布式体系等

评论

发布
暂无评论
【Java技术开发专题】系列之「Guava RateLimiter」针对于限流器的入门到实战(含源码分析介绍)