写点什么

Nacos 配置中心之客户端长轮询

作者:周杰伦本人
  • 2022 年 8 月 04 日
  • 本文字数:6738 字

    阅读完需:约 22 分钟

Nacos 配置中心之客户端长轮询

客户端长轮询定时任务是在 NacosFactory 的 createConfigService 构建 ConfigService 对象实例的时候启动的

createConfigService

public static ConfigService createConfigService(String serverAddr) throws NacosException {    return ConfigFactory.createConfigService(serverAddr);}
复制代码


public class ConfigFactory {
/** * Create Config * * @param properties init param * @return ConfigService * @throws NacosException Exception */ public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } }
/** * Create Config * * @param serverAddr serverList * @return Config * @throws ConfigService Exception */ public static ConfigService createConfigService(String serverAddr) throws NacosException { Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); return createConfigService(properties); }
}
复制代码


  1. 通过 Class.forName 加载 NacosConfigService 类

  2. 使用反射来完成 NacosConfigService 类的实例化

NacosConfigService 构造

NacosConfigService 构造方法:


public NacosConfigService(Properties properties) throws NacosException {    String encodeTmp = properties.getProperty("encode");    if (StringUtils.isBlank(encodeTmp)) {        this.encode = "UTF-8";    } else {        this.encode = encodeTmp.trim();    }
this.initNamespace(properties); this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); this.agent.start(); this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);}
复制代码


它的构造方法中:


  1. 初始化 HttpAgent,使用了装饰器模式,实际工作的类是 ServerHttpAgent,MetricsHttpAgent 内部也调用了 ServerHttpAgent 的方法,增加监控统计信息

  2. ClientWorker 是客户端的工作类,agent 作为参数传入 ClientWorker,用 agent 做一些远程调用

ClientWorker 构造

ClientWorker 的构造函数:


@SuppressWarnings("PMD.ThreadPoolCreationRule")    public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {        this.agent = agent;        this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties);
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } });
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } });
executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }
复制代码


构造方法中:


  1. 构建定时调度的线程池,第一个线程池 executor 只拥有一个核心线程,每隔 10s 执行一次 checkConfigInfo()方法,功能就是每 10ms 检查一次配置信息

  2. 第二个线程池 executorService 只完成了初始化,后续用于客户端的定时长轮询功能。

checkConfigInfo()方法:

public void checkConfigInfo() {    int listenerSize = cacheMap.get().size();    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());    if (longingTaskCount > currentLongingTaskCount) {        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {                       executorService.execute(new LongPollingRunnable(i));        }        currentLongingTaskCount = longingTaskCount;    }}
复制代码


这个方法的主要功能就是检查配置信息是否发送变化,


  1. 获取监听个数

  2. 分配长轮询任务数,向上取整

  3. 判断长轮询任务数是否比当前长轮询任务数大,如果大的话创建指定就创建线程达到所需的任务数的线程数量,如果不比当前任务数就把求得长轮询任务数赋值给当前长轮询任务数


cacheMap 用来存储监听变更的缓存集合,key 是根据 dataID/group/tenant 拼接的值。Value 是对应的存储在 Nacos 服务器上的配置文件的内容。


默认情况下每个长轮询 LongPollingRunnable 任务处理 3000 个监听配置集,超过 3000 个启动多个 LongPollingRunnable 执行。

LongPollingRunnable

LongPollingRunnable 是一个线程,我们可以直接找到 LongPollingRunnable 里面的 run 方法


class LongPollingRunnable implements Runnable {    private int taskId;
public LongPollingRunnable(int taskId) { this.taskId = taskId; }
@Override public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { // check failover config for (CacheData cacheData : cacheMap.get().values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } }
// check server config List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String content = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(content); LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)); } catch (NacosException ioe) { String message = String.format( "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear();
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } }}
复制代码


LongPollingRunnable 类的 run()方法中:


  1. 遍历 CacheData,检查本地配置,根据 taskId 对 cacheMap 进行数据分割,通过 checkLocalConfig 方法检查本地配置,本地在 ${user}\naocs\config\目录下缓存一份服务端的配置信息,checkLocalConfig 将内存中的数据和本地磁盘数据比较,不一致说明数据发生了变化,需要触发事件通知。

  2. 执行 checkUpdateDataIds 方法在服务端建立长轮询机制,通过长轮询检查数据变更。

  3. 遍历变更数据集合 changedGroupKeys,调用 getServerConfig 方法,根据 dataId,group,tenant 去服务端读取对应的配置信息并保存到本地文件中。

  4. 继续定时执行当前线程

checkUpdateDataIds()方法

checkUpdateDataIds()方法基于长连接方式监听服务端配置的变化,最后根据变化数据的 key 去服务端获取最新数据。


checkUpdateDataIds 中调用 checkUpdateConfigStr


/** *  */List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
List<String> headers = new ArrayList<String>(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout);
// told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.add("Long-Pulling-Timeout-No-Hangup"); headers.add("true"); }
if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); }
try { HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout);
if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code); } } catch (IOException e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList();}
复制代码


这个方法的作用就是从 Server 获取值变化了的 DataID 列表。返回的对象里只有 dataId 和 group 是有效的,保证不返回 NULLcheckUpdateConfigStr()方法中通过 agent.httpPost 调用/v1/cs/configs/listener 接口实现长轮询请求。长轮询请求是实现层面只是设置了一个比较长的超时时间,默认 30s。如果服务端的数据发生变更,客户端会收到 HttpResult。服务端返回的是存在数据变更的 dataId, group, tenant。获得这些信息后,在 LongPollingRunnable 的 run 方法中调用 getServerConfig 方法从 Nacos 服务器中读取具体的配置内容。

getServerConfig

从 Nacos 服务器中读取具体的配置内容:


public String getServerConfig(String dataId, String group, String tenant, long readTimeout)    throws NacosException {    if (StringUtils.isBlank(group)) {        group = Constants.DEFAULT_GROUP;    }
HttpResult result = null; try { List<String> params = null; if (StringUtils.isBlank(tenant)) { params = Arrays.asList("dataId", dataId, "group", group); } else { params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant); } result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); } catch (IOException e) { String message = String.format( "[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, e); throw new NacosException(NacosException.SERVER_ERROR, e); }
switch (result.code) { case HttpURLConnection.HTTP_OK: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content); return result.content; case HttpURLConnection.HTTP_NOT_FOUND: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null); return null; case HttpURLConnection.HTTP_CONFLICT: { LOGGER.error( "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, " + "tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(NacosException.CONFLICT, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } case HttpURLConnection.HTTP_FORBIDDEN: { LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(result.code, result.content); } default: { LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId, group, tenant, result.code); throw new NacosException(result.code, "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } }}
复制代码

总结

现在我们知道 Nacos 配置中心的客户端做了哪些事了,客户端创建 NacosConfigService 实例,它的构造方法中创建了 ClientWorker 对象,ClientWorker 中就设定了定时线程每隔 10 秒执行一次 checkConfigInfo()方法来检查配置信息是否变更,使用的线程是 LongPollingRunnable,它的 run()方法中的逻辑就是调用 checkUpdateDataIds()方法检查是否数据变更,本质是调用服务端的/v1/cs/configs/listener 接口来实现的

发布于: 刚刚阅读数: 4
用户头像

还未添加个人签名 2020.02.29 加入

公众号《盼盼小课堂》,多平台优质博主

评论

发布
暂无评论
Nacos配置中心之客户端长轮询_8月月更_周杰伦本人_InfoQ写作社区