写点什么

[ kitex 源码解读 ] 限流

作者:baiyutang
  • 2022 年 5 月 18 日
  • 本文字数:6557 字

    阅读完需:约 22 分钟

[ kitex 源码解读 ] 限流

限流

限流是一种保护 server 的措施,防止上游某个 client 流量突增导致 server 端过载。由此我们也可以知道限流是在服务端做的一种自我保护机制

重要说明

Kitex 源码中在关于 server.WithLimit 的 option 中已经在注释中解释了“该选项用法不稳定,将来可能改变或移出,我们不承诺在未来的版本中兼容该选项”。所以还请特别留意这里。

// WithLimit sets the limitation of concurrent connections or max QPS.// IMPORTANT: this option is not stable, and will be changed or removed in the future!!!// We don't promise compatibility for this option in future versions!!!func WithLimit(lim *limit.Option) Option {	return Option{F: func(o *internal_server.Options, di *utils.Slice) {		di.Push(fmt.Sprintf("WithLimit(%+v)", lim))
o.Limits = lim }}}
复制代码

基础用法

目前 Kitex 支持限制最大连接数和最大 QPS,在初始化 server 的时候,增加一个 Option,举例:

import "github.com/cloudwego/kitex/pkg/limit"
func main() { svr := xxxservice.NewServer(handler, server.WithLimit(&limit.Option{MaxConnections: 10000, MaxQPS: 1000})) svr.Run()}
复制代码

参数说明:

  • MaxConnections 表示最大连接数

  • MaxQPS 表示最大 QPS

验证

条件

我们以 kitex-example 的一个代码示例为基础进行改造。

  • kitex v0.3.1

  • MacOS 12.4

  • Goland


目的

测试下 QPS 限制。

服务端

func main() {  // 我们在此 MaxQPS 设置 1  svr := echo.NewServer(new(EchoImpl), server.WithLimit(&limit.Option{MaxConnections: 100, MaxQPS: 1}))  if err := svr.Run(); err != nil {    log.Println("server stopped with error:", err)  } else {    log.Println("server stopped")  }}
复制代码

客户端


func main() { client, err := echo.NewClient("echo", client.WithHostPorts("[::1]:8888")) if err != nil { log.Fatal(err) } // 定时器,观察这一个周期就退出,这里设置了 1s timer := time.Tick(time.Second) // 退出标志 needStop := false // 请求成功数 successNum := 0

go func() { select { case <-timer: needStop = true } }()
for !needStop { req := &api.Request{Message: "my request"} _, err = client.Echo(context.Background(), req) if err != nil { continue } successNum++ } log.Printf("success num: %d", successNum)}
复制代码

运行效果

  1. 启动服务端

  1. 启动客户端

我看到在这一个周期内,成功数也是我们设定的 1


  1. 看看服务端日志

源码分析

这里我们分两方面入手,第一是配置写入,第二是限流的关键原理。

配置

  1. server.WithLimit,进到该方法的实现

server.WithLimit(&limit.Option{MaxConnections: 100, MaxQPS: 1}
复制代码



在该函数中,我们点击 o.Limits


  1. Options.Limits,我们能看到最终 server 包接收的所有可选配置


  1. 依次查看字段被使用的地方

此时,我们可以鼠标放在 Limits 字段,然后 Ctrl + 单击 查看其被使用的地方,留意右上角的小方块


点击这个方块,可以在下方窗口预览


我们发现,有 6 处读该字段的地方,有 1 处写入的,并且写入就是我们启动服务,配置 server.WithLimit 的地方。


6 处读取全部在 server.buildLimiterWithOpt 的地方,我们把焦点也转移到这里。


  1. server.buildLimiterWithOpt 对配置的处理


这里的代码,我们这里这样理解

func (s *server) buildLimiterWithOpt() (connLimit limiter.ConcurrencyLimiter, qpsLimit limiter.RateLimiter, ok bool) {	// 如果外部没有设置则不处理	if s.opt.Limits == nil {		return	}
// 有配置最大连接数,则初始化一个 concurrencyLimiter,设置其限制数 if s.opt.Limits.MaxConnections > 0 { connLimit = limiter.NewConcurrencyLimiter(s.opt.Limits.MaxConnections) } else { connLimit = &limiter.DummyConcurrencyLimiter{} }
// 有设置最大 QPS,则初始化一个 qpsLimiter,设置其对应限制数 if s.opt.Limits.MaxQPS > 0 { interval := time.Millisecond * 100 // FIXME: should not care this implementation-specific parameter qpsLimit = limiter.NewQPSLimiter(interval, s.opt.Limits.MaxQPS) } else { qpsLimit = &limiter.DummyRateLimiter{} }
// 支持动态更新限流配置,先不关注这里 if s.opt.Limits.UpdateControl != nil { updater := limiter.NewLimiterWrapper(connLimit, qpsLimit) s.opt.Limits.UpdateControl(updater) } ok = true
// 最后我们看到返回了三个参数: // 并发限制器(limiter.ConcurrencyLimiter), QPS 限制器(limiter.RateLimiter),是否要限制 ok return}
复制代码
  1. server.addBoundHandlers,第④的地方会被 这里调用,我们看到这都是在 server.Run() 处理的


此时,我们也进入到了 Kitex 中与对端交互的一个重要模块 remote.bound

要说明的是,我们看到 bound.NewServerLimiterHandler 其实是实现了 remote.InboundHandler,构造了一个服务端限制处理处理,放在了 remote.Option 的结构体中,用来控制入站请求。但是至于 inboundoutbound 的处理及关于 TransPipeline 模块,我们先不关注这里。等下我们重点关注实现。






模块层级位置

经过我们上边的配置,去追踪代码,我们也能发现限流在整个框架的大概位置:即 Transport Pipeline 的位置,并且是在 codec 之前。



关键原理

ConcurrencyLimiter

limiter_inbound.go

文件 kitex/pkg/remote/bound/limiter_inbound.go

bound.NewServerLimiterHandler ,我们能看到如下:


这里我们只需要留意 OnActive OnInactive 对应到 concurrency_limiter.go 的处理方法。

当然如果你想看 OnInactive 是如何被调用的,你也能一路向上追溯到 netpoll :Connection.AddCloseCallback 接口,如下描述:

	// AddCloseCallback can add hangup callback for a connection, which will be called when connection closing.	// This is very useful for cleaning up idle connections. For instance, you can use callbacks to clean up	// the local resources, which bound to the idle connection, when hangup by the peer. No need another goroutine	// to polling check connection status.	AddCloseCallback(callback CloseCallback) error
复制代码



concurrency_limiter.go

文件 kitex/pkg/remote/concurrency_limiter.go

这里是并发限制的关键,使用了计数器算法:

// concurrencyLimiter implements ConcurrencyLimiter.type concurrencyLimiter struct {	lim int32 // 限制并发数	now int32 // 当前并发数	tmp int32 // 并发临时计数器}
// NewConcurrencyLimiter returns a new ConcurrencyLimiter with the given limit.func NewConcurrencyLimiter(lim int) ConcurrencyLimiter { return &concurrencyLimiter{int32(lim), 0, 0}}
// Acquire tries to increase the concurrency by 1.// Acquire 尝试将并发数加 1// The return value indicates whether the operation is allowed under the concurrency limitation.// 返回值表明在并发限制下是否允许该操作func (ml *concurrencyLimiter) Acquire(ctx context.Context) bool { // 先将并发临时计数器 +1 x := atomic.AddInt32(&ml.tmp, 1) // 如果算上当前连接小于等于限制数,则放行 if x <= atomic.LoadInt32(&ml.lim) { atomic.AddInt32(&ml.now, 1) return true } // 如果已经超过了并发限制,则将临时计数器 -1 atomic.AddInt32(&ml.tmp, -1) return false}
// Release decrease the concurrency by 1.// Release 当连接释放时,并发数 -1func (ml *concurrencyLimiter) Release(ctx context.Context) { atomic.AddInt32(&ml.now, -1) atomic.AddInt32(&ml.tmp, -1)}
// UpdateLimit updates the limit.// 支持动态更新并发数func (ml *concurrencyLimiter) UpdateLimit(lim int) { atomic.StoreInt32(&ml.lim, int32(lim))}
// Status returns the current status.// 返回限制器当前状态:限制数(limit)、已占用并发数(occupied)func (ml *concurrencyLimiter) Status(ctx context.Context) (limit, occupied int) { limit = int(atomic.LoadInt32(&ml.lim)) occupied = int(atomic.LoadInt32(&ml.now)) return}
复制代码

RateLimiter

limiter_inbound.go

文件 kitex/pkg/remote/bound/limiter_inbound.go

bound.NewServerLimiterHandler ,我们能看到如下:


这里我们只需要留意 OnRead OnMessage 对应到 qps_limiter.go 的处理方法。

至于什么时候 OnRead,什么时候 OnMessage,可以参考这里:

kitex/internal/server/option.go:100


qps_limiter.go

文件 kitex/pkg/qps_limiter.go

这里是算法的核心,是典型的令牌桶算法

package limiter
import ( "context" "sync/atomic" "time")
// 固定窗口时间 1svar fixedWindowTime = time.Second
// qpsLimiter implements the RateLimiter interface.type qpsLimiter struct { limit int32 // 限制数 tokens int32 // 当前令牌数 interval time.Duration // 放令牌的时间间隔 once int32 // 每次放的令牌数 ticker *time.Ticker // 周期性的定时器,为了匀速的往桶里放令牌 tickerDone chan bool // 标记定时器是否已完成}
// NewQPSLimiter creates qpsLimiter.// interval time.Duration 去桶里放令牌的时间间隔// limit int 时间窗口内的限制数func NewQPSLimiter(interval time.Duration, limit int) RateLimiter { // 计算单次放令牌的个数 once := calcOnce(interval, limit)
l := &qpsLimiter{ limit: int32(limit), interval: interval, tokens: once, once: once, } // 异步,匀速往桶里放令牌 go l.startTicker(interval)
return l}
// UpdateLimit update limitation of QPS. It is **not** concurrent-safe.// 动态更新 QPSfunc (l *qpsLimiter) UpdateLimit(limit int) { once := calcOnce(l.interval, limit) atomic.StoreInt32(&l.limit, int32(limit)) atomic.StoreInt32(&l.once, once) l.resetTokens(once)}
// UpdateQPSLimit update the interval and limit. It is **not** concurrent-safe.// 动态更新放令牌的间隔及 QPSfunc (l *qpsLimiter) UpdateQPSLimit(interval time.Duration, limit int) { // 重新计算单次放的令牌数 once := calcOnce(interval, limit) // 设置 QPS 阈值 atomic.StoreInt32(&l.limit, int32(limit)) // 设置单次令牌数 atomic.StoreInt32(&l.once, once)
// 剩余令牌数大于 once 的情况下,要重置桶内剩余数 l.resetTokens(once)
// 时间间隔要变 if interval != l.interval { // 时间间隔赋值 l.interval = interval // 停掉原有定时器,通过 tickerDone 发送信号 l.stopTicker() // 重启新的定时器,异步 go l.startTicker(interval) }}
// Acquire one token.// Acquire 获取令牌,令牌数 -1// 返回值表示是否放行func (l *qpsLimiter) Acquire(ctx context.Context) bool { if atomic.LoadInt32(&l.tokens) <= 0 { return false } return atomic.AddInt32(&l.tokens, -1) >= 0}
// Status returns the current status.// 返回当前状态func (l *qpsLimiter) Status(ctx context.Context) (max, cur int, interval time.Duration) { max = int(atomic.LoadInt32(&l.limit)) cur = int(atomic.LoadInt32(&l.tokens)) interval = l.interval return}
// startTicker// 启动定时器,周期性的放令牌func (l *qpsLimiter) startTicker(interval time.Duration) { l.ticker = time.NewTicker(interval) defer l.ticker.Stop() l.tickerDone = make(chan bool, 1) tc := l.ticker.C td := l.tickerDone // ticker and tickerDone can be reset, cannot use l.ticker or l.tickerDone directly for { select { case <-tc: // 匀速往桶里放令牌的关键点 l.updateToken() case <-td: // 收到停止信号 return } }}
// stopTicker 停止定时器func (l *qpsLimiter) stopTicker() { if l.tickerDone == nil { return } select { case l.tickerDone <- true: default: }}
// Some deviation is allowed here to gain better performance.// 更新令牌,关键代码,周期性触发这里,尽量保证每次都放 once 个令牌func (l *qpsLimiter) updateToken() { // 令牌足够,无需更新 if atomic.LoadInt32(&l.limit) < atomic.LoadInt32(&l.tokens) { return } // 单次要放的令牌数 once := atomic.LoadInt32(&l.once) // 当前规则最多能放的令牌数 delta := atomic.LoadInt32(&l.limit) - atomic.LoadInt32(&l.tokens) // 如果最多放的令牌大于我们应该匀速放的 once 个数,我们还按匀速来 if delta > once || delta < 0 { delta = once } // 放入令牌 newTokens := atomic.AddInt32(&l.tokens, delta) // 因为并发,在上一行的执行期间可能 -n,当前令牌数不足 once 的,要放够 if newTokens < once { atomic.StoreInt32(&l.tokens, once) }}
// 计算匀速要放的令牌数func calcOnce(interval time.Duration, limit int) int32 { if interval > time.Second { interval = time.Second } once := int32(float64(limit) / (fixedWindowTime.Seconds() / interval.Seconds())) if once < 0 { once = 0 } return once}
func (l *qpsLimiter) resetTokens(once int32) { if atomic.LoadInt32(&l.tokens) > once { atomic.StoreInt32(&l.tokens, once) }}
复制代码

进阶用法

限流定义了 LimitReporter 接口,用于限流状态监控,例如当前连接数过多、QPS 过大等。

如有需求,用户需要自行实现该接口,并通过 WithLimitReporter 注入。

// 实现这个接口type LimitReporter interface {    ConnOverloadReport()    QPSOverloadReport()}
// 例如type myLimitReporter struct{}
func (m *myLimitReporter) ConnOverloadReport() { // 并发限制处理逻辑}
func (m *myLimitReporter) QPSOverloadReport() { // QPS 超过限制处理逻辑}
// 最后,我们在服务启动时设置,比如:svr := echo.NewServer(new(EchoImpl), server.WithLimitReporter(&myLimitReporter{}), server.WithLimit(&limit.Option{MaxConnections: 10, MaxQPS: 100}))if err := svr.Run(); err != nil { klog.Error("server stopped with error:", err)}
复制代码


重磅:支持自定义限流规则

Kitex 6 月 2 日发布了 v0.3.2 的版本,新增扩展支持了外部限流器的实现,在代码中可通过WithConcurrencyLimiterWithQPSLimiter 传入限流器。可以参看 issues #431 的代码提交。发布这项功能也是为方便 Kitex 用户上云,计划对接腾讯的开源服务治理平台 Polaris 的一项支持。另外对于普通用户的意义就是我们可以自定义方法级别的限流,甚至可以做分布式限流器


文件 kitex/server/option.go


扩展方式如下:

func (m *myQPSLimiter) Acquire(ctx context.Context) bool {	// 通过 rpcinfo.GetRPCInfo(ctx) 获取调用信息	// ...	return true}
func (m *myQPSLimiter) Status(ctx context.Context) (max, current int, interval time.Duration) { // ... return 1, 0, time.Millisecond}

svr := echo.NewServer(new(EchoImpl), server.WithQPSLimiter(&myQPSLimiter{}))if err := svr.Run(); err != nil { klog.Error("server stopped with error:", err)}
复制代码

参考

  1. 限流 | CloudWeGo

  2. golang.org/x/time

  3. qpsLimiter.updateToken() 方法的误差可能过大

  4. feat: extend outside limiter implementation and fix problems of rate limiter of multiplexed server

  5. Integrate Polaris go sdk to support their service governance ability.

发布于: 2022 年 05 月 18 日阅读数: 113
用户头像

baiyutang

关注

广州 2017.12.13 加入

Microservices | Golang | Cloud Nitive | “Smart work,Not hard”

评论

发布
暂无评论
[ kitex 源码解读 ] 限流_Go_baiyutang_InfoQ写作社区