写点什么

☕️【Java 专题系列】「回顾 RateLimiter」针对于限流器的入门到精通(针对于源码分析介绍)

发布于: 5 小时前
☕️【Java专题系列】「回顾 RateLimiter」针对于限流器的入门到精通(针对于源码分析介绍)

Guava 包中限流实现分析

RateLimiter

之前的文章中已经介绍了常用的限流算法,而 google 在 Java 领域中使用 Guava 包中的限流工具进行服务限流。

回顾使用案例

Google 开源工具包 Guava 提供了限流工具类 RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便。


@Testpublic void  testSample() {    RateLimiter rateLimiter = RateLimiter.create(500)}
复制代码


以上示例,创建一个 RateLimiter,指定每秒放 500 个令牌(0.002 秒放 1 个令牌),其输出见下:


从输出结果可以看出,RateLimiter 具有预消费的能力:


  • 请求 1 时并没有任何等待直接预消费了 1 个令牌

  • 请求 2 时,由于之前预消费了 1 个令牌,故而等待了 2 秒,之后又预消费了 6 个令牌

  • 请求 3 时同理,由于之前预消费了 6 个令牌,故而等待了 12 秒


属于线性处理机制。


  • RateLimiter 通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)。

  • 但是某些情况下并不需要这种突发请求处理能力,如某 IM 厂商提供消息推送接口,但推送接口有严格的频率限制(600 次/30 秒),在调用该 IM 厂商推送接口时便不能预消费,否则,则可能出现推送频率超出限制而失败。

  • 其中 RateLimiter 类为限流的核心类,其为 public 的抽象类,RateLimiter 有一个实现类 SmoothRateLimiter,根据不同消耗令牌的策略 SmoothRateLimiter 又有两个具体实现类 SmoothBursty 和 SmoothWarmingUp。

  • 在实际使用过程中一般直接使用 RateLimiter 类,其他类对用户是透明的,RateLimiter 类的设计使用了类似 BUILDER 模式的小技巧,并做了一定的调整。

  • 通过 RateLimiter 类图可见,RateLimiter 类不仅承担了具体实现类的创建职责,同时也确定了被创建出的实际类可提供的方法。标准创建者模式 UML 图如下所示(引用自百度百科)

Guava 包中限流工具类

Guava 核心限流类介绍

  • RateLimiter 类为限流的核心类,其为 public 的抽象类,RateLimiter 有一个实现类 SmoothRateLimiter,根据不同消耗令牌的策略 SmoothRateLimiter 又有两个具体实现类 SmoothBursty 和 SmoothWarmingUp。

Guava 有两种限流模式

  • 一种为稳定模式(SmoothBursty:令牌生成速度恒定)

  • 一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值)


两种模式实现思路类似,主要区别在等待时间的计算上,

Guava RateLimiter 核心类实现

  • 在实际使用过程中一般直接使用 RateLimiter 类,其他类对用户是透明的。RateLimiter 类的设计使用了类似 BUILDER 模式的小技巧,并做了一定的调整。

  • 通过 RateLimiter 类图可见,RateLimiter 类不仅承担了具体实现类的创建职责,同时也确定了被创建出的实际类可提供的方法。


RateLimiter 类即承担了 builder 的职责,也承担了 Product 的职责。

SmoothBursty
  • Guava 包 RateLimiter 类的说明文档,首先使用 create 函数创建限流器,指定每秒生成 2 个令牌,在需要调用服务时使用 acquire 函数或取令牌。


create 函数分析


  • create 函数具有两个个重载,根据不同的重载可能创建不同的 RateLimiter 具体实现子类。

  • 目前可返回的实现子类包括 SmoothBursty 及 SmoothWarmingUp 两种,具体不同下文详细分析。

  • 在调用 create 接口时,实际实例化的为 SmoothBursty 类


public static RateLimiter create(double permitsPerSecond) {    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());}static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);    rateLimiter.setRate(permitsPerSecond);    return rateLimiter;}
复制代码


在解析 SmoothBursty 原理前,重点解释下 SmoothBursty 中几个属性的含义


/** * The currently stored permits. * 当前存储令牌数 */double storedPermits;/** * The maximum number of stored permits. * 最大存储令牌数 */double maxPermits;/** * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits * per second has a stable interval of 200ms. * 添加令牌时间间隔 */double stableIntervalMicros;/** * The time when the next request (no matter its size) will be granted. After granting a request, * this is pushed further in the future. Large requests push this further than small requests. * 下一次请求可以获取令牌的起始时间 * 由于RateLimiter允许预消费,上次请求预消费令牌后 * 下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌 */private long nextFreeTicketMicros = 0L;// could be either in the past or future
复制代码


tryAcquire 函数实现机制
  • 就非常容易理解 RateLimiter 暴露出来的接口


@CanIgnoreReturnValuepublic double acquire() {  return acquire(1);}
@CanIgnoreReturnValuepublic double acquire(int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L);}
final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); }}
复制代码


  • acquire 函数主要用于获取 permits 个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回


public boolean tryAcquire(int permits) {  return tryAcquire(permits, 0, MICROSECONDS);}
public boolean tryAcquire() { return tryAcquire(1, 0, MICROSECONDS);}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); checkPermits(permits); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); if (!canAcquire(nowMicros, timeoutMicros)) { return false; } else { microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } stopwatch.sleepMicrosUninterruptibly(microsToWait); return true;}
private boolean canAcquire(long nowMicros, long timeoutMicros) { return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;}
@Overridefinal long queryEarliestAvailable(long nowMicros) { return nextFreeTicketMicros;}
复制代码


  • acquire 函数主要用于获取 permits 个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回

  • tryAcquire 函数可以尝试在 timeout 时间内获取令牌,如果可以则挂起等待相应时间并返回 true,否则立即返回 false

  • canAcquire 用于判断 timeout 时间内是否可以获取令牌

resync 函数

该函数会在每次获取令牌之前调用,其实现思路为,若当前时间晚于 nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只需要在获取令牌时计算一次即可。


/** * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */void resync(long nowMicros) {    // if nextFreeTicket is in the past, resync to now    if (nowMicros > nextFreeTicketMicros) {      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();      storedPermits = min(maxPermits, storedPermits + newPermits);      nextFreeTicketMicros = nowMicros;    }}
复制代码


acquire 函数分析


acquire 函数也具有两个重载类,但分析过程仅仅需要关系具有整形参数的函数重载即可,无参数的函数仅仅是 acquire(1)的简便写法。


在 acquire(int permits)函数中主要完成三件事:


  • 预分配授权数量,此函数返回需要等待的时间,可能为 0;

  • 根据等待时间进行休眠;

  • 以秒为单位,返回获取授权消耗的时间。


完成以上工作的过程中,RateLimiter 类确定了获取授权的过程骨架并且实现了一些通用的方法,这些通用方法中会调用为实现的抽象方法,开发人员根据不同的算法需求可实现特定子类对抽象方法进行覆盖。


其调用流程如下图:

其中橙色块中 reserveEarliestAvailable 方法即为需要子类进行实现的,下文以该函数为核心,分析 RateLimiter 类的子类是如何实现该方法的。


final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {  resync(nowMicros);  long returnValue = nextFreeTicketMicros; // 返回的是上次计算的nextFreeTicketMicros  double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 可以消费的令牌数  double freshPermits = requiredPermits - storedPermitsToSpend; // 还需要的令牌数  long waitMicros =      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)          + (long) (freshPermits * stableIntervalMicros); // 根据freshPermits计算需要等待的时间
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 本次计算的nextFreeTicketMicros不返回 this.storedPermits -= storedPermitsToSpend; return returnValue;}
复制代码


  • 该函数用于获取 requiredPermits 个令牌,并返回需要等待到的时间点

  • 其中,storedPermitsToSpend 为桶中可以消费的令牌数,freshPermits 为还需要的(需要补充的)令牌数,根据该值计算需要等待的时间,追加并更新到 nextFreeTicketMicros

  • 需要注意的是,该函数的返回是更新前的(上次请求计算的)nextFreeTicketMicros,而不是本次更新的 nextFreeTicketMicros,通俗来讲,本次请求需要为上次请求的预消费行为埋单,这也是 RateLimiter 可以预消费(处理突发)的原理所在。若需要禁止预消费,则修改此处返回更新后的 nextFreeTicketMicros 值。

SmoothBursty 的构造函数
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {  super(stopwatch);  this.maxBurstSeconds = maxBurstSeconds; // 最大存储maxBurstSeconds秒生成的令牌}
@Overridevoid doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; // 计算最大存储令牌数 if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; }}
复制代码


  • 桶中可存放的最大令牌数由 maxBurstSeconds 计算而来,其含义为最大存储 maxBurstSeconds 秒生成的令牌。

  • 该参数的作用在于,可以更为灵活地控制流量。如,某些接口限制为 300 次/20 秒,某些接口限制为 50 次/45 秒等。

抽象函数分析

在以上文代码分析中出现了两个抽象函数 coolDownIntervalMicros 及 storedPermitsToWaitTime,现分析这两个抽象函数。


coolDownIntervalMicros 函数


**主要含义为生成一个令牌需要消耗的时间,该函数主要应用于计算当前时间可产生的令牌数。根据上文的 UML 图 SmoothRateLimiter 类有两个子类 SmoothBursty 及 SmoothWarmingUp。 **


SmoothBursty 类中对于 coolDownIntervalMicros 函数的实现如下:


@Overridedouble coolDownIntervalMicros() {  return stableIntervalMicros;}
复制代码


可见实现非常简单,仅仅只是返回 stableIntervalMicros 属性,即产生两个令牌需要的时间间隔。


SmoothWarmingUp 类中对于 coolDownIntervalMicros 函数的实现如下:


@Overridedouble coolDownIntervalMicros() {  return warmupPeriodMicros / maxPermits;}
复制代码


  • 其中 maxPermits 属性上文已经出现过,表示当前令牌桶的最大容量。

  • warmupPeriodMicros 属性属于 SmoothWarmingUp 类的特有属性,表示令牌桶中令牌从 0 到 maxPermits 需要经过的时间,故 warmupPeriodMicros / maxPermits 表示在令牌数量达到 maxPermits 之前的令牌产生时间间隔。

storedPermitsToWaitTime 函数

主要表示消耗存储在令牌桶中的令牌需要的时间。


SmoothBursty 类中对于 storedPermitsToWaitTime 函数的实现如下:


@Overridelong storedPermitsToWaitTime(double storedPermits, double permitsToTake) {  return 0L;}
复制代码


直接返回 0,表示消耗令牌不需要时间。


SmoothBursty 类中对于 storedPermitsToWaitTime 函数的实现如下:


@Overridelong storedPermitsToWaitTime(double storedPermits, double permitsToTake) {  double availablePermitsAboveThreshold = storedPermits - thresholdPermits;  long micros = 0;  // measuring the integral on the right part of the function (the climbing line)  if (availablePermitsAboveThreshold > 0.0) {    double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);    // TODO(cpovirk): Figure out a good name for this variable.    double length =        permitsToTime(availablePermitsAboveThreshold)            + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);    micros = (long) (permitsAboveThresholdToTake * length / 2.0);    permitsToTake -= permitsAboveThresholdToTake;  }  // measuring the integral on the left part of the function (the horizontal line)  micros += (long) (stableIntervalMicros * permitsToTake);  return micros;}
复制代码


  • 实现较为复杂,其核心思想在于计算消耗当前存储令牌时需要根据预热设置区别对待。其中涉及到新变量 thresholdPermits,该变量为令牌阈值,当当前存储的令牌数大于该值时,消耗(storedPermits-thresholdPermits)范围的令牌需要有预热的过程(即消耗每个令牌的间隔时间慢慢减小),而消耗 0~thresholdPermits 个数的以存储令牌,每个令牌消耗时间为固定值,即 stableIntervalMicros。

  • 而 thresholdPermits 取值需要考虑预热时间及令牌产生速度两个属性,即 thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;。可见阈值为预热时间中能够产生的令牌数的一半,并且根据注释计算消耗阈值以上的令牌的时间可以转换为计算预热图的梯形面积(实际为积分),本处不详细展开。

  • 使用此种设计可以保证在上次请求间隔时间较长时,令牌桶中存储了较多的令牌,当消耗这些令牌时,最开始的令牌消耗时间较长,后续时间慢慢缩短直到达到 stableIntervalMicros 的状态,产生预热的效果。

实现总结

  • 根据令牌桶算法,桶中的令牌是持续生成存放的,有请求时需要先从桶中拿到令牌才能开始执行,谁来持续生成令牌存放呢?

  • 一种解法是,开启一个定时任务,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制,假设系统中存在 6W 用户,则至多需要开启 6W 个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。

  • 在实现限流器的过程中,基于令牌桶的思想,并且增加了带有预热器的令牌桶限流器实现。被限流的线程使用其自带的 SleepingStopwatch 工具类,最终使用的是 Thread.sleep(ms, ns);方法,而线程使用 sleep 休眠时其持有的锁并不会释放,在多线程编程时此处需要注意。

  • 最后,限流器触发算法采用的是预定令牌的方式,即当前请求需要的令牌数不会对当前请求的等待时间造成影响,而是会影响下一次请求的等待时间。

发布于: 5 小时前阅读数: 6
用户头像

🏆 2021年InfoQ写作平台-签约作者 🏆 2020.03.25 加入

👑【酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“】 🏅 【Java技术领域,MySQL技术领域,APM全链路追踪技术及微服务、分布式方向的技术体系等】 “任何足够先进的技术都是魔法“

评论

发布
暂无评论
☕️【Java专题系列】「回顾 RateLimiter」针对于限流器的入门到精通(针对于源码分析介绍)