写点什么

Java 培训 - 怎样通过 Bucket4j 提供速率限制

作者:@零度
  • 2022 年 4 月 08 日
  • 本文字数:5985 字

    阅读完需:约 20 分钟

以下文章来源于终码一生

时不时地,我们所有人都面临着限制我们的外部 API 的问题——出于多种原因,我们应该限制对我们 API 的调用的某些功能。

1.哪里需要?

  1. 欺诈检测(保护机器人):例如,我们有一个论坛,当有人试图发送消息或发布超出限制的帖子时,我们希望防止来自客户的垃圾邮件。为了我们自己的安全,我们必须防止这种行为。

  2. 从业务逻辑来看,通常是用来实现“API 业务模型”的:比如我们需要为我们的外部 API 引入关税功能,我们要创建一些关税,例如 START、STANDARD、BUSINESS . 对于每种资费,我们设置了每小时的通话次数限制(但您也可以将通话次数限制为每分钟、每秒、毫秒一次,也可以设置为每分钟一次。此外,您甚至可以设置多个限制限制 - 这称为“带宽管理”)。

  • START - 每小时最多 100 个电话

  • 标准 - 每小时最多 10000 个

  • 商业 - 高达每小时 100000

在我们的项目中使用速率限制还有很多其他原因_java培训

为了实现限速,我们可以使用很多流行的算法,如下所示。

最受欢迎的:

  • 令牌桶

  • 漏桶

最不受欢迎的:

  • 固定窗口计数器

  • 滑动窗口日志

  • 滑动窗口计数器

在本文中,我们将讨论“令牌桶”算法。

2.“令牌桶”算法的解释

让我们在下一个例子中考虑这个算法。



  • Bucket:如你所见,他的 token 数量是固定的(如果你在我们的 bucket 中设置了 1000 个 token,这就是 volume 的最大值)。

  • Refiller:根据带宽管理定期将丢失的 token 填充到 Bucket(每次 Consume 前调用)。

  • Consume:从我们的 Bucket 中取出 tokens(取出 1 个 token 或多个 token——通常取决于调用 consume 方法的权重,它是一个可定制且灵活的变量,但在 99%的情况下,我们只需要消费一个令牌)。

下面,您可以看到一个使用带宽管理以每分钟刷新令牌的填充器示例:



加油机与带宽管理一起工作。消费(作为行动)从桶中拿走代币。

存储桶用于存储当前的令牌计数、最大可能的令牌计数以及生成新令牌的刷新时间。

Token Bucket 算法有固定的内存用于存储 Bucket,它由以下变量组成:

  • Bucket 的体积(最大可能的令牌数) - 8 个字节

  • 桶中令牌的当前计数 - 8 个字节

  • 生成新令牌的纳秒计数 - 8 个字节

  • 对象头:16 字节

总共:40 字节

例如,在 1 GB 中,我们可以存储 2500 万个桶。了解这一点非常重要,因为通常我们会将有关存储桶的信息存储在缓存中,然后存储到 RAM(随机存取存储器)中。

3.算法的缺点

不幸的是,该算法并不完美。Token Bucket 算法的主要问题被称为“Burst”。

我将用一个完美的例子来说明这个问题,以简短地解释这个想法:

  1. 在某些时候,我们的存储桶包含 100 个令牌。

  2. 同时,我们消耗了 100 个代币。

  3. 一秒钟后,填充器再次填充 100 个令牌。

  4. 同时,我们消耗了 100 个代币。

大约 1 秒,我们消耗了 200 个代币,因此,我们超过了限制 x2 倍!

但是,有问题吗?没有!如果我们要使用 Bucket 进行长期距离,问题就不是问题了。

如果我们只使用我们的 Bucket 1 秒,我们会过度消耗令牌 x2 倍(200 个令牌),但是如果我们使用我们的 Bucket 60 秒,则该桶的消耗大约等于 6100 秒,因为 Burst 问题只发生了一次. 您使用铲斗的次数越多,其准确性就越好。java培训机构当准确性在速率限制中很重要时,这是一种非常罕见的情况。

最重要的是消耗内存,因为我们有一个与“Burst”相关的问题。一个 bucket 有固定内存大小的要求(在 Token Bucket 算法的情况下 - 40 字节),我们面临“Burst”的问题,因为要创建 Bucket 我们需要 2 个变量:生成新令牌的纳秒计数(refill)和 bucket 的体积(容量)——因此,我们无法实现 Token Bucket 的准确性合约

4.通过 Bucket4j 实现 Rate-Limiter

让我们考虑一下 Bucket4j 库实现的 Token Bucket 算法。

Bucket4j 是 Java 世界中用于实现速率限制功能的最流行的库。每个月,Bucket4j 从 Maven Central 下载多达 200,000 次,并包含在 GitHub 上的 3500 个依赖项中。

让我们考虑几个简单的例子(我们将使用 Maven 作为软件项目管理和理解工具)。

对于第一个,我们需要在 pom.xml 中添加一个依赖项:

<dependency><groupId>com.github.vladimir-bukhtoyarov</groupId><artifactId>bucket4j-core</artifactId><version>7.0.0</version></dependency>

创建 Example.java:

import io.github.bucket4j.Bandwidth;import io.github.bucket4j.Bucket;import io.github.bucket4j.Bucket4j;import io.github.bucket4j.ConsumptionProbe;import java.time.Duration;public class Example {public static void main(String args[]) {//Create the Bandwidth to set the rule - one token per minuteBandwidth oneCosumePerMinuteLimit = Bandwidth.simple(1, Duration.ofMinutes(1));//Create the Bucket and set the Bandwidth which we created aboveBucket bucket = Bucket.builder().addLimit(oneCosumePerMinuteLimit).build();//Call method tryConsume to set count of Tokens to take from the Bucket,//returns boolean, if true - consume successful and the Bucket had enough Tokens inside Bucket to execute method tryConsumeSystem.out.println(bucket.tryConsume(1)); //return true//Call method tryConsumeAndReturnRemaining and set count of Tokens to take from the Bucket//Returns ConsumptionProbe, which include much more information than tryConsume, such as the//isConsumed - is method consume successful performed or not, if true - is successful//getRemainingTokens - count of remaining Tokens//getNanosToWaitForRefill - Time in nanoseconds to refill Tokens in our BucketConsumptionProbe consumptionProbe = bucket.tryConsumeAndReturnRemaining(1);System.out.println(consumptionProbe.isConsumed()); //return false since we have already called method tryConsume, but Bandwidth has a limit with rule - one token per one minuteSystem.out.println(consumptionProbe.getRemainingTokens()); //return 0, since we have already consumed all of the TokensSystem.out.println(consumptionProbe.getNanosToWaitForRefill()); //Return around 60000000000 nanoseconds}

好的,我认为它看起来简单易懂!

让我们考虑一个更困难的例子。让我们想象一种情况,您需要考虑通过对某个 RESTful API 方法的请求计数来限制(需要通过来自某个用户对某个控制器的请求调用计数来限制,每个 Y 周期不超过 X 次)。但是,我们的系统是分布式的,我们在一个集群中有很多笔记;我们使用 Hazelcast(但它可以是任何 JSR107 缓存、DynamoDB、Redis 或其他东西)。

让我们基于 Spring 框架来实现我们的示例。

首先,我们需要在 pom.xml 中添加一些依赖项:

<dependency><groupId>com.github.vladimir-bukhtoyarov</groupId><artifactId>bucket4j-hazelcast</artifactId><version>7.0.0</version></dependency><dependency><groupId>javax.cache</groupId><artifactId>cache-api</artifactId><version>1.0.0</version></dependency><dependency><groupId>com.hazelcast</groupId><artifactId>hazelcast</artifactId><version>4.0.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope></dependency>

对于下一步,我们应该考虑在将来在控制器级别上使用注释:

@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.METHOD)@Documentedpublic @interface RateLimiter {TimeUnit timeUnit() default TimeUnit.MINUTES;long timeValue();long restriction();}

此外,注释将分组 RateLimiter 注释(如果我们需要为每个控制器使用多个带宽)。

import java.lang.annotation.*;@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.METHOD)@Documentedpublic @interface RateLimiters {RateLimiter[] value();}

另外,需要添加新的数据类型:

public enum TimeUnit {MINUTES, HOURS}

而且,现在,我们需要创建一个类,它将进行注释处理。由于将在控制器级别设置注释,因此该类应从 HandlerInterceptorAdapter 扩展:

public class RateLimiterAnnotationHandlerInterceptorAdapter extends HandlerInterceptorAdapter {//You should have already realized class, which returns Authentication context to getting userIdprivate AuthenticationUtil authenticationUtil;private final ProxyManager<RateLimiterKey> proxyManager;@Autowiredpublic RateLimiterAnnotationHandlerInterceptorAdapter(AuthenticationUtil authenticationUtil, HazelcastInstance hazelcastInstance) {this.authenticationUtil = authenticationUtil;//To start work with Hazelcast, you also should create HazelcastInstance beanIMap<RateLimiterKey, byte[]> bucketsMap = hazelcastInstance.getMap(HazelcastFrontConfiguration.RATE_LIMITER_BUCKET);proxyManager = new HazelcastProxyManager<>(bucketsMap);}@Overridepublic boolean preHandle(HttpServletRequest request,HttpServletResponse response,Object handler) throws Exception {if (handler instanceof HandlerMethod) {HandlerMethod handlerMethod = (HandlerMethod) handler;//if into handlerMethod is present RateLimiter or RateLimiters annotation, we get it, if not, we get empty OptionalOptional<List<RateLimiter>> rateLimiters = RateLimiterUtils.getRateLimiters(handlerMethod);if (rateLimiters.isPresent()) {//Get path from RequestMapping annotation(respectively we can get annotations such: GetMapping, PostMapping, PutMapping, DeleteMapping, because all of than annotations are extended from RequestMapping)RequestMapping requestMapping = handlerMethod.getMethodAnnotation(RequestMapping.class);//To get unique key we use bundle of 2-x values: path from RequestMapping and user idRateLimiterKey key = new RateLimiterKey(authenticationUtil.getPersonId(), requestMapping.value());//Further we set key in proxy to get Bucket from cache or create a new BucketBucket bucket = proxyManager.builder().build(key, () -> RateLimiterUtils.rateLimiterAnnotationsToBucketConfiguration(rateLimiters.get()));//Try to consume token, if we don’t do that, we return 429 HTTP codeif (!bucket.tryConsume(1)) {response.setStatus(429);return false;}}}return true;}

要使用 Hazelcast,我们需要创建一个必须可序列化的自定义键:

@Data@AllArgsConstructorpublic class RateLimiterKey implements Serializable {private String userId;private String[] uri;}

此外,不要忘记名为 RateLimiterUtils 的特殊实用程序类,用于与 RateLimiterAnnotationHandlerInterceptorAdapter 一起工作(Spring 名称约定样式 - 将您的类或方法命名为必须易于理解,即使以您的名义包含 10 个单词。这是我的目标风格)。

public final class RateLimiterUtils {public static BucketConfiguration rateLimiterAnnotationsToBucketConfiguration(List<RateLimiter> rateLimiters) {ConfigurationBuilder configBuilder = Bucket4j.configurationBuilder();rateLimiters.stream().forEach(limiter -> configBuilder.addLimit(buildBandwidth(limiter)));return configBuilder.build();}public static Optional<List<RateLimiter>> getRateLimiters(HandlerMethod handlerMethod) {RateLimiters rateLimitersAnnotation = handlerMethod.getMethodAnnotation(RateLimiters.class);if(rateLimitersAnnotation != null) {return Optional.of(Arrays.asList(rateLimitersAnnotation.value()));}RateLimiter rateLimiterAnnotation = handlerMethod.getMethodAnnotation(RateLimiter.class);if(rateLimiterAnnotation != null) {return Optional.of(Arrays.asList(rateLimiterAnnotation));}return Optional.empty();}private static final Bandwidth buildBandwidth(RateLimiter rateLimiter) {TimeUnit timeUnit = rateLimiter.timeUnit();long timeValue = rateLimiter.timeValue();long restriction = rateLimiter.restriction();if (TimeUnit.MINUTES.equals(timeUnit)) {return Bandwidth.simple(restriction, Duration.ofMinutes(timeValue));} else if (TimeUnit.HOURS.equals(timeUnit)) {return Bandwidth.simple(restriction, Duration.ofHours(timeValue));} else {return Bandwidth.simple(5000, Duration.ofHours(1));}}}

还有一件事; 我们需要在扩展自 WebMvcConfigurerAdapter 的 Context 中注册我们的自定义拦截器:

import org.springframework.context.annotation.Configuration;import org.springframework.web.servlet.config.annotation.InterceptorRegistry;import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;@Configurationpublic class ContextConfig extends WebMvcConfigurerAdapter {@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new RateLimiterAnnotationHandlerInterceptorAdapter());}}

现在,为了测试我们的机制,我们将创建 ExampleController 并在控制器的方法上方设置 RateLimiter 以检查它是否正常工作:

import com.nibado.example.customargumentspring.component.RateLimiter;import com.nibado.example.customargumentspring.component.RateLimiters;import com.nibado.example.customargumentspring.component.TimeUnit;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class ExampleController {@RateLimiters({@RateLimiter(timeUnit = TimeUnit.MINUTES, timeValue = 1, restriction = 2), @RateLimiter(timeUnit = TimeUnit.HOURS, timeValue = 1, restriction = 5)})@GetMapping("/example/{id}")public String example(@PathVariable("id") String id) {return "ok";}}

在 @RateLimiters 中,我们设置了两个限制:

  • @RateLimiter(timeUnit = TimeUnit.MINUTES, timeValue = 1,restriction = 2) — 每分钟不超过 2 个请求。

  • @RateLimiter(timeUnit = TimeUnit.HOURS, timeValue = 1,restriction = 5) — 每小时不超过 5 个请求。

这只是 Bucket4j 库的一小部分。如果你觉得这个库不错的话,可以去学习更多 API。

用户头像

@零度

关注

关注尚硅谷,轻松学IT 2021.11.23 加入

IT培训 www.atguigu.com

评论

发布
暂无评论
Java培训-怎样通过 Bucket4j 提供速率限制_JAVA开发_@零度_InfoQ写作平台