写点什么

Nacos 配置中心之服务端长轮询处理机制

作者:周杰伦本人
  • 2022 年 8 月 05 日
  • 本文字数:3937 字

    阅读完需:约 13 分钟

Nacos 配置中心之服务端长轮询处理机制

接着上回说,客户端进行长轮询其实是使用定时线程来定时调用/v1/cs/configs/listener 接口,那么服务端的这个接口实现逻辑又是怎样的呢?今天我们就看看这一块的内容。


客户端发起 http 请求进行长轮询,调用接口/listner 服务端在 nacos-config 模块中的 ConfigController 类中:

ConfigController 的 listener()方法

ConfigController 的 listener()方法:


/** * 比较MD5 */@PostMapping("/listener")public void listener(HttpServletRequest request, HttpServletResponse response)    throws ServletException, IOException {    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);    String probeModify = request.getParameter("Listening-Configs");    if (StringUtils.isBlank(probeModify)) {        throw new IllegalArgumentException("invalid probeModify");    }
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map; try { clientMd5Map = MD5Util.getClientMd5Map(probeModify); } catch (Throwable e) { throw new IllegalArgumentException("invalid probeModify"); }
// do long-polling inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());}
复制代码


  1. 获取客户端需要监听的可能发送变化的配置,计算 MD5 值

  2. inner.doPollingConfig 执行长轮询请求

doPollingConfig()方法

/** * 轮询接口 */public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,                              Map<String, String> clientMd5Map, int probeRequestSize)    throws IOException {
// 长轮询 if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; }
// else 兼容短轮询逻辑 List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
// 兼容短轮询result String oldResult = MD5Util.compareMd5OldResult(changedGroups); String newResult = MD5Util.compareMd5ResultString(changedGroups);
String version = request.getHeader(Constants.CLIENT_VERSION_HEADER); if (version == null) { version = "2.0.0"; } int versionNum = Protocol.getVersionNumber(version);
/** * 2.0.4版本以前, 返回值放入header中 */ if (versionNum < START_LONGPOLLING_VERSION_NUM) { response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult); response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult); } else { request.setAttribute("content", newResult); }
// 禁用缓存 response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache,no-store"); response.setStatus(HttpServletResponse.SC_OK); return HttpServletResponse.SC_OK + "";}
复制代码


  1. 判断当前请求是否为长轮询,如果是,调用LongPollingService的 addLongPollingClient()方法

  2. 不是长轮询,就立即返回结果

LongPollingService 的 addLongPollingClient()

这个方法主要把客户端的长轮询请求封装成 ClientLongPolling 交给 scheduler 执行


public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,                                 int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); /** * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance */ long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // do nothing but set fix polling timeout } else { long start = System.currentTimeMillis(); List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // 一定要由HTTP线程调用,否则离开后容器会立即发送响应 final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制 asyncContext.setTimeout(0L);
scheduler.execute( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}
复制代码


  1. 获取客户端请求的超时时间,减去 500ms 后赋值给 timeout 变量。

  2. 判断 isFixedPolling,如果为 true,定时任务将会在 30s 后开始执行,否则在 29.5s 后开始执行

  3. 和服务端的数据进行 MD5 对比,如果发送变化则直接返回

  4. scheduler.execute 执行 ClientLongPolling 线程

ClientLongPolling

ClientLongPolling 是一个线程,run 方法如下:


@Overridepublic void run() {    asyncTimeoutFuture = scheduler.schedule(new Runnable() {        @Override        public void run() {            try {                getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());                /**                 * 删除订阅关系                 */                allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); List<String> changedGroups = MD5Util.compareMd5( (HttpServletRequest)asyncContext.getRequest(), (HttpServletResponse)asyncContext.getResponse(), clientMd5Map); if (changedGroups.size() > 0) { sendResponse(changedGroups); } else { sendResponse(null); } } else { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); sendResponse(null); } } catch (Throwable t) { LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause()); }
}
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this);}
复制代码


  1. 通过 scheduler.schedule 启动一个定时任务,并延时时间为 29.5s

  2. 将 ClientLongPolling 实例本身添加到 allSubs 队列中,它主要维护一个长轮询的订阅关系。

  3. 定时任务执行后,先把 ClientLongPolling 实例本身从 allSubs 队列中移除。

  4. 通过 MD5 比较客户端请求的 groupKeys 是否发生变更,并将变更结果通过 response 返回给客户端


所谓长轮询就是服务端收到请求之后,不立即返回,而是在延 29.5s 才把请求结果返回给客户端,这使得客户端和服务端之间在 30s 之内数据没有发生变化的情况下一直处于连接状态。

总结

通过对服务端/v1/cs/configs/listener 接口的分析,我们知道这个接口处理客户端的长轮询,使用定时线程,29.5s 之后查看是否有信息变更,有的话再返回给客户端信息,保证 30s 内有数据变化内察觉到。

发布于: 刚刚阅读数: 3
用户头像

还未添加个人签名 2020.02.29 加入

公众号《盼盼小课堂》,多平台优质博主

评论

发布
暂无评论
Nacos配置中心之服务端长轮询处理机制_8月月更_周杰伦本人_InfoQ写作社区