写点什么

🚀【Guava 技术指南】「RateLimiter 类」服务请求流控实现方案

发布于: 3 小时前
🚀【Guava技术指南】「RateLimiter类」服务请求流控实现方案

背景介绍

后台服务的,都会接触到流控,一般的场景就是在流量异常,比如遭受攻击的时候,保障服务不过载,在可支持的范围内提供稳定的服务。比如我们的服务支持 100QPS,当一下子来了 1000 个请求的时候,我们在可服务的范围内,每秒处理 100 个请求,这样在牺牲一些响应时效性的时候,可以保证服务不会 crash。

限流算法

常用的限流算法有漏桶算法和令牌桶算法,guava 的 RateLimiter 使用的是令牌桶算法,也就是以固定的频率向桶中放入令牌,例如一秒钟 10 枚令牌,实际业务在每次响应请求之前都从桶中获取令牌,只有取到令牌的请求才会被成功响应,获取的方式有两种:阻塞等待令牌或者取不到立即返回失败。

实现原理

简单来说,就是当有大量请求进来的时候,限制请求的频率,维持其在一个稳定的区间。而其具体的方法,简单来说就是,根据上次处理的时间戳和允许的每秒允许的请求,来决定下次可以执行的时间。而 RateLimiter 主要是利用了一个令牌桶的算法,如下:

系统以恒定的速率产生令牌(permit),当来一个请求的时候,会请求一个或者多个令牌,当且仅当系统有这么多个令牌的时候,请求才被允许执行,否则就一直等待令牌的生成。

技术选项

Guava 给我们提供了好用的流控工具,场景是外部接收请求并且在处理请求时候,从桶中申请令牌,申请到了就成功响应,申请不到时直接返回失败。

RateLimiter 从概念上来讲,速率限制器会在可配置的速率下分配许可证。如果必要的话,每个 acquire()会阻塞当前线程直到许可证可用后获取该许可证,一旦获取到许可证,不需要再释放许可证。

RateLimiter 使用的是一种叫令牌桶的流控算法,RateLimiter 会按照一定的频率往桶里扔令牌,线程拿到令牌才能执行,希望自己的应用程序 QPS 不要超过 1000,那么 RateLimiter 设置 1000 的速率后,就会每秒往桶里扔 1000 个令牌。

Maven 依赖

<dependency>  <groupId>com.google.guava</groupId>  <artifactId>guava</artifactId>  <version>22.0</version></dependency>
复制代码

实战案例

AccessLimitService.java 限流服务封装到一个类中 AccessLimitService,提供 tryAcquire()方法,用来尝试获取令牌,返回 true 表示获取到

@Servicepublic class AccessLimitService {
//每秒只发出5个令牌 RateLimiter rateLimiter = RateLimiter.create(5.0);
/** * 尝试获取令牌 * @return */ public boolean tryAcquire(){ return rateLimiter.tryAcquire(); }}
复制代码

每次收到请求的时候都尝试去获取令牌,获取成功和失败打印不同的信息

@Controllerpublic class HelloController {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired private AccessLimitService accessLimitService;
@RequestMapping("/access") @ResponseBody public String access(){ //尝试获取令牌 if(accessLimitService.tryAcquire()){ //模拟业务执行500毫秒 try { Thread.sleep(500); }catch (InterruptedException e){ e.printStackTrace(); } return "aceess success [" + sdf.format(new Date()) + "]"; }else{ return "aceess limit [" + sdf.format(new Date()) + "]"; } }}
复制代码

测试:十个线程并发访问接口


public class AccessClient { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
/** * get请求 * @param realUrl * @return */ public static String sendGet(URL realUrl) { String result = ""; BufferedReader in = null; try { // 打开和URL之间的连接 URLConnection connection = realUrl.openConnection(); // 设置通用的请求属性 connection.setRequestProperty("accept", "*/*"); connection.setRequestProperty("connection", "Keep-Alive"); connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)"); // 建立实际的连接 connection.connect();
// 定义 BufferedReader输入流来读取URL的响应 in = new BufferedReader(new InputStreamReader( connection.getInputStream())); String line; while ((line = in.readLine()) != null) { result += line; } } catch (Exception e) { System.out.println("发送GET请求出现异常!" + e); e.printStackTrace(); } // 使用finally块来关闭输入流 finally { try { if (in != null) { in.close(); } } catch (Exception e2) { e2.printStackTrace(); } } return result; }
public void access() throws Exception{ final URL url = new URL("http://localhost:8080/guavalimitdemo/access");
for(int i=0;i<10;i++) { fixedThreadPool.submit(new Runnable() { public void run() { System.out.println(sendGet(url)); } }); }
fixedThreadPool.shutdown(); fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); }
public static void main(String[] args) throws Exception{ AccessClient accessClient = new AccessClient(); accessClient.access(); }}
复制代码

部分请求由于获取的令牌可以成功执行,其余请求没有拿到令牌,我们可以根据实际业务来做区分处理。还有一点要注意,我们通过 RateLimiter.create(5.0)配置的是每一秒 5 枚令牌,但是限流的时候发出的是 6 枚,改用其他值验证,也是实际的比配置的大 1。

以上就是快速实现限流的实战过程,仅是单进程服务的限流,而实际的分布式服务中会考虑更多因素,会复杂很多。

举例来说明如何使用 RateLimiter,想象下我们需要处理一个任务列表,但我们不希望每秒的任务提交超过两个:

//速率是每秒两个许可final RateLimiter rateLimiter = RateLimiter.create(2.0);void submitTasks(List tasks, Executor executor) {    for (Runnable task : tasks) {        rateLimiter.acquire(); // 也许需要等待        executor.execute(task);    }}
复制代码

应用方法

  • acquire():从 RateLimiter 获取一个许可,该方法会被阻塞直到获取到请求。

  • acquire(int permits):从 RateLimiter 获取指定许可数,该方法会被阻塞直到获取到请求。

  • create(double permitsPerSecond):根据指定的稳定吞吐率创建 RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指 QPS,每秒多少查询)。

  • create(double permitsPerSecond, long warmupPeriod, TimeUnit unit):根据指定的稳定吞吐率和预热期来创建 RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指 QPS,每秒多少个请求量),在这段预热时间内,RateLimiter 每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和)

    getRate():返回 RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数。

    setRate(double permitsPerSecond):更新 RateLimite 的稳定速率,参数 permitsPerSecond 由构造 RateLimiter 的工厂方法提供。

    tryAcquire():从 RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话

    tryAcquire(int permits)从 RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话

    tryAcquire(int permits, long timeout, TimeUnit unit):从 RateLimiter 获取指定许可数如果该许可数可以在不超过 timeout 的时间内获取得到的话,或者如果无法在 timeout 过期之前获取得到许可数的话,那么立即返回 false (无需等待)

    tryAcquire(long timeout, TimeUnit unit):从 RateLimiter 获取许可如果该许可可以在不超过 timeout 的时间内获取得到的话,或者如果无法在 timeout 过期之前获取得到许可的话,那么立即返回 false(无需等待)


RateLimiter 经常用于限制对一些物理资源或者逻辑资源的访问速率。与 Semaphore 相比,Semaphore 限制了并发访问的数量而不是使用速率。

源码分析

相关源码基于 guava-28.0-jre 的版本相关的核心类均在com.google.common.util.concurrent里面,可见这些方法都是线程安全的,具体有如下几个

  • RateLimiter流控主类,也是一个抽象类

  • SmoothRateLimiter平滑流控类,这是 Guava 默认实现的一种流控方式,保障服务器已稳定的速率处理请求或者获取资源

  • SmoothWarmingUp该类实现一个热启动的功能,即流量由低到高,然后达到一个稳定的状态

  • SmoothBursty该类支持突发请求的状况,支持一下子来很多请求(但是在可控范围内)的情况

  • SleepingStopwatch实现一个不可中断的 sleep 的操作

下面简单介绍下几个类的关系,UML 类图关系如下

限速器
com.google.common.util.concurrent.RateLimiter@ThreadSafe@Betapublicabstract class RateLimiter extends Object
复制代码
原理
  • 保持分发的速率,以一定速率分发令牌,比如我们设置permitsPerSecond为 500 的话,则每 2 毫秒产生一个令牌。

  • 令牌会存储,若一定时间没有请求,可用令牌会存储下来,当然会有一个上限值,当下次来请求的时候,优先使用现有的存储的令牌

  • 会有一个nextFreeTicketMicros来记录下次有可用令牌的时间戳,在这个时间之前,所有的请求均不能通过。

核心方法
  • public static RateLimiter create(double permitsPerSecond):该方法会创建一个 RateLimiter 实例,其每秒产生 permitsPerSecond 个令牌

  • public double acquire(int permits):该方法是用于获取 N 个令牌的方法,如果系统内令牌不够,则一直等待直到有足够令牌可用

  public double acquire(int permits) {    long microsToWait = reserve(permits);    stopwatch.sleepMicrosUninterruptibly(microsToWait);    return 1.0 * microsToWait / SECONDS.toMicros(1L);  }
复制代码


  • 根据所需令牌计算等待时间

  • 执行等待的动作

  • 返回等待的毫秒数

计算等待时间的函数是reserve,其相关的实现

  final long reserve(int permits) {    checkPermits(permits);    synchronized (mutex()) {      return reserveAndGetWaitLength(permits, stopwatch.readMicros());    }  }
复制代码


  final long reserveAndGetWaitLength(int permits, long nowMicros) {    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);    return max(momentAvailable - nowMicros, 0);  }
复制代码


 final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {    resync(nowMicros);//若当前时间大于nextFreeTicketMicros,则需要将storedPermits的值同步    long returnValue = nextFreeTicketMicros;    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);//获取可使用的storedPermits    double freshPermits = requiredPermits - storedPermitsToSpend;    long waitMicros =        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)            + (long) (freshPermits * stableIntervalMicros);//如若需要新的令牌,则计算需要等待的时间
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);//重新计算下次令牌可用的时间 this.storedPermits -= storedPermitsToSpend; return returnValue; }
复制代码


  • public boolean tryAcquire(int permits, Duration timeout):该方法用户获取另外,如果在 timeout 时间内可以获取到足够的令牌,则等待,否则直接返回 false


发布于: 3 小时前阅读数: 6
用户头像

🏆2021年InfoQ写作平台-签约作者 🏆 2020.03.25 加入

👑【酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“】 🏅 【Java技术领域,MySQL技术领域,APM全链路追踪技术及微服务、分布式方向的技术体系等】 我们始于迷惘,终于更高水平的迷惘

评论

发布
暂无评论
🚀【Guava技术指南】「RateLimiter类」服务请求流控实现方案