Kitex
如果你还不了解 Kitex,可以看看《[ CloudWeGo 微服务实践 - 01 ] 开篇》。
如果你想参与 CloudWeGo 社区,给 Kitex 和其他组件贡献代码,可以看《如何给 CloudWeGo 做贡献》。
CloudWeGo 源码解读
Kitex 是 CloudWeGo 微服务中间件集合的一部分,已经把所有文章汇总到了《CloudWeGo 源码解读》。
服务发现
服务注册与服务发现是 RPC 框架中最重要的组件,地位仅次于加解密与通信的实现。在 Kitex 中,注册中心已经支持了主流的各个组件,如 registry-etcd、registry-nacos、registry-zookeeper、registry-polaris、registry-eureka、registry-consul 等。
服务发现一般有两种做法:
在 RPC 框架中,服务发现一般都是在客户端做发现,选择服务节点,这种更常见一些。Kitex 即是如此。
用法
我们以 nacos 为例,代码如下:
https://github.com/kitex-contrib/registry-nacos/blob/main/example/basic/client/main.go#L28
func main() { // 通过默认配置获取一个服务发现组件的实例 // 方法内部有通过默认配置创建一个 nacos 客户端,方便后续与 nacos 交互 r, err := resolver.NewDefaultNacosResolver() if err != nil { panic(err) } newClient := hello.MustNewClient( "Hello", // 以 WithOption 的方式服务发现实例 client.WithResolver(r), client.WithRPCTimeout(time.Second*3), ) for { resp, err := newClient.Echo(context.Background(), &api.Request{Message: "Hello"}) if err != nil { log.Fatal(err) } log.Println(resp) time.Sleep(time.Second) }}
复制代码
探索过程
实验条件、环境准备均和上节《[ Kitex 源码解读 ] 服务注册》保持一致。
启动服务端
我们运行的案例是 custom-config 的 demo。这里的 nacos 客户端参数都可配置,方便调试。
go run example/custom-config/server/main.go
复制代码
启动客户端
go run example/custom-config/client/main.go
复制代码
有日志能看到,我们成功启动了客户端,并能够正确请求了目标服务。
源码分析
client.WithResolver
我们的着手点仍然是从一个 WithOption 的配置开始,client.WithResolver,我们能看到最后把一个服务发现实例放到了 client 的 options 结构体中:
再往下走,我们看到服务发现的解析器在 189 行的地方有用到(我们先不看 165 行 WithProxy 的分支)。
由此我们来到了 kClient.initLBCache 的地方,这里是在初始化一个服务的客户端时调用的。为了方便大家理解,接下来我再分两部分去分析:
NewClient 时,初始化时;
远程调用时,middleware 里的服务发现处理;
NewClient 初始化
NewClient 时,接收到 WithResolver 的设置,接下来做初始化相关工作:
initLBCache
type kClient struct { svcInfo *serviceinfo.ServiceInfo mws []endpoint.Middleware eps endpoint.Endpoint sEps endpoint.Endpoint opt *client.Options lbf *lbcache.BalancerFactory
inited bool closed bool}
复制代码
type BalancerFactory struct { Hookable opts Options cache sync.Map // key -> LoadBalancer resolver discovery.Resolver balancer loadbalance.Loadbalancer rebalancer loadbalance.Rebalancer sfg singleflight.Group}
复制代码
func (kc *kClient) initLBCache() error { resolver := kc.opt.Resolver if resolver == nil { // 生成一个携带 kerrors.ErrNoResolver 错误的 resolver }
balancer := kc.opt.Balancer
// 默认使用基于权重的负载均衡 // 参看 https://www.cloudwego.io/zh/docs/kitex/tutorials/basic-feature/loadbalance/ if balancer == nil { balancer = loadbalance.NewWeightedBalancer() }
cacheOpts := lbcache.Options{DiagnosisService: kc.opt.DebugService} if kc.opt.BalancerCacheOpt != nil { cacheOpts = *kc.opt.BalancerCacheOpt }
// 1、生成一个 BalancerFactory 对象; // 2、检查 cacheOpts 刷新时间间隔设置,若没有则默认 5s // 3、检查 cacheOpts 缓存过期时间设置,若没有则默认 15s // 4、异步协程定期标记服务已发现的信息过期,具体在在 BalancerFactory.cache 字段 // 5、最后将 resolver + balancer + cacheOpts.RefreshInterval + cacheOpts.ExpireInterval 为 key, // 缓存到 balancerFactories 的 sync.Map 上 kc.lbf = lbcache.NewBalancerFactory(resolver, balancer, cacheOpts)
// ... return nil}
func NewBalancerFactory(resolver discovery.Resolver, balancer loadbalance.Loadbalancer, opts Options) *BalancerFactory { // 检查刷新时间间隔,若没有则默认 5s // 检查缓存过期时间,若没有则默认 15s opts.check()
// 构建一个当前负载均衡规则的缓存 key uniqueKey := cacheKey(resolver.Name(), balancer.Name(), opts)
// 规则缓存是否存在 val, ok := balancerFactories.Load(uniqueKey) if ok { return val.(*BalancerFactory) }
// 若没有则缓存一次 val, _, _ = balancerFactoriesSfg.Do(uniqueKey, func() (interface{}, error) { b := &BalancerFactory{ opts: opts, resolver: resolver, balancer: balancer, } if rb, ok := balancer.(loadbalance.Rebalancer); ok { hrb := newHookRebalancer(rb) b.rebalancer = hrb b.Hookable = hrb } else { b.Hookable = noopHookRebalancer{} }
// 根据 opts.ExpireInterval 定期清理已发现服务的缓存 // 主要操作 BalancerFactory.cache go b.watcher()
balancerFactories.Store(uniqueKey, b) return b, nil })
return val.(*BalancerFactory)}
复制代码
initMiddlewares
// Middleware deal with input Endpoint and output Endpoint.type Middleware func(Endpoint) Endpoint
// 在这里,我们只需要知道 newResolveMWBuilder 返回了 endpoint.Middleware 的函数即可// 具体逻辑我们稍后再分析func (kc *kClient) initMiddlewares(ctx context.Context) { // ... if kc.opt.Proxy == nil { // 自定义服务发现 kc.mws = append(kc.mws, newResolveMWBuilder(kc.lbf)(ctx)) // ... } else { if kc.opt.Resolver != nil { // 自定义服务发现 kc.mws = append(kc.mws, newResolveMWBuilder(kc.lbf)(ctx)) } kc.mws = append(kc.mws, newProxyMW(kc.opt.Proxy)) } // ...}
// 经过以上初始化过程,是组装了一系列需要用到的 Middleware 到 kClient.mws 这个切片里
复制代码
buildInvokeChain
func (kc *kClient) buildInvokeChain() error { // 构建一个发送请求并接收响应的 Endpoint,主要与远端交互 innerHandlerEp, err := kc.invokeHandleEndpoint() if err != nil { return err }
// 把调用处理逻辑与上一步处理好的所有 Middleware 串联起来,放在 kClient.eps 里 kc.eps = endpoint.Chain(kc.mws...)(innerHandlerEp)
// ... return nil}
复制代码
小结
WithResolver 接收参数
初始化 NewBalancerFactory 赋值给 kClient.lbf,并异步定时标记缓存过期 BalancerFactory.cache
NewBalancerFactory 做的缓存更多是配置和相关对象的缓存,还没有做服务发现和服务节点选取
将 kClient.lbf 经过函数 newResolveMWBuilder 添加到 kClient.mws
将 kClient.mws 串联所有 endpoint.Endpoint 到 kClient.eps, 待后续 RPC 调用时顺序执行
middleware 服务发现
kClient.Call
我们接上边的内容,再初始化后,我们看到所有的处理都放在了 kClient.eps 里。我们可以从这里入手,看下这里的这个 endpoint.Endpoint 函数什么地方被调用了:
在截图里的 311 行和 334 行,我们分别点进去,其实都在一个函数里,是不同的分支:
走到这里 kClient.Call,其实如果往上追,也能发现,这里是被 kitex_gen 生成的客户端代码实际发起 RPC 调用的地方:
newResolveMWBuilder
在刚刚我们分析 kClient.initMiddlewares 的时候,我们看到那里有调用 newResolveMWBuilder 的函数生成了一个 endpoint.Middleware 的函数,其实也是放在 kClient.Call 的 kc.ops 里执行了。这里我们就单独看看到底执行了什么,我们把主要代码拿出来:
type RPCInfo interface { From() EndpointInfo To() EndpointInfo Invocation() Invocation Config() RPCConfig Stats() RPCStats}
type Balancer struct { b *BalancerFactory target string // a description returned from the resolver's Target method res atomic.Value // newest and previous discovery result expire int32 // 0 = normal, 1 = expire and collect next ticker sharedTicker *sharedTicker}
// 获取到 RPC 相关信息 rpcInfo := rpcinfo.GetRPCInfo(ctx) dest := rpcInfo.To() if dest == nil { return kerrors.ErrNoDestService }
remote := remoteinfo.AsRemoteInfo(dest) if remote == nil { err := fmt.Errorf("unsupported target EndpointInfo type: %T", dest) return kerrors.ErrInternalException.WithCause(err) } if remote.GetInstance() != nil { return next(ctx, request, response) }
// 1、从 BalancerFactory 获取目标服务对应的均衡器 Balancer 等信息 // 2、这里也是有缓存的,刷新机制是根据设置的 cacheOpts.RefreshInterval 时间定时刷新 // 参照 Balancer.refresh 的代码(https://github.com/cloudwego/kitex/blob/develop/pkg/loadbalance/lbcache/cache.go#L184) // 3、根据不同的 dest 可以得到不同的 Balancer lb, err := lbf.Get(ctx, dest) if err != nil { return kerrors.ErrServiceDiscovery.WithCause(err) }
// picker 是 BalancerFactory.balancer 设置的均衡器对应的 // picker 的 主要逻辑是从一组服务实例信息中选取其中一个,如默认的 WeightedRandom 基于权重的随机 picker := lb.GetPicker() if r, ok := picker.(internal.Reusable); ok { defer r.Recycle() } var lastErr error for { select { case <-ctx.Done(): return kerrors.ErrRPCTimeout default: } // 具体的选取其中一个节点,代码例如可以参考: randomPicker.Next // https://github.com/cloudwego/kitex/blob/develop/pkg/loadbalance/weighted_random.go#L56 ins := picker.Next(ctx, request) if ins == nil { return kerrors.ErrNoMoreInstance.WithCause(fmt.Errorf("last error: %w", lastErr)) } remote.SetInstance(ins)
if err = next(ctx, request, response); err != nil && retryable(err) { lastErr = err continue } return err }
复制代码
// 根据目标服务信息获取对应的解析策略func (b *BalancerFactory) Get(ctx context.Context, target rpcinfo.EndpointInfo) (*Balancer, error) { desc := b.resolver.Target(ctx, target) val, ok := b.cache.Load(desc) if ok { return val.(*Balancer), nil }
val, err, _ := b.sfg.Do(desc, func() (interface{}, error) { // 从注册中心获取服务实例列表 res, err := b.resolver.Resolve(ctx, desc) if err != nil { return nil, err } // 重命名发现结果的缓存 key renameResultCacheKey(&res, b.resolver.Name()) bl := &Balancer{ b: b, target: desc, } bl.res.Store(res)
// 定时刷新服务发现的结果,刷新逻辑在 sharedTicker.add 中开了一个协程 bl.sharedTicker = getSharedTicker(bl, b.opts.RefreshInterval)
b.cache.Store(desc, bl) return bl, nil }) if err != nil { return nil, err } return val.(*Balancer), nil}
复制代码
小结
kClient.Call 会顺序执行所有的 endpoint.Middleware,包括上边 newResolveMWBuilder 服务发现重要逻辑
lbf.Get(ctx, dest) 获取均衡策略和配置,并且得到一组服务实例
lb.GetPicker() 则是根据均衡规则获取一个实例选择器,如进行一些配置和赋值,并且已经拿到一组服务实例
Picker.Next 则是实例选择器具体的算法执行,拿到具体的一个服务实例
定制扩展
官网有服务发现扩展,如果想了解具体的实践,可以参考之前写过的《[ CloudWeGo 微服务实践 - 08 ] Nacos 服务发现扩展 (2)》
总结
以上,利用 IDE,一起梳理了 Kitex 关于服务发现的主干逻辑,还有一些细节处理也是值得揣摩的。如果你对以上内容有不同的认识和要讨论的问题,都可以留言评论。
评论 (2 条评论)