写点什么

Soul 源码阅读 05|Http 长轮询同步数据分析

用户头像
哼干嘛
关注
发布于: 2021 年 01 月 28 日
原理图(来自官网)


http 长轮询机制如上所示,soul-web 网关请求 admin 的配置服务,读取超时时间为 90s,意味着网关层请求配置服务最多会等待 90s,这样便于 admin 配置服务及时响应变更数据,从而实现准实时推送。


http 请求到达 sou-admin 之后,并非立马响应数据,而是利用 Servlet3.0 的异步机制,异步响应数据。首先,将长轮询请求任务 LongPollingClient 扔到 BlocingQueue 中,并且开启调度任务,60s 后执行,这样做的目的是 60s 后将该长轮询请求移除队列,即便是这段时间内没有发生配置数据变更。因为即便是没有配置变更,也得让网关知道,总不能让其干等吧,而且网关请求配置服务时,也有 90s 的超时时间


首先,我们知道 Http 长轮询同步数据是要由网关主动发起请求的,所以我们先来看网关部分的代码。

通过阅读 org.dromara.soul.sync.data.http.HttpSyncDataService,我们可以得到以下信息:

  1. 配置数据被分成了 5 个组:APP_AUTH、PLUGIN、RULE、SELECTOR、META_DATA

  2. 首次运行时,会一次性获取所有组的配置,并创建一个线程池,去监听后续的数据变化

    private void start() {        // It could be initialized multiple times, so you need to control that.        if (RUNNING.compareAndSet(false, true)) {            // fetch all group configs.            this.fetchGroupConfig(ConfigGroupEnum.values());            int threadSize = serverList.size();            this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,                    new LinkedBlockingQueue<>(),                    SoulThreadFactory.create("http-long-polling", true));            // start long polling, each server creates a thread to listen for changes.            this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));        } else {            log.info("soul http long polling was started, executor=[{}]", executor);        }    }
复制代码
  1. doLongPolling 中通过请求 /configs/listener 可以获取到哪些组的配置数据发生了变化,然后会再通过 doFetchGroupConfig 方法去请求数据发生变化的组的全部信息。

private void doLongPolling(final String server) {  MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);  for (ConfigGroupEnum group : ConfigGroupEnum.values()) {    ConfigData<?> cacheConfig = factory.cacheConfigData(group);    String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));    params.put(group.name(), Lists.newArrayList(value));  }  HttpHeaders headers = new HttpHeaders();  headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);  HttpEntity httpEntity = new HttpEntity(params, headers);  String listenerUrl = server + "/configs/listener";  log.debug("request listener configs: [{}]", listenerUrl);  JsonArray groupJson = null;  try {    String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();    log.debug("listener result: [{}]", json);    groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");  } catch (RestClientException e) {    String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());    throw new SoulException(message, e);  }  if (groupJson != null) {    // fetch group configuration async.    ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);    if (ArrayUtils.isNotEmpty(changedGroups)) {      log.info("Group config changed: {}", Arrays.toString(changedGroups));      this.doFetchGroupConfig(server, changedGroups);    }  }}
复制代码


通过对网关代码的分析,可以知道网关是通过 soul-admin 提供的 /configs/listener/configs/fetch 这两个接口对配置数据进行监听变化和获取,我们来看看 soul-admin 中这两个接口是如何处理的。


  1. /configs/fetch 很简单,就是按组获取配置数据,然后返回结果

    @GetMapping("/fetch")    public SoulAdminResult fetchConfigs(@NotNull final String[] groupKeys) {        Map<String, ConfigData<?>> result = Maps.newHashMap();        for (String groupKey : groupKeys) {            ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));            result.put(groupKey, data);        }        return SoulAdminResult.success(SoulResultMessage.SUCCESS, result);    }
复制代码
  1. configs/listener

  • 请求时配置数据有更改:立即返回发生更改的组信息

  • 请求时配置数据无更改:阻塞并监听数据变化,直到数据有变化或达到超时时间

/**     * If the configuration data changes, the group information for the change is immediately responded.     * Otherwise, the client's request thread is blocked until any data changes or the specified timeout is reached.     *     * @param request  the request     * @param response the response     */public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
// compare group md5 List<ConfigGroupEnum> changedGroup = compareChangedGroup(request); String clientIp = getRemoteIp(request);
// response immediately. if (CollectionUtils.isNotEmpty(changedGroup)) { this.generateResponse(response, changedGroup); log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup); return; }
// listen for configuration changed. final AsyncContext asyncContext = request.startAsync();
// AsyncContext.settimeout() does not timeout properly, so you have to control it yourself asyncContext.setTimeout(0L);
// block client's thread. scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));}
复制代码


发布于: 2021 年 01 月 28 日阅读数: 19
用户头像

哼干嘛

关注

早日自由! 2018.09.30 加入

本职工作是后端开发,偶尔也写写前端和小程序

评论

发布
暂无评论
Soul 源码阅读 05|Http 长轮询同步数据分析