写点什么

【Nacos 源码之配置管理 十】客户端长轮询监听服务端变更数据

  • 2022-10-11
    江西
  • 本文字数:5878 字

    阅读完需:约 1 分钟

【Nacos源码之配置管理 十】客户端长轮询监听服务端变更数据

作者石臻臻,CSDN 博客之星 Top5Kafka Contributornacos Contributor华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家 KnowStreaming


KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源!

1 前言


还记得我们在 服务端增删改配置数据之后如何通知集群中的其他机器 中分析了服务端之间的相互通知修改数据, 当时结尾的时候,留下了以下问题,我们将在这篇文章中来给解析一下;

  • [x] 客户端是如何订阅服务端的?

  • [x] 客户端与服务端直接是长连接还是短连接?

  • [x] 客户端与服务端是推还是拉?

2ConfigService.addListener 订阅配置消息


【Nacos源码之配置管理 九】客户端获取配置数据的流程 中我们知道了客户端获取配置数据的整个流程;也知道了客户端的 Nacos 配置服务类 NacosConfigService ; 这个配置服务类在初始化的时候初始化了一些基本的属性,还有一些重要的实例 比如

  1. ServerHttpAgent :是一个请求集群 http 的代理类;它持有实例 ServerListManager(集群列表管理类,健康检查等等操作);

  2. ClientWorker: 客户端工作实例类; 它负责调用 ServerHttpAgent 来发起请求像服务端请求数据,已经监听配置数据的变更等等;

【Nacos源码之配置管理 九】客户端获取配置数据的流程 中只是说明了主动发起获取数据的流程; 但是客户端最重要的功能还是轮询和订阅功能;所以下面主要讲一下 客户端如何订阅服务端数据;

订阅指定配置数据的变更



代码中 addListener 表示对指定的配置监听,如果消息有变更了,则执行方法receiveConfigInfo ; 当然可以选择是否用异步执行的; getExecutor 方法是可以自定义线程池来执行方法 receiveConfigInfo ; 如果是返回 null 的话,那么就是用主线程同步执行的;

addTenantListeners


ConfigService.addListener 调用之后最终是执行了这里的方法,但是看这里的方法似乎只是将 listeners 放入到了配置数据的缓存CacheData 中; 那什么时候会通知到我们的监听器呢?那我们得看看哪里调用了我们这个 listeners 了;

ClientWork 客户端

看看 clientwork 初始化的代码

主要看看里面的checkConfigInfo() 最终执行的是


LongPollingRunnable 长轮询任务类


LongPollingRunnable 是一个长轮询的任务类,他负责不断的去对比服务端的配置数据的 MD5 与自身是否一致,如果不一致,则会发起请求去服务端获取最新数据来更新客户端的缓存数据;

我们看下主要代码:

 public void run() {//公众号: 进击的老码农  个人微信: jjdlmn_            List<CacheData> cacheDatas = new ArrayList<CacheData>();            List<String> inInitializingCacheList = new ArrayList<String>();            try {                // check failover config                for (CacheData cacheData : cacheMap.get().values()) {                    if (cacheData.getTaskId() == taskId) {                        cacheDatas.add(cacheData);                        try {                            checkLocalConfig(cacheData);                            if (cacheData.isUseLocalConfigInfo()) {                                cacheData.checkListenerMd5();                            }                        } catch (Exception e) {                            LOGGER.error("get local config info error", e);                        }                    }                }
                // check server config                List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                for (String groupKey : changedGroupKeys) {                    String[] key = GroupKey.parseKey(groupKey);                    String dataId = key[0];                    String group = key[1];                    String tenant = null;                    if (key.length == 3) {                        tenant = key[2];                    }                    try {                        String content = getServerConfig(dataId, group, tenant, 3000L);                        CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));                        cache.setContent(content);                        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",                            agent.getName(), dataId, group, tenant, cache.getMd5(),                            ContentUtils.truncateContent(content));                    } catch (NacosException ioe) {                        String message = String.format(                            "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",                            agent.getName(), dataId, group, tenant);                        LOGGER.error(message, ioe);                    }                }                for (CacheData cacheData : cacheDatas) {                    if (!cacheData.isInitializing() || inInitializingCacheList                        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {                        cacheData.checkListenerMd5();                        cacheData.setInitializing(false);                    }                }                inInitializingCacheList.clear();
                executorService.execute(this);
            } catch (Throwable e) {
                // If the rotation training task is abnormal, the next execution time of the task will be punished                LOGGER.error("longPolling error : ", e);                executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);            }        }


复制代码

检查本地配置,使用本地配置直接回调监听类的方法

我们先看下面的做了什么


  1. 如果客户端没有使用本地配置(状态是useLocalConfigInfo=false;),但是本地配置的文件存在,则将客户端的缓存更新为本地配置的文件;(一般是初始化或者运行过程中创建了本地配置文件) ;状态变更为useLocalConfigInfo=true;

  2. 如果使用的是本地配置(状态是useLocalConfigInfo=true;)但是本地配置不存在了(不想用本地配置了,删除了),则将useLocalConfigInfo=false;这个时候还不会更新客户端的缓存;会等到后面去服务端请求数据获取;

  3. 如果使用的是本地配置(状态是useLocalConfigInfo=true;)并且本地配置文件也存在; 但是发现本地配置文件有更新;则将最新的本地配置文件缓存到内存中;(怎么判断文件有更新呢?通过path.lastModified() 文件的最后修改时间对比)

  4. 如果使用了本地配置(状态是useLocalConfigInfo=true;)则对有变化的配置数据发起通知; CacheData 中的 md5 与 wrap 中的 lastCallMd5 作对比判断是否数据有更新


  1. 如果数据有变化,则通知所有的监听类; 会调用监听类的receiveConfigInfo(content )方法,当然如果配置了getExecutor()返回不是空的话,就会用这个返回的线程池来执行;如果是 null;则用主线程同步调用;

checkUpdateDataIds 获取有变化的配置数据列表


这个方法只是将所有 不是使用本地配置的的 配置数据 group、dataid 拼接起来可能更新的字符串 probeUpdateString;真正做处理的还是checkUpdateConfigStr()方法;

//公众号: 进击的老码农 /**     * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。     */    List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
        List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
        List<String> headers = new ArrayList<String>(2);        headers.add("Long-Pulling-Timeout");        headers.add("" + timeout);
        // told server do not hang me up if new initializing cacheData added in        if (isInitializingCacheList) {            headers.add("Long-Pulling-Timeout-No-Hangup");            headers.add("true");        }
        if (StringUtils.isBlank(probeUpdateString)) {            return Collections.emptyList();        }
        try {            HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,                agent.getEncode(), timeout);
            if (HttpURLConnection.HTTP_OK == result.code) {                setHealthServer(true);                return parseUpdateDataIdResponse(result.content);            } else {                setHealthServer(false);                LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);            }        } catch (IOException e) {            setHealthServer(false);            LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);            throw e;        }        return Collections.emptyList();    }
复制代码

这个方法主要就是发起 http 请求,从而拿到哪些的 group+dataid 的配置项是发生了变化的;注意这里不是拿具体内容 content; 上面拼接的 probeUpdateString 是所有客户端监听的所有配置项(除了使用了本地配置);主要是拿到变更的 group+dataid 才好去发起获取 content 的请求

  1. 配置请求 Head 中的参数①. Long-Pulling-Timeout : 长轮询的超时时间,这个值默认 30 秒;可以通知配置文件中配置 configLongPollTimeout=30000来自定义设置; 最少时间是 10 秒;就算你设置了 9 秒,它的超时时间也是 10 秒;②. Long-Pulling-Timeout-No-Hangup = true/false : 如果有第一次获取数据的配置则设置 true; 表示不挂起! 立刻返回;(服务端如果发现数据没有变更,则会将请求挂起,等等 29.5 秒才会返回,后面会详细介绍)

  2. 配置请求 param 中的参数①、Listening-Configs = probeUpdateString 表示监听的配置项;

  3. 向服务端发起请求: currentServerAddr/v1/cs/configs/listener ;

  4. 请求成功HTTP_OK = 200 ;设置当前访问的服务端健康;并返回获取到的有变更后的数据 group、dataid;

  5. 请求失败, 设置当前访问的服务端不健康,下次发起请求会换一个集群中的服务端;

第 4 步骤看起来很简单,就是发起一个 http 请求获取有变更的配置项;但是这里其实很有文章,下面会介绍到

拿到变更配置项之后,发起获取配置数据 content 请求

上面的步骤让我们拿到了改变过的配置项changedGroupKeys ;这个时候是没有拿到具体的数据内容content的; 代码不贴了,概述一下流程

  1. 发起获取配置数据内容 getServerConfig(dataId, group, tenant, 3000L);这一步骤就是【Nacos源码之配置管理 九】客户端获取配置数据的流程 中讲的;就不详细说了;

  2. 拿到 content 之后更新缓存

  3. 回调监听类的接口 receiveConfigInfo(content )

executorService.execute(this);重新执行一遍


可以看到重新把这个任务放到线程池中取执行了; 那么我们就郁闷了,这不就是不停的去发情请求吗?这样不会有性能问题吗?话是这么说,这个就是长轮询的方式发起请求; 不停的去请求;但是也不是想象中 发起之后里面发起;因为发起一个请求,如果配置没有变更的话,服务端会将请求挂起,直到快到超时时间或者直到配置数据有变更才会返回数据并结束连接;

.

3 总结


  1. addListener 会将监听的 dataId, group 缓存在对象CacheData中;并且将 Listener 包装一下作为CacheData属性

  2. 长轮询任务LongPollingRunnable一直在执行;

  3. 检查是否使用本地配置,如果使用本地配置并且配置有修改;则更新缓存CacheData的值; 并且回调监听类的接口receiveConfigInfo

  4. 如果没有使用本地配置,则向服务端发起请求: currentServerAddr/v1/cs/configs/listener ;获取有变更过的数据 dataid、group 列表

  5. 拿到 4 中返回的数据去获取具体数据内容 content;

  6. 一直执行 3、4、5 中的数据

发布于: 2022-10-11阅读数: 28
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019-09-06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
【Nacos源码之配置管理 十】客户端长轮询监听服务端变更数据_nacos_石臻臻的杂货铺_InfoQ写作社区