写点什么

Nacos 源码—Nacos 集群高可用分析(二)

  • 2025-05-07
    福建
  • 本文字数:13566 字

    阅读完需:约 45 分钟

4.集群节点的健康状态变动时的数据同步


(1)Nacos 后台管理的集群管理模块介绍


在集群管理模块下,可以看到每个节点的状态和元数据。节点 IP 就是节点的 IP 地址以及端口,节点状态就是标识当前节点是否可用,节点元数据就是相关的 Raft 信息。



其中节点元数据示例如下:


{    // 最后刷新时间    "lastRefreshTime": 1674093895774,    // raft 元信息    "raftMetaData": {        "metaDataMap": {            "naming_persistent_service": {                // leader IP 地址                "leader": "10.0.16.3:7849",                // raft 分组节点                "raftGroupMember": [                    "10.0.16.3:7850",                    "10.0.16.3:7848",                    "10.0.16.3:7849"                ],                "term": 1            }        }    },    // raft 端口    "raftPort": "7849",    // Nacos 版本    "version": "1.4.1"}
复制代码


(2)集群节点启动时开启节点健康检查任务的源码


因为 ServerMemberManager 这个 Bean 会监听 WebServerInitializedEvent 事件,所以 Spring 启动时会执行 ServerMemberManager 的 onApplicationEvent()方法。该方法会在集群模式下开启一个集群节点的健康检查任务,也就是会执行 MemberInfoReportTask 的 run()方法,即执行 Task 的 run()方法。

 

由于 MemberInfoReportTask 类继承了使用模版设计模式的抽象父类 Task,所以执行 Task 的 run()方法时:会先执行 MemberInfoReportTask 的 executeBody()方法,然后会执行 MemberInfoReportTask 的 after()方法。

 

在 MemberInfoReportTask 的 executeBody()方法中:首先会获取除自身以外的其他集群节点 List,然后通过对 cursor 变量自增后取模,来选出本次请求的目标节点 Member,最后通过 HTTP 方式(/v1/core/cluster/report)对目标节点 Member 发起请求。如果目标节点返回成功,则执行 MemberUtil 的 onSuccess()方法。如果目标节点返回失败,则执行 MemberUtil 的 onFail()方法,并且把目标节点 Member 的 state 属性修改为 DOWN。

 

最后在 MemberInfoReportTask 的 after()方法中:又会重新提交这个 MemberInfoReportTask 健康检查任务,反复执行。


@Component(value = "serverMemberManager")public class ServerMemberManager implements ApplicationListener<WebServerInitializedEvent> {    private final NacosAsyncRestTemplate asyncRestTemplate = HttpClientBeanHolder.getNacosAsyncRestTemplate(Loggers.CORE);    //Address information for the local node.    private String localAddress;    //Broadcast this node element information task.    private final MemberInfoReportTask infoReportTask = new MemberInfoReportTask();    ...    //监听Spring启动时发布的WebServerInitializedEvent事件    @Override    public void onApplicationEvent(WebServerInitializedEvent event) {        //设置当前集群节点的状态为默认状态        getSelf().setState(NodeState.UP);        //集群模式下才启动集群节点的健康检查任务        if (!EnvUtil.getStandaloneMode()) {            //开启一个延时任务,执行MemberInfoReportTask.run()方法            GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);        }        EnvUtil.setPort(event.getWebServer().getPort());        EnvUtil.setLocalAddress(this.localAddress);        Loggers.CLUSTER.info("This node is ready to provide external services");    }    ...        class MemberInfoReportTask extends Task {        private final GenericType<RestResult<String>> reference = new GenericType<RestResult<String>>() { };        private int cursor = 0;                @Override        protected void executeBody() {            //获取除自身节点外的其他集群节点            List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();            if (members.isEmpty()) {                return;            }            //轮询请求:每执行一次executeBody()方法,cursor就加1,然后根据cursor去获取对应的某集群节点Member            this.cursor = (this.cursor + 1) % members.size();            Member target = members.get(cursor);            Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());            //获取URL参数:/v1/core/cluster/report            final String url = HttpUtils.buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT, "/cluster/report");            try {                //通过HTTP发起请求,向某集群节点Member发起健康检查请求                asyncRestTemplate.post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version),                    Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() {                        @Override                        public void onReceive(RestResult<String> result) {                            if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value() || result.getCode() == HttpStatus.NOT_FOUND.value()) {                                Loggers.CLUSTER.warn("{} version is too low, it is recommended to upgrade the version : {}", target, VersionUtils.version);                                return;                            }                            if (result.ok()) {                                //如果请求成功,则设置集群节点Member的状态为NodeState.UP                                MemberUtil.onSuccess(ServerMemberManager.this, target);                            } else {                                //如果请求失败,则设置集群节点Member的状态为NodeState.DOWN                                Loggers.CLUSTER.warn("failed to report new info to target node : {}, result : {}", target.getAddress(), result);                                MemberUtil.onFail(ServerMemberManager.this, target);                            }                        }                                                @Override                        public void onError(Throwable throwable) {                            Loggers.CLUSTER.error("failed to report new info to target node : {}, error : {}", target.getAddress(), ExceptionUtil.getAllExceptionMsg(throwable));                            //如果请求失败,则设置集群节点Member的状态为NodeState.DOWN                            MemberUtil.onFail(ServerMemberManager.this, target, throwable);                        }                                                @Override                        public void onCancel() {                        }                    }                );            } catch (Throwable ex) {                Loggers.CLUSTER.error("failed to report new info to target node : {}, error : {}", target.getAddress(), ExceptionUtil.getAllExceptionMsg(ex));            }        }                @Override        protected void after() {            //重新提交这个节点健康检查的异步任务,从而实现反复执行            GlobalExecutor.scheduleByCommon(this, 2_000L);        }    }}
//Task使用了模版方法public abstract class Task implements Runnable { protected volatile boolean shutdown = false; @Override public void run() { if (shutdown) { return; } try { //执行异步任务的核心逻辑,这个方法是一个抽象方法,交给子类去具体实现 executeBody(); } catch (Throwable t) { Loggers.CORE.error("this task execute has error : {}", ExceptionUtil.getStackTrace(t)); } finally { if (!shutdown) { after(); } } } protected abstract void executeBody(); protected void after() { } public void shutdown() { shutdown = true; }}
public class MemberUtil { ... //Successful processing of the operation on the node. public static void onSuccess(final ServerMemberManager manager, final Member member) { final NodeState old = member.getState(); manager.getMemberAddressInfos().add(member.getAddress()); member.setState(NodeState.UP); member.setFailAccessCnt(0); if (!Objects.equals(old, member.getState())) { manager.notifyMemberChange(); } } public static void onFail(final ServerMemberManager manager, final Member member) { onFail(manager, member, ExceptionUtil.NONE_EXCEPTION); } //Failure processing of the operation on the node. public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) { manager.getMemberAddressInfos().remove(member.getAddress()); final NodeState old = member.getState(); member.setState(NodeState.SUSPICIOUS); member.setFailAccessCnt(member.getFailAccessCnt() + 1); int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3); if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils.containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) { member.setState(NodeState.DOWN); } if (!Objects.equals(old, member.getState())) { manager.notifyMemberChange(); } } ...}
public class GlobalExecutor { private static final ScheduledExecutorService COMMON_EXECUTOR = ExecutorFactory.Managed.newScheduledExecutorService( ClassUtils.getCanonicalName(GlobalExecutor.class), 4, new NameThreadFactory("com.alibaba.nacos.core.common") ); ... public static void scheduleByCommon(Runnable runnable, long delayMs) { if (COMMON_EXECUTOR.isShutdown()) { return; } //在指定的延迟后执行某项任务 COMMON_EXECUTOR.schedule(runnable, delayMs, TimeUnit.MILLISECONDS); } ...}
public final class ExecutorFactory { ... public static final class Managed { private static final String DEFAULT_NAMESPACE = "nacos"; private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance(); ... //Create a new scheduled executor service with input thread factory and register to manager. public static ScheduledExecutorService newScheduledExecutorService(final String group, final int nThreads, final ThreadFactory threadFactory) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(nThreads, threadFactory); THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService); return executorService; } ... } ...}
复制代码


(3)集群节点收到健康检查请求后的数据同步源码


集群节点收到某集群节点发来的"/v1/core/cluster/report"请求后,会调用 NacosClusterController 的 report()方法来处理请求。在 report()方法中,会把发起请求的来源节点状态直接设置成 UP 状态,然后调用 ServerMemberManager 的 update()方法来更新来源节点属性。在 update()方法中,会把存放在 serverList 中对应的节点 Member 进行更新,也就是通过 MemberUtil 的 copy()方法覆盖老对象的属性来实现更新。

 

注意:因为 serverList 属性在集群中的每个节点都存在一份,所以节点收到健康检查请求后,要对其 serverList 属性中的节点进行更新。


@RestController@RequestMapping(Commons.NACOS_CORE_CONTEXT + "/cluster")public class NacosClusterController {    private final ServerMemberManager memberManager;    ...    //Other nodes return their own metadata information.    @PostMapping(value = {"/report"})    public RestResult<String> report(@RequestBody Member node) {        if (!node.check()) {            return RestResultUtils.failedWithMsg(400, "Node information is illegal");        }        LoggerUtils.printIfDebugEnabled(Loggers.CLUSTER, "node state report, receive info : {}", node);        //能够正常请求到该接口的集群节点肯定是健康的,所以直接设置其节点状态为UP        node.setState(NodeState.UP);        node.setFailAccessCnt(0);        //修改集群节点        boolean result = memberManager.update(node);        return RestResultUtils.success(Boolean.toString(result));    }    ...}
@Component(value = "serverMemberManager")public class ServerMemberManager implements ApplicationListener<WebServerInitializedEvent> { //Cluster node list. private volatile ConcurrentSkipListMap<String, Member> serverList; ... //member information update. public boolean update(Member newMember) { Loggers.CLUSTER.debug("member information update : {}", newMember); String address = newMember.getAddress(); if (!serverList.containsKey(address)) { return false; } //更新serverList中的数据 serverList.computeIfPresent(address, (s, member) -> { //如果服务状态不健康,则直接移除 if (NodeState.DOWN.equals(newMember.getState())) { memberAddressInfos.remove(newMember.getAddress()); } //对比信息是否有做改变 boolean isPublishChangeEvent = MemberUtil.isBasicInfoChanged(newMember, member); //修改lastRefreshTime为当前时间 newMember.setExtendVal(MemberMetaDataConstants.LAST_REFRESH_TIME, System.currentTimeMillis()); //属性覆盖 MemberUtil.copy(newMember, member); if (isPublishChangeEvent) { //member basic data changes and all listeners need to be notified //如果有做改变,需要发布相关事件通知 notifyMemberChange(); } return member; }); return true; } ...}
复制代码


(4)总结


在 Nacos 集群架构下,集群节点间的健康状态如何进行同步。简单来说,集群节点间是会相互进行通信的。如果通信失败,那么就会把通信节点的状态属性修改为 DOWN。


 

5.集群新增节点时如何同步已有服务实例数据

 

(1)节点启动时加载服务实例数据的异步任务


Nacos 服务端会有一个 DistroProtocol 类,它是一个 Bean 对象,在 Spring 项目启动时会创建这个 DistroProtocol 类型的 Bean。

 

创建 DistroProtocol 类型的 Bean 时,会执行 DistroProtocol 的构造方法,从而调用 DistroProtocol 的 startLoadTask()方法开启一个加载数据的异步任务。

 

在 DistroProtocol 的 startLoadTask()方法中,会提交一个异步任务,并且会通过传入一个回调方法来标志是否已初始化成功。其中提交的任务类型是 DistroLoadDataTask,所以会执行 DistroLoadDataTask 的 run()方法,接着会执行 DistroLoadDataTask 的 load()方法,然后执行该任务类的 loadAllDataSnapshotFromRemote()方法,从而获取其他集群节点上的全部服务实例数据并更新本地注册表。

 

在 loadAllDataSnapshotFromRemote()方法中,首先会遍历除自身节点外的其他集群节点。然后调用 DistroHttpAgent 的 getDatumSnapshot()方法,通过 HTTP 请求"/v1/ns/distro/datums"获取目标节点的全部服务实例数据。接着再调用 DistroConsistencyServiceImpl 的 processSnapshot()方法,将获取到的全部服务实例数据写入到本地注册表中。其中只要有一个集群节点数据同步成功,那么这个方法就结束。否则就继续遍历下一个集群节点,获取全部服务实例数据然后同步本地。

 

Nacos 服务端在处理服务实例注册时,采用的是内存队列 + 异步任务。异步任务会调用 listener 的 onChange()方法利用写时复制来更新本地注册表。而 processSnapshot()方法也会调用 listener 的 onChange()方法来更新注册表,其中 listener 的 onChange()方法对应的实现其实就是 Service 的 onChange()方法。


@Componentpublic class DistroProtocol {    ...    public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder, DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {        this.memberManager = memberManager;        this.distroComponentHolder = distroComponentHolder;        this.distroTaskEngineHolder = distroTaskEngineHolder;        this.distroConfig = distroConfig;        //开启一个异步任务        startDistroTask();    }        private void startDistroTask() {        if (EnvUtil.getStandaloneMode()) {            isInitialized = true;            return;        }        startVerifyTask();        //提交一个加载数据的异步任务        startLoadTask();    }        private void startLoadTask() {        //加载数据的回调方法,修改isInitialized属性,标识是否初始化成功        DistroCallback loadCallback = new DistroCallback() {            @Override            public void onSuccess() {                isInitialized = true;            }                        @Override            public void onFailed(Throwable throwable) {                isInitialized = false;            }        };        //提交异步任务        GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));    }    ...}
//Distro load data task.public class DistroLoadDataTask implements Runnable { ... @Override public void run() { try { load(); if (!checkCompleted()) { GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis()); } else { loadCallback.onSuccess(); Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success"); } } catch (Exception e) { loadCallback.onFailed(e); Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e); } } private void load() throws Exception { while (memberManager.allMembersWithoutSelf().isEmpty()) { Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init..."); TimeUnit.SECONDS.sleep(1); } while (distroComponentHolder.getDataStorageTypes().isEmpty()) { Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register..."); TimeUnit.SECONDS.sleep(1); } for (String each : distroComponentHolder.getDataStorageTypes()) { if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) { loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each)); } } } private boolean loadAllDataSnapshotFromRemote(String resourceType) { DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType); DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType); if (null == transportAgent || null == dataProcessor) { Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}", resourceType, transportAgent, dataProcessor); return false; } //遍历除自身节点外的其他节点 for (Member each : memberManager.allMembersWithoutSelf()) { try { Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress()); //调用DistroHttpAgent.getDatumSnapshot()方法,通过HTTP方式获取其他集群节点的数据 DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress()); //调用DistroConsistencyServiceImpl.processSnapshot()方法,同步返回结果到自身节点的内存注册表 boolean result = dataProcessor.processSnapshot(distroData); Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(), result); //只要有一个集群节点返回全部数据并同步成功则结束 if (result) { return true; } } catch (Exception e) { Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e); } } return false; } ...}
public class DistroHttpAgent implements DistroTransportAgent { ... @Override public DistroData getDatumSnapshot(String targetServer) { try { //通过NamingProxy发起HTTP请求 byte[] allDatum = NamingProxy.getAllData(targetServer); return new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum); } catch (Exception e) { throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e); } } ...}
public class NamingProxy { ... //获取目标节点的全部数据 public static byte[] getAllData(String server) throws Exception { Map<String, String> params = new HashMap<>(8); RestResult<String> result = HttpClient.httpGet( "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL, new ArrayList<>(), params ); if (result.ok()) { return result.getData().getBytes(); } throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: " + result.getMessage()); } ...}
@DependsOn("ProtocolManager")@org.springframework.stereotype.Service("distroConsistencyService")public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { ... @Override public boolean processSnapshot(DistroData distroData) { try { return processData(distroData.getContent()); } catch (Exception e) { return false; } } private boolean processData(byte[] data) throws Exception { if (data.length > 0) { //序列化成对象 Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);

//创建空的Service for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { ... } for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { if (!listeners.containsKey(entry.getKey())) { // Should not happen: Loggers.DISTRO.warn("listener of {} not found.", entry.getKey()); continue; } try { //更新本地注册表 for (RecordListener listener : listeners.get(entry.getKey())) { listener.onChange(entry.getKey(), entry.getValue().value); } } catch (Exception e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e); continue; } //Update data store if listener executed successfully: dataStore.put(entry.getKey(), entry.getValue()); } } return true; } ...}
复制代码


总结:Nacos 服务端集群节点启动时,会创建一个 DistroProtocol 类型的 Bean 对象,在这个 DistroProtocol 类型的 Bean 对象的构造方法会开启一个异步任务。该异步任务的主要逻辑是通过 HTTP 方式从其他集群节点获取服务数据,然后把获取到的服务实例数据更新到本地的内存注册表,完成数据同步。而且只要成功从某一个集群节点完成数据同步,那整个任务逻辑就结束。

 

此外,向某个集群节点获取全部服务实例数据时,是向"/v1/ns/distro/datums"接口发起 HTTP 请求来进行获取的。

 

(2)节点处理获取全部服务实例数据请求的源码


Nacos 集群节点收到"/v1/ns/distro/datums"的 HTTP 请求后,便会执行 DistroController 的 getAllDatums()方法。也就是调用 DistroProtocol 的 onSnapshot()方法获取数据,然后直接返回。接着会调用 DistroDataStorageImpl 的 getDatumSnapshot()方法。

 

getDatumSnapshot()方法会从 DataStore 的 getDataMap()方法获取结果。进行服务实例注册时,会把服务实例信息存一份放在 DataStore 的 Map 中。进行服务实例同步时,也会把服务实例信息存放到 DataStore 的 Map 中。所以在 DataStore 里,会包含整个服务实例信息的数据。这里获取全部服务实例数据的接口,也是利用 DataStore 来实现的,而不是从内存注册表中获取。


@RestController@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/distro")public class DistroController {    @Autowired    private DistroProtocol distroProtocol;    ...    //Get all datums.    @GetMapping("/datums")    public ResponseEntity getAllDatums() {        DistroData distroData = distroProtocol.onSnapshot(KeyBuilder.INSTANCE_LIST_KEY_PREFIX);        return ResponseEntity.ok(distroData.getContent());    }    ...}
@Componentpublic class DistroProtocol { ... //Query all datum snapshot. public DistroData onSnapshot(String type) { DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type); if (null == distroDataStorage) { Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type); return new DistroData(new DistroKey("snapshot", type), new byte[0]); } //调用DistroDataStorageImpl.getDatumSnapshot()方法 return distroDataStorage.getDatumSnapshot(); } ...}
public class DistroDataStorageImpl implements DistroDataStorage { private final DataStore dataStore; ... @Override public DistroData getDatumSnapshot() { Map<String, Datum> result = dataStore.getDataMap(); //对服务实例数据进行序列化 byte[] dataContent = ApplicationUtils.getBean(Serializer.class).serialize(result); DistroKey distroKey = new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX); //封装一个DistroData对象并返回 return new DistroData(distroKey, dataContent); } ...}
//Store of data. 用于存储所有已注册的服务实例数据@Componentpublic class DataStore { private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024); public void put(String key, Datum value) { dataMap.put(key, value); } public Datum remove(String key) { return dataMap.remove(key); } public Set<String> keys() { return dataMap.keySet(); } public Datum get(String key) { return dataMap.get(key); } public boolean contains(String key) { return dataMap.containsKey(key); } public Map<String, Datum> batchGet(List<String> keys) { Map<String, Datum> map = new HashMap<>(128); for (String key : keys) { Datum datum = dataMap.get(key); if (datum == null) { continue; } map.put(key, datum); } return map; } ... public Map<String, Datum> getDataMap() { return dataMap; }}
复制代码


注意:DataStore 数据最后还是存到内存的。通过使用 DataStore,可以实现以下功能和好处:

 

一.数据持久化


DataStore 可将节点数据持久化到磁盘或其他介质,以确保数据的持久性。这样即使系统重启或发生故障,节点数据也能够得到恢复和保留。毕竟 Datum 的 key 是 ServiceName、value 是 Instance 实例列表,而 Instance 实例中又会包含所属的 ClusterName、IP 和 Port,所以根据 DataStore 可以恢复完整的内存注册表。


Map<string, map> serviceMap;Map(namespace, Map(group::serviceName, Service));
复制代码


二.数据同步


DataStore 可以协调和同步节点数据的访问和更新。当多个节点同时注册或更新数据时,DataStore 可确保数据的一致性和正确性,避免数据冲突和不一致的情况。

 

三.数据管理


DataStore 提供了对节点数据的管理功能,包括增加、更新、删除等操作。通过使用适当的数据结构和算法,可以高效地管理大量的节点数据,并支持快速的数据访问和查询。

 

四.数据访问控制


DataStore 可以实现对节点数据的访问控制和权限管理,只有具有相应权限的节点或用户才能访问和修改特定的节点数据,提高数据的安全性和保密性。

 

DataStore 在 Nacos 中充当了节点数据的中央存储和管理器。通过提供持久化 + 同步 + 管理 + 访问控制等功能,确保节点数据的可靠性 + 一致性 + 安全性,是实现节点数据存储和操作的核心组件之一。

 

(3)总结


Nacos 集群架构下新增一个集群节点时,新节点会如何进行服务数据同步:

 

首先利用了 DistroProtocol 类的 Bean 对象的构造方法开启异步任务,通过 HTTP 方式去请求其他集群节点的全部数据。

 

当新节点获取全部数据后,会调用 Service 的 onChange()方法,然后利用写时复制机制更新本地内存注册表。

 

Nacos 集群节点在处理获取全部服务实例数据的请求时,并不是从内存注册表中获取的,而是通过 DataStore 来获取。



文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18860669

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Nacos源码—Nacos集群高可用分析(二)_不在线第一只蜗牛_InfoQ写作社区