写点什么

Nacos 源码—Nacos 配置中心实现分析(二)

  • 2025-05-09
    福建
  • 本文字数:21600 字

    阅读完需:约 71 分钟

4.客户端如何感知远程配置数据的变更


(1)ConfigService 对象使用介绍


ConfigService 是一个接口,定义了获取配置、发布配置、移除配置等方法。ConfigService 只有一个实现类 NacosConfigService,Nacos 配置中心源码的核心其实就是这个 NacosConfigService 对象。

 

步骤一:手动创建 ConfigService 对象


首先定义好基本的 Nacos 信息,然后利用 NacosFactory 工厂类来创建 ConfigService 对象。


public class Demo {    public static void main(String[] args) throws Exception {        //步骤一:配置信息        String serverAddr = "124.223.102.236:8848";        String dataId = "stock-service-test.yaml";        String group = "DEFAULT_GROUP";
Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
//步骤一:获取配置中心服务 ConfigService configService = NacosFactory.createConfigService(properties); }}
复制代码


步骤二:获取配置、发布配置


创建好 ConfigService 对象后,就可以使用 ConfigService 对象的 getConfig()方法来获取配置信息,还可以使用 ConfigService 对象的 publishConfig()方法来发布配置信息。

 

如下 Demo 先获取一次配置数据,然后发布新配置,紧接着重新获取数据。发现第二次获取的配置数据已发生变化,从而也说明发布配置成功了。


public class Demo {    public static void main(String[] args) throws Exception {        //步骤一:配置信息        String serverAddr = "124.223.102.236:8848";        String dataId = "stock-service-test.yaml";        String group = "DEFAULT_GROUP";
Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
//步骤一:获取配置中心服务 ConfigService configService = NacosFactory.createConfigService(properties); //步骤二:从配置中心获取配置 String content = configService.getConfig(dataId, group, 5000); System.out.println("发布配置前" + content); //步骤二:发布配置 configService.publishConfig(dataId, group, "userName: userName被修改了", ConfigType.PROPERTIES.getType());
Thread.sleep(300L); //步骤二:从配置中心获取配置 content = configService.getConfig(dataId, group, 5000); System.out.println("发布配置后" + content); }}
复制代码


步骤三:添加监听器


可以使用 ConfigService 对象的 addListener()方法来添加监听器。通过 dataId + group 这两个参数,就可以注册一个监听器。当 dataId + group 对应的配置在服务端发生改变时,客户端的监听器就可以马上感知并对配置数据进行刷新。


public class Demo {    public static void main(String[] args) throws Exception {        //步骤一:配置信息        String serverAddr = "124.223.102.236:8848";        String dataId = "stock-service-test.yaml";        String group = "DEFAULT_GROUP";
Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
//步骤一:获取配置中心服务 ConfigService configService = NacosFactory.createConfigService(properties); //步骤二:从配置中心获取配置 String content = configService.getConfig(dataId, group, 5000); System.out.println("发布配置前" + content); //步骤二:发布配置 configService.publishConfig(dataId, group, "userName: userName被修改了", ConfigType.PROPERTIES.getType());
Thread.sleep(300L); //步骤二:从配置中心获取配置 content = configService.getConfig(dataId, group, 5000); System.out.println("发布配置后" + content); //步骤三:注册监听器 configService.addListener(dataId, group, new Listener() { @Override public void receiveConfigInfo(String configInfo) { System.out.println("感知配置变化:" + configInfo); }
@Override public Executor getExecutor() { return null; } });
//阻断进程关闭 Thread.sleep(Integer.MAX_VALUE); }}
复制代码


(2)客户端注册监听器的源码


Nacos 客户端是什么时候为 dataId + group 注册监听器的?

 

在 nacos-config 下的 spring.factories 文件中,有一个自动装配的配置类 NacosConfigAutoConfiguration,在该配置类中定义了一个 NacosContextRefresher 对象,而 NacosContextRefresher 对象会监听 ApplicationReadyEvent 事件。

 

在 NacosContextRefresher 的 onApplicationEvent()方法中,会执行 registerNacosListenersForApplications()方法,这个方法中会遍历每一个 dataId + group 注册 Nacos 监听器。

 

对于每一个 dataId + group,则通过调用 registerNacosListener()方法来进行 Nacos 监听器的注册,也就是最终调用 ConfigService 对象的 addListener()方法来注册监听器。


@Configuration(proxyBeanMethods = false)@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)public class NacosConfigAutoConfiguration {    ...    @Bean    public NacosContextRefresher nacosContextRefresher(NacosConfigManager nacosConfigManager, NacosRefreshHistory nacosRefreshHistory) {        return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);    }    ...}
public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware { private final ConfigService configService; ... @Override public void onApplicationEvent(ApplicationReadyEvent event) { //many Spring context if (this.ready.compareAndSet(false, true)) { this.registerNacosListenersForApplications(); } } //register Nacos Listeners. private void registerNacosListenersForApplications() { if (isRefreshEnabled()) { //获取全部的配置 for (NacosPropertySource propertySource : NacosPropertySourceRepository.getAll()) { //判断当前配置是否需要刷新 if (!propertySource.isRefreshable()) { continue; } String dataId = propertySource.getDataId(); //注册监听器 registerNacosListener(propertySource.getGroup(), dataId); } } } private void registerNacosListener(final String groupKey, final String dataKey) { String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey); Listener listener = listenerMap.computeIfAbsent(key, lst -> new AbstractSharedListener() { @Override public void innerReceive(String dataId, String group, String configInfo) { //监听器的回调方法处理逻辑 refreshCountIncrement(); //记录刷新历史 nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo); //发布RefreshEvent刷新事件 applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config")); if (log.isDebugEnabled()) { log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo)); } } }); try { //注册监听器 configService.addListener(dataKey, groupKey, listener); } catch (NacosException e) { log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), e); } } ...}
复制代码


(3)回调监听器的方法的源码


给每一个 dataId + group 注册 Nacos 监听器后,当 Nacos 服务端的配置文件发生变更时,就会回调监听器的方法,也就是会触发调用 AbstractSharedListener 的 innerReceive()方法。然后调用 applicationContext.publishEvent()发布 RefreshEvent 刷新事件,而发布的 RefreshEvent 刷新事件会被 RefreshEventListener 类来处理。

 

RefreshEventListener 类不是 Nacos 中的类了,而是 SpringCloud 的类。它在处理刷新事件时,会销毁被 @RefreshScope 注解修饰的类的 Bean,也就是会调用添加了 @RefreshScope 注解的类的 destroy()方法。把 Bean 实例销毁后,后面需要用到这个 Bean 时才重新进行创建。重新进行创建的时候,就会获取最新的配置文件,从而完成刷新效果。

 

(4)总结


客户端注册 Nacos 监听器,服务端修改配置后,客户端刷新配置的流程:


 

5.集群架构下节点间如何同步配置数据


(1)Nacos 控制台的配置管理模块


在这个模块中,可以通过配置列表维护我们的配置文件,可以通过历史版本找到配置的发布记录,并且支持回滚操作。当编辑配置文件时,客户端可以及时感知变化并刷新其配置文件。当服务端通知客户端配置变更时,也会通知集群节点进行数据同步。



当用户在 Nacos 控制台点击确认发布按钮时,Nacos 会大概进行如下处理:

一.修改配置文件数据

二.保存配置发布历史

三.通知并触发客户端监听事件进行配置文件变更

四.通知集群对配置文件进行变更

 

点击确认发布按钮时,会发起 HTTP 请求,地址为"/nacos/v1/cs/configs"。通过请求地址可知处理入口是 ConfigController 的 publishConfig()方法。

 

(2)变更配置数据时的源码


ConfigController 的 publishConfig()方法中的两行核心代码是:一.新增或修改配置数据的 PersistService 的 insertOrUpdate()方法,二.发布配置变更事件的 ConfigChangePublisher 的 notifyConfigChange()方法。

 

一.新增或者修改配置数据


其中 PersistService 有两个实现类:一是 EmbeddedStoragePersistServiceImpl,它是 Nacos 内置的 Derby 数据库。二是 ExternalStoragePersistServiceImpl,它是 Nacos 外置数据库如 MySQL。

 

在 ExternalStoragePersistServiceImpl 的 insertOrUpdate()方法中,如果执行 ExternalStoragePersistServiceImpl 的 updateConfigInfo()方法,那么会先查询对应的配置,然后更新配置,最后保存配置历史。


@RestController@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)public class ConfigController {    private final PersistService persistService;    ...        @PostMapping    @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)    public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,            @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,            @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,            @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,            @RequestParam(value = "appName", required = false) String appName,            @RequestParam(value = "src_user", required = false) String srcUser,            @RequestParam(value = "config_tags", required = false) String configTags,            @RequestParam(value = "desc", required = false) String desc,            @RequestParam(value = "use", required = false) String use,            @RequestParam(value = "effect", required = false) String effect,            @RequestParam(value = "type", required = false) String type,            @RequestParam(value = "schema", required = false) String schema) throws NacosException {            final String srcIp = RequestUtil.getRemoteIp(request);        final String requestIpApp = RequestUtil.getAppName(request);        srcUser = RequestUtil.getSrcUserName(request);        //check type        if (!ConfigType.isValidType(type)) {            type = ConfigType.getDefaultType().getType();        }        //check tenant        ParamUtils.checkTenant(tenant);        ParamUtils.checkParam(dataId, group, "datumId", content);        ParamUtils.checkParam(tag);        Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);        MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);        MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);        MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);        MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);        MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);        MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);        ParamUtils.checkParam(configAdvanceInfo);            if (AggrWhitelist.isAggrDataId(dataId)) {            LOGGER.warn("[aggr-conflict] {} attemp to publish single data, {}, {}", RequestUtil.getRemoteIp(request), dataId, group);            throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");        }            final Timestamp time = TimeUtils.getCurrentTime();        String betaIps = request.getHeader("betaIps");        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);        configInfo.setType(type);        if (StringUtils.isBlank(betaIps)) {            if (StringUtils.isBlank(tag)) {                //新增配置或者修改配置                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);                //发布配置改变事件                ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));            } else {                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);                //发布配置改变事件                ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));            }        } else {            //beta publish            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);            //发布配置改变事件            ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));        }        ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_PUB, content);        return true;    }    ...}
//External Storage Persist Service.@SuppressWarnings(value = {"PMD.MethodReturnWrapperTypeRule", "checkstyle:linelength"})@Conditional(value = ConditionOnExternalStorage.class)@Componentpublic class ExternalStoragePersistServiceImpl implements PersistService { private DataSourceService dataSourceService; ... @Override public void insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo, Timestamp time, Map<String, Object> configAdvanceInfo, boolean notify) { try { addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify); } catch (DataIntegrityViolationException ive) { // Unique constraint conflict updateConfigInfo(configInfo, srcIp, srcUser, time, configAdvanceInfo, notify); } } @Override public void updateConfigInfo(final ConfigInfo configInfo, final String srcIp, final String srcUser, final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) { boolean result = tjt.execute(status -> { try { //查询已存在的配置数据 ConfigInfo oldConfigInfo = findConfigInfo(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant()); String appNameTmp = oldConfigInfo.getAppName(); if (configInfo.getAppName() == null) { configInfo.setAppName(appNameTmp); } //更新配置数据 updateConfigInfoAtomic(configInfo, srcIp, srcUser, time, configAdvanceInfo); String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags"); if (configTags != null) { // delete all tags and then recreate removeTagByIdAtomic(oldConfigInfo.getId()); addConfigTagsRelation(oldConfigInfo.getId(), configTags, configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant()); } //保存到发布配置历史表 insertConfigHistoryAtomic(oldConfigInfo.getId(), oldConfigInfo, srcIp, srcUser, time, "U"); } catch (CannotGetJdbcConnectionException e) { LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e); throw e; } return Boolean.TRUE; }); } @Override public ConfigInfo findConfigInfo(final String dataId, final String group, final String tenant) { final String tenantTmp = StringUtils.isBlank(tenant) ? StringUtils.EMPTY : tenant; try { return this.jt.queryForObject("SELECT ID,data_id,group_id,tenant_id,app_name,content,md5,type FROM config_info WHERE data_id=? AND group_id=? AND tenant_id=?", new Object[] {dataId, group, tenantTmp}, CONFIG_INFO_ROW_MAPPER); } catch (EmptyResultDataAccessException e) { // Indicates that the data does not exist, returns null. return null; } catch (CannotGetJdbcConnectionException e) { LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e); throw e; } } ...}
复制代码


二.发布配置变更事件


执行 ConfigChangePublisher 的 notifyConfigChange()方法发布配置变更事件时,最终会把事件添加到 DefaultPublisher.queue 阻塞队列中,完成事件发布。

 

NotifyCenter 在其静态方法中,会创建 DefaultPublisher 并进行初始化。在执行 DefaultPublisher 的 init()方法时,就会开启一个异步任务。该异步任务便会不断从阻塞队列 DefaultPublisher.queue 中获取事件,然后调用 DefaultPublisher 的 receiveEvent()方法处理配置变更事件。

 

在 DefaultPublisher 的 receiveEvent()方法中,会循环遍历事件订阅者。其中就会包括来自客户端,以及来自集群节点的两个订阅者。前者会通知客户端发生了配置变更事件,后者会通知各集群节点发生了配置变更事件。而且进行事件通知时,都会调用 DefaultPublisher 的 notifySubscriber()方法。该方法会异步执行订阅者的监听逻辑,也就是 subscriber.onEvent()方法。

 

具体的 subscriber 订阅者有:用来通知集群节点进行数据同步的订阅者 AsyncNotifyService,用来通知客户端处理配置文件变更的订阅者 LongPollingService。

 

事件发布机制的实现简单总结:发布者需要一个 Set 存放注册的订阅者,发布者发布事件时,需要遍历调用订阅者处理事件的方法。


public class ConfigChangePublisher {    //Notify ConfigChange.    public static void notifyConfigChange(ConfigDataChangeEvent event) {        if (PropertyUtil.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) {            return;        }        NotifyCenter.publishEvent(event);    }}
//Unified Event Notify Center.public class NotifyCenter { static { ... try { // Create and init DefaultSharePublisher instance. INSTANCE.sharePublisher = new DefaultSharePublisher(); INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize); } catch (Throwable ex) { LOGGER.error("Service class newInstance has error : {}", ex); } ThreadUtils.addShutdownHook(new Runnable() { @Override public void run() { shutdown(); } }); } //注册订阅者 public static <T> void registerSubscriber(final Subscriber consumer) { ... addSubscriber(consumer, subscribeType); } private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) { ... EventPublisher publisher = INSTANCE.publisherMap.get(topic); //执行DefaultPublisher.addSubscriber()方法 publisher.addSubscriber(consumer); } ... //Request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is actually published. public static boolean publishEvent(final Event event) { try { return publishEvent(event.getClass(), event); } catch (Throwable ex) { LOGGER.error("There was an exception to the message publishing : {}", ex); return false; } } //Request publisher publish event Publishers load lazily, calling publisher. private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) { if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { return INSTANCE.sharePublisher.publish(event); } final String topic = ClassUtils.getCanonicalName(eventType); EventPublisher publisher = INSTANCE.publisherMap.get(topic); if (publisher != null) { //执行DefaultPublisher.publish()方法 return publisher.publish(event); } LOGGER.warn("There are no [{}] publishers for this event, please register", topic); return false; } ...}
//The default event publisher implementation.public class DefaultPublisher extends Thread implements EventPublisher { protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>(); private BlockingQueue<Event> queue; ... @Override public void addSubscriber(Subscriber subscriber) { //注册事件订阅者 subscribers.add(subscriber); } @Override public boolean publish(Event event) { checkIsStart(); //将事件添加到阻塞队列,则表示已完成事件发布 boolean success = this.queue.offer(event); if (!success) { LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event); receiveEvent(event); return true; } return true; } @Override public void init(Class<? extends Event> type, int bufferSize) { setDaemon(true); setName("nacos.publisher-" + type.getName()); this.eventType = type; this.queueMaxSize = bufferSize; this.queue = new ArrayBlockingQueue<Event>(bufferSize); start(); } @Override public synchronized void start() { if (!initialized) { //执行线程的run()方法,start just called once super.start(); if (queueMaxSize == -1) { queueMaxSize = ringBufferSize; } initialized = true; } } @Override public void run() { openEventHandler(); } void openEventHandler() { try { //This variable is defined to resolve the problem which message overstock in the queue. int waitTimes = 60; //To ensure that messages are not lost, enable EventHandler when waiting for the first Subscriber to register for (; ;) { if (shutdown || hasSubscriber() || waitTimes <= 0) { break; } ThreadUtils.sleep(1000L); waitTimes--; } for (; ;) { if (shutdown) { break; } final Event event = queue.take(); receiveEvent(event); UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence())); } } catch (Throwable ex) { LOGGER.error("Event listener exception : {}", ex); } } //Receive and notifySubscriber to process the event. void receiveEvent(Event event) { final long currentEventSequence = event.sequence(); //循环遍历事件的订阅者 for (Subscriber subscriber : subscribers) { // Whether to ignore expiration events if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass()); continue; } //通知事件订阅者 notifySubscriber(subscriber, event); } } @Override public void notifySubscriber(final Subscriber subscriber, final Event event) { LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber); final Runnable job = new Runnable() { @Override public void run() { //异步执行订阅者的监听逻辑 subscriber.onEvent(event); } }; final Executor executor = subscriber.executor(); if (executor != null) { executor.execute(job); } else { try { job.run(); } catch (Throwable e) { LOGGER.error("Event callback exception : {}", e); } } } ...}
复制代码


(3)集群节点间的配置数据变更同步


核心处理方法便是 AsyncNotifyService 的 onEvent()方法。该方法首先会获取集群节点列表,然后遍历集群列表构造通知任务 NotifySingleTask,接着把通知任务 NotifySingleTask 添加到队列 queue 当中,最后根据通知任务队列 queue 封装一个异步任务提交到线程池去处理,也就是异步任务 AsyncTask 的 run()方法会处理通知任务 NotifySingleTask。

 

在异步任务 AsyncTask 的 run()方法中,会一直从 queue 中获取通知任务,以便将配置数据同步到对应的集群节点。具体就是在 while 循环中,首先获得通知任务中对应的集群节点的 IP 地址。然后判断该集群节点的 IP 是否在当前节点的配置中,并且是否是健康状态。如果该集群节点不健康,则放入队列并将队列提交给异步任务来延迟处理。如果该集群节点是健康状态,则通过 HTTP 方式发起配置数据的同步,地址是"/v1/cs/communication/dataChange"。


@Servicepublic class AsyncNotifyService {    ...    @Autowired    public AsyncNotifyService(ServerMemberManager memberManager) {        this.memberManager = memberManager;        //Register ConfigDataChangeEvent to NotifyCenter.        NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);        //Register A Subscriber to subscribe ConfigDataChangeEvent.        NotifyCenter.registerSubscriber(new Subscriber() {            @Override            public void onEvent(Event event) {                //配置中心数据变更,同步其他集群节点数据                if (event instanceof ConfigDataChangeEvent) {                    ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;                    long dumpTs = evt.lastModifiedTs;                    String dataId = evt.dataId;                    String group = evt.group;                    String tenant = evt.tenant;                    String tag = evt.tag;                    //获取集群节点列表                    Collection<Member> ipList = memberManager.allMembers();                                      Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();                    //遍历集群列表构造通知任务NotifySingleTask去同步数据                    for (Member member : ipList) {                        //把通知任务NotifySingleTask添加到队列queue当中                        queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta));                    }                    //根据通知任务队列Queue<NotifySingleTask>,封装一个异步任务AsyncTask,提交到线程池执行                    ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));                }            }                        @Override            public Class<? extends Event> subscribeType() {                return ConfigDataChangeEvent.class;            }        });    }    ...    class AsyncTask implements Runnable {        private Queue<NotifySingleTask> queue;        private NacosAsyncRestTemplate restTemplate;                public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue<NotifySingleTask> queue) {            this.restTemplate = restTemplate;            this.queue = queue;        }            @Override        public void run() {            executeAsyncInvoke();        }            private void executeAsyncInvoke() {            while (!queue.isEmpty()) {                //一直从queue队列中获取通知任务,以便将配置数据同步到对应的集群节点                NotifySingleTask task = queue.poll();                //获取通知任务中对应的集群节点的IP地址                String targetIp = task.getTargetIP();                if (memberManager.hasMember(targetIp)) {                    //start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify                    //判断该集群节点的ip是否在当前节点的配置中,并且是否是健康状态                    boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);                    if (unHealthNeedDelay) {                        //target ip is unhealthy, then put it in the notification list                        //如果该集群节点不健康,则放入另外一个队列,同样会将队列提交给异步任务,然后延迟处理                        ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,                            task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,                            0, task.target);                        //get delay time and set fail count to the task                        asyncTaskExecute(task);                    } else {                        //如果该集群节点是健康状态,则通过HTTP方式发起配置数据的同步                        Header header = Header.newInstance();                        header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));                        header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());                        if (task.isBeta) {                            header.addParam("isBeta", "true");                        }                        AuthHeaderUtil.addIdentityToHeader(header);                        //通过HTTP方式发起配置数据的同步,请求的HTTP地址:/v1/cs/communication/dataChange                        restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));                    }                }            }        }    }        private void asyncTaskExecute(NotifySingleTask task) {        int delay = getDelayTime(task);        Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();        queue.add(task);        AsyncTask asyncTask = new AsyncTask(nacosAsyncRestTemplate, queue);        //提交异步任务给线程池延迟执行        ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);    }}
复制代码


当集群节点处理"/v1/cs/communication/dataChange"这个 HTTP 请求时,会调用 CommunicationController 的 notifyConfigInfo()方法,接着调用 DumpService 的 dump()方法将请求包装成 DumpTask 同步数据任务,然后调用 TaskManager 的 addTask()方法将 DumpTask 同步数据任务放入 map。

 

TaskManager 的父类 NacosDelayTaskExecuteEngine 在初始化时,会开启一个异步任务执行 ProcessRunnable 的 run()方法,也就是会不断从 map 中取出 DumpTask 同步数据任务,然后调用 DumpProcessor 的 process()方法处理具体的配置数据同步逻辑。也就是查询数据库最新的配置,然后持久化配置数据到磁盘上,从而完成集群之间配置数据的同步。


@RestController@RequestMapping(Constants.COMMUNICATION_CONTROLLER_PATH)public class CommunicationController {    private final DumpService dumpService;    ...    @GetMapping("/dataChange")    public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,            @RequestParam("group") String group,            @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,            @RequestParam(value = "tag", required = false) String tag) {        dataId = dataId.trim();        group = group.trim();        String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);        long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);        String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);        String isBetaStr = request.getHeader("isBeta");        if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {            dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);        } else {            dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);        }        return true;    }    ...}
public abstract class DumpService { private TaskManager dumpTaskMgr; public DumpService(PersistService persistService, ServerMemberManager memberManager) { ... this.processor = new DumpProcessor(this); this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager"); this.dumpTaskMgr.setDefaultTaskProcessor(processor); ... } ... public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta)); } ...}
public final class TaskManager extends NacosDelayTaskExecuteEngine implements TaskManagerMBean { ... @Override public void addTask(Object key, AbstractDelayTask newTask) { super.addTask(key, newTask); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); } ...}
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> { protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;//任务池 public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity); processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); //开启延时任务 processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); } ... @Override public void addTask(Object key, AbstractDelayTask newTask) { lock.lock(); try { AbstractDelayTask existTask = tasks.get(key); if (null != existTask) { newTask.merge(existTask); } //最后放入到ConcurrentHashMap中 tasks.put(key, newTask); } finally { lock.unlock(); } } ... private class ProcessRunnable implements Runnable { @Override public void run() { try { processTasks(); } catch (Throwable e) { getEngineLog().error(e.toString(), e); } } } @Override public Collection<Object> getAllTaskKeys() { Collection<Object> keys = new HashSet<Object>(); lock.lock(); try { keys.addAll(tasks.keySet()); } finally { lock.unlock(); } return keys; } protected void processTasks() { //获取tasks中所有的任务,然后进行遍历 Collection<Object> keys = getAllTaskKeys(); for (Object taskKey : keys) { //通过任务key,获取具体的任务,并且从任务池中移除掉 AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } //DumpService在初始化时会设置TaskManager的默认processor是DumpProcessor //根据taskKey获取NacosTaskProcessor延迟任务处理器:DumpProcessor NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { //ReAdd task if process failed //调用DumpProcessor.process()方法 if (!processor.process(task)) { //如果失败了,会重试添加task回tasks这个map中 retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); retryFailedTask(taskKey, task); } } }}
复制代码


(4)服务端通知客户端配置数据已变更


服务端通知客户端配置文件变更的方法是 LongPollingService.onEvent()。

 

由前面客户端如何感知远程配置数据的变更可知,Nacos 客户端启动时:会调用 ConfigService 的 addListener()方法为每个 dataId + group 添加一个监听器。而 NacosConfigService 初始化时会创建 ClientWorker 对象,此时会开启多个长连接任务即执行 LongPollingRunnable 的 run()方法。

 

执行 LongPollingRunnable 的 run()方法时,会触发执行 ClientWorker 的 checkUpdateDataIds()方法,该方法最后会调用服务端的"/v1/cs/configs/listener"接口,将当前客户端添加到 LongPollingService 的 allSubs 属性中。

 

这样当以后 dataId + group 的配置发生变更时,服务端会触发执行 LongPollingService 的 onEvent()方法,然后遍历 LongPollingService.allSubs 属性通知客户端配置已变更。

 

客户端收到变更事件通知后,会将最新的配置刷新到容器中,同时将 @RefreshScope 注解修饰的 Bean 从缓存中删除。这样再次访问这些 Bean,就会重新创建 Bean,从而读取到最新的配置。


public class NacosConfigService implements ConfigService {    //long polling.    private final ClientWorker worker;    ...    public NacosConfigService(Properties properties) throws NacosException {        ...        this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);    }    ...}
//Long polling.public class ClientWorker implements Closeable { public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { ... this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); } //Check config info. public void checkConfigInfo() { //Dispatch taskes. int listenerSize = cacheMap.size(); //Round up the longingTaskCount. int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { //The task list is no order.So it maybe has issues when changing. //执行长连接任务:LongPollingRunnable.run() executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } } ... class LongPollingRunnable implements Runnable { ... @Override public void run() { ... //check server config List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); ... } } //Fetch the dataId list from server. List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception { ... return checkUpdateConfigStr(sb.toString(), isInitializingCacheList); } //Fetch the updated dataId list from server. List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception { ... try { //In order to prevent the server from handling the delay of the client's long task, increase the client's read timeout to avoid this problem. long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); //发起HTTP请求:/v1/cs/configs/listener,将客户端添加到LongPollingService.allSubs属性中 HttpRestResult<String> result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), readTimeoutMs); ... } catch (Exception e) { ... } return Collections.emptyList(); }}
@RestController@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)public class ConfigController { private final ConfigServletInner inner; ... @PostMapping("/listener") @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class) public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { ... //do long-polling inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); } ...}
@Servicepublic class ConfigServletInner { ... //轮询接口. public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException { //Long polling. if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; } ... } ...}
@Servicepublic class LongPollingService { //客户端长轮询订阅者 final Queue<ClientLongPolling> allSubs; ... public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) { ... //添加订阅者 ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); } class ClientLongPolling implements Runnable { @Override public void run() { ... allSubs.add(this); } ... } ... public LongPollingService() { allSubs = new ConcurrentLinkedQueue<ClientLongPolling>(); ... NotifyCenter.registerSubscriber(new Subscriber() { @Override public void onEvent(Event event) { if (isFixedPolling()) { // Ignore. } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent) event; //触发执行DataChangeTask.run()方法 ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } } ... }); } class DataChangeTask implements Runnable { @Override public void run() { try { ConfigCacheService.getContentBetaMd5(groupKey); //遍历订阅了配置变更事件的客户端 for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); if (clientSub.clientMd5Map.containsKey(groupKey)) { ... getRetainIps().put(clientSub.ip, System.currentTimeMillis()); iter.remove(); // Delete subscribers' relationships. //发送服务端数据变更的响应给客户端 clientSub.sendResponse(Arrays.asList(groupKey)); } } } catch (Throwable t) { LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t)); } } } ...}
复制代码


(5)总结


一.配置中心数据变更同步集群节点的整体逻辑


当在 Nacos 后台变更配置数据后:首先自身节点会把最新的配置数据更新到数据库中,并且添加变更历史。然后利用事件发布订阅机制来通知订阅者,其中订阅者 AsyncNotifyService 会通过 HTTP 方式来通知其他集群节点。当其他集群节点收到通知后,会重新查询数据库最新的配置数据。然后持久化到磁盘上,因为获取配置数据的接口是直接读磁盘文件的。集群节点的配置数据同步完成后,还要通知客户端配置数据已变更。

 

二.服务端通知客户端配置数据已变更


在客户端给 dataId + group 添加监听器后,会和服务端建立一个长轮询,所以另外一个订阅者 LongPollingService 会通过长轮询通知客户端。也就是会遍历每一个客户端,通过长轮询向客户端进行响应。最终会调用到客户端监听器的回调方法,从而去刷新客户端的配置 Bean。


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18865296

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
Nacos源码—Nacos配置中心实现分析(二)_Java_量贩潮汐·WholesaleTide_InfoQ写作社区