写点什么

Dubbo-go Client 端调用服务过程

发布于: 2020 年 11 月 11 日
Dubbo-go Client端调用服务过程

导读:


有了上一篇文章《Dubbo-go Server 端开启服务过程》的铺垫,可以类比客户端启动于服务端的启动过程。其中最大的区别是服务端通过 zk 注册服务,发布自己的 ivkURL 并订阅事件开启监听;而客户应该是通过 zk 注册组件,拿到需要调用的 serviceURL,更新 invoker 并重写用户的 RPCService,从而实现对远程过程调用细节的封装。


1. 配置文件和客户端源码


1.1 client 配置文件

helloworld 提供的 demo:profiles/client.yaml


registries :  "demoZk":    protocol: "zookeeper"    timeout : "3s"    address: "127.0.0.1:2181"    username: ""    password: ""references:  "UserProvider":    # 可以指定多个registry,使用逗号隔开;不指定默认向所有注册中心注册    registry: "demoZk"    protocol : "dubbo"    interface : "com.ikurento.user.UserProvider"    cluster: "failover"    methods :    - name: "GetUser"      retries: 3
复制代码


可看到配置文件与之前讨论过的 server 端非常类似,其 refrences 部分字段就是对当前服务要主调的服务的配置,其中详细说明了调用协议、注册协议、接口 id、调用方法、集群策略等,这些配置都会在之后与注册组件交互,重写 ivk、调用的过程中使用到。


1.2 客户端使用框架源码

// file: user.gofunc init() {    config.SetConsumerService(userProvider)    hessian.RegisterPOJO(&User{})}
复制代码


// file: main.gofunc main() {    hessian.RegisterPOJO(&User{})    config.Load()    time.Sleep(3e9)    println("\n\n\nstart to test dubbo")    user := &User{}    err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)    if err != nil {      panic(err)    }    println("response result: %v\n", user)    initSignal()}
复制代码

官网提供的 helloworld demo 的源码。可看到与服务端类似,在 user.go 内注册了 rpc-service,以及需要 rpc 传输的结构体 user。


在 main 函数中,同样调用了 config.Load()函数,之后就可以直接通过实现好的 rpc-service:userProvider 直接调用对应的功能函数,即可实现 rpc 调用。


可以猜到,从 hessian 注册结构、SetConsumerService,到调用函数.GetUser()期间,用户定义的 rpc-service 也就是 userProvider 对应的函数被重写,重写后的 GetUser 函数已经包含了实现了远程调用逻辑的 invoker。


接下来,就要通过阅读源码,看看 dubbo-go 是如何做到的。

2. 实现远程过程调用

2.1 加载配置文件


// file:config/config_loader.go :Load()
// Load Dubbo Initfunc Load() { // init router initRouter() // init the global event dispatcher extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType) // start the metadata report if config set if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil { logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err) return } // reference config loadConsumerConfig()
复制代码


在 main 函数中调用的 config.Load()函数,进而调用了 loadConsumerConfig,类似于之前讲到的 server 端配置读入函数。


在 loadConsumerConfig 函数中,进行了三步操作:


// file: config/config_loader.go
func loadConsumerConfig() { // 1 init other consumer config conConfigType := consumerConfig.ConfigType for key, value := range extension.GetDefaultConfigReader() {} checkApplicationName(consumerConfig.ApplicationConfig) configCenterRefreshConsumer() checkRegistries(consumerConfig.Registries, consumerConfig.Registry) // 2 refer-implement-reference for key, ref := range consumerConfig.References { if ref.Generic { genericService := NewGenericService(key) SetConsumerService(genericService) } rpcService := GetConsumerService(key) ref.id = key ref.Refer(rpcService) ref.Implement(rpcService) }
// 3 wait for invoker is available, if wait over default 3s, then panic for {}}
复制代码


1. 检查配置文件并将配置写入内存

2. 在 for 循环内部,依次引用(refer)并且实例化(implement)每个被调 reference。

3. 等待三秒钟所有 invoker 就绪


其中重要的就是 for 循环里面的引用和实例化,两步操作,会在接下来展开讨论。

至此,配置已经被写入了框架。


2.2 获取远程 Service URL,实现可供调用的 invoker


上述的 ref.Refer 完成的就是这部分的操作。



图(一)

2.2.1 构造注册 url

和 server 端类似,存在注册 url 和服务 url,dubbo 习惯将服务 url 作为注册 url 的 sub。


// file: config/reference_config.go: Refer()
func (c *ReferenceConfig) Refer(_ interface{}) { //(一)配置url参数(serviceUrl),将会作为sub cfgURL := common.NewURLWithOptions( common.WithPath(c.id), common.WithProtocol(c.Protocol), common.WithParams(c.getUrlMap()), common.WithParamsValue(constant.BEAN_NAME_KEY, c.id), ) ... // (二)注册地址可以通过url格式给定,也可以通过配置格式给定 // 这一步的意义就是配置->提取信息生成URL if c.Url != "" {// 用户给定url信息,可以是点对点的地址,也可以是注册中心的地址 // 1. user specified URL, could be peer-to-peer address, or register center's address. urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*") for _, urlStr := range urlStrings { serviceUrl, err := common.NewURL(urlStr) ... } } else {// 配置读入注册中心的信息 // assemble SubURL from register center's configuration mode // 这是注册url,protocol = registry,包含了zk的用户名、密码、ip等等 c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER) ... // set url to regUrls for _, regUrl := range c.urls { regUrl.SubURL = cfgURL// regUrl的subURl存当前配置url } } //至此,无论通过什么形式,已经拿到了全部的regURL // (三)获取registryProtocol实例,调用其Refer方法,传入新构建好的regURL if len(c.urls) == 1 { // 这一步访问到registry/protocol/protocol.go registryProtocol.Refer // 这里是registry c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0]) } else { // 如果有多个注册中心,即有多个invoker,则采取集群策略 invokers := make([]protocol.Invoker, 0, len(c.urls)) ... }
复制代码

这个函数中,已经处理完从 Register 配置到 RegisterURL 的转换,即图(一)中部分:

接下来,已经拿到的 url 将被传递给 RegistryProtocol,进一步 refer。

2.2.2 registryProtocol 获取到 zkRegistry 实例,进一步 Refer


// file: registry/protocol/protocol.go: Refer
// Refer provider service from registry center// 拿到的是配置文件registries的url,他能够生成一个invoker = 指向目的addr,以供客户端直接调用。func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { var registryUrl = url // 这里拿到的是referenceConfig,serviceUrl里面包含了Reference的所有信息,包含interfaceName、method等等 var serviceUrl = registryUrl.SubURL if registryUrl.Protocol == constant.REGISTRY_PROTOCOL {// registryUrl.Proto = "registry" protocol := registryUrl.GetParam(constant.REGISTRY_KEY, "") registryUrl.Protocol = protocol//替换成了具体的值,比如"zookeeper" } // 接口对象 var reg registry.Registry // (一)实例化接口对象,缓存策略 if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded { // 缓存中不存在当前registry,新建一个reg reg = getRegistry(&registryUrl) // 缓存起来 proto.registries.Store(registryUrl.Key(), reg) } else { reg = regI.(registry.Registry) } // 到这里,获取到了reg实例 zookeeper的registry //(二)根据Register的实例zkRegistry和传入的regURL新建一个directory // 这一步存在复杂的异步逻辑,从注册中心拿到了目的service的真实addr,获取了invoker并放入directory, // 这一步将在下面详细给出步骤 // new registry directory for store service url from registry directory, err := extension.GetDefaultRegistryDirectory(&registryUrl, reg) if err != nil { logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", serviceUrl.String(), err.Error()) return nil } // (三)DoRegister 在zk上注册当前client service err = reg.Register(*serviceUrl) if err != nil { logger.Errorf("consumer service %v register registry %v error, error message is %s", serviceUrl.String(), registryUrl.String(), err.Error()) } // (四)new cluster invoker,将directory写入集群,获得具有集群策略的invoker cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) invoker := cluster.Join(directory) // invoker保存 proto.invokers = append(proto.invokers, invoker) return invoker}
复制代码


可详细阅读上述注释,这个函数完成了从 url 到 invoker 的全部过程


(一)首先获得 Registry 对象,默认是之前实例化的 zkRegistry,和之前 server 获取 Registry 的处理很类似。


(二)通过构造一个新的 directory,异步拿到之前在 zk 上注册的 server 端信息,生成 invoker


(三)在 zk 上注册当前 service


(四)集群策略,获得最终 invoker


这一步完成了图(一)中所有余下的绝大多数操作,接下来就需要详细的查看 directory 的构造过程:

2.2.3 构造 directory(包含较复杂的异步操作)


图(二)


上述的 extension.GetDefaultRegistryDirectory(&registryUrl, reg)函数,本质上调用了已经注册好的NewRegistryDirectory函数:


// file: registry/directory/directory.go: NewRegistryDirectory()
// NewRegistryDirectory will create a new RegistryDirectory// 这个函数作为default注册在extension上面// url为注册url,reg为zookeeper registryfunc NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) { if url.SubURL == nil { return nil, perrors.Errorf("url is invalid, suburl can not be nil") } dir := &RegistryDirectory{ BaseDirectory: directory.NewBaseDirectory(url), cacheInvokers: []protocol.Invoker{}, cacheInvokersMap: &sync.Map{}, serviceType: url.SubURL.Service(), registry: registry, } dir.consumerConfigurationListener = newConsumerConfigurationListener(dir) go dir.subscribe(url.SubURL) return dir, nil}
复制代码

首先构造了一个注册 directory,开启协程调用其 subscribe 函数,传入 serviceURL。


这个 directory 目前包含了对应的 zkRegistry,以及传入的 URL,他 cacheInvokers 的部分是空的。


进入 dir.subscribe(url.SubURL)这个异步函数:


// file: registry/directory/directory.go: subscribe()
// subscribe from registryfunc (dir *RegistryDirectory) subscribe(url *common.URL) { // 增加两个监听, dir.consumerConfigurationListener.addNotifyListener(dir) dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url) // subscribe调用 dir.registry.Subscribe(url, dir)}
复制代码

重点来了,他调用了 zkRegistry 的 Subscribe 方法,与此同时将自己作为 ConfigListener 传入


我认为这种传入 listener 的设计模式非常值得学习,而且很有 java 的味道。


针对等待 zk 返回订阅信息这样的异步操作,需要传入一个 Listener,这个 Listener 需要实现 Notify 方法,进而在作为参数传入内部之后,可以被异步地调用 Notify,将内部触发的异步事件“传递出来”,再进一步处理加工。


层层的 Listener 事件链,能将传入的原始 serviceURL 通过 zkConn 发送给 zk 服务,获取到服务端注册好的 url 对应的二进制信息。


而 Notify 回调链,则将这串 byte[]一步一步解析、加工;以事件的形式向外传递,最终落到 directory 上的时候,已经是成型的 newInvokers 了。


具体细节不再以源码形式展示,可参照上图查阅源码。


至此已经拿到了 server 端注册好的真实 invoker。


完成了图(一)中的部分:


2.2.4 构造带有集群策略的 clusterinvoker


经过上述操作,已经拿到了 server 端 Invokers,放入了 directory 的 cacheinvokers 数组里面缓存。


后续的操作对应本文 2.2.2 的第四步,由 directory 生成带有特性集群策略的 invoker


// (四)new cluster invoker,将directory写入集群,获得具有集群策略的invoker    cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))    invoker := cluster.Join(directory)123
复制代码

Join 函数的实现就是如下函数:


// file: cluster/clusterimpl/failovercluster_invokers.go: newFailoverClusterInvoker()
func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { return &failoverClusterInvoker{ baseClusterInvoker: newBaseClusterInvoker(directory), }}
复制代码

dubbo-go 框架默认选择 failover 策略,既然返回了一个 invoker,我们查看一下 failoverClusterInvoker 的 Invoker 方法,看他是如何将集群策略封装到 Invoker 函数内部的:


// file: cluster/cluster_impl/failover_cluster_invokers.go: Invoker()
// Invoker 函数func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { ... //调用List方法拿到directory缓存的所有invokers invokers := invoker.directory.List(invocation) if err := invoker.checkInvokers(invokers, invocation); err != nil {// 检查是否可以实现调用 return &protocol.RPCResult{Err: err} } // 获取来自用户方向传入的 methodName := invocation.MethodName() retries := getRetries(invokers, methodName) loadBalance := getLoadBalance(invokers[0], invocation) for i := 0; i <= retries; i++ { // 重要!这里是集群策略的体现,失败后重试! //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if i > 0 { if err := invoker.checkWhetherDestroyed(); err != nil { return &protocol.RPCResult{Err: err} } invokers = invoker.directory.List(invocation) if err := invoker.checkInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } } // 这里是负载均衡策略的体现!选择特定ivk进行调用。 ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked) if ivk == nil { continue } invoked = append(invoked, ivk) //DO INVOKE result = ivk.Invoke(ctx, invocation) if result.Error() != nil { providers = append(providers, ivk.GetUrl().Key()) continue } return result } ...}
复制代码

看了很多 Invoke 函数的实现,所有类似的 Invoker 函数都包含两个方向,一个是用户方向的 invcation,一个是函数方向的底层 invokers。


而集群策略的 invoke 函数本身作为接线员,把 invocation 一步步解析,根据调用需求和集群策略,选择特定的 invoker 来执行


proxy 函数也是这样,一个是用户方向的 ins[] reflect.Type, 一个是函数方向的 invoker。


proxy 函数负责将 ins 转换为 invocation,调用对应 invoker 的 invoker 函数,实现连通。


而出于这样的设计,可以在一步步 Invoker 封装的过程中,每个 Invoker 只关心自己负责操作的部分,从而使整个调用栈解耦。


秒啊!!!


至此,我们理解了 failoverClusterInvoker 的 Invoke 函数实现,也正是和这个集群策略 Invoker 被返回,接受来自上方的调用。


已完成图(一)中的:



2.2.5 在 zookeeper 上注册当前 client


拿到 invokers 后,可以回到这个函数了:


// file: config/refrence_config.go: Refer()
if len(c.urls) == 1 { // 这一步访问到registry/protocol/protocol.go registryProtocol.Refer c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0]) // (一)拿到了真实的invokers } else { // 如果有多个注册中心,即有多个invoker,则采取集群策略 invokers := make([]protocol.Invoker, 0, len(c.urls)) ... cluster := extension.GetCluster(hitClu) // If 'zone-aware' policy select, the invoker wrap sequence would be: // ZoneAwareClusterInvoker(StaticDirectory) -> // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } // (二)create proxy,为函数配置代理 if c.Async { callback := GetCallback(c.id) c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL) } else { // 这里c.invoker已经是目的addr了 c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL) }
复制代码

我们有了可以打通的 invokers,但还不能直接调用,因为 invoker 的入参是 invocation,而调用函数使用的是具体的参数列表。需要通过一层 proxy 来规范入参和出参。


接下来新建一个默认 proxy,放置在 c.proxy 内,以供后续使用


至此,完成了图(一)中最后的操作


2.3 将调用逻辑以代理函数的形式写入 rpc-service

上面完成了 config.Refer 操作

回到

config/config_loader.go: loadConsumerConfig()

下一个重要的函数是 Implement,他完的操作较为简单:旨在使用上面生成的 c.proxy 代理,链接用户自己定义的 rpcService 到 clusterInvoker 的信息传输。


函数较长,只选取了重要的部分:


// file: common/proxy/proxy.go: Implement()
// Implement// proxy implement// In consumer, RPCService like:// type XxxProvider struct {// Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error// }// Implement 实现的过程,就是proxy根据函数名和返回值,通过调用invoker 构造出拥有远程调用逻辑的代理函数// 将当前rpc所有可供调用的函数注册到proxy.rpc内func (p *Proxy) Implement(v common.RPCService) { // makeDubboCallProxy 这是一个构造代理函数,这个函数的返回值是func(in []reflect.Value) []reflect.Value 这样一个函数 // 这个被返回的函数是请求实现的载体,由他来发起调用获取结果 makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value { return func(in []reflect.Value) []reflect.Value { // 根据methodName和outs的类型,构造这样一个函数,这个函数能将in 输入的value转换为输出的value // 这个函数具体的实现如下: ... // 目前拿到了 methodName、所有入参的interface和value,出参数reply // (一)根据这些生成一个 rpcinvocation inv = invocation_impl.NewRPCInvocationWithOptions( invocation_impl.WithMethodName(methodName), invocation_impl.WithArguments(inIArr), invocation_impl.WithReply(reply.Interface()), invocation_impl.WithCallBack(p.callBack), invocation_impl.WithParameterValues(inVArr)) for k, value := range p.attachments { inv.SetAttachments(k, value) } // add user setAttachment atm := invCtx.Value(constant.AttachmentKey) // 如果传入的ctx里面有attachment,也要写入inv if m, ok := atm.(map[string]string); ok { for k, value := range m { inv.SetAttachments(k, value) } } // 至此构造inv完毕 // (二)触发Invoker 之前已经将cluster_invoker放入proxy,使用Invoke方法,通过getty远程过程调用 result := p.invoke.Invoke(invCtx, inv) // 如果有attachment,则加入 if len(result.Attachments()) > 0 { invCtx = context.WithValue(invCtx, constant.AttachmentKey, result.Attachments()) } ... } } numField := valueOfElem.NumField() for i := 0; i < numField; i++ { t := typeOf.Field(i) methodName := t.Tag.Get("dubbo") if methodName == "" { methodName = t.Name } f := valueOfElem.Field(i) if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { // 针对于每个函数 outNum := t.Type.NumOut() // 规定函数输出只能有1/2个 if outNum != 1 && outNum != 2 { logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2", t.Name, t.Type.String(), outNum) continue } // The latest return type of the method must be error. // 规定最后一个返回值一定是error if returnType := t.Type.Out(outNum - 1); returnType != typError { logger.Warnf("the latest return type %s of method %q is not error", returnType, t.Name) continue } // 获取到所有的出参类型,放到数组里 var funcOuts = make([]reflect.Type, outNum) for i := 0; i < outNum; i++ { funcOuts[i] = t.Type.Out(i) } // do method proxy here: // (三)调用make函数,传入函数名和返回值,获得能调用远程的proxy,将这个proxy替换掉原来的函数位置 f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts))) logger.Debugf("set method [%s]", methodName) } } ...}
复制代码


正如之前所说,proxy 的作用是将用户定义的函数参数列表,转化为抽象的 invocation 传入 Invoker,进行调用。


其中已标明有三处较为重要的地方:


  1. 在代理函数中实现由参数列表生成 Invocation 的逻辑

  2. 在代理函数实现调用 Invoker 的逻辑

  3. 将代理函数替换为原始 rpc-service 对应函数


至此,也就解决了一开始的问题:


// file: client.go: main()
config.Load()user := &User{}err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)
复制代码


这里直接调用用户定义的 rpcService 的函数 GetUser,这里实际调用的是经过重写入的函数代理,所以就能实现远程调用了。


3. 从 client 到 server 的 invoker 嵌套链 - 小结

在阅读 dubbo-go 源码的过程中,我能发现一条清晰的 invoker-proxy 嵌套链,我希望通过图的形式来展现:



发布于: 2020 年 11 月 11 日阅读数: 664
用户头像

dubbogo社区 2019.08.25 加入

dubbogo社区官方账号,发布 github.com/apache/dubbo-go 各种最新技术趋势、项目实战和最新版本特性等技术干货。

评论

发布
暂无评论
Dubbo-go Client端调用服务过程