写点什么

四种常用限流算法对比

作者:菜皮日记
  • 2023-09-09
    北京
  • 本文字数:5415 字

    阅读完需:约 18 分钟

Leaky Bucket 漏桶

漏桶可理解为是一个限定容量的请求队列。

想象有一个桶,有水(指请求或数据)从上面流进来,水从桶下面的一个孔流出来。水流进桶的速度可以是随机的,但是水流出桶的速度是恒定的。当水流进桶的速度较慢,桶不会被填满,请求就可以被处理。当水流进桶的速度过快时,桶会逐渐被填满,当水超过桶的容量就会溢出,即被丢弃。



class LeakyBucketRateLimiter(object):    def __init__(self, capacity, leak_rate):        # 桶容量        self.capacity = capacity        # 水流出的速率 每秒n个单位        self.rate = leak_rate        # 上一次漏水的时间,用来跟本次时间做差值,乘以水流速率计算出这段时间内流出了多少水        self.last_leak_time = int(time.time())        # 桶中当前剩余的水        self.remain_water = 0
    def allow_request(self, require_units=1):        now = int(time.time())        leaked_water = (now - self.last_leak_time) * self.rate        self.remain_water = max(0, self.remain_water - leaked_water)        self.last_leak_time = now
        print(f"刚刚流出{leaked_water}, 桶容量已达{self.remain_water}")
        if self.remain_water + require_units <= self.capacity:            self.remain_water += require_units            return True
        return False
if __name__ == "__main__":    leaky_bucket = LeakyBucketRateLimiter(3, 1)    for i in range(10):        print(f"限流结果:", leaky_bucket.allow_request())        print("----")        s = random.randint(1, 5) / 10        time.sleep(s)
复制代码

结果

刚刚流出0, 桶容量已达0限流结果: True----刚刚流出0, 桶容量已达1限流结果: True----刚刚流出1, 桶容量已达1限流结果: True----刚刚流出0, 桶容量已达2限流结果: True----刚刚流出1, 桶容量已达2限流结果: True----刚刚流出0, 桶容量已达3限流结果: False----刚刚流出0, 桶容量已达3限流结果: False----刚刚流出1, 桶容量已达2限流结果: True----刚刚流出0, 桶容量已达3限流结果: False----刚刚流出0, 桶容量已达3限流结果: False----
复制代码

Token Bucket 令牌桶

在令牌桶算法中,系统会以一个固定的速率向桶中添加令牌。

当有请求(或数据包)到来时,会从桶中删除一定数量的令牌。如果桶中有足够的令牌,请求就可以立即处理;如果桶中没有足够的令牌,请求就会被阻塞或丢弃,具体行为取决于具体的实现。桶有容量限制,如果添加令牌时桶已满,新的令牌就会被丢弃。


class TokenBucketRateLimiter(object):    def __init__(self, capacity, refill_rate):        # 容量        self.capacity = capacity        # 产生令牌的速率        self.refill_rate = refill_rate        # 当前已有的可用令牌数        self.current_tokens = capacity        # 上一次产生令牌的时间,用来与当前时间计算并乘以速率,得出这段时间内产生了多少令牌        self.last_refill_time = int(time.time())
    def allow_request(self, token_needs=1):        now = int(time.time())        new_tokens = (now - self.last_refill_time) * self.refill_rate        self.current_tokens += min(self.capacity, new_tokens)        self.last_refill_time = now
        print(f"刚刚新产生{new_tokens}, 剩余{self.current_tokens}")
        if self.current_tokens >= token_needs:            self.current_tokens -= token_needs            return True
        return False
if __name__ == "__main__":    token_bucket = TokenBucketRateLimiter(3, 1)    for i in range(10):        print(f"限流结果:", token_bucket.allow_request())        print("----")        s = random.randint(1, 5) / 10        time.sleep(s)
复制代码

结果

刚刚新产生0, 剩余3限流结果: True----刚刚新产生0, 剩余2限流结果: True----刚刚新产生0, 剩余1限流结果: True----刚刚新产生1, 剩余1限流结果: True----刚刚新产生0, 剩余0限流结果: False----刚刚新产生0, 剩余0限流结果: False----刚刚新产生1, 剩余1限流结果: True----刚刚新产生0, 剩余0限流结果: False----刚刚新产生1, 剩余1限流结果: True----刚刚新产生0, 剩余0限流结果: False----
复制代码

Fixed Window 固定窗口

在一个固定的时间窗口内,只允许一定数量的请求。

如果在这个时间窗口内的请求已经达到了限制,那么新的请求就会被拒绝,过了当前时间窗口后,会进入下一个时间窗口,并重置窗口内的请求数量,重新计算。


class FixedWindowRateLimiter(object):    def __init__(self, max_requests, window_size):        # 单个窗口大小 单位秒        self.window_size = window_size        # 单个窗口中最大可处理的请求数量        self.max_requests = max_requests        # 当前窗口已处理请求        self.current_request = 0        # 窗口计算起始点        self.last_window_start = int(time.time())
    def allow_request(self):        now = int(time.time())
        if now - self.last_window_start >= self.window_size:            print("窗口刷新")            self.last_window_start = now            self.current_request = 0
        print(f"当前窗口已处理请求 {self.current_request} 个")
        if self.current_request < self.max_requests:            self.current_request += 1            return True
        return False
if __name__ == "__main__":    limiter = FixedWindowRateLimiter(3, 2)    for i in range(10):        print(f"限流结果:", limiter.allow_request())        print("----")        s = random.randint(1, 5) / 10        time.sleep(s)
复制代码

结果

当前窗口已处理请求 0 个限流结果: True----当前窗口已处理请求 1 个限流结果: True----当前窗口已处理请求 2 个限流结果: True----当前窗口已处理请求 3 个限流结果: False----当前窗口已处理请求 3 个限流结果: False----当前窗口已处理请求 3 个限流结果: False----当前窗口已处理请求 3 个限流结果: False----当前窗口已处理请求 3 个限流结果: False----当前窗口已处理请求 3 个限流结果: False----窗口刷新当前窗口已处理请求 0 个限流结果: True----
复制代码

Sliding Window 滑动窗口

在固定窗口限流算法中,如果大量请求在一个时间窗口的边界附近到达,可能会造成瞬时的流量突增。滑动窗口随着时间的推移,动态统计请求量,避免了在窗口边界附近的流量突增。

class SlidingWindowRateLimiter(object):    def __init__(self, max_requests, window_size):        # 最大请求量        self.max_requests = max_requests        # 窗口大小        self.window_size = window_size        # 存放每个请求的时间        self.requests_list = collections.deque()
    def allow_request(self):        now = int(time.time())        while self.requests_list and self.requests_list[0] <= (now - self.window_size):            self.requests_list.popleft()
        print(f"当前窗口已处理请求 {len(self.requests_list)} 个")
        if len(self.requests_list) < self.max_requests:            self.requests_list.append(now)            return True
        return False
if __name__ == "__main__":    limiter = SlidingWindowRateLimiter(3, 2)    for i in range(10):        print(f"限流结果:", limiter.allow_request())        print('----')        s = random.randint(1, 5) / 10        time.sleep(s)
复制代码

结果

当前窗口已处理请求 0 个限流结果: True----当前窗口已处理请求 1 个限流结果: True----当前窗口已处理请求 2 个限流结果: True----当前窗口已处理请求 3 个限流结果: False----当前窗口已处理请求 3 个限流结果: False----当前窗口已处理请求 2 个限流结果: True----当前窗口已处理请求 3 个限流结果: False----当前窗口已处理请求 3 个限流结果: False----当前窗口已处理请求 1 个限流结果: True----当前窗口已处理请求 2 个限流结果: True----
复制代码

还有一种方式,不需要记录具体每个请求的时间点,而通过计算滑动窗口与固定窗口之间时间的偏移,估算出滑动窗口中请求量,这个方式不太准确,但节省了请求时间点的存储成本。

图中所示,已知设窗口大小为 1 小时即 60 分钟,每个窗口内可处理 100 请求,当前时间为 1:15。

在 [12:00–1:00) 的整个绿色窗口中一共有 84 个请求,在 [1:00 to 1:15) 的黄色窗口中,15 分钟内已经处理了 36 个请求,如何计算当前窗口剩余容量?

通过计算滑动窗口与前一窗口重叠部分占比,来估算前一窗口中被占用的容量,(60分钟-15分钟)/60分钟 表示滑动窗口减去黄色当前窗口后,与绿色窗口的重叠,占绿色窗口整体的百分比,也就估算出重叠部分的请求量在总共 84 个请求的占比,得出 63 个请求,再加上当前窗口的 36 请求,一共是 99 请求,那么当前滑动窗口的剩余容量就是 100 - 99 = 1 个请求容量。

class SlidingWindowRateLimiterByEstimate(object):    def __init__(self, max_requests, window_size):        self.max_requests = max_requests        self.window_size = window_size
        # 当前窗口起始时间        self.current_window_start = time.time()        # 前一个窗口请求量        self.pre_count = 0        # 当前窗口请求量        self.current_count = 0
    def allow_request(self):        now = time.time()        if (now - self.current_window_start) > self.window_size:            # 滑过完整的一个窗口,重置            self.current_window_start = now            self.pre_count = self.current_count            self.current_count = 0
        # 通过计算滑动窗口与前一个窗口重叠部分,占整个窗口的占比,估算重叠部分的请求量        # 再加上当前窗口的请求量        # 得出滑动窗口中的请求量        estimate_count = (self.pre_count * (self.window_size - (now - self.current_window_start)) / self.window_size) + self.current_count        print(f"估算请求量:{estimate_count}")
        if estimate_count > self.max_requests:            return False
        self.current_count += 1        return True
if __name__ == "__main__":    limiter = SlidingWindowRateLimiterByEstimate(3, 2)    for i in range(10):        print(f"限流结果:", limiter.allow_request())        print('----')        s = random.randint(1, 5) / 10        time.sleep(s)
复制代码

结果

估算请求量:0.0限流结果: True----估算请求量:1.0限流结果: True----估算请求量:2.0限流结果: True----估算请求量:3.0限流结果: True----估算请求量:4.0限流结果: False----估算请求量:4.0限流结果: False----估算请求量:4.0限流结果: False----估算请求量:3.3901939392089844限流结果: False----估算请求量:2.775090217590332限流结果: True----估算请求量:3.366755485534668限流结果: False----
复制代码


用户头像

菜皮日记

关注

全干程序员 2018-08-08 加入

个人站点:菜皮日记 lipijin

评论

发布
暂无评论
四种常用限流算法对比_限流_菜皮日记_InfoQ写作社区