写点什么

Dubbo 中 Zookeeper 注册中心原理分析

作者:小小怪下士
  • 2023-02-02
    湖南
  • 本文字数:10828 字

    阅读完需:约 36 分钟

本文通过分析 Dubbo 中 ZooKeeper 注册中心的实现 ZooKeeperResitry 的继承体系结构,自顶向下分析了 AbstractRegistry(提供了服务数据的本地缓存)、FailbackRegistry(服务注册订阅相关的异常重试)、CacheableFailbackRegistry(Dubbo 在 URL 推送模型做的优化)、ZooKeeperRegistry(ZooKeeper 注册中心实现原理)的源码,详细介绍了 Dubbo 中 ZooKeeper 注册中心的实现原理。


当服务提供者启动时,服务提供者将自己提供的服务信息注册到注册中心,注册中心将这些信息记录下来。服务消费者启动时,向注册中心发起订阅,将自己感兴趣的服务提供者的信息拉取到本地并缓存起来,用于后续远程调用。另外,注册中心能够感知到服务提供者新增或者下线,并更新自己的服务信息,同时通知服务消费者告知服务提供者新增/下线,从而消费者也可以动态感知到服务提供者的变化。


一、ZooKeeper 注册中心

ZooKeeper 是 Apache 的顶级项目。ZooKeeper 为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名服务、配置管理和分布式锁等分布式的基础服务。在解决分布式数据一致性方面,ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议。

1.1 ZooKeeper 数据结构

ZooKeeper 的数据模型是一个树形结构的文件系统。Dubbo 服务在 ZooKeeper 中的数据结构(旧版模型,Dubbo2.7 版本后路由数据、配置数据不再写到该目录下)如下图所示。



ZooKeeper 中的节点分为持久节点、持久顺序节点、临时节点、临时顺序节点。Dubbo 使用 ZooKeeper 作为注册中心时,不关心节点的创建顺序,只会创建持久节点和临时节点。


  • 持久节点: 服务注册后保证节点不会丢失,注册中心重启也会存在 。

  • 临时节点: 服务注册后连接丢失或 session 超时,注册的节点会自动被移除 。

1.2 ZooKeeper 的 Watcher 机制

客户端和服务器维持数据交互通常有两种形式:


  1. 客户端定时向服务器轮询

  2. 服务器主动向客户端推送数据


ZooKeeper 采用的是方式 2,主动向客户端推送数据。客户端注册监听它关心的节点(注册 Watcher 监听指定节点),当节点状态发生变化(数据变化、子节点增减变化)时,则相应的 Watcher 被触发,ZooKeeper 服务会通知客户端,这就是 Watcher 机制。


Watcher 有以下特点:


  1. **主动推送:**Watcher 被触发时,由 ZooKeeper 主动将更新推送给客户端,而不需要客户端轮询。

  2. **一次性:**在节点上注册 Watcher 监听后,当节点状态发生变化时该 Watcher 只会被触发一次,如果客户端想再收到后续发生变化的通知,需要重新再注册一次 Watcher。

  3. **可见性:**如果一个客户端在读请求中附带 Watcher,Watcher 被触发的同时再次读取数据,客户端在得到 Watcher 消息之前肯定不可能看到更新后的数据。换句话说,更新通知先于更新结果。

  4. **顺序性:**如果多个更新触发了多个 Watcher ,那 Watcher 被触发的顺序与更新顺序一致。

1.3 ZooKeeper 会话机制

ZooKeeper 客户端通过 TCP 长连接连接到 ZooKeeper 服务集群。会话 (Session) 从第一次连接开始就已经建立,之后通过心跳检测机制来保持有效的会话状态。通过这个连接,客户端可以发送请求并接收响应,同时也可以接收到 Watch 事件的通知。一旦一台客户端与一台服务器建立连接,这台服务器会为这个客户端创建一个新的会话。


每个会话都会有一个超时时间。若由于服务器压力过大、网络故障等各种原因导致客户端连接断开时,只要在会话超时时间之内能够重新连接上 ZooKeeper 服务器,那么之前创建的会话仍然有效。若服务器在超时时间内没有收到任何请求,则相应会话被视为过期。一旦会话过期,就无法再重新打开,且任何与该会话相关的临时 节点都会被删除。


通常来说,会话应该长期存在,而这需要由客户端来保证。客户端可以通过心跳方式来保持会话不过期。

1.4 使用 ZooKeeper 作为注册中心

如下图所示,服务提供者(集成了 ZK 客户端)在服务启动时,会通过 ZK 客户端与 ZK 服务端建立连接,将服务提供者信息(提供者的 IP 地址、端口、服务接口信息等)注册到 ZooKeeper 服务端,这时会在 ZooKeeper 服务端生成一个临时节点来存储这些服务信息,这就是服务提供者的注册操作。


服务消费者(集成了 ZK 客户端)在服务启动时,会通过 ZK 客户端与 ZK 服务端建立连接,拉取自己感兴趣的服务提供者信息并缓存到本地,同时也会对自己感兴趣的节点(比如服务提供节点)注册 Watcher 监听。后续发起远程调用时,会从本地缓存的服务提供者信息通过负载均衡等策略选择一台服务提供者发起服务调用。


如果服务提供者扩容新增了机器,服务提供者会向 ZK 发起新的注册操作,在对应的目录下创建临时节点(代表这台新增的服务提供者信息),同时会触发之前服务消费者注册的 Watcher 监听,ZooKeeper 服务端会将变更信息推送到服务消费,服务消费者在接收到变更后会更新自己本地的服务提供者信息,这样就完成了服务的自动扩容。同样的,当某台服务提供者下线,它与 ZooKeeper 服务端的连接会断掉,因为服务注册时创建的是临时节点,因此当连接断掉后,该临时节点会被删除,同时会触发之前服务消费者注册的 Watcher 监听,相应的服务消费者会收到通知刷新自己本地的服务提供者信息缓存。


二、源码分析


Node,Dubbo 中用 Node 来表示抽象节点,Dubbo 中的核心概念 Registry、Invoker 等都实现了接口 Node。


RegistryService 定义了注册服务需要提供的基本能力,即增删改查。Registry 接口继承了 Node 和 Registry 接口,它表示拥有注册中心能力的节点。其中定义了 4 个默认方法,其中的 reExportRegister 和 reExportUnRegiser 方法都是委托给 RegisterService 的相应方法。


AbstractRegistry 实现了 Registry 接口中的方法,它在内存中实现了注册数据的读写改动,实现了本地缓存的功能。


FailbackRegistry 继承了 AbstractRegistry,结合时间轮,提供了失败重试的能力。


CacheableFailbackRegistry 针对 URL 推送模型做了优化,减少了 URL 的创建。


ZooKeeperRegistry 提供了基于 ZooKeeper 的注册中心实现。


下面重点分析 AbstractRegistry、FailbackRegistry、CacheableFailbackRegistry 和 ZooKeeperRegistry。

2.1 AbstractRegistry

AbstractRegistry 实现了 Registry 接口中的方法,它在内存中实现了注册数据的读写改动,从而可以就降低注册中心的压力。从前文继承体系结构可以看出,Dubbo 中的注册中心实现都继承了该类。


AbstractRegistry 的核心是通过 Properties 类型的 properties 与 File 类型的 file 字段来记录服务数据。properties 基于内存存储了服务相关的数据,file 则对应磁盘文件,两者的数据是同步的。在创建 AbstractRegistry 对象时,会根据传入的 URL 中的 file.cache 参数来决定是否开启本地缓存,默认开启。


如果开启,会将磁盘文件 file 中的数据加载到 properties 当中。当 properties 中的数据发生变化时,会同步到磁盘文件 file 中。如果传入的 URL 的 save.file 参数为 false(默认是 false),会通过线程池来异步同步 properties 的数据到 file,如果是 true 则是在当前线程同步。

2.2 FailbackRegistry

FailbackRegistry 处理的是注册中心相关的逻辑处理异常时如何处理,它会使用时间轮来处理异常重试。像 ZooKeeperRegistry、NacosRegistry 等注册中心实现,都继承了 FailbackRegistry,因此也都有了异常重试的能力。


FailbackRegistry 继承了 AbstractRegistry,复写了 register/unregister、subscribe/unsubscribe、notify 这 5 个核心方法,在这些方法中首先会调用父类 AbstractRegistry 对应的方法,然后真正的注册订阅等逻辑还是通过模板方法模式委托给了子类实现,重试逻辑则交由时间轮来处理。

2.2.1 核心字段

FailbackRegistry 的核心字段如下:


1)ConcurrentMap < URL,FailedRegisteredTask > failedRegistered,注册失败的 URL 集合。key 是注册失败的 URL,value 是 FailedRegisteredTask,也就是重试注册时要执行的逻辑,该注册重试逻辑会交给时间轮执行。当注册失败时,会调用方法 addFailedRegistered 添加注册失败的 URL。


2)ConcurrentMap < URL,FailedUnregisteredTask > failedUnregistered,取消注册时发生失败的 URL 集合。key 是取消注册失败的 URL,value 是 FailedUnregisteredTask,也就是重试取消注册时要执行的逻辑。


3)ConcurrentMap < Holder,FailedSubscribedTask > failedSubscribed,订阅失败的 URL 集合。key 是 Holder(封装了订阅失败的 URL 以及对应的监听器 NotifyListener),value 是 FailedSubscribedTask,也就是重试订阅时要执行的逻辑。


4)ConcurrentMap < Holder,FailedUnsubscribedTask >failedUnsubscribed,取消订阅发生失败的 URL 集合。key 是 Holder(封装了取消订阅失败的 URL 以及对应的监听器 NotifyListener),value 是 FailedUnsubscribedTask,也就是重试取消订阅时要执行的逻辑。


5)int retryPeriod,重试操作时间间隔。


6)HashedWheelTimer retryTimer,时间轮,用于执行重试操作。

2.2.2 核心方法

以 subscribe 为例来分析 FailbackRegistry 中是如何处理重试的(其余方法类似)。如下图所示,首先 FailbackRegistry 的 subscribe 方法会调用父类 AbstractRegistry 的 subcribe 方法,将订阅数据添加到内存中进行维护,接着会从订阅失败/取消订阅失败的集合中移除该 URL 相关的订阅数据。


然后调用子类的 doSubscribe 方法将真正的订阅逻辑交给子类实现,这是典型的模板方法设计模式。如果发生异常,会调用 getCacheUrls 方法获取缓存的服务提供者数据。如果缓存的服务数据非空,因为这边订阅失败了,所以需要手动触发下 notify 方法,回调相关的 NotifyListener 方法,刷新消费者本地的服务提供者列表、路由、配置的数据。如果缓存数据为空,则需要判断下,是否检测订阅失败以及订阅失败的异常是否可以跳过,根据这个来判断是否需要抛出异常还是忽略仅打印日志。


最后会调用 addFailedSubscribed 方法将订阅失败的信息添加到 failedSubscribed 集合,以及将任务添加到时间轮中,这样当时间到了,时间轮就可以处理该重试任务了。


这边有一点需要注意,如果任务已经存在 failedSubscribed 集合中,就不需要重复添加了。failedSubscribed 是 Map 结构,通过 key 来判断数据是否存在,因此这边的 Holder 作为 key 时,需要复写 hashCode 和 equals 方法。



@Overridepublic void subscribe(URL url, NotifyListener listener) {    // 调用父类方法, 在内存中记录下订阅相关的数据    super.subscribe(url, listener);    // 从订阅失败集合、取消订阅失败集合中移除该URL相关的数据, 如果存在需要取消相关任务的执行    removeFailedSubscribed(url, listener);    try {        // 模板方法设计模式, 委托给子类实现真正的订阅逻辑        doSubscribe(url, listener);    } catch (Exception e) {        Throwable t = e;        // 订阅发生了异常, 则使用本地缓存的服务提供者数据        List<URL> urls = getCacheUrls(url);        if (CollectionUtils.isNotEmpty(urls)) {            // 调用notify方法回调相关的listener方法, 通知消费者刷新自己本地            // 的服务提供者、路由、配置等数据            notify(url, listener, urls);        } else {            // 根据check以及是否是可忽略的异常来判断是否需要抛出异常            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)                && url.getParameter(Constants.CHECK_KEY, true);            boolean skipFailback = t instanceof SkipFailbackWrapperException;            if (check || skipFailback) {                if (skipFailback) {                    t = t.getCause();                }                // 抛异常            } else {                // 仅打印错误日志            }        }
// 添加订阅失败的URL信息 addFailedSubscribed(url, listener); }}
protected void addFailedSubscribed(URL url, NotifyListener listener) { // 封装成Holder Holder h = new Holder(url, listener); // 判断该Holder是否已经存在, 如果已经存在则直接返回, 不需要多次添加 FailedSubscribedTask oldOne = failedSubscribed.get(h); if (oldOne != null) { return; } // 创建订阅失败的重试任务 FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener); // 向订阅失败的集合里面添加该任务, 如果返回null说明之前不存在该任务, // 这次需要往时间轮中注册该任务 oldOne = failedSubscribed.putIfAbsent(h, newTask); if (oldOne == null) { // 将该任务添加到时间轮中 retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); }}
复制代码

2.3 CacheableFailbackRegistry

在 2.7.8 版本中并没有 CacheableFailbackRegistry 这个类。在 Dubbo3.0 中,针对服务数据的推送做了一系列的优化,CacheableFailbackRegistry 正是其中一个改动,下面来进行具体的讲解。

2.3.1 URL 推送模型

下图所示是 Dubbo2.7.8 中的 URL 推送模型,消费者启动后会向 ZooKeeper 服务端订阅感兴趣的服务,当有新的消费者感兴趣的服务提供者节点(提供者 3)加入服务时,该服务(提供者 3)会把自己的服务信息注册到 ZooKeeper 服务端,接着 ZooKeeper 服务端会把 providers 节点下的所有服务实例信息(提供者 1、2、3)全量推送给消费者,消费者收到后根据推送的数据全量生成 URL 列表,该 URL 列表后续会被消费者用来构建自己本地的可选的服务提供者列表数据,后续发起远程调用时,就可以从该服务提供者列表中选择一个节点。


可以看到,当服务提供者实例数量较小时,推送的数据量较小,消费者端构建 URL 的压力就小一些,但是当某个接口有大量的服务提供者时,当发生服务扩容/缩容,就会有大量的 URL 被创建。Dubbo3.0 中的对该 URL 推送模型做了一系列的优化,主要是对 URL 的类结构做了一定调整,引入了多级缓存,下面具体分析。


2.3.2 URL 结构变更及多级缓存

如下两图分别是 Dubbo2.7.8 和 Dubbo3.0.7 中的 URL 结构,可以看到在 3.0.7 中新增了 URLAddress 和 URLParam 类,原本 URL 中的 host、port 等信息移到了 URLAddress 中,原本 URL 的参数信息则移到了 URLParam 中。为什么要这么做?很大程度上是为了利用缓存,避免重复创建 URL 对象。比如将 URL 中的 host、port 等信息抽取到 URLAddress 对象中,当发生服务数据变更推送时,根据 host、port 等信息很大程度上能够从缓存中找到已有的 URLAddress 对象,这样就避免了一些不必要的 URL 创建。



2.3.3 核心方法

如下图所示,当消费者接收到 ZooKeeper 注册中心推送过来的全量服务提供者列表信息(providers)时,会调用到该方法进行 ServiceAddressURL 列表的创建。首先,会根据消费者 URL 尝试从缓存 stringUrls 中获取服务提供者信息对应的 Map(key 是服务提供者信息的字符串表示,value 是对应的 ServiceAddressURL)。如果该 Map 为空,则 providers 对应的 ServiceAddressURL 都需要创建。如果 Map 不为空,则先从该 Map 中根据 provider 的字符串 key 尝试获取缓存,如果存在则不需要调用 createURL 创建。


createURL 也是依次从缓存 stringAddress 和 stringParam 获取对应的 URLAddress 和 URLParam,如果缓存中不存在则创建。接着,利用刚刚获取到的 URLAddress 和 URLParam 以及 consumerURL 创建 ServiceAddressURL。


创建完成后会判断该 ServiceAddressURL 是否和当前消费者匹配(version、group 这些信息是否匹配,前文已经分析过)。如果不匹配则返回空,后续也不会被更新到 stringUrls 缓存中。


创建完成后会调用 evictURLCache,将待清理的 URL 信息放入 waitForRemove 中,cacheRemovalScheduler 会定时清理缓存数据。



protected List<URL> toUrlsWithoutEmpty(URL consumer, Collection<String> providers) {    // 根据消费者URL信息从stringUrls中获取对应的服务提供者信息    Map<String, ServiceAddressURL> oldURLs = stringUrls.get(consumer);    // create new urls    Map<String, ServiceAddressURL> newURLs;    // 去除consumerURL中的release、dubbo、methods、时间戳、dubbo.tag这些参数    URL copyOfConsumer = removeParamsFromConsumer(consumer);    // 如果缓存中没有, 肯定都是需要新建的    if (oldURLs == null) {        newURLs = new HashMap<>((int) (providers.size() / 0.75f + 1));        for (String rawProvider : providers) {            // 去除掉rawProvider中的时间戳、PID参数            rawProvider = stripOffVariableKeys(rawProvider);            // 调用createURL创建ServiceAddressURL            ServiceAddressURL cachedURL =                         createURL(rawProvider, copyOfConsumer, getExtraParameters());            // 如果创建的为空则忽略不放入newURLs            if (cachedURL == null) {                continue;            }            newURLs.put(rawProvider, cachedURL);        }    } else {        newURLs = new HashMap<>((int) (providers.size() / 0.75f + 1));        // maybe only default , or "env" + default        for (String rawProvider : providers) {            rawProvider = stripOffVariableKeys(rawProvider);            // 从缓存中获取, 如果缓存中存在就不需要再创建了            ServiceAddressURL cachedURL = oldURLs.remove(rawProvider);            if (cachedURL == null) {                cachedURL = createURL(rawProvider, copyOfConsumer, getExtraParameters());                if (cachedURL == null) {                    continue;                }            }            newURLs.put(rawProvider, cachedURL);        }    }
// 清除老的缓存数据 evictURLCache(consumer); // 更新stringUrls缓存 stringUrls.put(consumer, newURLs);
return new ArrayList<>(newURLs.values());}
protected ServiceAddressURL createURL(String rawProvider, URL consumerURL, Map<String, String> extraParameters) { boolean encoded = true;
int paramStartIdx = rawProvider.indexOf(ENCODED_QUESTION_MARK);
if (paramStartIdx == -1) { encoded = false; } // 解析rawProvider, 一分为二, 一个用来创建URLAddress、一个用来创建URLParam String[] parts = URLStrParser.parseRawURLToArrays(rawProvider, paramStartIdx); if (parts.length <= 1) { return DubboServiceAddressURL.valueOf(rawProvider, consumerURL); }
String rawAddress = parts[0]; String rawParams = parts[1]; boolean isEncoded = encoded;
// 从缓存stringAddress缓存中获取URLAddress, 不存在则创建 URLAddress address = stringAddress.computeIfAbsent(rawAddress, k -> URLAddress.parse(k, getDefaultURLProtocol(), isEncoded)); address.setTimestamp(System.currentTimeMillis());
// 从缓存stringParam缓存中获取URLParam, 不存在则创建 URLParam param = stringParam.computeIfAbsent(rawParams, k -> URLParam.parse(k, isEncoded, extraParameters)); param.setTimestamp(System.currentTimeMillis());
// 用获取到的URLAddress、URLParam直接new创建ServiceAddressURL ServiceAddressURL cachedURL = createServiceURL(address, param, consumerURL);
// 判断创建出来的ServiceAddressURL是否和当前消费者匹配, 不匹配返回null if (isMatch(consumerURL, cachedURL)) { return cachedURL; } return null;}
复制代码

2.4 ZooKeeperRegistry

2.4.1 注册

根据传入的 URL 生成该节点要在 ZooKeeper 服务端上注册的节点路径,值为如下形式:/dubbo/org.apache.dubbo.springboot.demo.DemoService/providers/服务信息,/dubbo 是根路径,接下来是服务的接口名,然后/providers 是类型信息(如果是消费者则是/consumers 节点,路由信息则是/routers),最后是 URL 的字符串表示。得到节点路径后,会根据 URL 的 dynamic 参数(默认是 true)决定在 ZooKeeper 服务端上创建的是临时节点,还是持久节点,默认是临时节点。


注册后数据结构如下图所示。


2.4.2 订阅

订阅的核心是通过 ZooKeeperClient 在指定的节点的添加 ChildListener,当该节点的子节点数据发生变化时,ZooKeeper 服务端会通知到该 ChildListener 的 notify 方法,然后调用到对应的 NotifyListener 方法,刷新消费者本地的服务提供者列表等信息。


doSubscribe 方法主要分为两个分支:URL 的 interface 参数明确指定了为*(订阅所有,通常实际使用中不会这么使用,Dubbo 的控制后台会订阅所有)或者就订阅某个服务接口,接下来分析订阅某个指定服务接口这个分支代码。这块涉及到三个监听器(它们是一对一的):



**1)NotifyListener,**Dubbo 中定义的通用的订阅相关的监听器。它是定义在 dubbo-registry-api 模块中的,不仅仅在 ZooKeeper 注册中心模块中使用。


**2)ChildListener,**Dubbo 中定义的针对 ZooKeeper 注册中心的监听器,用来监听指定节点子节点的数据变化。


**3)CuratorWatcher,**Curator 框架中的监听器,Curator 是常用的 ZooKeeper 客户端,如果 Dubbo 采用其它 ZooKeeper 客户端工具,这块就是其它相关的监听器逻辑了。


当订阅的服务数据发生变化时,最先会触发到 CuratorWatcher 的 process 方法,process 方法中会调用 ChildListener 的 childChanged 方法,在 childChanged 方法会继续触发调用到 ZooKeeperRegistry 的 notify 方法。这里有两点需要注意:


**1)因为 doSubscribe 方法中通过 ZooKeeperClient 添加 Watcher 监听器时,**使用的是 usingWatcher,这是一次性的,所以在 CuratorWatcher 的实现 CuratorWatcherImpl 的 process 方法中,当收到 ZooKeeper 的变更数据推送时,会再次在 path 上注册 Watcher。


**2)在 ChildListener 的实现 RegistryChildListenerImpl 的 doNotify 方法中,**会调用 ZooKeeperRegistry 的 toUrlsWithEmpty 将传入的字符串形式的服务提供者列表等数据转换成对应的 ServiceAddressURL 列表数据,以供后面使用。


明确了三个监听器的含义之后,接下来分析 doSubscribe 的逻辑就简单了。首先会调用 toCategoriesPath 方法获取三个 path 路径,分别是


  • /dubbo/org.apache.dubbo.demo.DemoService/providers

  • /dubbo/org.apache.dubbo.demo.DemoService/configurators

  • /dubbo/org.apache.dubbo.demo.DemoService/routers


表示当前消费者(url 参数代表)需要订阅这三个节点,当任意一个节点的数据发生变化时,ZooKeeper 服务端都会通知到当前注册中心客户端,更新本地的服务数据。依次遍历这三个 path 路径,分别在这三个 path 上注册监听器。


ZooKeeperRegistry 通过 zkListeners 这个 Map 维护了所有消费者(URL)的所有监听器数据,首先根据 url 参数获取当前消费者对应的监听器 listeners,这是一个 Map,key 是 NotifyListener,value 是对应的 ChildListener,如果不存在则需要创建 ChildListener 的实现 RegistryChildListenerImpl。当创建完成后,会在 ZooKeeper 服务端创建持久化的 path 路径,并且在该 path 路径上注册监听器。首次订阅注册监听器时,会获取到该路径下的所有服务数据的字符串形式,并调用 toUrlsWithEmpty 转成 URL 形式,接着调用 notify 方法刷新消费者本地的服务相关数据。


@Overridepublic void doSubscribe(final URL url, final NotifyListener listener) {
CountDownLatch latch = new CountDownLatch(1); try { List<URL> urls = new ArrayList<>(); // 获取三个路径, 分别是/providers、/routers、/configurators for (String path : toCategoriesPath(url)) {
// 获取该消费者URL对应的监听器Map ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
// 查找NotifyListener对应的ChildListener是否存在, 不存在则创建 ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch)); if (zkListener instanceof RegistryChildListenerImpl) { ((RegistryChildListenerImpl) zkListener).setLatch(latch); }
// 在ZooKeeper服务端创建持久节点 zkClient.create(path, false); // 添加ChildListener监听器, 并全量拉取下相关服务数据 List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { // 将字符串形式的服务数据转换成URL列表数据 urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 首次订阅后触发下notify逻辑, 刷新消费者本地的服务提供者列表等数据 notify(url, listener, urls); } finally { latch.countDown(); }}
复制代码


接下来看下 AbstractZooKeeperClient 的 addChildListener 方法,逻辑也比较简单,在指定 path 路径上添加一次性的 Watcher。取消订阅的逻辑则是会将传入的 UR 和 NotifyListener 对应的 ChildListener 移除,不再监听。


public List<String> addChildListener(String path, final ChildListener listener) {    ConcurrentMap<ChildListener, TargetChildListener> listeners =                childListeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
TargetChildListener targetListener = listeners.computeIfAbsent(listener, k -> createTargetChildListener(path, k)); return addTargetChildListener(path, targetListener);}
@Overridepublic List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) { try { return client.getChildren().usingWatcher(listener).forPath(path); } catch (NoNodeException e) { return null; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); }}
复制代码

三、总结

本文通过分析 Dubbo3.0 中 AbstractRegistry、FailbackRegistry、ZooKeeperRegistry,详细介绍了 Dubbo 中 ZooKeeper 注册中心的实现原理,包括服务数据本地缓存、服务注册订阅异常重试等。另外,网上大部分文章是基于 2.x 版本的,本文基于 Dubbo3.0,重点分析了 Dubbo3.0 中新引入的 CacheableFailbackRegistry,详细介绍了 Dubbo 在 URL 推送模型上所做的优化。

用户头像

还未添加个人签名 2022-09-04 加入

热衷于分享java技术,一起交流学习,探讨技术。 需要Java相关资料的可以+v:xiaoyanya_1

评论

发布
暂无评论
Dubbo 中 Zookeeper 注册中心原理分析_Java_小小怪下士_InfoQ写作社区