Kitex
如果你还不了解 Kitex,可以看看《[ CloudWeGo 微服务实践 - 01 ] 开篇》。
如果你想参与 CloudWeGo 社区,给 Kitex 和其他组件贡献代码,可以看《如何给 CloudWeGo 做贡献》。
CloudWeGo 源码解读
Kitex 是 CloudWeGo 微服务中间件集合的一部分,已经把所有文章汇总到了《CloudWeGo 源码解读》。
服务发现
服务注册与服务发现是 RPC 框架中最重要的组件,地位仅次于加解密与通信的实现。在 Kitex 中,注册中心已经支持了主流的各个组件,如 registry-etcd、registry-nacos、registry-zookeeper、registry-polaris、registry-eureka、registry-consul 等。
服务发现一般有两种做法:
在 RPC 框架中,服务发现一般都是在客户端做发现,选择服务节点,这种更常见一些。Kitex 即是如此。
用法
我们以 nacos 为例,代码如下:
https://github.com/kitex-contrib/registry-nacos/blob/main/example/basic/client/main.go#L28
func main() {
// 通过默认配置获取一个服务发现组件的实例
// 方法内部有通过默认配置创建一个 nacos 客户端,方便后续与 nacos 交互
r, err := resolver.NewDefaultNacosResolver()
if err != nil {
panic(err)
}
newClient := hello.MustNewClient(
"Hello",
// 以 WithOption 的方式服务发现实例
client.WithResolver(r),
client.WithRPCTimeout(time.Second*3),
)
for {
resp, err := newClient.Echo(context.Background(), &api.Request{Message: "Hello"})
if err != nil {
log.Fatal(err)
}
log.Println(resp)
time.Sleep(time.Second)
}
}
复制代码
探索过程
实验条件、环境准备均和上节《[ Kitex 源码解读 ] 服务注册》保持一致。
启动服务端
我们运行的案例是 custom-config 的 demo。这里的 nacos 客户端参数都可配置,方便调试。
go run example/custom-config/server/main.go
复制代码
启动客户端
go run example/custom-config/client/main.go
复制代码
有日志能看到,我们成功启动了客户端,并能够正确请求了目标服务。
源码分析
client.WithResolver
我们的着手点仍然是从一个 WithOption 的配置开始,client.WithResolver,我们能看到最后把一个服务发现实例放到了 client 的 options 结构体中:
再往下走,我们看到服务发现的解析器在 189 行的地方有用到(我们先不看 165 行 WithProxy 的分支)。
由此我们来到了 kClient.initLBCache 的地方,这里是在初始化一个服务的客户端时调用的。为了方便大家理解,接下来我再分两部分去分析:
NewClient 时,初始化时;
远程调用时,middleware 里的服务发现处理;
NewClient 初始化
NewClient 时,接收到 WithResolver 的设置,接下来做初始化相关工作:
initLBCache
type kClient struct {
svcInfo *serviceinfo.ServiceInfo
mws []endpoint.Middleware
eps endpoint.Endpoint
sEps endpoint.Endpoint
opt *client.Options
lbf *lbcache.BalancerFactory
inited bool
closed bool
}
复制代码
type BalancerFactory struct {
Hookable
opts Options
cache sync.Map // key -> LoadBalancer
resolver discovery.Resolver
balancer loadbalance.Loadbalancer
rebalancer loadbalance.Rebalancer
sfg singleflight.Group
}
复制代码
func (kc *kClient) initLBCache() error {
resolver := kc.opt.Resolver
if resolver == nil {
// 生成一个携带 kerrors.ErrNoResolver 错误的 resolver
}
balancer := kc.opt.Balancer
// 默认使用基于权重的负载均衡
// 参看 https://www.cloudwego.io/zh/docs/kitex/tutorials/basic-feature/loadbalance/
if balancer == nil {
balancer = loadbalance.NewWeightedBalancer()
}
cacheOpts := lbcache.Options{DiagnosisService: kc.opt.DebugService}
if kc.opt.BalancerCacheOpt != nil {
cacheOpts = *kc.opt.BalancerCacheOpt
}
// 1、生成一个 BalancerFactory 对象;
// 2、检查 cacheOpts 刷新时间间隔设置,若没有则默认 5s
// 3、检查 cacheOpts 缓存过期时间设置,若没有则默认 15s
// 4、异步协程定期标记服务已发现的信息过期,具体在在 BalancerFactory.cache 字段
// 5、最后将 resolver + balancer + cacheOpts.RefreshInterval + cacheOpts.ExpireInterval 为 key,
// 缓存到 balancerFactories 的 sync.Map 上
kc.lbf = lbcache.NewBalancerFactory(resolver, balancer, cacheOpts)
// ...
return nil
}
func NewBalancerFactory(resolver discovery.Resolver, balancer loadbalance.Loadbalancer, opts Options) *BalancerFactory {
// 检查刷新时间间隔,若没有则默认 5s
// 检查缓存过期时间,若没有则默认 15s
opts.check()
// 构建一个当前负载均衡规则的缓存 key
uniqueKey := cacheKey(resolver.Name(), balancer.Name(), opts)
// 规则缓存是否存在
val, ok := balancerFactories.Load(uniqueKey)
if ok {
return val.(*BalancerFactory)
}
// 若没有则缓存一次
val, _, _ = balancerFactoriesSfg.Do(uniqueKey, func() (interface{}, error) {
b := &BalancerFactory{
opts: opts,
resolver: resolver,
balancer: balancer,
}
if rb, ok := balancer.(loadbalance.Rebalancer); ok {
hrb := newHookRebalancer(rb)
b.rebalancer = hrb
b.Hookable = hrb
} else {
b.Hookable = noopHookRebalancer{}
}
// 根据 opts.ExpireInterval 定期清理已发现服务的缓存
// 主要操作 BalancerFactory.cache
go b.watcher()
balancerFactories.Store(uniqueKey, b)
return b, nil
})
return val.(*BalancerFactory)
}
复制代码
initMiddlewares
// Middleware deal with input Endpoint and output Endpoint.
type Middleware func(Endpoint) Endpoint
// 在这里,我们只需要知道 newResolveMWBuilder 返回了 endpoint.Middleware 的函数即可
// 具体逻辑我们稍后再分析
func (kc *kClient) initMiddlewares(ctx context.Context) {
// ...
if kc.opt.Proxy == nil {
// 自定义服务发现
kc.mws = append(kc.mws, newResolveMWBuilder(kc.lbf)(ctx))
// ...
} else {
if kc.opt.Resolver != nil { // 自定义服务发现
kc.mws = append(kc.mws, newResolveMWBuilder(kc.lbf)(ctx))
}
kc.mws = append(kc.mws, newProxyMW(kc.opt.Proxy))
}
// ...
}
// 经过以上初始化过程,是组装了一系列需要用到的 Middleware 到 kClient.mws 这个切片里
复制代码
buildInvokeChain
func (kc *kClient) buildInvokeChain() error {
// 构建一个发送请求并接收响应的 Endpoint,主要与远端交互
innerHandlerEp, err := kc.invokeHandleEndpoint()
if err != nil {
return err
}
// 把调用处理逻辑与上一步处理好的所有 Middleware 串联起来,放在 kClient.eps 里
kc.eps = endpoint.Chain(kc.mws...)(innerHandlerEp)
// ...
return nil
}
复制代码
小结
WithResolver
接收参数
初始化 NewBalancerFactory
赋值给 kClient.lbf
,并异步定时标记缓存过期 BalancerFactory.cache
NewBalancerFactory
做的缓存更多是配置和相关对象的缓存,还没有做服务发现和服务节点选取
将 kClient.lbf
经过函数 newResolveMWBuilder
添加到 kClient.mws
将 kClient.mws
串联所有 endpoint.Endpoint
到 kClient.eps
, 待后续 RPC 调用时顺序执行
middleware 服务发现
kClient.Call
我们接上边的内容,再初始化后,我们看到所有的处理都放在了 kClient.eps
里。我们可以从这里入手,看下这里的这个 endpoint.Endpoint
函数什么地方被调用了:
在截图里的 311 行和 334 行,我们分别点进去,其实都在一个函数里,是不同的分支:
走到这里 kClient.Call,其实如果往上追,也能发现,这里是被 kitex_gen 生成的客户端代码实际发起 RPC 调用的地方:
newResolveMWBuilder
在刚刚我们分析 kClient.initMiddlewares
的时候,我们看到那里有调用 newResolveMWBuilder
的函数生成了一个 endpoint.Middleware
的函数,其实也是放在 kClient.Call
的 kc.ops
里执行了。这里我们就单独看看到底执行了什么,我们把主要代码拿出来:
type RPCInfo interface {
From() EndpointInfo
To() EndpointInfo
Invocation() Invocation
Config() RPCConfig
Stats() RPCStats
}
type Balancer struct {
b *BalancerFactory
target string // a description returned from the resolver's Target method
res atomic.Value // newest and previous discovery result
expire int32 // 0 = normal, 1 = expire and collect next ticker
sharedTicker *sharedTicker
}
// 获取到 RPC 相关信息
rpcInfo := rpcinfo.GetRPCInfo(ctx)
dest := rpcInfo.To()
if dest == nil {
return kerrors.ErrNoDestService
}
remote := remoteinfo.AsRemoteInfo(dest)
if remote == nil {
err := fmt.Errorf("unsupported target EndpointInfo type: %T", dest)
return kerrors.ErrInternalException.WithCause(err)
}
if remote.GetInstance() != nil {
return next(ctx, request, response)
}
// 1、从 BalancerFactory 获取目标服务对应的均衡器 Balancer 等信息
// 2、这里也是有缓存的,刷新机制是根据设置的 cacheOpts.RefreshInterval 时间定时刷新
// 参照 Balancer.refresh 的代码(https://github.com/cloudwego/kitex/blob/develop/pkg/loadbalance/lbcache/cache.go#L184)
// 3、根据不同的 dest 可以得到不同的 Balancer
lb, err := lbf.Get(ctx, dest)
if err != nil {
return kerrors.ErrServiceDiscovery.WithCause(err)
}
// picker 是 BalancerFactory.balancer 设置的均衡器对应的
// picker 的 主要逻辑是从一组服务实例信息中选取其中一个,如默认的 WeightedRandom 基于权重的随机
picker := lb.GetPicker()
if r, ok := picker.(internal.Reusable); ok {
defer r.Recycle()
}
var lastErr error
for {
select {
case <-ctx.Done():
return kerrors.ErrRPCTimeout
default:
}
// 具体的选取其中一个节点,代码例如可以参考: randomPicker.Next
// https://github.com/cloudwego/kitex/blob/develop/pkg/loadbalance/weighted_random.go#L56
ins := picker.Next(ctx, request)
if ins == nil {
return kerrors.ErrNoMoreInstance.WithCause(fmt.Errorf("last error: %w", lastErr))
}
remote.SetInstance(ins)
if err = next(ctx, request, response); err != nil && retryable(err) {
lastErr = err
continue
}
return err
}
复制代码
// 根据目标服务信息获取对应的解析策略
func (b *BalancerFactory) Get(ctx context.Context, target rpcinfo.EndpointInfo) (*Balancer, error) {
desc := b.resolver.Target(ctx, target)
val, ok := b.cache.Load(desc)
if ok {
return val.(*Balancer), nil
}
val, err, _ := b.sfg.Do(desc, func() (interface{}, error) {
// 从注册中心获取服务实例列表
res, err := b.resolver.Resolve(ctx, desc)
if err != nil {
return nil, err
}
// 重命名发现结果的缓存 key
renameResultCacheKey(&res, b.resolver.Name())
bl := &Balancer{
b: b,
target: desc,
}
bl.res.Store(res)
// 定时刷新服务发现的结果,刷新逻辑在 sharedTicker.add 中开了一个协程
bl.sharedTicker = getSharedTicker(bl, b.opts.RefreshInterval)
b.cache.Store(desc, bl)
return bl, nil
})
if err != nil {
return nil, err
}
return val.(*Balancer), nil
}
复制代码
小结
kClient.Call
会顺序执行所有的 endpoint.Middleware
,包括上边 newResolveMWBuilder
服务发现重要逻辑
lbf.Get(ctx, dest)
获取均衡策略和配置,并且得到一组服务实例
lb.GetPicker()
则是根据均衡规则获取一个实例选择器,如进行一些配置和赋值,并且已经拿到一组服务实例
Picker.Next
则是实例选择器具体的算法执行,拿到具体的一个服务实例
定制扩展
官网有服务发现扩展,如果想了解具体的实践,可以参考之前写过的《[ CloudWeGo 微服务实践 - 08 ] Nacos 服务发现扩展 (2)》
总结
以上,利用 IDE,一起梳理了 Kitex 关于服务发现的主干逻辑,还有一些细节处理也是值得揣摩的。如果你对以上内容有不同的认识和要讨论的问题,都可以留言评论。
评论 (2 条评论)