限流
限流是一种保护 server 的措施,防止上游某个 client 流量突增导致 server 端过载。由此我们也可以知道限流是在服务端做的一种自我保护机制。
重要说明
Kitex 源码中在关于 server.WithLimit 的 option 中已经在注释中解释了“该选项用法不稳定,将来可能改变或移出,我们不承诺在未来的版本中兼容该选项”。所以还请特别留意这里。
// WithLimit sets the limitation of concurrent connections or max QPS.
// IMPORTANT: this option is not stable, and will be changed or removed in the future!!!
// We don't promise compatibility for this option in future versions!!!
func WithLimit(lim *limit.Option) Option {
return Option{F: func(o *internal_server.Options, di *utils.Slice) {
di.Push(fmt.Sprintf("WithLimit(%+v)", lim))
o.Limits = lim
}}
}
复制代码
基础用法
目前 Kitex 支持限制最大连接数和最大 QPS,在初始化 server 的时候,增加一个 Option,举例:
import "github.com/cloudwego/kitex/pkg/limit"
func main() {
svr := xxxservice.NewServer(handler, server.WithLimit(&limit.Option{MaxConnections: 10000, MaxQPS: 1000}))
svr.Run()
}
复制代码
参数说明:
MaxConnections
表示最大连接数
MaxQPS
表示最大 QPS
验证
条件
我们以 kitex-example 的一个代码示例为基础进行改造。
kitex v0.3.1
MacOS 12.4
Goland
目的
测试下 QPS 限制。
服务端
func main() {
// 我们在此 MaxQPS 设置 1
svr := echo.NewServer(new(EchoImpl), server.WithLimit(&limit.Option{MaxConnections: 100, MaxQPS: 1}))
if err := svr.Run(); err != nil {
log.Println("server stopped with error:", err)
} else {
log.Println("server stopped")
}
}
复制代码
客户端
func main() {
client, err := echo.NewClient("echo", client.WithHostPorts("[::1]:8888"))
if err != nil {
log.Fatal(err)
}
// 定时器,观察这一个周期就退出,这里设置了 1s
timer := time.Tick(time.Second)
// 退出标志
needStop := false
// 请求成功数
successNum := 0
go func() {
select {
case <-timer:
needStop = true
}
}()
for !needStop {
req := &api.Request{Message: "my request"}
_, err = client.Echo(context.Background(), req)
if err != nil {
continue
}
successNum++
}
log.Printf("success num: %d", successNum)
}
复制代码
运行效果
启动服务端
启动客户端
我看到在这一个周期内,成功数也是我们设定的 1。
看看服务端日志
源码分析
这里我们分两方面入手,第一是配置写入,第二是限流的关键原理。
配置
server.WithLimit
,进到该方法的实现
server.WithLimit(&limit.Option{MaxConnections: 100, MaxQPS: 1}
复制代码
在该函数中,我们点击 o.Limits
Options.Limits
,我们能看到最终 server 包接收的所有可选配置
依次查看字段被使用的地方
此时,我们可以鼠标放在 Limits
字段,然后 Ctrl + 单击 查看其被使用的地方,留意右上角的小方块
点击这个方块,可以在下方窗口预览
我们发现,有 6 处读该字段的地方,有 1 处写入的,并且写入就是我们启动服务,配置 server.WithLimit
的地方。
6 处读取全部在 server.buildLimiterWithOpt
的地方,我们把焦点也转移到这里。
server.buildLimiterWithOpt
对配置的处理
这里的代码,我们这里这样理解
func (s *server) buildLimiterWithOpt() (connLimit limiter.ConcurrencyLimiter, qpsLimit limiter.RateLimiter, ok bool) {
// 如果外部没有设置则不处理
if s.opt.Limits == nil {
return
}
// 有配置最大连接数,则初始化一个 concurrencyLimiter,设置其限制数
if s.opt.Limits.MaxConnections > 0 {
connLimit = limiter.NewConcurrencyLimiter(s.opt.Limits.MaxConnections)
} else {
connLimit = &limiter.DummyConcurrencyLimiter{}
}
// 有设置最大 QPS,则初始化一个 qpsLimiter,设置其对应限制数
if s.opt.Limits.MaxQPS > 0 {
interval := time.Millisecond * 100 // FIXME: should not care this implementation-specific parameter
qpsLimit = limiter.NewQPSLimiter(interval, s.opt.Limits.MaxQPS)
} else {
qpsLimit = &limiter.DummyRateLimiter{}
}
// 支持动态更新限流配置,先不关注这里
if s.opt.Limits.UpdateControl != nil {
updater := limiter.NewLimiterWrapper(connLimit, qpsLimit)
s.opt.Limits.UpdateControl(updater)
}
ok = true
// 最后我们看到返回了三个参数:
// 并发限制器(limiter.ConcurrencyLimiter), QPS 限制器(limiter.RateLimiter),是否要限制 ok
return
}
复制代码
server.addBoundHandlers
,第④的地方会被 这里调用,我们看到这都是在 server.Run() 处理的
此时,我们也进入到了 Kitex 中与对端交互的一个重要模块 remote.bound
。
要说明的是,我们看到 bound.NewServerLimiterHandler
其实是实现了 remote.InboundHandler
,构造了一个服务端限制处理处理,放在了 remote.Option
的结构体中,用来控制入站请求。但是至于 inbound
和 outbound
的处理及关于 TransPipeline
模块,我们先不关注这里。等下我们重点关注实现。
模块层级位置
经过我们上边的配置,去追踪代码,我们也能发现限流在整个框架的大概位置:即 Transport Pipeline
的位置,并且是在 codec 之前。
关键原理
ConcurrencyLimiter
limiter_inbound.go
文件 kitex/pkg
/
remote
/
bound
/
limiter_inbound.go
中
在 bound.NewServerLimiterHandler
,我们能看到如下:
这里我们只需要留意 OnActive
OnInactive
对应到 concurrency_limiter.go
的处理方法。
当然如果你想看 OnInactive
是如何被调用的,你也能一路向上追溯到 netpoll
:Connection.AddCloseCallback 接口,如下描述:
// AddCloseCallback can add hangup callback for a connection, which will be called when connection closing.
// This is very useful for cleaning up idle connections. For instance, you can use callbacks to clean up
// the local resources, which bound to the idle connection, when hangup by the peer. No need another goroutine
// to polling check connection status.
AddCloseCallback(callback CloseCallback) error
复制代码
concurrency_limiter.go
文件 kitex
/
pkg
/
remote
/
concurrency_limiter.go
中
这里是并发限制的关键,使用了计数器算法:
// concurrencyLimiter implements ConcurrencyLimiter.
type concurrencyLimiter struct {
lim int32 // 限制并发数
now int32 // 当前并发数
tmp int32 // 并发临时计数器
}
// NewConcurrencyLimiter returns a new ConcurrencyLimiter with the given limit.
func NewConcurrencyLimiter(lim int) ConcurrencyLimiter {
return &concurrencyLimiter{int32(lim), 0, 0}
}
// Acquire tries to increase the concurrency by 1.
// Acquire 尝试将并发数加 1
// The return value indicates whether the operation is allowed under the concurrency limitation.
// 返回值表明在并发限制下是否允许该操作
func (ml *concurrencyLimiter) Acquire(ctx context.Context) bool {
// 先将并发临时计数器 +1
x := atomic.AddInt32(&ml.tmp, 1)
// 如果算上当前连接小于等于限制数,则放行
if x <= atomic.LoadInt32(&ml.lim) {
atomic.AddInt32(&ml.now, 1)
return true
}
// 如果已经超过了并发限制,则将临时计数器 -1
atomic.AddInt32(&ml.tmp, -1)
return false
}
// Release decrease the concurrency by 1.
// Release 当连接释放时,并发数 -1
func (ml *concurrencyLimiter) Release(ctx context.Context) {
atomic.AddInt32(&ml.now, -1)
atomic.AddInt32(&ml.tmp, -1)
}
// UpdateLimit updates the limit.
// 支持动态更新并发数
func (ml *concurrencyLimiter) UpdateLimit(lim int) {
atomic.StoreInt32(&ml.lim, int32(lim))
}
// Status returns the current status.
// 返回限制器当前状态:限制数(limit)、已占用并发数(occupied)
func (ml *concurrencyLimiter) Status(ctx context.Context) (limit, occupied int) {
limit = int(atomic.LoadInt32(&ml.lim))
occupied = int(atomic.LoadInt32(&ml.now))
return
}
复制代码
RateLimiter
limiter_inbound.go
文件 kitex
/
pkg
/
remote
/
bound
/
limiter_inbound.go
中
在 bound.NewServerLimiterHandler
,我们能看到如下:
这里我们只需要留意 OnRead
OnMessage
对应到 qps_limiter.go
的处理方法。
至于什么时候 OnRead
,什么时候 OnMessage
,可以参考这里:
kitex/internal/server/option.go:100
qps_limiter.go
文件 kitex/pkg/qps_limiter.go
中
这里是算法的核心,是典型的令牌桶算法:
package limiter
import (
"context"
"sync/atomic"
"time"
)
// 固定窗口时间 1s
var fixedWindowTime = time.Second
// qpsLimiter implements the RateLimiter interface.
type qpsLimiter struct {
limit int32 // 限制数
tokens int32 // 当前令牌数
interval time.Duration // 放令牌的时间间隔
once int32 // 每次放的令牌数
ticker *time.Ticker // 周期性的定时器,为了匀速的往桶里放令牌
tickerDone chan bool // 标记定时器是否已完成
}
// NewQPSLimiter creates qpsLimiter.
// interval time.Duration 去桶里放令牌的时间间隔
// limit int 时间窗口内的限制数
func NewQPSLimiter(interval time.Duration, limit int) RateLimiter {
// 计算单次放令牌的个数
once := calcOnce(interval, limit)
l := &qpsLimiter{
limit: int32(limit),
interval: interval,
tokens: once,
once: once,
}
// 异步,匀速往桶里放令牌
go l.startTicker(interval)
return l
}
// UpdateLimit update limitation of QPS. It is **not** concurrent-safe.
// 动态更新 QPS
func (l *qpsLimiter) UpdateLimit(limit int) {
once := calcOnce(l.interval, limit)
atomic.StoreInt32(&l.limit, int32(limit))
atomic.StoreInt32(&l.once, once)
l.resetTokens(once)
}
// UpdateQPSLimit update the interval and limit. It is **not** concurrent-safe.
// 动态更新放令牌的间隔及 QPS
func (l *qpsLimiter) UpdateQPSLimit(interval time.Duration, limit int) {
// 重新计算单次放的令牌数
once := calcOnce(interval, limit)
// 设置 QPS 阈值
atomic.StoreInt32(&l.limit, int32(limit))
// 设置单次令牌数
atomic.StoreInt32(&l.once, once)
// 剩余令牌数大于 once 的情况下,要重置桶内剩余数
l.resetTokens(once)
// 时间间隔要变
if interval != l.interval {
// 时间间隔赋值
l.interval = interval
// 停掉原有定时器,通过 tickerDone 发送信号
l.stopTicker()
// 重启新的定时器,异步
go l.startTicker(interval)
}
}
// Acquire one token.
// Acquire 获取令牌,令牌数 -1
// 返回值表示是否放行
func (l *qpsLimiter) Acquire(ctx context.Context) bool {
if atomic.LoadInt32(&l.tokens) <= 0 {
return false
}
return atomic.AddInt32(&l.tokens, -1) >= 0
}
// Status returns the current status.
// 返回当前状态
func (l *qpsLimiter) Status(ctx context.Context) (max, cur int, interval time.Duration) {
max = int(atomic.LoadInt32(&l.limit))
cur = int(atomic.LoadInt32(&l.tokens))
interval = l.interval
return
}
// startTicker
// 启动定时器,周期性的放令牌
func (l *qpsLimiter) startTicker(interval time.Duration) {
l.ticker = time.NewTicker(interval)
defer l.ticker.Stop()
l.tickerDone = make(chan bool, 1)
tc := l.ticker.C
td := l.tickerDone
// ticker and tickerDone can be reset, cannot use l.ticker or l.tickerDone directly
for {
select {
case <-tc:
// 匀速往桶里放令牌的关键点
l.updateToken()
case <-td:
// 收到停止信号
return
}
}
}
// stopTicker 停止定时器
func (l *qpsLimiter) stopTicker() {
if l.tickerDone == nil {
return
}
select {
case l.tickerDone <- true:
default:
}
}
// Some deviation is allowed here to gain better performance.
// 更新令牌,关键代码,周期性触发这里,尽量保证每次都放 once 个令牌
func (l *qpsLimiter) updateToken() {
// 令牌足够,无需更新
if atomic.LoadInt32(&l.limit) < atomic.LoadInt32(&l.tokens) {
return
}
// 单次要放的令牌数
once := atomic.LoadInt32(&l.once)
// 当前规则最多能放的令牌数
delta := atomic.LoadInt32(&l.limit) - atomic.LoadInt32(&l.tokens)
// 如果最多放的令牌大于我们应该匀速放的 once 个数,我们还按匀速来
if delta > once || delta < 0 {
delta = once
}
// 放入令牌
newTokens := atomic.AddInt32(&l.tokens, delta)
// 因为并发,在上一行的执行期间可能 -n,当前令牌数不足 once 的,要放够
if newTokens < once {
atomic.StoreInt32(&l.tokens, once)
}
}
// 计算匀速要放的令牌数
func calcOnce(interval time.Duration, limit int) int32 {
if interval > time.Second {
interval = time.Second
}
once := int32(float64(limit) / (fixedWindowTime.Seconds() / interval.Seconds()))
if once < 0 {
once = 0
}
return once
}
func (l *qpsLimiter) resetTokens(once int32) {
if atomic.LoadInt32(&l.tokens) > once {
atomic.StoreInt32(&l.tokens, once)
}
}
复制代码
进阶用法
限流定义了 LimitReporter
接口,用于限流状态监控,例如当前连接数过多、QPS 过大等。
如有需求,用户需要自行实现该接口,并通过 WithLimitReporter
注入。
// 实现这个接口
type LimitReporter interface {
ConnOverloadReport()
QPSOverloadReport()
}
// 例如
type myLimitReporter struct{}
func (m *myLimitReporter) ConnOverloadReport() {
// 并发限制处理逻辑
}
func (m *myLimitReporter) QPSOverloadReport() {
// QPS 超过限制处理逻辑
}
// 最后,我们在服务启动时设置,比如:
svr := echo.NewServer(new(EchoImpl), server.WithLimitReporter(&myLimitReporter{}), server.WithLimit(&limit.Option{MaxConnections: 10, MaxQPS: 100}))
if err := svr.Run(); err != nil {
klog.Error("server stopped with error:", err)
}
复制代码
重磅:支持自定义限流规则
Kitex 6 月 2 日发布了 v0.3.2
的版本,新增扩展支持了外部限流器的实现,在代码中可通过WithConcurrencyLimiter
和 WithQPSLimiter
传入限流器。可以参看 issues #431 的代码提交。发布这项功能也是为方便 Kitex 用户上云,计划对接腾讯的开源服务治理平台 Polaris 的一项支持。另外对于普通用户的意义就是我们可以自定义方法级别的限流,甚至可以做分布式限流器。
文件 kitex/server/option.go
中
扩展方式如下:
func (m *myQPSLimiter) Acquire(ctx context.Context) bool {
// 通过 rpcinfo.GetRPCInfo(ctx) 获取调用信息
// ...
return true
}
func (m *myQPSLimiter) Status(ctx context.Context) (max, current int, interval time.Duration) {
// ...
return 1, 0, time.Millisecond
}
svr := echo.NewServer(new(EchoImpl), server.WithQPSLimiter(&myQPSLimiter{}))
if err := svr.Run(); err != nil {
klog.Error("server stopped with error:", err)
}
复制代码
参考
限流 | CloudWeGo
golang.org/x/time
qpsLimiter.updateToken() 方法的误差可能过大
feat: extend outside limiter implementation and fix problems of rate limiter of multiplexed server
Integrate Polaris go sdk to support their service governance ability.
评论