写点什么

Nacos 源码系列—订阅机制的前因后果 (上)

作者:牧小农
  • 2022 年 5 月 14 日
  • 本文字数:8806 字

    阅读完需:约 29 分钟

Nacos源码系列—订阅机制的前因后果(上)

点赞再看,养成习惯,微信搜索【牧小农】关注我获取更多资讯,风里雨里,小农等你,很高兴能够成为你的朋友。项目源码地址:公众号回复 nacos,即可免费获取源码

前因

我们在了解 Nacos 订阅机制之前,首先来了解一下前因——Nacos 客户端的“服务发现”,我们先通过下面一张图来直观的看一下,有人可能就说这也叫直观,明明很曲折,小农想说的是,这样才能让你们印象更加深刻(手动狗头)。


读者内心:我信你个鬼。



对于 Naocs 客户端“服务发现” 主要是有 NamingService 获取服务列表、组装参数,调用服务接口等等,上图中只是一个大致的流程,在其中还有获取服务列表中的通信流程协议(Grpc/http),订阅流程以及后果(故障转移流程),下面我们就来详细讲解一下,客户端服务发现的基本流程。


首先我们先从一个入口类 Client 项目下的NamingTest开始看起


@Ignorepublic class NamingTest {        @Test    public void testServiceList() throws Exception {                Properties properties = new Properties();        //服务IP        properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");        //用户名        properties.put(PropertyKeyConst.USERNAME, "nacos");        //密码        properties.put(PropertyKeyConst.PASSWORD, "nacos");                Instance instance = new Instance();        //实例IP        instance.setIp("1.1.1.1");        //实例端口        instance.setPort(800);        //配置权重        instance.setWeight(2);        Map<String, String> map = new HashMap<String, String>();        map.put("netType", "external");        map.put("version", "2.0");        instance.setMetadata(map);
//关键代码 创建自己的实例 NamingService namingService = NacosFactory.createNamingService(properties); namingService.registerInstance("nacos.test.1", instance); ThreadUtils.sleep(5000L); List<Instance> list = namingService.getAllInstances("nacos.test.1"); System.out.println(list); ThreadUtils.sleep(30000L); }}
复制代码


在前几篇章节中,我们讲解了registerInstance()方法,今天我们需要来看一下getAllInstances()方法的具体逻辑,这个就是我们需要观察的入口


@Overridepublic List<Instance>  getAllInstances(String serviceName, String groupName, List<String> clusters,        boolean subscribe) throws NacosException {    ServiceInfo serviceInfo;    String clusterString = StringUtils.join(clusters, ",");    if (subscribe) {        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);        if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);        }    } else {        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);    }    List<Instance> list;    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {        return new ArrayList<Instance>();    }    return list;}
复制代码


在上面具体方法中,会经过几轮重载方法的调用,在重载方法调用的过程中已经设置了默认值,例如(默认分组(DEFAULT_GROUP),集群列表(空)、是否订阅(是)等等)


/** * * @param serviceName 服务名称 * @param groupName   分组名称(DEFAULT_GROUP) * @param clusters    集群数量(默认为空) * @param subscribe   是否订阅服务(是) * @return * @throws NacosException */@Overridepublic List<Instance>  getAllInstances(String serviceName, String groupName, List<String> clusters,        boolean subscribe) throws NacosException {    ServiceInfo serviceInfo;    String clusterString = StringUtils.join(clusters, ",");    //是否为订阅模式    if (subscribe) {        //从客户端缓存中获取服务信息        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);        if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {            //如果缓存中服务信息不存在,进行订阅            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);        }    } else {        //未订阅,从服务器获取        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);    }    //获取实例列表    List<Instance> list;    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {        return new ArrayList<Instance>();    }    return list;}
复制代码


如果是订阅模式,直接从本都缓存中获取服务信息,然后从中获取实例列表,订阅机制会自动同步服务器实例信息到本地,如果缓存中没有,说明是首次调用,进行订阅后获取服务信息,具体流程如下:



订阅处理流程在上面的流程中,我们讲到了订阅的逻辑,接下来我们就来看一看订阅里面到底做了哪些事情,首先我们已经知道服务在哪里订阅了,我们只需要点进去找对应的方法。


serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);


下面是具体的方法,这里 clientProxy 类型为 NamingClientProxyDelegate,实例化 NacosNamingService 时该类被实例化


@Overridepublic ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {    NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);    //定时调度UpdateTask    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);    //获取缓存中的serviceInfo对象    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);    if (null == result || !isSubscribed(serviceName, groupName, clusters)) {        //判断如果为空,进行订阅逻辑处理(Grpc协议)        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);    }    //ServiceInfo本地缓存处理    serviceInfoHolder.processServiceInfo(result);    return result;}
复制代码


在上述代码中,可以看到我们在获取服务器列表中,进行了订阅逻辑的扩展。


  1. 在订阅方法中首先开启定时任务,用来定时同步服务端的实例信息,进行本地缓存的更新等操作,如果是首次直接返回,去判断是否有本地缓存

  2. 如果本地缓存中存在 serviceInfo 信息,直接返回 serviceInfo 信息,如果不存在,默认采用 Grpc 协议进行订阅,然后在返回 serviceInfo 信息

  3. 通过grpcClientProxy.subscribe()直接向服务器发送一个订阅请求,并返回结果

  4. servieInfo 本地缓存处理,并且会将获取的最新的 serviceInfo 和本地的 serviceInfo 进行比较,进行更新操作。


如下图所示:


订阅

在上面我们讲解了,Nacos 是如何进行服务器发现,以及订阅的入口和大体逻辑,接下来我们就来详细讲一讲 Nacos 的订阅机制的核心,首先 Nacos 客户端会通过定时任务,进行轮询,每间隔 6 秒从 Nacos 注册中心获取服务实例列表,如果检测实例发生变化,发布变更事件,订阅者进行对应的逻辑处理(更新缓存和实例信息),我们先从一张图,来了解一下订阅机制主要的流程。


定时任务


订阅其实本身也是服务发现的一种实现方式,就是在服务发现的时候执行订阅方法,然后通过定时任务定时拉取服务端信息。我们找到 NacosNamingService.subscribe(),会发现里面有好几个···subscribe()```方法,这几个方法在重载的过程中,会帮我们添加一些默认参数(默认分组、空集合列表),最终我们对定位到下面这个方法:


@Overridepublic void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)        throws NacosException {    if (null == listener) {        return;    }    String clusterString = StringUtils.join(clusters, ",");    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);    clientProxy.subscribe(serviceName, groupName, clusterString);}
复制代码


在这里我们先来看 clientProxy.subscribe(),这个方法实际上就是我们上面讲到的NamingClientProxyDelegate.subscribe()方法,在这里主要是对服务列表的信息进行查询,所有我们可以知道不管是查询还是订阅都是用的同一个方法。在这里我们就不做过多的描述。


在这里我们主要关注的是这个方法里面一个定时调度的方法ServiceInfoUpdateService.scheduleUpdateIfAbsent();,这个方法里面构建了 serviceKey,通过 key 来判断是否重复,最后添加到 updateTask,而addTask()就是添加任务并且发起一个定时任务


public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);    if (futureMap.get(serviceKey) != null) {        return;    }    synchronized (futureMap) {        if (futureMap.get(serviceKey) != null) {            return;        }        //主要关注点,添加定时任务        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));        futureMap.put(serviceKey, future);    }}
复制代码


默认定时延迟一秒执行:


private static final long DEFAULT_DELAY = 1000L;private synchronized ScheduledFuture<?> addTask(UpdateTask task) {    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);}
复制代码


在这个定时任务里面封装了订阅机制的核心业务逻辑,位于UpdateTask.run()方法。


@Overridepublic void run() {    long delayTime = DEFAULT_DELAY;
try { //判断 服务是否订阅过并且没有开启定时任务 操作过不再执行 if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey( serviceKey)) { NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters); isCancel = true; return; }
//获取缓存的service信息 ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey); //缓存不存在 if (serviceObj == null) { //根据serviceName等信息获取service信息 serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); //进行本地缓存处理 serviceInfoHolder.processServiceInfo(serviceObj); lastRefTime = serviceObj.getLastRefTime(); return; }
//如果服务最后的更新时间<=缓存刷新时间,从注册中心重新查询 if (serviceObj.getLastRefTime() <= lastRefTime) { serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); //本地缓存处理 serviceInfoHolder.processServiceInfo(serviceObj); } //刷新最后更新的时间 lastRefTime = serviceObj.getLastRefTime(); if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } // TODO multiple time can be configured. //下一次更新缓存时间设置(6秒) delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE; //设置失败数量为0 resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e); } finally { //没有服务订阅过并且开启定时任务 if (!isCancel) { // 下次调度刷新时间,下次执行的时间与failCount有关,failCount=0,则下次调度时间为6秒,最长为1分钟 // 无异常情况下缓存实例的刷新时间是6秒 executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } }}
复制代码


通过定时任务执行UpdateTask,默认间隔时间为 6 秒,当发生异常时会延长,但不会超过 1 分钟。该方法会比较本地是否存在缓存,以及是否过期,当不存在或者过期的时候,会去查询注册中心,获取最新实例,更新最后获取时间,处理服务信息,在最后会计算任务时间,循环执行流程。



业务逻辑在最后会计算下一次定时任务的执行时间,通过 delayTime 来延迟执行,delayTime 默认为 1000*6(6 秒),在 finally 里面发起下一次定时任务,当我们程序出现异常的时候,执行时间和错误次数成正比,最长时间不超过一分钟


到这里我们已经对于 Nacos 客户端定于的核心流程讲解了一遍,Nacos 客户端通过一个定时任务,每间隔 6 秒从注册中心获取实例列表,当发现实例发生变化的时候,发布变更事件,订阅者进行业务处理,然后更新内存中和本地缓存中的实例。接下来我们就来讲一讲,定时任务获取到最新实例列表之后,整个时间机制是如何处理的。


我们在第一步调用subscribe()方法的时候,会订阅一个EventListener事件,而在定时任务 UpdateTask 定时获取实例列表之后,会调用ServiceInfoHolder.processServiceInfo方法对 ServiceInfo 进行本地处理,这其中就包括事件处理。


在 subscribe 方法中,通过下面的代码我们进行监听事件的注册


@Overridepublic void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)        throws NacosException {    if (null == listener) {        return;    }    String clusterString = StringUtils.join(clusters, ",");    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);    clientProxy.subscribe(serviceName, groupName, clusterString);}
复制代码


在上述代码中,我们主要关注的是changeNotifier.registerListener,这个监听就是进行具体事件注册逻辑,在下述代码中,主要是将EventListener存储在listenerMapmap 结构中,key 为服务实例信息的拼接,value 为监听事件的集合


public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);    if (eventListeners == null) {        synchronized (lock) {            eventListeners = listenerMap.get(key);            if (eventListeners == null) {                eventListeners = new ConcurrentHashSet<EventListener>();                //将EventListener缓存到listenerMap中                listenerMap.put(key, eventListeners);            }        }    }    eventListeners.add(listener);}
复制代码


关于 serviceInfo 的处理在 updateTask 获取到最新的实例信息后会进行本地化处理,我们需要看的是ServiceInfoUpdateService.run()下的serviceInfoHolder.processServiceInfo(serviceObj);本地缓存方法


public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {      //判断服务key是否为空      String serviceKey = serviceInfo.getKey();      if (serviceKey == null) {          return null;      }      ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());      if (isEmptyOrErrorPush(serviceInfo)) {          //empty or error push, just ignore          return oldService;      }      //将缓存信息放置到map中      serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);      //判断实例信息是否发生改变      boolean changed = isChangedServiceInfo(oldService, serviceInfo);      if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {          serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));      }      //监控服务缓存map的大小      MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());      if (changed) {          NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),                  JacksonUtils.toJson(serviceInfo.getHosts()));          //添加实例变更事件,被订阅者执行          NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),                  serviceInfo.getClusters(), serviceInfo.getHosts()));          //写入本地文件          DiskCache.write(serviceInfo, cacheDir);      }      return serviceInfo;  }   
复制代码


首先我们判断最新的 ServiceInfo 数据是否正确,有没有发生变化,如果数据格式正确且发生变化,会发布一个变更事件(InstancesChangeEvent),同时讲 serviceinfo 写入缓存中



对于服务信息的变更,Nacos 是如何做的呢,别急我们往下看,当我们调用InstancesChangeEvent()方法以后,变更事件会由NotifyCenter进行发布,我们来瞅一瞅


首先事件追踪的核心流程主要分为,根据事件类型获取-》获取事件发布者-》发布事件,详细如下所示:


private static final NotifyCenter INSTANCE = new NotifyCenter();
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) { if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { return INSTANCE.sharePublisher.publish(event); } //根据时间类型,获取对应的CanonicalName final String topic = ClassUtils.getCanonicalName(eventType); //从NotifyCenter.publisherMap中获取对应时间发布中 EventPublisher publisher = INSTANCE.publisherMap.get(topic); if (publisher != null) { //事件发布者publisher发布事件 return publisher.publish(event); } LOGGER.warn("There are no [{}] publishers for this event, please register", topic); return false;}
复制代码


在这个源码中,其实 INSTANCE是单例实现的,在这里publisherMap键值对是什么时候建立的?其实在是我们NacosNamingService.init()调用初始化方法的时候进行绑定的


private void init(Properties properties) throws NacosException {    ......    //建立InstancesChangeEvent和EnvenPublisher的关系    NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);   ......}
复制代码


当我们从上面方法进去的时候,会发现他默认使用的是DEFAULT_PUBLISHER_FACTORY来进行构建,而在NotifyCenter代码块中,会发现 DEFAULT_PUBLISHER_FACTORY 默认构建的 EventPublisher 为 DefaultPublisher


public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {//主要关注DEFAULT_PUBLISHER_FACTORY  return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);}
if (iterator.hasNext()) { clazz = iterator.next().getClass(); } else { clazz = DefaultPublisher.class; }
DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> { try { EventPublisher publisher = clazz.newInstance(); publisher.init(cls, buffer); return publisher; } catch (Throwable ex) { LOGGER.error("Service class newInstance has error : ", ex); throw new NacosRuntimeException(SERVER_ERROR, ex); } };
复制代码


由此我们看出在NotifyCenter类中维护了事件名称和事件发布者的关系,而默认的时间发布中为 DefaultPublisher。

闲言

到这里,我们 Nacos 订阅机制的前半章我们就讲完了,因为整体服务订阅的事件机制还是比较复杂,篇幅太长,所以分成了两部分,今天这个章节我们主要讲解了,客户端服务发现的原理以及订阅机制中定时器的运行逻辑和 NotifyCenter 发布 InstancesChangeEvent 事件的流程


如果您对文章的有疑问的地方,欢迎在下方留言,小农看到了第一时间回复大家。


如果觉得文章对您有帮忙,记得点赞关注,您的支持是我创作的最大动力!

发布于: 2022 年 05 月 14 日阅读数: 3
用户头像

牧小农

关注

业精于勤荒于嬉,行成于思毁于随。 2019.02.13 加入

公众号【牧小农】

评论

发布
暂无评论
Nacos源码系列—订阅机制的前因后果(上)_源码_牧小农_InfoQ写作社区