写点什么

理论 + 算法 + 实战,教你如何实现亿级流量下的分布式限流

  • 2022 年 2 月 17 日
  • 本文字数:10562 字

    阅读完需:约 35 分钟

本文分享自华为云社区《【高并发】如何实现亿级流量下的分布式限流?这些理论你必须掌握!!》,作者:冰 河。


在互联网应用中,高并发系统会面临一个重大的挑战,那就是大量流高并发访问,比如:天猫的双十一、京东 618、秒杀、抢购促销等,这些都是典型的大流量高并发场景。

高并发系统

限流短时间内巨大的访问流量,我们如何让系统在处理高并发的同时还能保证自身系统的稳定性?有人会说,增加机器就可以了,因为我的系统是分布式的,所以可以只需要增加机器就可以解决问题了。但是,如果你通过增加机器还是不能解决这个问题怎么办呢?而且这种情况下又不能无限制的增加机器,服务器的硬件资源始终都是有限的,在有限的资源下,我们要应对这种大流量高并发的访问,就不得不采取一些其他的措施来保护我们的后端服务系统了,比如:缓存、异步、降级、限流、静态化等。


这里,我们先说说如何实现限流。

什么是限流?

在高并发系统中,限流通常指的是:对高并发访问或者请求进行限速或者对一个时间内的请求进行限速来保护我们的系统,一旦达到系统的限速规则(比如系统限制的请求速度),则可以采用下面的方式来处理这些请求。


  • 拒绝服务(友好提示或者跳转到错误页面)。

  • 排队或等待(比如秒杀系统)。

  • 服务降级(返回默认的兜底数据)。


其实,就是对请求进行限速,比如 10r/s,即每秒只允许 10 个请求,这样就限制了请求的速度。从某种意义上说,限流,其实就是在一定频率上进行量的限制。


限流一般用来控制系统服务请求的速率,比如:天猫双十一的限流,京东 618 的限流,12306 的抢票等。

限流有哪些使用场景?

这里,我们来举一个例子,假设你做了一个商城系统,某个节假日的时候,突然发现提交订单的接口请求比平时请求量突然上涨了将近 50 倍,没多久提交订单的接口就超时并且抛出了异常,几乎不可用了。而且,因为订单接口超时不可用,还导致了系统其它服务出现故障。


我们该如何应对这种大流量场景呢?一种典型的处理方案就是限流。当然了,除了限流之外,还有其他的处理方案,我们这篇文章就主要讲限流。


对稀缺资源的秒杀、抢购;对数据库的高并发读写操作,比如提交订单,瞬间往数据库插入大量的数据;限流可以说是处理高并发问题的利器,有了限流就可以不用担心瞬间高峰流量压垮系统服务或者服务雪崩,最终做到有损服务而不是不服务。


使用限流同样需要注意的是:限流要评估好,测试好,否则会导致正常的访问被限流。

计数器

计数器法

限流算法中最简单粗暴的一种算法,例如,某一个接口 1 分钟内的请求不超过 60 次,我们可以在开始时设置一个计数器,每次请求时,这个计数器的值加 1,如果这个这个计数器的值大于 60 并且与第一次请求的时间间隔在 1 分钟之内,那么说明请求过多;如果该请求与第一次请求的时间间隔大于 1 分钟,并且该计数器的值还在限流范围内,那么重置该计数器。


使用计数器还可以用来限制一定时间内的总并发数,比如数据库连接池、线程池、秒杀的并发数;计数器限流只要一定时间内的总请求数超过设定的阀值则进行限流,是一种简单粗暴的总数量限流,而不是平均速率限流。


这个方法有一个致命问题:临界问题——当遇到恶意请求,在 0:59 时,瞬间请求 100 次,并且在 1:00 请求 100 次,那么这个用户在 1 秒内请求了 200 次,用户可以在重置节点突发请求,而瞬间超过我们设置的速率限制,用户可能通过算法漏洞击垮我们的应用。


这个问题我们可以使用滑动窗口解决。

滑动窗口


在上图中,整个红色矩形框是一个时间窗口,在我们的例子中,一个时间窗口就是 1 分钟,然后我们将时间窗口进行划分,如上图我们把滑动窗口划分为 6 格,所以每一格代表 10 秒,每超过 10 秒,我们的时间窗口就会向右滑动一格,每一格都有自己独立的计数器,例如:一个请求在 0:35 到达, 那么 0:30 到 0:39 的计数器会+1,那么滑动窗口是怎么解决临界点的问题呢?如上图,0:59 到达的 100 个请求会在灰色区域格子中,而 1:00 到达的请求会在红色格子中,窗口会向右滑动一格,那么此时间窗口内的总请求数共 200 个,超过了限定的 100,所以此时能够检测出来触发了限流。回头看看计数器算法,会发现,其实计数器算法就是窗口滑动算法,只不过计数器算法没有对时间窗口进行划分,所以是一格。


由此可见,当滑动窗口的格子划分越多,限流的统计就会越精确。

漏桶算法

算法的思路就是水(请求)先进入到漏桶里面,漏桶以恒定的速度流出,当水流的速度过大就会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。如下图所示。


漏桶算法不支持突发流量。

令牌桶算法


从上图中可以看出,令牌算法有点复杂,桶里存放着令牌 token。桶一开始是空的,token 以固定的速率 r 往桶里面填充,直到达到桶的容量,多余的 token 会被丢弃。每当一个请求过来时,就会尝试着移除一个 token,如果没有 token,请求无法通过。


令牌桶算法支持突发流量。

令牌桶算法实现

Guava 框架提供了令牌桶算法的实现,可直接使用这个框架的 RateLimiter 类创建一个令牌桶限流器,比如:每秒放置的令牌桶的数量为 5,那么 RateLimiter 对象可以保证 1 秒内不会放入超过 5 个令牌,并且以固定速率进行放置令牌,达到平滑输出的效果。

平滑流量示例

这里,我写了一个使用 Guava 框架实现令牌桶算法的示例,如下所示。

package io.binghe.limit.guava;
import com.google.common.util.concurrent.RateLimiter;
/** * @author binghe * @version 1.0.0 * @description 令牌桶算法 */public class TokenBucketLimiter { public static void main(String[] args){ //每秒钟生成5个令牌 RateLimiter limiter = RateLimiter.create(5);
//返回值表示从令牌桶中获取一个令牌所花费的时间,单位是秒 System.out.println(limiter.acquire(1)); System.out.println(limiter.acquire(1)); System.out.println(limiter.acquire(1)); System.out.println(limiter.acquire(1)); System.out.println(limiter.acquire(1)); System.out.println(limiter.acquire(1)); System.out.println(limiter.acquire(1)); System.out.println(limiter.acquire(1)); System.out.println(limiter.acquire(1)); System.out.println(limiter.acquire(1)); }}
复制代码

代码的实现非常简单,就是使用 Guava 框架的 RateLimiter 类生成了一个每秒向桶中放入 5 个令牌的对象,然后不断从桶中获取令牌。我们先来运行下这段代码,输出的结果信息如下所示。

0.00.1972940.1912780.199970.1993050.2004720.2001840.1994170.2001110.199759
复制代码

从输出结果可以看出:第一次从桶中获取令牌时,返回的时间为 0.0,也就是没耗费时间。之后每次从桶中获取令牌时,都会耗费一定的时间,这是为什么呢?按理说,向桶中放入了 5 个令牌后,再从桶中获取令牌也应该和第一次一样并不会花费时间啊!


因为在 Guava 的实现是这样的:我们使用 RateLimiter.create(5)创建令牌桶对象时,表示每秒新增 5 个令牌,1 秒等于 1000 毫秒,也就是每隔 200 毫秒向桶中放入一个令牌。


当我们运行程序时,程序运行到 RateLimiter limiter = RateLimiter.create(5);时,就会向桶中放入一个令牌,当程序运行到第一个 System.out.println(limiter.acquire(1));时,由于桶中已经存在一个令牌,直接获取这个令牌,并没有花费时间。然而程序继续向下执行时,由于程序会每隔 200 毫秒向桶中放入一个令牌,所以,获取令牌时,花费的时间几乎都是 200 毫秒左右。

突发流量示例

我们再来看一个突发流量的示例,代码示例如下所示。

package io.binghe.limit.guava;
import com.google.common.util.concurrent.RateLimiter;
/** * @author binghe * @version 1.0.0 * @description 令牌桶算法 */public class TokenBucketLimiter { public static void main(String[] args){ //每秒钟生成5个令牌 RateLimiter limiter = RateLimiter.create(5);
//返回值表示从令牌桶中获取一个令牌所花费的时间,单位是秒 System.out.println(limiter.acquire(50)); System.out.println(limiter.acquire(5)); System.out.println(limiter.acquire(5)); System.out.println(limiter.acquire(5)); System.out.println(limiter.acquire(5)); }}
复制代码

上述代码表示的含义为:每秒向桶中放入 5 个令牌,第一次从桶中获取 50 个令牌,也就是我们说的突发流量,后续每次从桶中获取 5 个令牌。接下来,我们运行上述代码看下效果。

0.09.9984090.991091.0001480.999752
复制代码

运行代码时,会发现当命令行打印出 0.0 后,会等很久才会打印出后面的输出结果。


程序每秒钟向桶中放入 5 个令牌,当程序运行到 RateLimiter limiter = RateLimiter.create(5); 时,就会向桶中放入令牌。当运行到 System.out.println(limiter.acquire(50)); 时,发现很快就会获取到令牌,花费了 0.0 秒。接下来,运行到第一个 System.out.println(limiter.acquire(5));时,花费了 9.998409 秒。小伙们可以思考下,为什么这里会花费 10 秒中的时间呢?


这是因为我们使用 RateLimiter limiter = RateLimiter.create(5);代码向桶中放入令牌时,一秒钟放入 5 个,而 System.out.println(limiter.acquire(50));需要获取 50 个令牌,也就是获取 50 个令牌需要花费 10 秒钟时间,这是因为程序向桶中放入 50 个令牌需要 10 秒钟。程序第一次从桶中获取令牌时,很快就获取到了。而第二次获取令牌时,花费了将近 10 秒的时间。


Guava 框架支持突发流量,但是在突发流量之后再次请求时,会被限速,也就是说:在突发流量之后,再次请求时,会弥补处理突发请求所花费的时间。所以,我们的突发示例程序中,在一次从桶中获取 50 个令牌后,再次从桶中获取令牌,则会花费 10 秒左右的时间。

Guava 令牌桶算法的特点

  • RateLimiter 使用令牌桶算法,会进行令牌的累积,如果获取令牌的频率比较低,则不会导致等待,直接获取令牌。

  • RateLimiter 由于会累积令牌,所以可以应对突发流量。也就是说如果同时请求 5 个令牌,由于此时令牌桶中有累积的令牌,能够快速响应请求。

  • RateLimiter 在没有足够的令牌发放时,采用的是滞后的方式进行处理,也就是前一个请求获取令牌所需要等待的时间由下一次请求来承受和弥补,也就是代替前一个请求进行等待。(这里,小伙伴们要好好理解下)

HTTP 接口限流实战

这里,我们实现 Web 接口限流,具体方式为:使用自定义注解封装基于令牌桶限流算法实现接口限流。

不使用注解实现接口限流

搭建项目

这里,我们使用 SpringBoot 项目来搭建 Http 接口限流项目,SpringBoot 项目本质上还是一个 Maven 项目。所以,小伙伴们可以直接创建一个 Maven 项目,我这里的项目名称为 mykit-ratelimiter-test。接下来,在 pom.xml 文件中添加如下依赖使项目构建为一个 SpringBoot 项目。

<parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.2.6.RELEASE</version>    </parent>
<modelVersion>4.0.0</modelVersion><groupId>io.mykit.limiter</groupId><artifactId>mykit-ratelimiter-test</artifactId><version>1.0.0-SNAPSHOT</version><packaging>jar</packaging><name>mykit-ratelimiter-test</name>
<properties> <guava.version>28.2-jre</guava.version></properties>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-undertow</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
<dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency>
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency></dependencies>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version><!--$NO-MVN-MAN-VER$--> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> </plugins></build>
复制代码


可以看到,我在项目中除了引用了 SpringBoot 相关的 Jar 包外,还引用了 guava 框架,版本为 28.2-jre。

创建核心类

这里,我主要是模拟一个支付接口的限流场景。首先,我们定义一个 PayService 接口和 MessageService 接口。PayService 接口主要用于模拟后续的支付业务,MessageService 接口模拟发送消息。接口的定义分别如下所示。


  • PayService

package io.mykit.limiter.service;import java.math.BigDecimal;/** * @author binghe * @version 1.0.0 * @description 模拟支付 */public interface PayService {    int pay(BigDecimal price);}
复制代码


  • MessageService

package io.mykit.limiter.service;/** * @author binghe * @version 1.0.0 * @description 模拟发送消息服务 */public interface MessageService {    boolean sendMessage(String message);}
复制代码

接下来,创建二者的实现类,分别如下。

  • MessageServiceImpl

package io.mykit.limiter.service.impl;import io.mykit.limiter.service.MessageService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;/** * @author binghe * @version 1.0.0 * @description 模拟实现发送消息 */@Servicepublic class MessageServiceImpl implements MessageService {    private final Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);    @Override    public boolean sendMessage(String message) {        logger.info("发送消息成功===>>" + message);        return true;    }}
复制代码
  • PayServiceImpl

package io.mykit.limiter.service.impl;import io.mykit.limiter.service.PayService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;import java.math.BigDecimal;/** * @author binghe * @version 1.0.0 * @description 模拟支付 */@Servicepublic class PayServiceImpl implements PayService {    private final Logger logger = LoggerFactory.getLogger(PayServiceImpl.class);    @Override    public int pay(BigDecimal price) {        logger.info("支付成功===>>" + price);        return 1;    }}
复制代码

由于是模拟支付和发送消息,所以,我在具体实现的方法中打印出了相关的日志,并没有实现具体的业务逻辑。


接下来,就是创建我们的 Controller 类 PayController,在 PayController 类的接口 pay()方法中使用了限流,每秒钟向桶中放入 2 个令牌,并且客户端从桶中获取令牌,如果在 500 毫秒内没有获取到令牌的话,我们可以则直接走服务降级处理。


PayController 的代码如下所示。

package io.mykit.limiter.controller;import com.google.common.util.concurrent.RateLimiter;import io.mykit.limiter.service.MessageService;import io.mykit.limiter.service.PayService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.math.BigDecimal;import java.util.concurrent.TimeUnit;
/** * @author binghe * @version 1.0.0 * @description 测试接口限流 */@RestControllerpublic class PayController { private final Logger logger = LoggerFactory.getLogger(PayController.class); /** * RateLimiter的create()方法中传入一个参数,表示以固定的速率2r/s,即以每秒2个令牌的速率向桶中放入令牌 */ private RateLimiter rateLimiter = RateLimiter.create(2);
@Autowired private MessageService messageService; @Autowired private PayService payService; @RequestMapping("/boot/pay") public String pay(){ //记录返回接口 String result = ""; //限流处理,客户端请求从桶中获取令牌,如果在500毫秒没有获取到令牌,则直接走服务降级处理 boolean tryAcquire = rateLimiter.tryAcquire(500, TimeUnit.MILLISECONDS); if (!tryAcquire){ result = "请求过多,降级处理"; logger.info(result); return result; } int ret = payService.pay(BigDecimal.valueOf(100.0)); if(ret > 0){ result = "支付成功"; return result; } result = "支付失败,再试一次吧..."; return result; }}
复制代码

最后,我们来创建 mykit-ratelimiter-test 项目的核心启动类,如下所示。

package io.mykit.limiter;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;
/** * @author binghe * @version 1.0.0 * @description 项目启动类 */@SpringBootApplicationpublic class MykitLimiterApplication {
public static void main(String[] args){ SpringApplication.run(MykitLimiterApplication.class, args); }}
复制代码

至此,我们不使用注解方式实现限流的 Web 应用就基本完成了。

运行项目

项目创建完成后,我们来运行项目,运行 SpringBoot 项目比较简单,直接运行 MykitLimiterApplication 类的 main()方法即可。


项目运行成功后,我们在浏览器地址栏输入链接:http://localhost:8080/boot/pay。页面会输出“支付成功”的字样,说明项目搭建成功了。如下所示。


此时,我只访问了一次,并没有触发限流。接下来,我们不停的刷浏览器,此时,浏览器会输出“支付失败,再试一次吧…”的字样,如下所示。


在 PayController 类中还有一个 sendMessage()方法,模拟的是发送消息的接口,同样使用了限流操作,具体代码如下所示。


@RequestMapping("/boot/send/message")public String sendMessage(){    //记录返回接口    String result = "";    //限流处理,客户端请求从桶中获取令牌,如果在500毫秒没有获取到令牌,则直接走服务降级处理    boolean tryAcquire = rateLimiter.tryAcquire(500, TimeUnit.MILLISECONDS);    if (!tryAcquire){        result = "请求过多,降级处理";        logger.info(result);        return result;    }    boolean flag = messageService.sendMessage("恭喜您成长值+1");    if (flag){        result = "消息发送成功";        return result;    }    result = "消息发送失败,再试一次吧...";    return result;}
复制代码


sendMessage()方法的代码逻辑和运行效果与 pay()方法相同,我就不再浏览器访问 http://localhost:8080/boot/send/message 地址的访问效果了,小伙伴们可以自行验证。

不使用注解实现限流缺点

通过对项目的编写,我们可以发现,当在项目中对接口进行限流时,不使用注解进行开发,会导致代码出现大量冗余,每个方法中几乎都要写一段相同的限流逻辑,代码十分冗余。

如何解决代码冗余的问题呢?我们可以使用自定义注解进行实现。

使用注解实现接口限流

使用自定义注解,我们可以将一些通用的业务逻辑封装到注解的切面中,在需要添加注解业务逻辑的方法上加上相应的注解即可。针对我们这个限流的实例来说,可以基于自定义注解实现。

实现自定义注解

实现,我们来创建一个自定义注解,如下所示。

package io.mykit.limiter.annotation;import java.lang.annotation.*;/** * @author binghe * @version 1.0.0 * @description 实现限流的自定义注解 */@Target(value = ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface MyRateLimiter {    //向令牌桶放入令牌的速率    double rate();    //从令牌桶获取令牌的超时时间    long timeout() default 0;}
复制代码

自定义注解切面实现

接下来,我们还要实现一个切面类 MyRateLimiterAspect,如下所示。

package io.mykit.limiter.aspect;
import com.google.common.util.concurrent.RateLimiter;import io.mykit.limiter.annotation.MyRateLimiter;import org.aspectj.lang.ProceedingJoinPoint;import org.aspectj.lang.annotation.Around;import org.aspectj.lang.annotation.Aspect;import org.aspectj.lang.annotation.Pointcut;import org.aspectj.lang.reflect.MethodSignature;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletResponse;import java.io.IOException;import java.io.PrintWriter;import java.util.concurrent.TimeUnit;
/** * @author binghe * @version 1.0.0 * @description 一般限流切面类 */@Aspect@Componentpublic class MyRateLimiterAspect {
private RateLimiter rateLimiter = RateLimiter.create(2);
@Pointcut("execution(public * io.mykit.limiter.controller.*.*(..))") public void pointcut(){
}
/** * 核心切面方法 */ @Around("pointcut()") public Object process(ProceedingJoinPoint proceedingJoinPoint) throws Throwable{ MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
//使用反射获取方法上是否存在@MyRateLimiter注解 MyRateLimiter myRateLimiter = signature.getMethod().getDeclaredAnnotation(MyRateLimiter.class); if(myRateLimiter == null){ //程序正常执行,执行目标方法 return proceedingJoinPoint.proceed(); } //获取注解上的参数 //获取配置的速率 double rate = myRateLimiter.rate(); //获取客户端等待令牌的时间 long timeout = myRateLimiter.timeout();
//设置限流速率 rateLimiter.setRate(rate);
//判断客户端获取令牌是否超时 boolean tryAcquire = rateLimiter.tryAcquire(timeout, TimeUnit.MILLISECONDS); if(!tryAcquire){ //服务降级 fullback(); return null; } //获取到令牌,直接执行 return proceedingJoinPoint.proceed();
}
/** * 降级处理 */ private void fullback() { response.setHeader("Content-type", "text/html;charset=UTF-8"); PrintWriter writer = null; try { writer = response.getWriter(); writer.println("出错了,重试一次试试?"); writer.flush();; } catch (IOException e) { e.printStackTrace(); }finally { if(writer != null){ writer.close(); } } }}
复制代码

自定义切面的功能比较简单,我就不细说了,大家有啥问题可以关注【冰河技术】微信公众号来进行提问。

接下来,我们改造下 PayController 类中的 sendMessage()方法,修改后的方法片段代码如下所示。

@MyRateLimiter(rate = 1.0, timeout = 500)@RequestMapping("/boot/send/message")public String sendMessage(){    //记录返回接口    String result = "";    boolean flag = messageService.sendMessage("恭喜您成长值+1");    if (flag){        result = "消息发送成功";        return result;    }    result = "消息发送失败,再试一次吧...";    return result;}
复制代码

运行部署项目

部署项目比较简单,只需要运行 MykitLimiterApplication 类下的 main()方法即可。这里,为了简单,我们还是从浏览器中直接输入链接地址来进行访问

效果如下所示。

接下来,我们不断的刷新浏览器。会出现“消息发送失败,再试一次吧…”的字样,说明已经触发限流操作。


点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚阅读数: 2
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
理论+算法+实战,教你如何实现亿级流量下的分布式限流