Nacos 配置中心源码 | 京东物流技术团队

客户端
入口
在引入配置中心 maven 依赖的 jar 文件中找到 spring-cloud-starter-alibaba-nacos-config-2.2.5.RELEASE.jar!/META-INF/spring.factories
,在该配置文件找到 NacosConfigBootstrapConfiguration 配置类,该类是 nacos 配置中心的入口类,类中注册了三个 bean。


NacosConfigProperties:属性配置类,对应配置文件中 spring.cloud.nacos.config 前缀的属性。
NacosConfigManager:管理 NacosConfigProperties 和 ConfigService。
NacosPropertySourceLocator:加载配置中心配置信息。
NacosConfigManager
在 NacosConfigManager 构造方法中,调用了 createConfigService 方法,该方法通过工厂类调用 ConfigService 实现类的构造方法创建 ConfigService 实例。


在 ConfigService 的实现类 NacosConfigService 的构造方法中会初始化 this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));,该 agent 是用来像服务端发送请求的代理。


ServerHttpAgent 类中 NacosRestTemplate 属性是发送远程调用的工具类,会调用 HttpMethod.GET 方法调用服务端 rest 请求。

在回到 NacosConfigService#NacosConfigService 的方法中 this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); 该属性是客户端工作线程类,在类的内部有两个线程池:
1. 只有一个线程的线程池 this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory()
用来执行定时任务,每隔 10ms 执行一次 checkConfigInfo(); 方法,按照每 3000 个配置项为一批次捞取待轮询的 cacheData 实例,将其包装成为一个 LongPollingTask 提交进入第二个线程池 executorService 处理。


2.线程数等于处理器个数的线程池,用来执行 ClientWorker.LongPollingRunnable#LongPollingRunnable#run,cacheMap 中缓存着需要刷新的配置,将 cacheMap 中数量以 3000 分一个组,分别创建一个 LongPollingRunnable 用来监听配置更新,在 LongPollingRunnable#run 方法中调用 checkLocalConfig(cacheData); 检查本地的配置,容错的处理;调用 checkUpdateDataIds(cacheDatas, inInitializingCacheList); 方法是向 nacos 服务端 发送一个长连接超时事件 30s,返回有更新的 dataids;调用 getServerConfig(dataId, group, tenant, 3000L); 方法是根据返回有变化的 dataids 调用服务端配置中心接口获取配置属性,并更新本地快照;调用 checkListenerMd5();方式,对有变化的配置添加监听处理;最后继续调用 executorService.execute(this); 方法轮询处理。



CacheData#checkListenerMd5


在 listener.receiveConfigInfo(contentTmp); 方法中会调用到 AbstractSharedListener#receiveConfigInfo 方法,会发布 RefreshEvent 事件。


对应的事件监听器为:RefreshEventListener, Spring Cloud 实现的,在该监听器里更新配置和刷新容器中标记了 @RefreshScope 的配置,在 onApplicationEvent 方法中监听 2 个事件,ApplicationReadyEvent(spring boot 事件,表示 application 应该初始化完成)、RefreshEvent。


RefreshEvent:this.handle((RefreshEvent)event);处理该事件,用来刷新容器中标记了 @RefreshScope 注解的配置,org.springframework.cloud.context.refresh.ContextRefresher#refresh


refreshEnvironment();中 extract(this.context.getEnvironment().getPropertySources()) 抽取除系统变量外的其他变量;addConfigFilesToEnvironment();把原有的 environment 里面的参数放到一个新建的 spring context 容器下重新加载,完事之后关闭新容器,这里就是获取参数的新值;


changes(before,extract(this.context.getEnvironment().getPropertySources())) 获取新的参数值,并和之前得进行比较找出改变得参数值。

this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys)); 发布环境变更事件,并带上改变得参数值。
回到 ContextRefresher#refresh 方法,看下 this.scope.refreshAll(); 刷新标记 @RefreshScope 注解的 bean。


super.destroy(); 方法,清楚 scope 里面的缓存,下次就会重新从 BeanFactory 获取一个新的实例会使用新的配置。
this.context.publishEvent(new RefreshScopeRefreshedEvent()); 方法发布事件。
服务端
DumpService
DumpService 类是一个抽象类负责从存储中查询配置保存到磁盘上,它有两个子类,EmbeddedDumpService 嵌入式存储(DERBY)、ExternalDumpService 扩展数存储。

ExternalDumpService 实现类的 init 方法上 @PostConstruct 注解,在 spring 构建 bean 的过程中会执行带有 @PostConstruct 的初始化方法。

调用到抽象父类 DumpService#dumpOperate 的方法,调用到 dumpConfigInfo 方法,dumpConfigInfo 方法会判断是全量更新,还是追加更新。

如果 isAllDump 为 true 会走全量更新,会进行判断是否有快速更新配置、是否存在心跳检查文件、最后检查时间是否小于 6 小时,上述判断都满足就不走全量更新,否则走全量更新。

dumpAllProcessor.process(new DumpAllTask());将数据库中的所有 configInfo 配置信息查询出来,写入服务器端磁盘缓存。

persistService.findConfigMaxId(); 查询数据库中最大的主键,用于分页处理。

persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE); 从数据库中分页查询数据,每次查询 1000 条。


ConfigCacheService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(),cf.getType()); 写入磁盘

保存到文件中

updateMd5(groupKey, md5, lastModifiedTs); 缓存配置信息的 MD5 到内存中,并发布 LocalDataChangeEvent 事件。

事件监听器会在 NotifyCenter.registerSubscriber 调用。

获取配置
HttpMethod.GET /nacos/v1/cs/configs 获取服务端配置接口,ConfigController#getConfig。

在 getConfig 中调用了 inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
在 doGetConfig 方法中会调用 DiskUtil.targetBetaFile(dataId, group, tenant);方法,从本地磁盘上获取,不是从 mysql 中拉取,如果直接修改 mysql 数据不会生效的,需要发布 ConfigDataChangeEvent 事件,触发更新。

监听配置
HttpMethod.POST 请求调用 /nacos/v1/cs/configs/listener 轮询接口调用长连接。

longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); 长连接轮询处理。

SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); 最多处理 29.5s 需要保留 0.5s 来响应客户端,避免超时。

MD5Util.compareMd5(req, rsp, clientMd5Map); 比较客户端的 md5 与当前服务端的是否一致,不一致返回到 changedGroups。

有不一致数据直接响应 generateResponse(req, rsp, changedGroups);

线程池执行长连接任务 ConfigExecutor.executeLongPolling。

LongPollingService.ClientLongPolling#run 长轮询。

ConfigExecutor.scheduleLongPolling 延迟 29.5s 执行

延迟执行先删除队列中自己的任务 allSubs.remove(ClientLongPolling.this);

allSubs.add(this); 添加到队列

inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());

MD5Util.compareMd5(request, response, clientMd5Map); 和当前配置比较,返回有变更的配置
nacos 管理端变更配置
HttpMethod.POST /nacos/v1/cs/configs

persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true); 持节化信息到数据库。




回到 ConfigController#publishConfig 看下 ConfigChangePublisher.notifyConfigChange 方法,触发 ConfigDataChangeEvent 事件。


ConfigDataChangeEvent 事件监听。

ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue)); 同步其他节点。

还有 LongPollingService 初始化的时候订阅了 LocalDataChangeEvent 事件,也会监听到。

ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
看下 LongPollingService.DataChangeTask#run,push 模式, 遍历 allSubs 把变化的 key 响应客户端。clientSub.sendResponse(Arrays.asList(groupKey));

作者:京东物流 张士欣
来源:京东云开发者社区 自猿其说 Tech 转载请注明来源
版权声明: 本文为 InfoQ 作者【京东科技开发者】的原创文章。
原文链接:【http://xie.infoq.cn/article/8dedcb45886d00e04f23387b7】。文章转载请联系作者。
评论