写点什么

Go 分布式令牌桶限流 + 兜底策略

作者:万俊峰Kevin
  • 2022 年 1 月 12 日
  • 本文字数:3009 字

    阅读完需:约 10 分钟

Go 分布式令牌桶限流 + 兜底策略

上篇文章提到固定时间窗口限流无法处理突然请求洪峰情况,本文讲述的令牌桶线路算法则可以比较好的处理此场景。

工作原理

  1. 单位时间按照一定速率匀速的生产 token 放入桶内,直到达到桶容量上限。

  2. 处理请求,每次尝试获取一个或多个令牌,如果拿到则处理请求,失败则拒绝请求。

优缺点

优点

可以有效处理瞬间的突发流量,桶内存量 token 即可作为流量缓冲区平滑处理突发流量。

缺点

实现较为复杂。

代码实现

core/limit/tokenlimit.go

分布式环境下考虑使用 redis 作为桶和令牌的存储容器,采用 lua 脚本实现整个算法流程。

redis lua 脚本

-- 每秒生成token数量即token生成速度local rate = tonumber(ARGV[1])-- 桶容量local capacity = tonumber(ARGV[2])-- 当前时间戳local now = tonumber(ARGV[3])-- 当前请求token数量local requested = tonumber(ARGV[4])-- 需要多少秒才能填满桶local fill_time = capacity/rate-- 向下取整,ttl为填满时间的2倍local ttl = math.floor(fill_time*2)-- 当前时间桶容量local last_tokens = tonumber(redis.call("get", KEYS[1]))-- 如果当前桶容量为0,说明是第一次进入,则默认容量为桶的最大容量if last_tokens == nil thenlast_tokens = capacityend-- 上一次刷新的时间local last_refreshed = tonumber(redis.call("get", KEYS[2]))-- 第一次进入则设置刷新时间为0if last_refreshed == nil thenlast_refreshed = 0end-- 距离上次请求的时间跨度local delta = math.max(0, now-last_refreshed)-- 距离上次请求的时间跨度,总共能生产token的数量,如果超多最大容量则丢弃多余的tokenlocal filled_tokens = math.min(capacity, last_tokens+(delta*rate))-- 本次请求token数量是否足够local allowed = filled_tokens >= requested-- 桶剩余数量local new_tokens = filled_tokens-- 允许本次token申请,计算剩余数量if allowed thennew_tokens = filled_tokens - requestedend-- 设置剩余token数量redis.call("setex", KEYS[1], ttl, new_tokens)-- 设置刷新时间redis.call("setex", KEYS[2], ttl, now)
return allowed
复制代码

令牌桶限流器定义

type TokenLimiter struct {    // 每秒生产速率    rate int    // 桶容量    burst int    // 存储容器    store *redis.Redis    // redis key    tokenKey       string    // 桶刷新时间key    timestampKey   string    // lock    rescueLock     sync.Mutex    // redis健康标识    redisAlive     uint32    // redis故障时采用进程内 令牌桶限流器    rescueLimiter  *xrate.Limiter    // redis监控探测任务标识    monitorStarted bool}
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {    tokenKey := fmt.Sprintf(tokenFormat, key)    timestampKey := fmt.Sprintf(timestampFormat, key)
    return &TokenLimiter{        rate:          rate,        burst:         burst,        store:         store,        tokenKey:      tokenKey,        timestampKey:  timestampKey,        redisAlive:    1,        rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),    }}
复制代码

获取令牌


func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {    // 判断redis是否健康    // redis故障时采用进程内限流器    // 兜底保障    if atomic.LoadUint32(&lim.redisAlive) == 0 {        return lim.rescueLimiter.AllowN(now, n)    }    // 执行脚本获取令牌    resp, err := lim.store.Eval(        script,        []string{            lim.tokenKey,            lim.timestampKey,        },        []string{            strconv.Itoa(lim.rate),            strconv.Itoa(lim.burst),            strconv.FormatInt(now.Unix(), 10),            strconv.Itoa(n),        })    // redis allowed == false    // Lua boolean false -> r Nil bulk reply    // 特殊处理key不存在的情况    if err == redis.Nil {        return false    } else if err != nil {        logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)        // 执行异常,开启redis健康探测任务        // 同时采用进程内限流器作为兜底        lim.startMonitor()        return lim.rescueLimiter.AllowN(now, n)    }
    code, ok := resp.(int64)    if !ok {        logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)        lim.startMonitor()        return lim.rescueLimiter.AllowN(now, n)    }
    // redis allowed == true    // Lua boolean true -> r integer reply with value of 1    return code == 1}
复制代码

redis 故障时兜底策略

兜底策略的设计考虑得非常细节,当 redis 不可用的时候,启动单机版的 ratelimit 做备用限流,确保基本的限流可用,服务不会被冲垮。

// 开启redis健康探测func (lim *TokenLimiter) startMonitor() {    lim.rescueLock.Lock()    defer lim.rescueLock.Unlock()    // 防止重复开启    if lim.monitorStarted {        return    }
    // 设置任务和健康标识    lim.monitorStarted = true    atomic.StoreUint32(&lim.redisAlive, 0)    // 健康探测    go lim.waitForRedis()}
// redis健康探测定时任务func (lim *TokenLimiter) waitForRedis() {    ticker := time.NewTicker(pingInterval)    // 健康探测成功时回调此函数    defer func() {        ticker.Stop()        lim.rescueLock.Lock()        lim.monitorStarted = false        lim.rescueLock.Unlock()    }()
    for range ticker.C {        // ping属于redis内置健康探测命令        if lim.store.Ping() {            // 健康探测成功,设置健康标识            atomic.StoreUint32(&lim.redisAlive, 1)            return        }    }}
复制代码

项目地址

https://github.com/zeromicro/go-zero

欢迎使用 go-zerostar 支持我们!

发布于: 刚刚阅读数: 2
用户头像

保持简单 2017.10.24 加入

go-zero作者

评论

发布
暂无评论
Go 分布式令牌桶限流 + 兜底策略