写点什么

[ Kitex 源码解读 ] 服务发现

作者:baiyutang
  • 2022 年 7 月 26 日
  • 本文字数:5164 字

    阅读完需:约 17 分钟

[ Kitex 源码解读 ] 服务发现

Kitex

如果你还不了解 Kitex,可以看看《[ CloudWeGo 微服务实践 - 01 ] 开篇》。

如果你想参与 CloudWeGo 社区,给 Kitex 和其他组件贡献代码,可以看《如何给 CloudWeGo 做贡献》。

CloudWeGo 源码解读

Kitex 是 CloudWeGo 微服务中间件集合的一部分,已经把所有文章汇总到了《CloudWeGo 源码解读》。

服务发现

服务注册与服务发现是 RPC 框架中最重要的组件,地位仅次于加解密与通信的实现。在 Kitex 中,注册中心已经支持了主流的各个组件,如 registry-etcdregistry-nacosregistry-zookeeperregistry-polarisregistry-eurekaregistry-consul 等。


服务发现一般有两种做法:

  • 客户端做,由客户端从注册中心获取一个或一组(再按照算法选择其一);

  • 在服务端做,客户端直接请求一个地址,再做负载均衡转发到其中一个服务节点上,例如 k8s 的 service 。


在 RPC 框架中,服务发现一般都是在客户端做发现,选择服务节点,这种更常见一些。Kitex 即是如此。

用法

我们以 nacos 为例,代码如下:

https://github.com/kitex-contrib/registry-nacos/blob/main/example/basic/client/main.go#L28


func main() { // 通过默认配置获取一个服务发现组件的实例 // 方法内部有通过默认配置创建一个 nacos 客户端,方便后续与 nacos 交互 r, err := resolver.NewDefaultNacosResolver() if err != nil { panic(err) } newClient := hello.MustNewClient( "Hello", // 以 WithOption 的方式服务发现实例 client.WithResolver(r), client.WithRPCTimeout(time.Second*3), ) for { resp, err := newClient.Echo(context.Background(), &api.Request{Message: "Hello"}) if err != nil { log.Fatal(err) } log.Println(resp) time.Sleep(time.Second) }}
复制代码


探索过程

实验条件、环境准备均和上节《[ Kitex 源码解读 ] 服务注册》保持一致。

启动服务端

我们运行的案例是 custom-config 的 demo。这里的 nacos 客户端参数都可配置,方便调试。

 go run example/custom-config/server/main.go
复制代码



启动客户端

 go run example/custom-config/client/main.go
复制代码


有日志能看到,我们成功启动了客户端,并能够正确请求了目标服务。

源码分析

client.WithResolver

我们的着手点仍然是从一个 WithOption 的配置开始,client.WithResolver,我们能看到最后把一个服务发现实例放到了 client 的 options 结构体中:


再往下走,我们看到服务发现的解析器在 189 行的地方有用到(我们先不看 165 行 WithProxy 的分支)。



由此我们来到了 kClient.initLBCache 的地方,这里是在初始化一个服务的客户端时调用的。为了方便大家理解,接下来我再分两部分去分析:

  1. NewClient 时,初始化时;

  2. 远程调用时,middleware 里的服务发现处理;

NewClient 初始化

NewClient 时,接收到 WithResolver 的设置,接下来做初始化相关工作:


initLBCache

type kClient struct {	svcInfo *serviceinfo.ServiceInfo	mws     []endpoint.Middleware	eps     endpoint.Endpoint	sEps    endpoint.Endpoint	opt     *client.Options	lbf     *lbcache.BalancerFactory
inited bool closed bool}
复制代码


type BalancerFactory struct {	Hookable	opts       Options	cache      sync.Map // key -> LoadBalancer	resolver   discovery.Resolver	balancer   loadbalance.Loadbalancer	rebalancer loadbalance.Rebalancer	sfg        singleflight.Group}
复制代码


func (kc *kClient) initLBCache() error {	resolver := kc.opt.Resolver	if resolver == nil {		// 生成一个携带 kerrors.ErrNoResolver 错误的 resolver	}
balancer := kc.opt.Balancer
// 默认使用基于权重的负载均衡 // 参看 https://www.cloudwego.io/zh/docs/kitex/tutorials/basic-feature/loadbalance/ if balancer == nil { balancer = loadbalance.NewWeightedBalancer() }
cacheOpts := lbcache.Options{DiagnosisService: kc.opt.DebugService} if kc.opt.BalancerCacheOpt != nil { cacheOpts = *kc.opt.BalancerCacheOpt }
// 1、生成一个 BalancerFactory 对象; // 2、检查 cacheOpts 刷新时间间隔设置,若没有则默认 5s // 3、检查 cacheOpts 缓存过期时间设置,若没有则默认 15s // 4、异步协程定期标记服务已发现的信息过期,具体在在 BalancerFactory.cache 字段 // 5、最后将 resolver + balancer + cacheOpts.RefreshInterval + cacheOpts.ExpireInterval 为 key, // 缓存到 balancerFactories 的 sync.Map 上 kc.lbf = lbcache.NewBalancerFactory(resolver, balancer, cacheOpts)
// ... return nil}

func NewBalancerFactory(resolver discovery.Resolver, balancer loadbalance.Loadbalancer, opts Options) *BalancerFactory { // 检查刷新时间间隔,若没有则默认 5s // 检查缓存过期时间,若没有则默认 15s opts.check()
// 构建一个当前负载均衡规则的缓存 key uniqueKey := cacheKey(resolver.Name(), balancer.Name(), opts)
// 规则缓存是否存在 val, ok := balancerFactories.Load(uniqueKey) if ok { return val.(*BalancerFactory) }
// 若没有则缓存一次 val, _, _ = balancerFactoriesSfg.Do(uniqueKey, func() (interface{}, error) { b := &BalancerFactory{ opts: opts, resolver: resolver, balancer: balancer, } if rb, ok := balancer.(loadbalance.Rebalancer); ok { hrb := newHookRebalancer(rb) b.rebalancer = hrb b.Hookable = hrb } else { b.Hookable = noopHookRebalancer{} }
// 根据 opts.ExpireInterval 定期清理已发现服务的缓存 // 主要操作 BalancerFactory.cache go b.watcher()
balancerFactories.Store(uniqueKey, b) return b, nil })
return val.(*BalancerFactory)}
复制代码

initMiddlewares

// Middleware deal with input Endpoint and output Endpoint.type Middleware func(Endpoint) Endpoint
// 在这里,我们只需要知道 newResolveMWBuilder 返回了 endpoint.Middleware 的函数即可// 具体逻辑我们稍后再分析func (kc *kClient) initMiddlewares(ctx context.Context) { // ... if kc.opt.Proxy == nil { // 自定义服务发现 kc.mws = append(kc.mws, newResolveMWBuilder(kc.lbf)(ctx)) // ... } else { if kc.opt.Resolver != nil { // 自定义服务发现 kc.mws = append(kc.mws, newResolveMWBuilder(kc.lbf)(ctx)) } kc.mws = append(kc.mws, newProxyMW(kc.opt.Proxy)) } // ...}
// 经过以上初始化过程,是组装了一系列需要用到的 Middleware 到 kClient.mws 这个切片里
复制代码

buildInvokeChain

func (kc *kClient) buildInvokeChain() error {	// 构建一个发送请求并接收响应的 Endpoint,主要与远端交互	innerHandlerEp, err := kc.invokeHandleEndpoint()	if err != nil {		return err	}
// 把调用处理逻辑与上一步处理好的所有 Middleware 串联起来,放在 kClient.eps 里 kc.eps = endpoint.Chain(kc.mws...)(innerHandlerEp)
// ... return nil}
复制代码

小结

  1. WithResolver 接收参数

  2. 初始化 NewBalancerFactory 赋值给 kClient.lbf,并异步定时标记缓存过期 BalancerFactory.cache

  3. NewBalancerFactory 做的缓存更多是配置和相关对象的缓存,还没有做服务发现和服务节点选取

  4. kClient.lbf 经过函数 newResolveMWBuilder 添加到 kClient.mws

  5. kClient.mws 串联所有 endpoint.Endpoint kClient.eps, 待后续 RPC 调用时顺序执行

middleware 服务发现

kClient.Call

我们接上边的内容,再初始化后,我们看到所有的处理都放在了 kClient.eps 里。我们可以从这里入手,看下这里的这个 endpoint.Endpoint 函数什么地方被调用了:

在截图里的 311 行和 334 行,我们分别点进去,其实都在一个函数里,是不同的分支:


走到这里 kClient.Call,其实如果往上追,也能发现,这里是被 kitex_gen 生成的客户端代码实际发起 RPC 调用的地方:

newResolveMWBuilder

在刚刚我们分析 kClient.initMiddlewares 的时候,我们看到那里有调用 newResolveMWBuilder 的函数生成了一个 endpoint.Middleware 的函数,其实也是放在 kClient.Callkc.ops 里执行了。这里我们就单独看看到底执行了什么,我们把主要代码拿出来:

type RPCInfo interface {	From() EndpointInfo	To() EndpointInfo	Invocation() Invocation	Config() RPCConfig	Stats() RPCStats}
type Balancer struct { b *BalancerFactory target string // a description returned from the resolver's Target method res atomic.Value // newest and previous discovery result expire int32 // 0 = normal, 1 = expire and collect next ticker sharedTicker *sharedTicker}
// 获取到 RPC 相关信息 rpcInfo := rpcinfo.GetRPCInfo(ctx) dest := rpcInfo.To() if dest == nil { return kerrors.ErrNoDestService }
remote := remoteinfo.AsRemoteInfo(dest) if remote == nil { err := fmt.Errorf("unsupported target EndpointInfo type: %T", dest) return kerrors.ErrInternalException.WithCause(err) } if remote.GetInstance() != nil { return next(ctx, request, response) }
// 1、从 BalancerFactory 获取目标服务对应的均衡器 Balancer 等信息 // 2、这里也是有缓存的,刷新机制是根据设置的 cacheOpts.RefreshInterval 时间定时刷新 // 参照 Balancer.refresh 的代码(https://github.com/cloudwego/kitex/blob/develop/pkg/loadbalance/lbcache/cache.go#L184) // 3、根据不同的 dest 可以得到不同的 Balancer lb, err := lbf.Get(ctx, dest) if err != nil { return kerrors.ErrServiceDiscovery.WithCause(err) }
// picker 是 BalancerFactory.balancer 设置的均衡器对应的 // picker 的 主要逻辑是从一组服务实例信息中选取其中一个,如默认的 WeightedRandom 基于权重的随机 picker := lb.GetPicker() if r, ok := picker.(internal.Reusable); ok { defer r.Recycle() } var lastErr error for { select { case <-ctx.Done(): return kerrors.ErrRPCTimeout default: } // 具体的选取其中一个节点,代码例如可以参考: randomPicker.Next // https://github.com/cloudwego/kitex/blob/develop/pkg/loadbalance/weighted_random.go#L56 ins := picker.Next(ctx, request) if ins == nil { return kerrors.ErrNoMoreInstance.WithCause(fmt.Errorf("last error: %w", lastErr)) } remote.SetInstance(ins)
if err = next(ctx, request, response); err != nil && retryable(err) { lastErr = err continue } return err }
复制代码


// 根据目标服务信息获取对应的解析策略func (b *BalancerFactory) Get(ctx context.Context, target rpcinfo.EndpointInfo) (*Balancer, error) {	desc := b.resolver.Target(ctx, target)	val, ok := b.cache.Load(desc)	if ok {		return val.(*Balancer), nil	}
val, err, _ := b.sfg.Do(desc, func() (interface{}, error) { // 从注册中心获取服务实例列表 res, err := b.resolver.Resolve(ctx, desc) if err != nil { return nil, err } // 重命名发现结果的缓存 key renameResultCacheKey(&res, b.resolver.Name()) bl := &Balancer{ b: b, target: desc, } bl.res.Store(res)
// 定时刷新服务发现的结果,刷新逻辑在 sharedTicker.add 中开了一个协程 bl.sharedTicker = getSharedTicker(bl, b.opts.RefreshInterval)
b.cache.Store(desc, bl) return bl, nil }) if err != nil { return nil, err } return val.(*Balancer), nil}
复制代码

小结

  1. kClient.Call 会顺序执行所有的 endpoint.Middleware,包括上边 newResolveMWBuilder 服务发现重要逻辑

  2. lbf.Get(ctx, dest) 获取均衡策略和配置,并且得到一组服务实例

  3. lb.GetPicker() 则是根据均衡规则获取一个实例选择器,如进行一些配置和赋值,并且已经拿到一组服务实例

  4. Picker.Next 则是实例选择器具体的算法执行,拿到具体的一个服务实例

定制扩展

官网有服务发现扩展,如果想了解具体的实践,可以参考之前写过的《[ CloudWeGo 微服务实践 - 08 ] Nacos 服务发现扩展 (2)

总结

以上,利用 IDE,一起梳理了 Kitex 关于服务发现的主干逻辑,还有一些细节处理也是值得揣摩的。如果你对以上内容有不同的认识和要讨论的问题,都可以留言评论。


发布于: 2022 年 07 月 26 日阅读数: 67
用户头像

baiyutang

关注

广州 2017.12.13 加入

CloudWeGo Committer | Microservices | Golang | Cloud Nitive | “Smart work,Not hard” 开源爱好者

评论 (2 条评论)

发布
用户头像
恭喜您!AS 深圳站话题讨论活动的获奖者!联系文字君领奖哦!文字君的微信是infoqwriter
2022 年 07 月 27 日 11:07
回复
救救我这篇文章,好像被风控了
2022 年 07 月 27 日 16:12
回复
没有更多了
[ Kitex 源码解读 ] 服务发现_Go_baiyutang_InfoQ写作社区