Kitex
如果你还不了解 Kitex,可以看看《[ CloudWeGo 微服务实践 - 01 ] 开篇》。
如果你想参与 CloudWeGo 社区,给 Kitex 和其他组件贡献代码,可以看《如何给 CloudWeGo 做贡献》。
CloudWeGo 源码解读
Kitex 是 CloudWeGo 微服务中间件集合的一部分,已经把所有文章汇总到了《CloudWeGo 源码解读》。
熔断
在进行 RPC 调用时,下游服务难免会出错;
当下游出现问题时,如果上游继续对其进行调用,既妨碍了下游的恢复,也浪费了上游的资源;
为了解决这个问题,使用熔断器,自动化的解决这个问题。
这里是一篇更详细的熔断器介绍。
比较出名的熔断器当属 hystrix 了,这里是它的设计文档。
以上是官方文档《熔断器》,一些介绍,文档中也比较详细的描述了其作用和统计方法。
补充说明下:熔断器是在客户端的,在发现下游服务端返回较多错误时,对下游服务采取的主动保护策略。
探索过程
实验条件
我们以 kitex-example/basic 的代码示例进行改造。
代码改造
basic/client
func main() {
c, err := echo.NewClient(
"echo",
client.WithHostPorts("[::1]:8888"),
// genKey 传 nil 即可,内部判断会使用默认策略,并且这里是 服务粒度的熔断器
client.WithCircuitBreaker(circuitbreak.NewCBSuite(nil)),
)
if err != nil {
log.Fatal(err)
}
for {
req := &api.Request{Message: "my request"}
resp, err := c.Echo(context.Background(), req)
if err != nil {
klog.Error(err)
}
if resp != nil {
klog.Info(resp)
}
}
}
复制代码
basic/server
var _ api.Echo = &EchoImpl{}
// EchoImpl implements the last service interface defined in the IDL.
type EchoImpl struct{}
// Echo implements the Echo interface.
func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Response, err error) {
return nil, errors.New("something is wrong")
}
func main() {
svr := echo.NewServer(new(EchoImpl))
if err := svr.Run(); err != nil {
log.Println("server stopped with error:", err)
} else {
log.Println("server stopped")
}
}
复制代码
启动服务端
go run basic/server/main.go
复制代码
启动客户端
go run basic/client/main.go
复制代码
从日志上也能大概看出,在启动大概 10s 后,熔断器起作用了。
那么到底这里是怎么实现的呢?我们尝试去找到代码。
源码分析
服务粒度熔断器
client.WithCircuitBreaker
我们的着手点仍然是从一个 WithOption 的配置开始,client.WithCircuitBreaker:
initMiddlewares
通过上一步的 option 可选参数的传入,真正初始化一个客户端连接时需要再初始化一下,然后构建一个熔断器的 middleware
。
具体的处理链路:NewClient -> kClient.init -> kClient.initCircuitBreaker -> kClient.initMiddlewares
initCircuitBreaker
这里是一个小的代码分支,主要是订阅服务发现的事件,如果熔断器设置了按照实例粒度熔断,发现有服务实例被下线删除了,对应的熔断器也删除。
func (kc *kClient) initCircuitBreaker() error {
if kc.opt.CBSuite != nil {
kc.opt.CBSuite.SetEventBusAndQueue(kc.opt.Bus, kc.opt.Events)
}
return nil
}
// SetEventBusAndQueue 订阅服务发现变更事件
func (s *CBSuite) SetEventBusAndQueue(bus event.Bus, events event.Queue) {
s.events = events
if bus != nil {
bus.Watch(discovery.ChangeEventName, s.discoveryChangeHandler)
}
}
// discoveryChangeHandler 服务发现变更事件对应熔断器的处理逻辑
func (s *CBSuite) discoveryChangeHandler(e *event.Event) {
if s.instancePanel == nil {
return
}
extra := e.Extra.(*discovery.Change)
for i := range extra.Removed {
instCBKey := extra.Removed[i].Address().String()
s.instancePanel.RemoveBreaker(instCBKey)
}
}
复制代码
initMiddlewares
经过熔断器自身的初始化工作,就来到把熔断器构建到 middleware
里
我们也看到上述选中的一行代码,从命名上我们也能猜到这里是要做服务粒度的熔断器:
kc.opt.CBSuite.ServiceCBMW()
复制代码
CBSuite.ServiceCBMW
接下来 CBSuite.ServiceCBMW
是我们重点关注的点,我们来看看这里是初始化构建服务粒度的熔断器:
// ServiceCBMW 构建服务粒度熔断器的中间件
func (s *CBSuite) ServiceCBMW() endpoint.Middleware {
if s == nil {
return endpoint.DummyMiddleware
}
// 初始化服务粒度熔断器
s.initServiceCB()
// 构建熔断器中间件
return NewCircuitBreakerMW(*s.serviceControl, s.servicePanel)
}
// initServiceCB 初始化服务粒度熔断器
func (s *CBSuite) initServiceCB() {
if s.servicePanel != nil && s.serviceControl != nil {
return
}
// 服务粒度熔断器缓存 key,默认 RPCInfo2Key
if s.genServiceCBKey == nil {
s.genServiceCBKey = RPCInfo2Key
}
// circuitbreaker 是在 github.com/bytedance/gopkg 仓库下对熔断器算法封装的一个包
// gopkg 是字节开源的 Go 工具集
// 所以也可以理解核心算法其实是在 gopkg 仓库里
opts := circuitbreaker.Options{
ShouldTripWithKey: s.svcTripFunc,
}
// 用户操作面板
s.servicePanel, _ = circuitbreaker.NewPanel(s.onServiceStateChange, opts)
// 缓存下改统计key的配置,并且该配置可以通过 UpdateServiceCBConfig 函数动态修改
svcKey := func(ctx context.Context, request interface{}) (serviceCBKey string, enabled bool) {
ri := rpcinfo.GetRPCInfo(ctx)
serviceCBKey = s.genServiceCBKey(ri)
// Enable: true, ErrRate: 0.5, MinSample: 200
cbConfig, _ := s.serviceCBConfig.LoadOrStore(serviceCBKey, defaultCBConfig)
enabled = cbConfig.(CBConfig).Enable
return
}
// 熔断器控制策略
s.serviceControl = &Control{
GetKey: svcKey,
// 判断错误类型
GetErrorType: ErrorTypeOnServiceLevel,
// 修饰后的错误
DecorateError: func(ctx context.Context, request interface{}, err error) error {
return kerrors.ErrServiceCircuitBreak
},
}
}
// NewCircuitBreakerMW 根据给定参数构建一个熔断器中间件
// 该函数作为 middleware,在服务启动时初始化被串联起来,在 RPC 调用时会指定到这里逻辑
func NewCircuitBreakerMW(control Control, panel circuitbreaker.Panel) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request, response interface{}) (err error) {
// 根据控制策略生成 key,并获取配置,判断是否可用
key, enabled := control.GetKey(ctx, request)
if !enabled {
return next(ctx, request, response)
}
// 该统计 key,是否被允许,不允许则报错拦截
// forbidden by circuitbreaker
if !panel.IsAllowed(key) {
return control.DecorateError(ctx, request, kerrors.ErrCircuitBreak)
}
// 允许就进入到下一个中间件
err = next(ctx, request, response)
// 记录统计
RecordStat(ctx, request, response, err, key, &control, panel)
return
}
}
}
复制代码
实例粒度熔断器
代码改造
要对实例粒度熔断器单独进行验证,要通过 WithInstanceMW
的方式使用,改三个地方:
basic/client
func main() {
// 这里只是为了演示,所以借用了 suite
cbs := circuitbreak.NewCBSuite(nil)
c, err := echo.NewClient(
"echo",
client.WithHostPorts("[::1]:8888"),
// 按照官方文档的说法,这里也要用 WithInstanceMW,这里的中间件执行会放在服务发现之后
client.WithInstanceMW(cbs.InstanceCBMW()),
)
if err != nil {
log.Fatal(err)
}
for {
req := &api.Request{Message: "my request"}
resp, err := c.Echo(context.Background(), req)
if err != nil {
klog.Error(err)
}
if resp != nil {
klog.Info(resp)
}
}
}
复制代码
go.mod
我们把 kitex-example 项目依赖的 kitex 包替换成当前目录的,方便改下源码测试,因为实例的服务粒度,默认是连接上的错误才会触发,这种情况我们不太好模拟。
replace github.com/cloudwego/kitex => ../kitex
复制代码
ErrorTypeOnInstanceLevel
在 kitex 仓库中全局搜 ErrorTypeOnInstanceLevel
关键词,能发现如下代码,按照下面代码的说明改造:
// ErrorTypeOnInstanceLevel determines the error type with a instance level criteria.
// Basically, it treats only the connection error as failure.
func ErrorTypeOnInstanceLevel(ctx context.Context, request, response interface{}, err error) ErrorType {
// 原来代码处理:是建连问题才触发一次统计
// if errors.Is(err, kerrors.ErrGetConnection) {
// return TypeFailure
// }
// 改造后:任何一种错误都触发统计,方便我们复现
if err != nil {
return TypeFailure
}
return TypeSuccess
}
复制代码
重新启动客户端
go run basic/client/main.go
复制代码
从日志上也能看出来,在报错一段时间后即触发了熔断。
处理流程
实例粒度熔断器和服务粒度的大部分代码一致,先过一下大致流程:
CBSuite.InstanceCBMW
// InstanceCBMW return a new instance level CircuitBreakerMW.
func (s *CBSuite) InstanceCBMW() endpoint.Middleware {
if s == nil {
return endpoint.DummyMiddleware
}
s.initInstanceCB()
// 后端的构建中间件是一样的
return NewCircuitBreakerMW(*s.instanceControl, s.instancePanel)
}
func (s *CBSuite) initInstanceCB() {
if s.instancePanel != nil && s.instanceControl != nil {
return
}
s.instanceCBConfig = instanceCBConfig{CBConfig: defaultCBConfig}
opts := circuitbreaker.Options{
ShouldTripWithKey: s.insTripFunc,
}
s.instancePanel, _ = circuitbreaker.NewPanel(s.onInstanceStateChange, opts)
// 对比服务粒度的 key,我们记得服务的默认用的 RPCInfo2Key 生成(当然也可以自定义)
instanceKey := func(ctx context.Context, request interface{}) (instCBKey string, enabled bool) {
ri := rpcinfo.GetRPCInfo(ctx)
instCBKey = ri.To().Address().String()
s.instanceCBConfig.RLock()
enabled = s.instanceCBConfig.Enable
s.instanceCBConfig.RUnlock()
return
}
s.instanceControl = &Control{
GetKey: instanceKey,
// 还有这里,这里 ErrorTypeOnInstanceLevel 默认只处理建连失败的才是算到统计里
GetErrorType: ErrorTypeOnInstanceLevel,
DecorateError: func(ctx context.Context, request interface{}, err error) error {
return kerrors.ErrInstanceCircuitBreak
},
}
}
复制代码
client.WithInstanceMW
// WithInstanceMW 添加中间件,在服务发现和负载均衡之后的处理请求
func WithInstanceMW(mw endpoint.Middleware) Option {
imwb := func(ctx context.Context) endpoint.Middleware {
return mw
}
return Option{F: func(o *client.Options, di *utils.Slice) {
di.Push(fmt.Sprintf("WithInstanceMW(%+v)", utils.GetFuncName(mw)))
o.IMWBs = append(o.IMWBs, imwb)
}}
}
IMWBs []endpoint.MiddlewareBuilder
复制代码
initMiddlewares
func (kc *kClient) initMiddlewares(ctx context.Context) {
builderMWs := richMWsWithBuilder(ctx, kc.opt.MWBs)
kc.mws = append(kc.mws, kc.opt.CBSuite.ServiceCBMW(), rpcTimeoutMW(ctx))
kc.mws = append(kc.mws, builderMWs...)
kc.mws = append(kc.mws, acl.NewACLMiddleware(kc.opt.ACLRules))
if kc.opt.Proxy == nil {
// 服务发现和负载均衡,kc.lbf
kc.mws = append(kc.mws, newResolveMWBuilder(kc.lbf)(ctx))
// 这里要说明的是如果是用 CBSuite 构建才会在这里生效
kc.mws = append(kc.mws, kc.opt.CBSuite.InstanceCBMW())
// 这里才是我们上一步 WithInstanceMW 构建的中间件合并到 mws 的地方
kc.mws = append(kc.mws, richMWsWithBuilder(ctx, kc.opt.IMWBs)...)
} else {
if kc.opt.Resolver != nil { // customized service discovery
kc.mws = append(kc.mws, newResolveMWBuilder(kc.lbf)(ctx))
}
kc.mws = append(kc.mws, newProxyMW(kc.opt.Proxy))
}
kc.mws = append(kc.mws, newIOErrorHandleMW(kc.opt.ErrHandle))
}
复制代码
框架自动重试
文档提到 注意,框架自动重试的前提是需要通过 WithInstanceMW 添加,WithInstanceMW 添加的 middleware 会在负载均衡后执行
,这些又是如何处理的呢?
我们知道熔断器和其他处理逻辑都是使用 endpoint.Middleware
的形式,在 resolver 的 mw 里判断的重试错误,如果是可重试的错误,会继续 pick 其他实例重试:
newResolveMWBuilder
我们留意到 picker.Next
之后执行下一个 endpoint.Middleware
,发现了报错,并且报错是 retryable(err)
,就会就如 continue
,重新走到 ins := picker.Next(ctx, request)
选取一个服务实例。
// newResolveMWBuilder
retryable := func(err error) bool {
return errors.Is(err, kerrors.ErrGetConnection) || errors.Is(err, kerrors.ErrCircuitBreak)
}
// NewCircuitBreakerMW
if !panel.IsAllowed(key) {
return control.DecorateError(ctx, request, kerrors.ErrCircuitBreak)
}
复制代码
两类熔断器的异同
相同点
都是主动对下游服务采取的保护措施,发生报错一定频率就不再请求。
最终都复用了 NewCircuitBreakerMW
构建中间件的函数,最后 endpoint.Middleware
其实是同一个函数(但配置不同)。
熔断器关键算法是维护在了 github.com/bytedance/gopkg/circuitbreaker
。
不同点
熔断触发的维度不同:一个是实例粒度,报错 ErrGetConnection
;一个是服务粒度,比如服务端返回了明确的错误,具体处理可以参考 ErrorTypeOnServiceLevel
函数。
用法上:在使用 CBSuite
构建参数时,NewCBSuite
函数指定的 genKey 是默认给服务粒度熔断器用的;client.WithCircuitBreaker
默认会内置服务粒度熔断器和实例粒度熔断器。想要单独使用或自定义配置需要我们自己做参数构建 endpoint.Middleware
。
用法上的不同映射到内部的参数也是不同的,比如 CBSuite
结构体的分组:
endpoint.Middleware
及对应的 endpoint.Endpoint
的执行顺序不同:服务粒度熔断器的判断处理逻辑在服务发现之前;实例粒度熔断器的判断出护理逻辑在服务发现之后;
如果要自定义熔断器,服务粒度熔断器要 client.WithMiddleware(cbMW)
,实例粒度熔断器要 client.WithInstanceMW(cbMW)
。
重要说明
client.WithCircuitBreaker
默认会内置服务粒度熔断器和实例粒度熔断器,两种都会有:默认的熔断阈值是 ErrRate: 0.5, MinSample: 200,错误率达到 50% 触发熔断,同时要求统计量 >200
。
熔断阈值可动态更新:UpdateServiceCBConfig
和 UpdateInstanceCBConfig
。
可通过 circuitbreak.NewCircuitBreakerMW
完全自定义,不过稍微繁琐,kitex-examples/governance/client/circuitbreak 也有例子可参考,需要手动配置 bytedance/gopkg/cloud/circuitbreaker 相关参数。
github.com/bytedance/gopkg/cloud/circuitbreaker 其实是算法关键,但限于篇幅,这里只介绍和 Kitex 实现相关代码逻辑,如果有必要再对其单独成篇进行分析。
总结
经过上述分析,感觉如果要自定义熔断阈值对大多数框架使用者来说还是有一定理解成本,不过可以先参考 kitex-examples/governance/client/circuitbreak 这里的例子,如果还是不是很理解,或者用不好,可以联系社区。如果对这里的实现有好的建议也欢迎留言。
评论