写点什么

Nacos 源码—Nacos 集群高可用分析(四)

  • 2025-05-08
    福建
  • 本文字数:11162 字

    阅读完需:约 37 分钟

8.Nacos 实现的 Raft 协议是如何选举 Leader 节点的


(1)初始化 RaftCore 实例时会开启两个异步任务


在 RaftCore 的 init()方法中,会开启两个异步任务。第一个异步任务的作用是选举 Leader 节点,第二个异步任务的作用是发送心跳同步数据。


@Deprecated@DependsOn("ProtocolManager")@Componentpublic class RaftCore implements Closeable {    ...    //Init raft core.    @PostConstruct    public void init() throws Exception {        Loggers.RAFT.info("initializing Raft sub-system");        final long start = System.currentTimeMillis();        //从本地文件中加载数据        raftStore.loadDatums(notifier, datums);        setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));        Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());        initialized = true;        Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
//开启一个异步任务选举Leader节点 masterTask = GlobalExecutor.registerMasterElection(new MasterElection()); //开启一个异步任务通过心跳同步数据 heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat()); versionJudgement.registerObserver(isAllNewVersion -> { stopWork = isAllNewVersion; if (stopWork) { try { shutdown(); raftListener.removeOldRaftMetadata(); } catch (NacosException e) { throw new NacosRuntimeException(NacosException.SERVER_ERROR, e); } } }, 100);
//给NotifyCenter注册一个监听PersistentNotifier NotifyCenter.registerSubscriber(notifier); Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); } ...}
public class GlobalExecutor { //线程池的线程数是可用线程的一半 private static final ScheduledExecutorService NAMING_TIMER_EXECUTOR = ExecutorFactory.Managed.newScheduledExecutorService( ClassUtils.getCanonicalName(NamingApp.class), Runtime.getRuntime().availableProcessors() * 2, new NameThreadFactory("com.alibaba.nacos.naming.timer") ); ... public static ScheduledFuture registerMasterElection(Runnable runnable) { //以固定的频率来执行某项任务,它不受任务执行时间的影响,到时间就会执行任务 return NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS); } public static ScheduledFuture registerHeartbeat(Runnable runnable) { //以相对固定的频率来执行某项任务,即只有等这一次任务执行完了(不管执行了多长时间),才能执行下一次任务 return NAMING_TIMER_EXECUTOR.scheduleWithFixedDelay(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS); } ...}
复制代码


(2)选举 Leader 节点的 MasterElection 异步任务


MasterElection 的 run()方法就体现了 Raft 协议进行 Leader 选举的第一步。即每个节点会进行休眠,如果时间没到则返回然后重新执行异步任务。等休眠时间到了才会调用 MasterElection 的 sendVote()方法发起投票。

 

一旦执行 MasterElection 的 sendVote()方法发起投票:会先把选举周期+1,然后投票给自己,接着修改节点状态为 Candidate。做完这些准备工作后,才会以 HTTP 形式向其他节点发送投票请求。

 

其他节点返回投票信息时,会调用 RaftPeerSet 的 decideLeader()方法处理。这个方法会处理其他节点返回的投票信息,具体逻辑如下:

 

首先用一个 Map 记录每个节点返回的投票信息,然后遍历这个 Map 去统计投票数量,最后比较当前节点的累计票数,是否已超过集群节点半数。如果超过,则把当前节点的状态修改为 Leader。


@Deprecated@DependsOn("ProtocolManager")@Componentpublic class RaftCore implements Closeable {    private RaftPeerSet peers;    ...        public class MasterElection implements Runnable {        @Override        public void run() {            try {                if (stopWork) {                    return;                }                if (!peers.isReady()) {                    return;                }                //随机休眠                RaftPeer local = peers.local();                local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;                //休眠时间没到就直接返回                if (local.leaderDueMs > 0) {                    return;                }                            //reset timeout                local.resetLeaderDue();                local.resetHeartbeatDue();
//发起投票 sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); } } private void sendVote() { RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()), local.term); peers.reset();
//选举周期+1 local.term.incrementAndGet(); //投票给自己 local.voteFor = local.ip; //自己成为候选者,设置当前节点状态为Candidate状态 local.state = RaftPeer.State.CANDIDATE; Map<String, String> params = new HashMap<>(1); params.put("vote", JacksonUtils.toJson(local));
//遍历其他集群节点 for (final String server : peers.allServersWithoutMySelf()) { final String url = buildUrl(server, API_VOTE); try { //发送HTTP的投票请求 HttpClient.asyncHttpPost(url, null, params, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url); return; } RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer)); //处理返回的投票结果 peers.decideLeader(peer); } @Override public void onError(Throwable throwable) { Loggers.RAFT.error("error while sending vote to server: {}", server, throwable); } @Override public void onCancel() { } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}", server); } } } } ...}
@Deprecated@Component@DependsOn("ProtocolManager")public class RaftPeerSet extends MemberChangeListener implements Closeable { private volatile Map<String, RaftPeer> peers = new HashMap<>(8); ... //Calculate and decide which peer is leader. If has new peer has more than half vote, change leader to new peer. public RaftPeer decideLeader(RaftPeer candidate) { //记录本次投票结果 peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null; //统计累计票数 for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue; } ips.add(peer.voteFor); if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } } //判断投票数量是否超过半数,如果已超半数则把自己节点的状态改为Leader if (maxApproveCount >= majorityCount()) { RaftPeer peer = peers.get(maxApprovePeer); //设置当前节点的状态为Leader状态 peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) { leader = peer; ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local())); Loggers.RAFT.info("{} has become the LEADER", leader.ip); } } return leader; } ...}
复制代码


9.Nacos 实现的 Raft 协议是如何同步数据的


(1)Leader 节点如何发送心跳来同步数据


RaftCore 的 init()方法会开启另外一个异步任务 HeartBeat。HeartBeat 的 run()方法会调用 HeartBeat 的 sendBeat()方法来发送心跳请求。

 

其中只有 Leader 节点才会发送心跳请求。Leader 在调用 HeartBeat 的 sendBeat()方法发送心跳同步数据请求时,会将 Instance 的 key 作为心跳的参数发送给其他 Follower 节点。Follower 节点接收到 Leader 的心跳请求后,会比较请求中的数据与自身数据的差异,如果存在差异则向 Leader 同步。

 

HeartBeat 的 sendBeat()方法主要包括三部分:

 

第一部分:判断当前节点是不是 Leader,如果不是 Leader 则不能发送心跳。

 

第二部分:组装发送心跳包的参数。只会把 datum.key 放入进去,并不会把整个 Instance 信息传输过去。Follower 节点拿到心跳包中的 key 之后,发现部分 key 在自身节点是不存在的,那么这时 Follower 节点就会根据这些 key 向 Leader 节点获取 Instance 的详细信息进行同步。

 

第三部分:向其他 Follower 节点发送心跳数据,是通过 HTTP 的方式来发起心跳请求的,请求地址为:/v1/ns/raft/beat。


@Deprecated@DependsOn("ProtocolManager")@Componentpublic class RaftCore implements Closeable {    ...    public class HeartBeat implements Runnable {        @Override        public void run() {            try {                if (stopWork) {                    return;                }                if (!peers.isReady()) {                    return;                }                            RaftPeer local = peers.local();                local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;                if (local.heartbeatDueMs > 0) {                    return;                }                local.resetHeartbeatDue();                //发送心跳同步数据                sendBeat();            } catch (Exception e) {                Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);            }        }            private void sendBeat() throws IOException, InterruptedException {            RaftPeer local = peers.local();            //第一部分:判断当前节点是不是Leader,如果不是Leader则不能发送心跳            if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {                return;            }            if (Loggers.RAFT.isDebugEnabled()) {                Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());            }                    local.resetLeaderDue();                    //build data            ObjectNode packet = JacksonUtils.createEmptyJsonNode();            packet.replace("peer", JacksonUtils.transferToJsonNode(local));                    ArrayNode array = JacksonUtils.createEmptyArrayNode();                    if (switchDomain.isSendBeatOnly()) {                Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());            }                    if (!switchDomain.isSendBeatOnly()) {                //第二部分:组装发送心跳包的参数                //组装数据,只会把datum.key放入进去,并不会把整个Instance信息传输过去                for (Datum datum : datums.values()) {                    ObjectNode element = JacksonUtils.createEmptyJsonNode();                    if (KeyBuilder.matchServiceMetaKey(datum.key)) {                        //只放入key的信息                        element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));                    } else if (KeyBuilder.matchInstanceListKey(datum.key)) {                        element.put("key", KeyBuilder.briefInstanceListkey(datum.key));                    }                    element.put("timestamp", datum.timestamp.get());                    array.add(element);                }            }            packet.replace("datums", array);            //broadcast            Map<String, String> params = new HashMap<String, String>(1);            params.put("beat", JacksonUtils.toJson(packet));                    String content = JacksonUtils.toJson(params);                    ByteArrayOutputStream out = new ByteArrayOutputStream();            GZIPOutputStream gzip = new GZIPOutputStream(out);            gzip.write(content.getBytes(StandardCharsets.UTF_8));            gzip.close();                    byte[] compressedBytes = out.toByteArray();            String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);                    if (Loggers.RAFT.isDebugEnabled()) {                Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(), compressedContent.length());            }            //第三部分:向其他Follower节点发送心跳数据,通过HTTP的方式来发起心跳,请求地址为:/v1/ns/raft/beat            //发送心跳 + 同步数据            for (final String server : peers.allServersWithoutMySelf()) {                try {                    final String url = buildUrl(server, API_BEAT);                    if (Loggers.RAFT.isDebugEnabled()) {                        Loggers.RAFT.debug("send beat to server " + server);                    }                    //通过HTTP发送心跳                    HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {                        @Override                        public void onReceive(RestResult<String> result) {                            if (!result.ok()) {                                Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);                                MetricsMonitor.getLeaderSendBeatFailedException().increment();                                return;                            }                            peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));                            if (Loggers.RAFT.isDebugEnabled()) {                                Loggers.RAFT.debug("receive beat response from: {}", url);                            }                        }                                                @Override                        public void onError(Throwable throwable) {                            Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, throwable);                            MetricsMonitor.getLeaderSendBeatFailedException().increment();                        }                                                @Override                        public void onCancel() {                                            }                    });                } catch (Exception e) {                    Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);                    MetricsMonitor.getLeaderSendBeatFailedException().increment();                }            }        }    }    ...}
复制代码


(2)Follower 节点如何处理心跳来同步数据


Follower 节点收到 Leader 节点发送过来的 HTTP 请求"/v1/ns/raft/beat"时,会执行 RaftController 类中的 beat()方法,接着会执行 RaftCore 的 receivedBeat()方法来进行具体的心跳处理。

 

RaftCore.receivedBeat()方法的具体逻辑如下:

 

一.首先会进行一些判断

第一个判断:Follower 节点接收到的心跳请求如果不是 Leader 节点发出的会直接抛出异常。

 

第二个判断:Follower 节点的 term 只会小于等于 Leader 节点的 term,如果大于,则直接抛出异常。

 

第三个判断:如果自身节点的状态不是 Follower,需要把状态改为 Follower。因为有可能自身节点之前是 Leader,但因为网络原因出现了脑裂问题。等网络恢复后,自身节点收到新 Leader 发来的心跳,新 Leader 的 term 比自身节点要大,那么它就需要切换成 Follower 节点。

 

二.然后对自身节点的 datums 中的 key 和心跳请求中的 key 进行比对

如果发现自身节点数据缺少了,那么就会记录到 batch 中,然后把 batch 中的 key 进行拆分包装成请求参数,最后通过 HTTP 方式向 Leader 节点查询这些 key 对应的 Instance 详细信息。

 

Follower 节点拿到 Leader 节点返回的 Instance 服务实例信息后,会继续调用 RaftStore.write()、PersistentNotifier.notify()这两个方法,一个将数据持久化到本地文件、一个将数据同步到内存注册表,从而最终完成以 Leader 节点为准的心跳请求同步数据的流程。


@Deprecated@RestController@RequestMapping({UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft", UtilsAndCommons.NACOS_SERVER_CONTEXT + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft"})public class RaftController {    private final RaftCore raftCore;    ...        @PostMapping("/beat")    public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {        if (versionJudgement.allMemberIsNewVersion()) {            throw new IllegalStateException("old raft protocol already stop");        }        String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);        String value = URLDecoder.decode(entity, "UTF-8");        value = URLDecoder.decode(value, "UTF-8");        JsonNode json = JacksonUtils.toObj(value);               //处理心跳        RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));        return JacksonUtils.transferToJsonNode(peer);    }    ...}
@Deprecated@DependsOn("ProtocolManager")@Componentpublic class RaftCore implements Closeable { public final PersistentNotifier notifier; ... //Received beat from leader. // TODO split method to multiple smaller method. public RaftPeer receivedBeat(JsonNode beat) throws Exception { ... //第一个判断:如果发送心跳不是Leader,则直接抛出异常 if (remote.state != RaftPeer.State.LEADER) { Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state, JacksonUtils.toJson(remote)); throw new IllegalArgumentException("invalid state from master, state: " + remote.state); } //第二个判断:如果本身节点的term还大于Leader的term,也直接抛出异常 if (local.term.get() > remote.term.get()) { Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}", remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs); throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get()); } //第三个判断:自己的节点状态是不是FOLLOWER,如果不是则需要更改为FOLLOWER if (local.state != RaftPeer.State.FOLLOWER) { Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote)); //mk follower local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; } ... //遍历Leader传输过来的Instance key,和本地的Instance进行对比 for (Object object : beatDatums) { ... try { if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) { continue; } if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) { //记录需要同步的datumKey batch.add(datumKey); } //到达一定数量才进行批量数据同步 if (batch.size() < 50 && processedCount < beatDatums.size()) { continue; } ... //使用batch组装参数 String url = buildUrl(remote.ip, API_GET); Map<String, String> queryParam = new HashMap<>(1); queryParam.put("keys", URLEncoder.encode(keys, "UTF-8")); //发送HTTP请求给Leader,根据keys参数获取Instance详细信息 HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { return; } //序列化result结果 List<JsonNode> datumList = JacksonUtils.toObj(result.getData(), new TypeReference<List<JsonNode>>() { }); //遍历Leader返回的Instance详细信息 for (JsonNode datumJson : datumList) { Datum newDatum = null; OPERATE_LOCK.lock(); try { ... //Raft写本地数据 raftStore.write(newDatum); //同步内存数据 datums.put(newDatum.key, newDatum); //和服务实例注册时的逻辑一样,最终会调用listener.onChange()方法 notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value); } catch (Throwable e) { Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e); } finally { OPERATE_LOCK.unlock(); } } ... return; } ... }); batch.clear(); } catch (Exception e) { Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey); } } ... } ...}
复制代码


10.Nacos 如何实现 Raft 协议的简版总结


Nacos 实现的 Raft 协议主要包括三部分内容:

一.Nacos 集群如何使用 Raft 协议写入数据

二.Nacos 集群如何选举 Leader 节点

三.Nacos 集群如何让 Leader 实现心跳请求同步数据

 

Nacos 早期版本实现的只是 Raft 协议的简化版本,并没有两阶段提交的处理。而是 Leader 节点处理数据完成后,直接就去同步给其他集群节点。哪怕集群节点同步失败或没有过半节点成功,Leader 的数据也不会回滚而只抛出异常。所以,Nacos 早期版本的 Raft 实现,后期也会废弃使用。

 

如下是 Nacos 实现的 Raft 协议在注册服务实例时集群处理数据的流程:



如下是 Nacos 实现的 Raft 协议处理 Leader 选举和通过心跳同步数据的流程:



文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18863383

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Nacos源码—Nacos集群高可用分析(四)_Java_不在线第一只蜗牛_InfoQ写作社区