写点什么

图解漏桶(LeakyBucket)限流器的实现原理

作者:Go学堂
  • 2022-11-18
    北京
  • 本文字数:2861 字

    阅读完需:约 9 分钟

图解漏桶(LeakyBucket)限流器的实现原理

leaky bucket 也叫漏桶,就是将请求先放到一个桶中进行排队,然后按一个固定的速率来处理请求,即所谓的漏出。


桶具有一定的容量,即最大容量的请求数,当排队请求的数量超过桶的容量时,再进来的请求就直接过滤掉,不再被处理。换句话说就是请求先在桶中排队,系统或服务只以一个恒定的速度从桶中将请求取出进行处理。如果排队的请求数量超过桶能够容纳的最大数量,即桶装满了,则直接丢弃。


01 计数原理算法概述

算法的实现有很多种,本文基于计数原理的实现来进行讲解。计数原理实际上就是当请求到来时,基于当前时间和预订的处理速率来计算该请求应该等待多长时间才能被处理


该算法主要的属性字段有以下三个:

  • 处理请求的速率 rate。该值代表多久处理一个请求。实际上就是指处理完该请求后,要等待多久才能处理下一个请求。 比如我们初始化时指定服务每 100ms 处理一个请求,也就是每处理 1 个请求,需要等待 100ms 才能处理下一个请求。

  • 桶的最大容量 capacity。该值代表我们最多允许多少个请求排队,超过该值,就直接返回,不用等待了。这个在生活中有很多类似场景:有一次我们去公园排队坐游船,排了很长的队伍。管理员过来告诉我们,只有前 20 个人能排上号,20 号后面的就可以不用排了。

  • 桶中最后一个排队请求被处理的时间 last。该值有两个作用:

  • 第一个作用是当有新请求进来的时候,就可以计算出新请求需要被处理的时间:last+rate

  • 第二个作用是根据 last、当前时间 t 以及速率 rate 计算当前还有多少个请求等待被处理:


waitRequests = (last - t) / rate
复制代码


根据上面几个关键属性字段,我们就可以对 LeakBucket 定义了,如下:

type LeakyBucket struct {    rate int64 //处理请求的速率    capacity int64 //桶的最大容量    last time.Time //桶中最后一个排队请求被处理的时间}
复制代码


然后,LeakyBucket 还有一个函数 Limit 函数:

func (t *LeakyBucket) Limit() (time.Duration, error) {
}
复制代码


该函数的主要作用就是计算流入请求能够被处理的等待时间。针对该函数有以下两点说明:

  • 接收到的每个请求都需要调用该函数,每个调用一次就相当于有一个请求流入桶中。

  • 该函数返回值代表调用者要处理该请求需要等待的时长,调用者需要进行 time.Sleep 这么长时间才能进行处理,也就是通过 Sleep 控制了消耗的速度。


因此,我们可知请求流入和流出的流程如下:



02 请求是如何排队的?

假设现在 LeakyBucket 是一个空桶,按 100ms 处理一个请求的速率漏出,容量大小为 5。现在同时有 5 个请求流入桶中,我们看看每个请求经过 Limit 是如何计算各自的预计处理时间以及等待时间的。


  • 第一个请求进来,不用等待,直接就会被处理。处理的时间就是 last=当前时间 t

  • 第二个请求,因为第一个请求刚刚排队并被处理了,那么就需要按处理速率等待,那么被处理的时间就是第一个请求被处理的时间+rate,即 last=t+100ms

  • 第三个请求,因为第二个请求仍然在排队,所有应该在处理了第二个后,再等待 100ms 才能被处理,即 last=第二个的处理时间+100ms=t+300ms

  • 第四、五个请求依次类推,如下图:



我们将上图转换在时间轴上看着会更直观一些:



以上情况是我们假设在空桶的状态下,同时流入了 n 个请求,从第 2 个请求开始,处理时间都在前一个请求的处理时间上+rate。


但如果新的请求流入是在最后一个请求流入后的 50ms 流入的,即 last+50ms 的时间点。那么新流入的请求被处理的时间就不应该是 last+rate 了,这样该请求的处理时间距离上次处理请求的时间就是:


(last+rate) - (last+50ms)= rate+50ms
复制代码


这样计算的话就比 rate 多了 50ms。那该如何计算呢? 实际的被处理时间应该是先计算当前时间距离上次被处理时间的间隔,然后再跟 rate 进行比较,看看比 rate 差多少,然后补全该差值即可。如下

delta = 当前时间t - last
last = 当前时间t + (rate - delta)
复制代码


如下图所示:


03 如何计算桶满了?

当新请求流入后,发现桶已经满了,就不再排队了,而是直接被丢弃掉。那如何计算桶是否已经满了呢?其实就是根据当前时间 now,和桶中排队的最后一个请求被处理的时间 last 之间的差值,再除以速率 rate,就能计算出正在等待被处理的请求有多少个了,然后和 capacity 进行比较即可:


wait = (last - now) / rateif wait > capacity {    //表示桶已经满了}else {    //未满,还可以继续排队}
复制代码


这里桶的容量实际上是代表业务能够接受请求被处理的最大等待时间。比如一个 web 用户访问一个页面,最多愿意等 10 秒,那如果请求等待处理的时间是 11 秒,那即使等到被处理了,用户也已经流失了,也就失去了等待的意义。


好了,几个关键点介绍完了,下面我们直接贴上部分实现的代码,完整的代码可参考https://github.com/uber-go/ratelimithttps://github.com/mennanov/limiters/blob/master/leakybucket.go

type LeakyBucket struct {    rate int64 //处理请求的速率    capacity int64 //桶的最大容量    last time.Time //桶中最后一个排队请求被处理的时间    mu sync.Mutex}
func (t *LeakyBucket) Limit(ctx context.Context) (time.Duration, error) { //这里进行加锁,保证每个请求按顺序依次处理 t.mu.Lock() defer t.mu.Unlock()
now := time.Now().UnixNano() //当前时间的纳秒数 if now < t.last { // 说明已经有请求在排队了,那么新请求进来排队后被处理的时间就是rate后 t.last += t.rate } else { // 说明为桶为空,也许是初始状态,也许是所有的请求都被处理完了.
var offset int64 //代表等待处理该请求的时间需要等待多久 delta := now - state.Last // 代表当前时间距离上次处理请求的时间过了多久 if delta < t.rate { //说明还没有到下次处理请求的时间,则需要等待offset后才能到 offset = t.rate - delta } //如果delta > t.rate 说明当前时间距离上次处理请求的时间已经超过了rate,offset为0,新的请求就应该被马上处理 t.last = now + offset //更新该请求应该被处理的时间 }
wait := t.last - now //计算桶是否已经满了 if wait/t.rate > t.capacity { //桶满了,返回error,调用者根据需要是直接丢弃还是等待wait长的时间。一般是直接丢弃。
t.last = now - offset //因为这里要丢弃该请求,所有要保持新请求排队前的状态 return time.Duration(wait), ErrLimitExhausted }
//排队成功,返回要等待的时间给调用者,让调用者sleep进行阻塞就能实现按rate的速率处理请求了 return time.Duration(wait), nil}
复制代码


04 总结

LeakyBucket 的核心思想就是按固定的速率处理请求,所以不支持突增的流量。因为即使有再多的流量,也是按固定的速率被处理。算法的实现中本质上就是按固定的处理速率计算该请求能够被处理的时间以及需要等待的时间。


---特别推荐---

特别推荐:一个专注 go 项目实战、项目中踩坑经验及避坑指南、各种好玩的 go 工具的公众号。「Go 学堂」,专注实用性,非常值得大家关注。点击下方公众号卡片,直接关注。关注送《100 个 go 常见的错误》pdf 文档。

发布于: 刚刚阅读数: 5
用户头像

Go学堂

关注

关注「Go学堂」,学习更多编程知识 2019-08-06 加入

专注Go编程知识、案例、常见错误及原理分析。意在通过阅读更多优秀的代码,提高编程技能。同名公众号「Go学堂」期待你的关注

评论

发布
暂无评论
图解漏桶(LeakyBucket)限流器的实现原理_Go_Go学堂_InfoQ写作社区