写点什么

【Nacos 源码之配置管理 七】服务端增删改配置数据之后如何通知集群中的其他机器

  • 2022 年 10 月 08 日
    江西
  • 本文字数:6080 字

    阅读完需:约 20 分钟

【Nacos源码之配置管理 七】服务端增删改配置数据之后如何通知集群中的其他机器

作者石臻臻,CSDN 博客之星 Top5Kafka Contributornacos Contributor华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家 KnowStreaming


KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源!

在后台管理界面可以直接增删改查所有的配置数据,那么问题来了

  1. [x]增删改数据这个操作,除了落库,还做了哪些操作?

1 新增配置数据


打开后台新建一个配置


  1. 如果是新增先访问 Http 请求 ConfigController.getConfig()检验 dataId,group 等等是否已经存在,已经存在提示不让新增

  2. 不存在可以新增,则访问 ConfigController.publishConfig 方法发布配置数据;

ConfigController.publishConfig 发布数据

    /**     * 增加或更新非聚合数据。     *     * @throws NacosException     */    @RequestMapping(method = RequestMethod.POST)    @ResponseBody    public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response)        throws NacosException {        //部分省略....
        if (AggrWhitelist.isAggrDataId(dataId)) {            log.warn("[aggr-conflict] {} attemp to publish single data, {}, {}",                RequestUtil.getRemoteIp(request), dataId, group);            throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");        }
        final Timestamp time = TimeUtils.getCurrentTime();        String betaIps = request.getHeader("betaIps");        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);        if (StringUtils.isBlank(betaIps)) {            if (StringUtils.isBlank(tag)) {                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));            } else {                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));            }        } else { // beta publish            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);            EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));        }        ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(),            LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
        return true;    }
复制代码
  1. persistService.insertOrUpdate 将配置信息持久化到数据库

  2. 发起配置数据有变化事件的通知


EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));

复制代码

关于事件通知不懂的可以查看这篇文章 原 【Nacos源码之配置管理 二】Nacos中的事件发布与订阅--观察者模式

ConfigDataChangeEvent 这个事件只有 AsyncNotifyService 监听了


那好,我们主要看 AsyncNotifyService 做了哪些事情;

AsyncNotifyService 异步通知服务

 @Override    public void onEvent(Event event) {
        // 并发产生 ConfigDataChangeEvent        if (event instanceof ConfigDataChangeEvent) {            ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;            long dumpTs = evt.lastModifiedTs;            String dataId = evt.dataId;            String group = evt.group;            String tenant = evt.tenant;            String tag = evt.tag;            List<?> ipList = serverListService.getServerList();
            // 其实这里任何类型队列都可以            Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();            for (int i = 0; i < ipList.size(); i++) {                queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta));            }            EXECUTOR.execute(new AsyncTask(httpclient, queue));        }    }
复制代码
  1. 通过 serverListService.getServerList(); 拿到所有的集群服务器列表.这个列表是配置的所有服务器列表,也包括不健康的服务器;PS:关于集群服务器列表的获取可以看文章 【Nacos 源码之配置管理 六】集群模式下服务器之间是如何感知的

  2. 遍历服务器列表 ipList,每个服务器(包括自己)组装成一个任务 NotifySingleTask,对象里面有一个属性 url; 组装 url 的代码,这个 Url 就是一会要请求的链接

 private static final String URL_PATTERN = "http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH            + "/dataChange"            + "?dataId={2}&group={3}";this.url = MessageFormat.format(URL_PATTERN, target, RunningConfigUtils.getContextPath(), dataId,                    group);                    
复制代码

最终组装成的 url 形式 http://ip:port/nacos/communication/dataChange?dataId={2}&group={3}&tenant={4}&tag=tag3. 2 中组装成的所有 NotifySingleTask 放到 Queue 队列中, 然后传到 AsyncTask 中; AsyncTask 是一个 http 异步的任务; 它会 http 请求 NotifySingleTask 对象中的 url

AsyncTask 异步任务

class AsyncTask implements Runnable {
        public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) {            this.httpclient = httpclient;            this.queue = queue;        }
        @Override        public void run() {            executeAsyncInvoke();        }
        private void executeAsyncInvoke() { while (!queue.isEmpty()) {                NotifySingleTask task = queue.poll();                String targetIp = task.getTargetIP();                if (serverListService.getServerList().contains(                    targetIp)) {                    // 启动健康检查且有不监控的ip则直接把放到通知队列,否则通知                    if (serverListService.isHealthCheck()                        && ServerListService.getServerListUnhealth().contains(targetIp)) {                        // target ip 不健康,则放入通知列表中                        ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,                            task.getLastModified(),                            LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);                        // get delay time and set fail count to the task                        asyncTaskExecute(task);                    } else {                        HttpGet request = new HttpGet(task.url);                        request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,                            String.valueOf(task.getLastModified()));                        request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP);                        if (task.isBeta) {                            request.setHeader("isBeta", "true");                        }                        httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task));                    }                }            }        }        private Queue<NotifySingleTask> queue;        private CloseableHttpAsyncClient httpclient;
    }

复制代码

这个任务的执行流程如下:

  1. 如果 targetIp 不在 serverListService.getServerList() 列表中就忽略( 出现这个原因的情况可可能是远程配置文件移除了这台服务器)

  2. 如果远程配置文件服务器是健康的但是 targetIp 是不健康的,就发起重试流程asyncTaskExecute(task); 这个重试任务会延迟执行

  3. 如果 targetIp 健康,则发起请求,请求链接就是之前组装的 url ;http://ip:port/nacos/communication/dataChange?dataId={2}&group={3}&tenant={4}&tag=tag

  4. 如果这个 url 请求失败,也会发起重试流程 asyncTaskExecute(task);

重试延迟执行时间

走入重试流程的话,会延迟一定的时间来执行;那么延迟多久呢?

    /**     * get delayTime and also set failCount to task;失败时间指数增加,以免断网场景不断重试无效任务,影响正常同步     *     * @param task notify task     * @return delay     */    private static int getDelayTime(NotifySingleTask task) {        int failCount = task.getFailCount();        int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS;        if (failCount <= MAX_COUNT) {            task.setFailCount(failCount + 1);        }        return delay;    }
复制代码

失败时间指数增加,以免断网场景不断重试无效任务,影响正常同步

通知配置信息改变

上面第 3 步骤说发起一个 http 请求 url; 这个请求就是通知所有服务器有数据变更了(包括通知自己),这个通知的方法在 CommunicationController.notifyConfigInfo 最终执行的方法是 DumpService dump 方法


    dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);

复制代码

关于 DumpService 可以看之前的文章【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中DumpService 的 dump 方法其实也是 new 一个 DumpTask 任务放到任务执行类 dumpTaskMgr 里面等待执行,这个执行类是单线程操作;

  public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {        String groupKey = GroupKey2.getKey(dataId, group, tenant);        dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));    }
复制代码

这个 dumpTaskMgr 的执行器是哪个呢?


dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager",new DumpProcessor(this));        
复制代码

最终的执行器是 DumpProcessor 那么最终执行的是 DumpProcessor 的 process 方法;

DumpProcessor ,Dump 指定的配置信息

【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中里面有详细讲解执行器 DumpAllProcessor 这个执行器是一次把所有的配置给 Dump 下来;而 DumpProcessor 是只 dump 指定的配置!

因为第四篇文章已经详细讲解了 Dump 流程,这里只简要说明一下

  1. 先数据库查询这个配置信息是否存在(因为有可能在其他机器上执行了删除操作),存在的话,将指定的配置文件 Dump 到服务器磁盘文件上; 如果没有查到就把本地的也删除并通知

  2. 更新服务器上配置数据的缓存

  3. 发送 LocalDataChangeEvent 事件,通知本地数据有变更;最终执行的是 DataChangeTask 任务;

DataChangeTask 数据变更任务

  1. 遍历所有的长轮询订阅者者(怎么订阅上的?后面会有文章介绍)

  2. 如果是 beta 发布且不在 beta 列表直接跳过

  3. 如果 tag 发布且不在 tag 列表直接跳过

  4. 发送 Http 响应(既然是响应,那什么时候请求的呢?后面介绍)通知所有未被上面 2、3 过滤掉的的订阅者最新的配置数据 ConfigInfo

2 总结


  1. 增删改数据库数据

  2. 请求所有的集群服务器列表的 http 请求/communication/dataChange

  3. 每个服务器收到请求之后就更新本地磁盘文件

  4. 每个服务器更新本地缓存

  5. 每个服务器轮询订阅自己的客户端

  6. 如果是 beta 灰度发布,但是这个客户端不在灰度 Ip 列表里面则忽略

  7. 如果是 tag 发布切不在 tag 列表直接忽略

  8. 发送 Http 响应通知所有未被上面 6、7 过滤掉的的订阅者最新的配置数据 ConfigInfo,这个数据只是 dataId,group 等等;没有返回 content 的内容

  9. 客户端收到响应之后可以调用服务端 http 请求 ConfigServletInner.doGetConfig 方法来获取最新数据;

  10. ConfigServletInner.doGetConfig 获取数据是获取服务器本地磁盘中的数据,是没有走数据库的;因为本地磁盘的文件就是最新的数据

3 问题


读完这篇文章我们知道了, 增删改配置之后做了哪些事情,相信看完这篇文章之后会有新的问题;

  • [ ] 客户端是如何订阅服务端的?

  • [ ] 服务端又是怎么通知到客户端数据变更的?

  • [ ] 客户端与服务端直接是长连接还是短连接?

  • [ ] 客户端与服务端是推还是拉?

以上问题,我会再后面的文章中一一分析;欢迎关注我的公众号 进击的老码农(jjdlmn)Nacos 源码系列会再公众号中首发;每日会推一些科技资讯、Java、面试题、源码系列;

发布于: 刚刚阅读数: 4
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019.09.06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
【Nacos源码之配置管理 七】服务端增删改配置数据之后如何通知集群中的其他机器_nacos_石臻臻的杂货铺_InfoQ写作社区