写点什么

【Nacos 源码之配置管理 十一】服务端 LongPollingService 推送变更数据到客户端

  • 2022 年 10 月 11 日
    江西
  • 本文字数:4846 字

    阅读完需:约 16 分钟

作者石臻臻,CSDN 博客之星 Top5Kafka Contributornacos Contributor华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家 KnowStreaming


KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源!

1 前言


上一篇 【Nacos源码之配置管理 十】客户端长轮询监听服务端变更数据 介绍了客户端会像服务端发起长轮询来获取变更数据, 其实在客户端发起长轮询的请求相当于向服务端发起了一个订阅; 因为服务端接受到客户端的请求之后如果没有查询到变更数据是不会返回的;而是会等待 29.5s(当然时间可配),在这个 29.5s 时间内,服务端如果检测到有数据变更,会立马像客户端发起响应请求,因为这个时间内服务端还是有 hold 住客户端发过来的请求,所以能发回响应数据; hold 住 request 是用的 AsyncContext 异步这边文章就具体来讲一讲

  • [x] 服务端是怎么通知到客户端数据变更的

  • [x] 如何以 拉模式 长轮询服务端

2LongPollingService


LongPollingService 是一个长轮询服务,但是它是处理客户端的长轮询;LongPollingService 还处理服务端本地数据变更之后的事情

服务端数据变更事件

LongPollingService 实现了 AbstractEventListener 的 onEvent 方法; 这是一个发布订阅模式; 可以看 【Nacos源码之配置管理 二】Nacos中的事件发布与订阅--观察者模式;


之前在文章 Nacos源码之配置管理 七】服务端增删改配置数据之后如何通知集群中的其他机器 中有介绍修改数据之后的一些流程,就有讲到这里;这个是一个本地数据变更事件 ;其中讲到 DataChangeTask 的时候留下了 2 个问号,前面挖的坑现在是填的时候了;

这次我们好好讲一讲数据变更任务 DataChangeTask

DataChangeTask 服务端数据变更任务


配置数据有变更的时候执行这个方法; 遍历allSubs.iterator();得到对象 ClientLongPolling ;这是一个客户端长轮询的对象;里面保存了一些例如 ip、clientMd5Map、asyncContext 等等还有其他一些数据;

asyncContext :Servlet 3.0 新增了异步处理, 这个对象持有客户端的 request response; 就是通过这个对象,在服务端有了数据变更的情况下,能够里面的将变更数据返回响应给客户端; AsyncContext异步请求的用法

那么就剩下一个很重要的问题就是 allSubs 是什么时候订阅上的?

客户端发起长轮询


上一篇文章 【Nacos源码之配置管理 十】客户端长轮询监听服务端变更数据 分析了客户端发起长轮询的请求;如下

那么看看服务端这个listener做了什么

客户端发起了请求,高版本是支持长轮询,同时也兼容了老版本的短轮询; 短轮询就不分析了;

  public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,                                     int probeRequestSize) {
        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);        String tag = req.getHeader("Vipserver-Tag");        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);        /**         * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance         */        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);        if (isFixedPolling()) {            timeout = Math.max(10000, getFixedPollingInterval());            // do nothing but set fix polling timeout        } else {            long start = System.currentTimeMillis();            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);            if (changedGroups.size() > 0) {                generateResponse(req, rsp, changedGroups);                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",                    System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",                    clientMd5Map.size(), probeRequestSize, changedGroups.size());                return;            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,                    changedGroups.size());                return;            }        }        String ip = RequestUtil.getRemoteIp(req);        // 一定要由HTTP线程调用,否则离开后容器会立即发送响应        final AsyncContext asyncContext = req.startAsync();        // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制        asyncContext.setTimeout(0L);
        scheduler.execute(            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));    }
复制代码
  1. 获取客户端的请求参数;①.str: 长轮询的超时时间,默认 30s;详细可见上一篇文章②.noHangUpFlag:不挂起标识,这个标识为 false 的时候,会把客户端的请求挂起;等待超时或者数据变更通知;如果客户端监听的数据是首次初始化,这个标识为 true;③.delayTime:延时时间;为了避免客户端请求超时,需要提前这个时间返回响应;这个数据是在配置管理中配置的,默认 500 毫秒,详细见 【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中



  1. 对比客户端和服务端 MD5 是否相同,有不同则直接返回不同的 dataid+group 响应;

  2. 如果 2 中没有不同,并且如果 noHangUpFlag=true 则直接返回,不挂起请求;

  3. 2 和 3 都不满足,则使用 AsyncContext,将请求异步化,直接挂起; 超时时间为str-delayTIme,str 是客户端设置的时间如下所示,delayTime 默认 500 毫秒,可以在管理后台配置(见上面具体如何配置),如果都不主动配置,那么超时时间是 30000-500=29500; 29.5 秒;


  1. 执行 ClientLongPolling 任务

ClientLongPolling 任务

这个类有如下属性

asyncContext 中持有客户端的请求;clientMd5Map 包含了客户端所要监听的数据的 MD5;

ClientLongPolling 这个任务类执行的时候,是把 一个任务延迟了 timeoutTime 之后再执行的,并且返回 asyncTimeoutFuture,这个 timeoutTime 就是上面说到的超时时间,例如 29.5s;最重要的是,把这个当前实例放到了allSubs中; 等待有数据变更的时候,可以通知到这个客户端,因为当前实例有asyncContext ,可以相应客户端的请求;

29.5s 之后做了什么事情

看看 timeoutTime 之后执行的方法体做了什么


isFixedPolling()下面再讲,暂时忽略;我们看到最终执行的是

  1. 删除订阅关系,为啥要删除,因为这个allSubs在配置数据有变更的时候会遍历这个来进行通知,这里相当于本次请求要结束了,所以删除,不让通知了

  2. 执行sendResponse(null);方法;


看完方法之后了解到,超时时间到了之后,服务端会直接完成客户端的本次请求;客户端没有获取到变更数据,然后会又立马进行下一次请求;重复这个过程;

timeoutTime 超时时间到了之后,服务端会直接结束本次请求;然后客户端又会立马重新发起新一轮请求,重复这个过程;相当于是说客户端每隔 timeoutTime 时间之后,就发起一次请求判断服务端是否有变更数据;

那么问题来了,如果只是这样的话,那么就是服务端纯粹的使用拉模式, 并没有服务端的推模式呀?

服务端变更数据使用推模式推送数据

还记得文章一开头就说到一个事件吗,LocalDataChangeEvent 事件,服务端中修改了配置数据之后,就通知这个事件,这个事件最终会执行DataChangeTask任务;这个任务上面已经分析过了,我们再回头分析一下关键点;

   /**下面删除了部分代码;保留关键点**/  class DataChangeTask implements Runnable {        @Override        public void run() {            try {                for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {                    ClientLongPolling clientSub = iter.next();                    if (clientSub.clientMd5Map.containsKey(groupKey)) {                                          iter.remove(); // 删除订阅关系                                           clientSub.sendResponse(Arrays.asList(groupKey));                    }                }            } catch (Throwable t) {                        }        }

复制代码
  1. 遍历的allSubs; 这个allSubs是上面介绍过,客户端发起长轮询的请求的时候注册上的;

  2. 比较客户端订阅的配置数据 MD5 与当前是否一致,这个时候基本是不一致的,因为有修改嘛;

  3. 做了一些过滤操作之后,sendResponse(Arrays.asList(groupKey));这个操作,就是向客户端发送已经变更的配置项;发送了之后,本次请求也就结束了;客户端又会重新再发起新的一轮请求;

客户端拉+服务端推

上面分析完了之后,我们总结一下;

  1. 客户端发起订阅请求;

  2. 服务端接收到请求之后,立马去查询一次数据是否变更;①.如果有变更立马返回;然后客户端又回到步骤 1;②.如果没有变更,则把当前请求 hold 住一定的时间(默认 29.5s)

  3. ①.如果这期间客户端所监听的数据都一直没有变更,怎时间到达之后,结束客户端的本次请求;客户端又回到步骤 1;②.如果期间有变更; 服务端会轮询所有监听了这个变更了配置项的客户端;然后立马返回响应变更了的配置项;本次请求也结束;客户端又回到步骤 1;

就是这样一个不停的轮询的过程; 但是注意,服务端返回的只是哪些配置项有变更(只返回 dataid+group 等等,并没有返回 content),客户端拿到这些变更配置项之后,还有主动请求配置项的 content,来更新自己的缓存

isFixedPolling 固定长轮询

上面在介绍的时候,我们选择性忽略了这个固定长轮询;现在来介绍一下固定长轮询并不是上介绍的 拉+推 的模式;而是客户端纯粹的拉客户端发起长轮询,服务端立马查询是否有变更,若没有变更会挂起请求; 等待时间到达之后,会再次查询一次;然后返回结果; 如果期间有配置变更;是不会立马推送给客户端的,所以客户端是有一定的延迟的; 默认情况这个时间是 10s;

如何设置固定长轮询

新增一个元数据配置项, DataId 是 com.alibaba.nacos.meta.switch ; Group 是DEFAULT_GROUP ;注意这个配置是内置的,一定要这样配置;

在这里插入图片描述

  1. 将配置isFixedPolling=true 打开固定长轮询

  2. fixedPollingInertval=10000;固定长轮询的间隔时间

  3. fixedDelayTime=500 延迟时间; 例如间隔时间是 10s,延迟时间是 0.5 秒; 那么每隔 9.5s 执行一次轮询;0.5s 的时间是为了防止请求超时的;

两种模式的比较

拉+推 的模式具有时效性;纯粹的拉 会有一定的延迟;推荐使用 拉+推模式

3 总结

在这里插入图片描述

发布于: 刚刚阅读数: 3
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019.09.06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
【Nacos源码之配置管理 十一】服务端LongPollingService推送变更数据到客户端_nacos_石臻臻的杂货铺_InfoQ写作社区