写点什么

zk 源码—单机和集群通信原理(二)

  • 2025-04-10
    福建
  • 本文字数:28909 字

    阅读完需:约 95 分钟

2.集群版的 zk 服务端的启动过程


什么是集群:


集群是由网络中不同的机器组成的一个系统,集群中的工作是通过集群中调度者服务器来协同完成的。

 

集群中的调度者服务器:


调度者的工作就是在集群收到客户端请求后,根据集群中机器的使用情况,决定将此次客户端请求交给集群中哪一台服务器或网络节点进行处理。

 

zk 中的集群模式:


zk 集群会将服务器分成 Leader、Follower、Observer 三种角色的服务器;在集群运行期间这三种角色的服务器所负责的工作各不相同。

 

一.Leader 角色服务器(处理事务性请求 + 管理其他服务器)


负责处理事务性请求,以及管理集群中的其他服务器。Leader 服务器是集群中工作的分配和调度者。

 

二.Follower 服务器(处理非事务性请求 + 选举 Leader 服务器)


负责处理非事务性请求,以及选举出 Leader 服务器。发生 Leader 选举时,系统会从 Follow 服务器中,根据过半投票原则选举出一个 Follower 作为 Leader 服务器。

 

三.Observer 服务器(处理非事务性请求 + 不参与选举和被选举)


负责处理非事务性请求,不参与 Leader 服务器的选举,也不会作为候选者被选举为 Leader 服务器。

 

zk 服务端整体架构图如下:



集群版 zk 服务端的启动分为四个阶段:


预启动阶段、初始化阶段、Leader 选举阶段、Leader 和 Follower 启动阶段

 

集群版 zk 服务端的启动流程图如下:



接下来介绍集群版的 zk 服务端是如何从初始化到对外提供服务的。

 

(1)预启动阶段


在 zk 服务端进行初始化之前,首先要对配置文件等信息进行解析和载入,而 zk 服务端的预启动阶段的主要工作流程如下:

 

一.QuorumPeerMain 启动程序

二.解析 zoo.cfg 配置文件

三.创建和启动历史文件清理器

四.根据配置判断是集群模式还是单机模式

 

首先 zk 服务端会调用 QuorumPeerMain 类中的 main()方法,然后在 QuorumPeerMain 的 initializeAndRun()方法里解析 zoo.cfg 配置文件。接着继续在 initializeAndRun()方法中创建和启动历史文件清理器,以及根据配置文件和启动参数,即 args 参数和 config.isDistributed()方法,来判断 zk 服务端的启动方式是集群模式还是单机模式。如果配置参数中配置了相关的配置项,并且已经指定了集群模式运行,那么在服务启动时就会调用 runFromConfig()方法完成集群模式的初始化。


public class QuorumPeerMain {    protected QuorumPeer quorumPeer;    ...    //1.启动程序入口    public static void main(String[] args) {        QuorumPeerMain main = new QuorumPeerMain();        try {            //启动程序            main.initializeAndRun(args);        } catch (IllegalArgumentException e) {            ...        }        LOG.info("Exiting normally");        System.exit(0);    }        protected void initializeAndRun(String[] args) {        QuorumPeerConfig config = new QuorumPeerConfig();        if (args.length == 1) {            //2.解析配置文件            config.parse(args[0]);        }        //3.创建和启动历史文件清理器        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());        purgeMgr.start();        //4.根据配置判断是集群模式还是单机模式        if (args.length == 1 && config.isDistributed()) {            //集群模式            runFromConfig(config);        } else {            //单机模式            ZooKeeperServerMain.main(args);        }    }    ...}
复制代码


(2)初始化阶段


一.创建网络连接工厂实例 ServerCnxnFactory

二.初始化网络连接工厂实例 ServerCnxnFactory

三.创建集群版服务器实例 QuorumPeer

四.创建数据持久化工具 FileTxnSnapLog 并设置到 QuorumPeer 实例中

五.创建内存数据库 ZKDatabase 并设置到 QuorumPeer 实例中

六.初始化集群版服务器实例 QuorumPeer

七.恢复集群版服务器实例 QuorumPeer 本地数据

八.启动网络连接工厂 ServerCnxnFactory 主线程


public class QuorumPeerMain {    protected QuorumPeer quorumPeer;    ...    public void runFromConfig(QuorumPeerConfig config) {        ...        ServerCnxnFactory cnxnFactory = null;        if (config.getClientPortAddress() != null) {            //1.创建网络连接工厂实例ServerCnxnFactory            cnxnFactory = ServerCnxnFactory.createFactory();            //2.初始化网络连接工厂实例ServerCnxnFactory            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);        }        //接下来就是初始化集群版服务器实例QuorumPeer        //3.创建集群版服务器实例QuorumPeer        quorumPeer = getQuorumPeer();        //4.创建zk数据管理器FileTxnSnapLog        quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));        quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());        quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());        quorumPeer.setElectionType(config.getElectionAlg());        quorumPeer.setMyid(config.getServerId());        quorumPeer.setTickTime(config.getTickTime());        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());        quorumPeer.setInitLimit(config.getInitLimit());        quorumPeer.setSyncLimit(config.getSyncLimit());        quorumPeer.setConfigFileName(config.getConfigFilename());        //5.创建并初始化内存数据库ZKDatabase        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));        quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);        if (config.getLastSeenQuorumVerifier() != null) {            quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);        }        quorumPeer.initConfigInZKDatabase();        quorumPeer.setCnxnFactory(cnxnFactory);        quorumPeer.setSslQuorum(config.isSslQuorum());        quorumPeer.setUsePortUnification(config.shouldUsePortUnification());        quorumPeer.setLearnerType(config.getPeerType());        quorumPeer.setSyncEnabled(config.getSyncEnabled());        quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());        if (config.sslQuorumReloadCertFiles) {            quorumPeer.getX509Util().enableCertFileReloading();        }        // sets quorum sasl authentication configurations        quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);        if (quorumPeer.isQuorumSaslAuthEnabled()) {            quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);            quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);            quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);            quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);            quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);        }        quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);        quorumPeer.initialize();        //6.初始化集群版服务器实例QuorumPeer        quorumPeer.start();        //join方法会将当前线程挂起,等待QuorumPeer线程结束后再执行当前线程        quorumPeer.join();    }        protected QuorumPeer getQuorumPeer() throws SaslException {        return new QuorumPeer();    }}
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider { ServerCnxnFactory cnxnFactory; ServerCnxnFactory secureCnxnFactory; ... public synchronized void start() { //7.恢复集群版服务器实例QuorumPeer本地数据 loadDataBase(); //8.启动网络连接工厂ServerCnxnFactory主线程 startServerCnxnFactory(); adminServer.start(); //9.开始Leader选举 startLeaderElection(); startJvmPauseMonitor();//开启监控JVM停顿的线程 //10.启动集群版服务器实例QuorumPeer super.start(); } private void startServerCnxnFactory() { if (cnxnFactory != null) { cnxnFactory.start(); } if (secureCnxnFactory != null) { secureCnxnFactory.start(); } } ...}
public abstract class ServerCnxnFactory { ... static public ServerCnxnFactory createFactory() throws IOException { String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY); if (serverCnxnFactoryName == null) { serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance(); return serverCnxnFactory; } ...}
public class NIOServerCnxnFactory extends ServerCnxnFactory { //最大客户端连接数 protected int maxClientCnxns = 60; //处理连接过期的线程 private ConnectionExpirerThread expirerThread; //处理客户端建立连接的线程 private AcceptThread acceptThread; //处理客户端请求的线程 private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>(); //会话过期相关 int sessionlessCnxnTimeout; private ExpiryQueue<NIOServerCnxn> cnxnExpiryQueue; //selector线程数,CPU核数的一半 private int numSelectorThreads; //工作线程数 private int numWorkerThreads; ... public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException { ... maxClientCnxns = maxcc; sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000); cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout); //创建一个自动处理过期会话的ConnectionExpirerThread线程 expirerThread = new ConnectionExpirerThread();

int numCores = Runtime.getRuntime().availableProcessors(); numSelectorThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores/2), 1)); numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores); ... //创建一批SelectorThread线程 for (int i=0; i<numSelectorThreads; ++i) { selectorThreads.add(new SelectorThread(i)); } //打开ServerSocketChannel this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); //绑定端口,启动NIO服务器 ss.socket().bind(addr); ss.configureBlocking(false); //创建一个AcceptThread线程 acceptThread = new AcceptThread(ss, addr, selectorThreads); } public void start() { stopped = false; if (workerPool == null) { workerPool = new WorkerService("NIOWorker", numWorkerThreads, false); } for (SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } }}
复制代码


一.创建网络连接工厂实例 ServerCnxnFactory


在执行 QuorumPeerMain 的 runFromConfig()方法时,首先会通过 ServerCnxnFactory 的 createFactory()方法来创建服务端网络连接工厂。

 

ServerCnxnFactory 的 createFactory()方法首先会获取配置值,判断是使用 NIO 还是使用 Netty,然后再通过反射去实例化服务端网络连接工厂。

 

可以通过配置 zookeeper.serverCnxnFactory 来指定使用:zk 自己实现的 NIO 还是 Netty 框架,来构建服务端网络连接工厂。

 

二.初始化网络连接工厂实例 ServerCnxnFactory


在执行 QuorumPeerMain 的 runFromConfig()方法时,创建完服务端网络连接工厂实例 ServerCnxnFactory 后,就会调用网络连接工厂 ServerCnxnFactory 的 configure()方法来初始化 ServerCnxnFactory 实例。

 

这里以 NIOServerCnxnFactory 的 configure()方法为例,该方法主要会启动一个 NIO 服务器,以及创建三类线程:

 

一.处理客户端连接的 AcceptThread 线程

二.处理客户端请求的一批 SelectorThread 线程

三.处理过期连接的 ConnectionExpirerThread 线程

 

初始化完 ServerCnxnFactory 实例后,虽然此时 NIO 服务器已对外开放端口,客户端也能访问到 2181 端口,但此时 zk 服务端还不能正常处理客户端请求。

 

三.创建集群版服务器实例 QuorumPeer


在执行 QuorumPeerMain 的 runFromConfig()方法时,创建和初始化完网络连接工厂实例 ServerCnxnFactory 后,接着就会调用 QuorumPeerMain 的 getQuorumPeer()方法创建集群版服务器实例。

 

ZooKeeperServer 是单机版服务端的核心实体类。

QuorumPeer 是集群版服务端的核心实体类。

 

可以将每个 QuorumPeer 类实例看作是集群中的一台服务器。在 zk 集群模式中,一个 QuorumPeer 类实例一般具有 3 种状态,分别是:

状态一:参与 Leader 节点的选举

状态二:作为 Follower 节点同步 Leader 节点的数据

状态三:作为 Leader 节点管理集群的 Follower 节点

 

在执行 QuorumPeerMain 的 runFromConfig()方法时,创建完 QuorumPeer 实例后,接着会将集群版服务运行中需要的核心工具类注册到 QuorumPeer 实例中。这些核心工具类也是单机版服务端运行时需要的,比如:数据持久化类 FileTxnSnapLog、NIO 工厂类 ServerCnxnFactory 等。然后还会将配置文件中的服务器地址列表、Leader 选举算法、会话超时时间等设置到 QuorumPeer 实例中。

 

四.创建数据持久化工具 FileTxnSnapLog 并设置到 QuorumPeer 实例中


可以通过 FileTxnSnapLog 对 zk 服务器的内存数据进行持久化,具体会将内存数据持久化到配置文件的事务日志文件 + 快照数据文件中。

 

在执行 QuorumPeerMain 的 runFromConfig()方法时,创建完 QuorumPeer 实例后,首先会根据 zoo.cfg 配置文件中的 dataDir 数据快照目录和 dataLogDir 事务日志目录,通过"new FileTxnSnapLog()"来创建 FileTxnSnapLog 类实例,然后设置到 QuorumPeer 实例中。

 

五.创建内存数据库 ZKDatabase 并设置到 QuorumPeer 实例中


ZKDatabase 是 zk 的内存数据库,主要负责管理 zk 的所有会话记录以及 DataTree 和事务日志的存储。

 

在执行 QuorumPeerMain 的 runFromConfig 方法时,创建完 QuorumPeer 实例以及创建完数据持久化工具 FileTxnSnapLog 并设置到 QuorumPeer 后,就会将持久化工具 FileTxnSnapLog 作为参数去创建 ZKDatabase 实例,然后设置到 QuorumPeer 实例。

 

六.初始化集群版服务器实例 QuorumPeer


除了需要将一些核心组件注册到服务器实例 QuorumPeer 中去,还需要对服务器实例 QuorumPeer 根据 zoo.cfg 配置文件设置一些参数,比如服务器地址列表、Leader 选举算法、会话超时时间等。其中这些核心组件包括:数据持久化工具 FileTxnSnapLog、服务端网络连接工厂 ServerCnxnFactory、内存数据库 ZKDatabase。

 

完成初始化 QuorumPeer 实例并启动 QuorumPeer 线程后,便会通过 QuorumPeer 的 join()方法将 main 线程挂起,等待 QuorumPeer 线程结束后再执行。

 

七.恢复集群版服务器实例 QuorumPeer 本地数据


在执行 QuorumPeerMain 的 runFromConfig()方法时,初始化完 QuorumPeer 实例后,就会调用 QuorumPeer 的 start()方法来启动集群中的服务器。

 

在 QuorumPeer 的 start()方法中,首先会调用 QuorumPeer 的 loadDataBase()方法来恢复数据。

 

八.启动网络连接工厂 ServerCnxnFactory 主线程


在 QuorumPeer 的 start()方法中,在调用完 QuorumPeer 的 loadDataBase()方法来恢复本地数据之后,便会调用 QuorumPeer 的 startServerCnxnFactory()方法来启动网络连接工厂的主线程。

 

(3)Leader 选举阶段


一.初始化 Leader 选举(初始化当前投票 + 监听选举端口 + 启动选举守护线程)

二.启动 QuorumPeer 线程检测当前服务器状态

三.进行 Leader 选举


public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {    private volatile boolean running = true;    private ServerState state = ServerState.LOOKING;    volatile private Vote currentVote;    Election electionAlg;    ...    public synchronized void start() {        //7.恢复集群版服务器实例QuorumPeer本地数据        loadDataBase();        //8.启动网络连接工厂ServerCnxnFactory主线程        startServerCnxnFactory();        adminServer.start();        //9.初始化Leader选举(初始化当前投票+监听选举端口+启动选举守护线程)        startLeaderElection();        startJvmPauseMonitor();//开启监控JVM停顿的线程        //10.执行集群版服务器实例QuorumPeer.run()方法        super.start();    }        synchronized public void startLeaderElection() {        if (getPeerState() == ServerState.LOOKING) {            //zk会根据自身的服务器ID、最新的事务ID和当前的服务器epoch来生成一个初始化的投票            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());        }        ...        //创建选举算法——FastLeaderElection,每一个QuorumPeer实例都会有一个选举算法实例        this.electionAlg = createElectionAlgorithm(electionType);    }        protected Election createElectionAlgorithm(int electionAlgorithm){        Election le=null;        QuorumCnxManager qcm = createCnxnManager();        QuorumCnxManager.Listener listener = qcm.listener;        //启动对Leader选举端口的监听,等待集群中其他服务器创建连接        listener.start();        //创建选举算法,每一个QuorumPeer实例都会有一个选举算法实例        FastLeaderElection fle = new FastLeaderElection(this, qcm);        //启动选举算法使用的守护线程,其中发送投票的守护线程会不断将当前的投票发出去        fle.start();        le = fle;        return le;    }        public QuorumCnxManager createCnxnManager() {        return new QuorumCnxManager(this,            this.getId(),            this.getView(),            this.authServer,            this.authLearner,            this.tickTime * this.syncLimit,            this.getQuorumListenOnAllIPs(),            this.quorumCnxnThreadsSize,            this.isQuorumSaslAuthEnabled());    }    ...    public synchronized ServerState getPeerState() {        return state;    }        @Override    public void run() {        ...        while (running) {            //检测当前服务器状态 + 进行Leader选举            switch (getPeerState()) {                case LOOKING:                    LOG.info("LOOKING");                    ...                    //设置当前的投票                    setCurrentVote(electionAlg.lookForLeader());                                                             break;                case OBSERVING:                    ...                ...            }        }    }    ...}
//创建用来进行选举的服务器,监听端口3888public class QuorumCnxManager { ... public class Listener extends ZooKeeperThread { volatile ServerSocket ss = null;//BIO的ServerSocket ... @Override public void run() { int numRetries = 0; InetSocketAddress addr; Socket client = null; Exception exitException = null; while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) { ss = new ServerSocket(); ss.setReuseAddress(true); if (self.getQuorumListenOnAllIPs()) { int port = self.getElectionAddress().getPort(); addr = new InetSocketAddress(port); } else { self.recreateSocketAddresses(self.getId()); addr = self.getElectionAddress(); } setName(addr.toString()); ss.bind(addr); while (!shutdown) { client = ss.accept(); setSockOpts(client); LOG.info("Received connection request " + client.getRemoteSocketAddress()); if (quorumSaslAuthEnabled) { receiveConnectionAsync(client); } else { receiveConnection(client); } numRetries = 0; } } LOG.info("Leaving listener"); if (!shutdown) { LOG.error("As I'm leaving the listener thread, " + "I won't be able to participate in leader " + "election any longer: " + self.getElectionAddress()); } else if (ss != null) { try { ss.close(); } catch (IOException ie) { LOG.debug("Error closing server socket", ie); } } } ... } ...}
复制代码


在 QuorumPeer 的 start()方法中,调用完 loadDataBase()和 startServerCnxnFactory()方法,恢复好本地数据以及启动完网络连接工厂 ServerCnxnFactory 的线程后,便会调用 QuorumPeer 的 startLeaderElection()方法进行 Leader 选举。

 

一.初始化 Leader 选举(初始化当前投票 + 监听选举端口 + 启动选举守护线程)


Leader 选举是集群版的 zk 服务端和单机版的 zk 服务端的最大不同点。首先 QuorumPeer 的 startLeaderElection()方法会通过"new Vote()",根据服务器 ID、最新的事务 ID 和当前的服务器 epoch,来初始化当前投票。也就是在 Leader 选举的初始化过程中,集群版 zk 的每个服务器都会给自己投票。

 

然后当 QuorumPeer 的 startLeaderElection()方法完成初始化当前投票后,会调用 QuorumPeer 的 createElectionAlgorithm()方法去初始化选举算法,即监听选举端口 3888 + 启动选举守护线程。

 

而在 QuorumPeer 的 createElectionAlgorithm()方法中:首先会创建 Leader 选举所需的网络 IO 实例 QuorumCnxManager。QuorumCnxManager 实例会启动 Listener 线程对 Leader 选举端口进行监听,用来等待集群中其他服务器发送的请求。

 

然后会创建 FastLeaderElection 选举算法实例,zk 默认使用 FastLeaderElection 选举算法,每一个 QuorumPeer 实例都会有一个选举算法实例。

 

接着会启动选举算法 FastLeaderElection 实例使用的守护线程,其中发送投票的守护线程会不断从发送队列中取出消息进行发送,接收投票的守护线程会不断从接收队列中取出消息进行处理。

 

二.启动 QuorumPeer 线程检测当前服务器状态


QuorumPeer 也是一个继承了 ZooKeeperThread 的线程。当 QuorumPeer 的 startLeaderElection()方法完成初始化 Leader 选举后,便会启动 QuorumPeer 线程通过 while 循环不断检查当前服务器状态。

 

QuorumPeer 线程的核心工作就是不断地检测当前服务器的状态并做相应处理。服务器状态会在 LOOKING、LEADING、FOLLOWING/OBSERVING 之间进行切换。在集群版 zk 服务端启动时,QuorumPeer 的初始状态是 LOOKING,所以 QuorumPeer 线程就会判断此时需要更新当前投票进行 Leader 选举。

 

三.进行 Leader 选举


zk 的 Leader 选举过程,其实就是集群中所有机器相互间进行一系列投票,选举产生最合适的机器成为 Leader,其余机器成为 Follower 或 Observer,然后对这些已经确定集群角色的机器通过 QuorumPeer 线程进行初始化的过程。

 

Leader 选举算法:


就是集群中哪个机器处理的数据越新,就越有可能成为 Leader。先通过每个服务器处理过的最大 ZXID 来比较谁的数据最新,如果每个机器处理的 ZXID 一致,那么最大 SID 的服务器就成为 Leader。

 

(4)Leader 和 Follower 启动阶段


一.创建 Leader 服务端实例和 Follower 客户端实例

二.Leader 启动时会创建 LearnerCnxAcceptor 监听 Learner 的连接

三.Learner 启动时会发起请求建立和 Leader 的连接

四.Leader 会为请求连接的每个 Learner 创建一个 LearnerHandler

五.Learner 建立和 Leader 的连接后会向 Leader 发送 LearnerInfo 进行注册

六.Leader 解析 LearnerInfo 计算出 epoch 并发送 LeaderInfo 给 Learner

七.Learner 收到 Leader 发送的 LeaderInfo 后会反馈 ackNewEpoch 消息

八.Leader 收到过半 Learner 的 ackNewEpoch 消息后开始进行数据同步

九.过半 Learner 完成数据同步就启动 Leader 和 Learner 绑定的服务器实例

 

当 zk 完成 Leader 选举后,集群中每个服务器基本都已确定自己的角色。zk 将集群中的机器分为 Leader、Follower、Obervser 三种角色,每种角色在集群中起到的作用都各不相同。

 

Leader 角色主要负责处理客户端发送的数据变更等事务性请求,并管理协调集群中 Follower 角色的服务器。Follower 角色则主要处理客户端的获取数据等非事务性请求。Observer 角色的服务器的功能和 Follower 角色的服务器的功能相似,唯一的不同就是不会参与 Leader 的选举工作。

 

zk 中的这三种角色服务器,在服务启动过程中也有各自的不同,下面分析 Leader 角色和 Follower 角色在启动过程中的工作原理,也就是 Leader 角色和 Follower 角色启动过程中的交互步骤。

 

Leader 服务端和 Follower 客户端的启动交互:



一.创建 Leader 服务端实例和 Follower 客户端实例


由于 QuorumPeer 线程会不断检测当前服务器的状态并做相应处理,所以当 QuorumPeer 线程 + FastLeaderElection 守护线程完成 Leader 选举后,每个 zk 服务器都会根据节点状态创建相应的角色实例来完成数据同步。

 

比如 zk 服务器 QuorumPeer 实例,如果通过 getPeerState()方法发现自己的节点状态为 LEADING,那么就会调用 QuorumPeer 的 makeLeader()方法来创建 Leader 服务端实例。如果通过 getPeerState()方法发现自己的节点状态为 FOLLOWERING,那么就会调用 QuorumPeer 的 makeFollower()方法来创建 Follower 客户端实例。


public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {    private volatile boolean running = true;    public Follower follower;    public Leader leader;    public Observer observer;    ...    public synchronized ServerState getPeerState(){        return state;    }        @Override    public void run() {        ...        while (running) {            //检测当前服务器状态 + 进行Leader选举            switch (getPeerState()) {                case LOOKING:                    LOG.info("LOOKING");                    ...                    //设置当前的投票                    setCurrentVote(electionAlg.lookForLeader());                                                            break;                case FOLLOWING:                    LOG.info("FOLLOWING");                    //1.创建Follower客户端实例                    setFollower(makeFollower(logFactory));                    //启动Follower客户端实例,followLeader()方法里面会有个while循环处理心跳等                    follower.followLeader();                    ...                    break;                case LEADING:                    LOG.info("LEADING");                    //1.创建Leader服务端实例                    setLeader(makeLeader(logFactory));                    //2.启动Leader服务端实例,lead()方法里面会有一个while循环处理心跳等                    leader.lead();                    setLeader(null);                    ...                    break;                ...            }            ...        }    }        //Follower客户端实例绑定了FollowerZooKeeperServer服务器实例    protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {        return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));    }        //Leader服务端实例绑定了LeaderZooKeeperServer服务器实例    protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {        return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));    }    ...}
复制代码


二.Leader 启动时会创建 LearnerCnxAcceptor 监听 Learner 的连接


当 QuorumPeer 线程通过 makeLeader()方法创建好 Leader 服务端实例后,就会通过调用 Leader 的 lead()方法来启动 Leader 服务端实例。

 

首先在 Leader 的构造方法中,会创建 BIO 的 ServerSocket 并监听 2888 端口。然后在 Leader 的 lead()方法中,会创建 Learner 接收器 LearnerCnxAcceptor。而 Leader 的 lead()方法里会有一个 while 循环,不断处理与 Learner 的心跳等。

 

所有非 Leader 角色都可称为 Learner 角色,Follower 会继承 Learner。LearnerCnxAcceptor 接收器则用于接收所有 Learner 客户端的连接请求。


public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {    ...    //Leader服务端实例绑定了LeaderZooKeeperServer服务器实例    protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {        return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));    }}
public class Leader { private final ServerSocket ss;//BIO的ServerSocket //1.创建Leader网络服务端实例 Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException { this.self = self; this.proposalStats = new BufferStats(); if (self.shouldUsePortUnification() || self.isSslQuorum()) { boolean allowInsecureConnection = self.shouldUsePortUnification(); if (self.getQuorumListenOnAllIPs()) { ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, self.getQuorumAddress().getPort()); } else { ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection); } } else { if (self.getQuorumListenOnAllIPs()) { ss = new ServerSocket(self.getQuorumAddress().getPort()); } else { ss = new ServerSocket(); } } ss.setReuseAddress(true); if (!self.getQuorumListenOnAllIPs()) { ss.bind(self.getQuorumAddress()); } this.zk = zk; this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(maxConcurrentSnapshots, maxConcurrentSnapshotTimeout); } //2.启动Leader网络服务端实例 void lead() throws IOException, InterruptedException { ... //2.Leader网络服务端启动时会创建Learner接收器LearnerCnxAcceptor //Start thread that waits for connection requests from new followers. cnxAcceptor = new LearnerCnxAcceptor(); ... } ...}
复制代码


三.Learner 启动时会发起请求建立和 Leader 的连接


当 QuorumPeer 线程通过 makeFollower()方法创建好 Follower 客户端实例后,就会调用 Follower 的 followLeader()方法来启动 Follower 客户端实例。在 Follower 的 followLeader()方法中,也就是在 Learner 客户端实例创建完后,会通过 findLeader()方法从 Leader 选举的投票结果中找到 Leader 服务端,然后通过 connectToLeader()方法来建立和该 Leader 服务端之间的连接。connectToLeader()方法尝试建立连接时,最多尝试 5 次,每次睡眠 1 秒。Follower 的 followLeader()方法中,会有一个 while 循环不断处理心跳等消息。


public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {    ...    //Follower客户端实例绑定了FollowerZooKeeperServer服务器实例    protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {        return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));    }}
public class Follower extends Learner { final FollowerZooKeeperServer fzk; //1.创建Follower网络客户端实例 Follower(QuorumPeer self, FollowerZooKeeperServer zk) { this.self = self; this.zk = zk; this.fzk = zk; } //2.启动Follower网络客户端,也就是启动Learner网络客户端 void followLeader() throws InterruptedException { ... //3.从Leader选举的投票结果中找到Leader网络服务端 QuorumServer leaderServer = findLeader(); //3.建立和Leader网络服务端之间的连接 connectToLeader(leaderServer.addr, leaderServer.hostname); ... while (this.isRunning()) { //主要处理心跳消息 readPacket(qp); processPacket(qp); } } ...}
public class Learner { ... QuorumPeer self; LearnerZooKeeperServer zk; protected BufferedOutputStream bufferedOutput; protected Socket sock; protected InputArchive leaderIs; protected OutputArchive leaderOs; ... //3.从Leader选举的投票结果中找到Leader服务器 protected QuorumServer findLeader() { QuorumServer leaderServer = null; // Find the leader by id Vote current = self.getCurrentVote(); for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { // Ensure we have the leader's correct IP address before attempting to connect. s.recreateSocketAddresses(); leaderServer = s; break; } } if (leaderServer == null) { LOG.warn("Couldn't find the leader with id = " + current.getId()); } return leaderServer; } //3.建立和Leader服务器之间的连接 protected void connectToLeader(InetSocketAddress addr, String hostname) { this.sock = createSocket(); int initLimitTime = self.tickTime * self.initLimit; int remainingInitLimitTime = initLimitTime; long startNanoTime = nanoTime();
for (int tries = 0; tries < 5; tries++) { try { remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000); if (remainingInitLimitTime <= 0) { LOG.error("initLimit exceeded on retries."); throw new IOException("initLimit exceeded on retries."); } sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime)); if (self.isSslQuorum()) { ((SSLSocket) sock).startHandshake(); } sock.setTcpNoDelay(nodelay); break; } catch (IOException e) { ... } Thread.sleep(1000); } self.authLearner.authenticate(sock, hostname); //下面定义的leaderIs和leaderOs是建立在Socket基础上的 //所以后续在registerWithLeader方法中可以先后往leaderIs写数据然后等待从leaderOs读数据 leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); } protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException { sock.connect(addr, timeout); }}
复制代码


四.Leader 会为请求连接的每个 Learner 创建一个 LearnerHandler


LearnerCnxAcceptor 也是一个线程。Leader 服务端启动时会创建和启动 Learner 接收器 LearnerCnxAcceptor,LearnerCnxAcceptor 线程里会通过 while 循环不断监听 Learner 发起的连接。

 

当 Leader 服务端实例接收到来自 Learner 的连接请求后,LearnerCnxAcceptor 线程就会通过 ServerSocket 监听到 Learner 连接请求。此时,LearnerCnxAcceptor 就会创建一个 LearnerHandler 实例。每个 LearnerHandler 实例都对应了一个 Leader 与 Learner 之间的连接,LearnerHandler 负责 Leader 与 Learner 间几乎所有的消息通信和数据同步。

 

LearnerHandler 也是一个线程,LearnerHandler 会通过 BIO + while 循环来处理和 Learner 的通信、数据同步和心跳。


public class Leader {    private final ServerSocket ss;//BIO的ServerSocket        //1.创建Leader服务端实例    Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {        this.self = self;        this.proposalStats = new BufferStats();        if (self.shouldUsePortUnification() || self.isSslQuorum()) {            boolean allowInsecureConnection = self.shouldUsePortUnification();            if (self.getQuorumListenOnAllIPs()) {                ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, self.getQuorumAddress().getPort());            } else {                ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection);            }        } else {            if (self.getQuorumListenOnAllIPs()) {                ss = new ServerSocket(self.getQuorumAddress().getPort());            } else {                ss = new ServerSocket();            }        }        ss.setReuseAddress(true);        if (!self.getQuorumListenOnAllIPs()) {            ss.bind(self.getQuorumAddress());        }        this.zk = zk;        this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);    }    ...    void lead() throws IOException, InterruptedException {        ...        //2.Leader服务端启动时会创建Learner接收器LearnerCnxAcceptor        //Start thread that waits for connection requests from new followers.        cnxAcceptor = new LearnerCnxAcceptor();        cnxAcceptor.start();        ...    }        class LearnerCnxAcceptor extends ZooKeeperCriticalThread {        private volatile boolean stop = false;                public LearnerCnxAcceptor() {            super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk.getZooKeeperServerListener());        }              @Override        public void run() {            while (!stop) {                Socket s = null;                boolean error = false;                ...                s = ss.accept();                s.setSoTimeout(self.tickTime * self.initLimit);                s.setTcpNoDelay(nodelay);                BufferedInputStream is = new BufferedInputStream(s.getInputStream());                //4.Leader服务端会为每个Learner客户端创建一个LearnerHandler                LearnerHandler fh = new LearnerHandler(s, is, Leader.this);                fh.start();                ...            }        }                public void halt() {            stop = true;        }    }    ...}
public class LearnerHandler extends ZooKeeperThread { final Leader leader; ... @Override public void run() { leader.addLearnerHandler(this); tickOfNextAckDeadline = leader.self.tick.get() + leader.self.initLimit + leader.self.syncLimit;
ia = BinaryInputArchive.getArchive(bufferedInput); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); oa = BinaryOutputArchive.getArchive(bufferedOutput);
QuorumPacket qp = new QuorumPacket(); ia.readRecord(qp, "packet"); if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) { LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!"); return; } byte learnerInfoData[] = qp.getData(); ... } ...}
复制代码


五.Learner 建立和 Leader 的连接后会向 Leader 发送 LearnerInfo 进行注册


当 Learner 通过 Learner 的 connectToLeader()方法和 Leader 建立起连接后,就会通过 Learner 的 registerWithLeader()方法开始向 Leader 进行注册。也就是将 Learner 客户端自己的基本信息 LearnerInfo 发送给 Leader 服务端,LearnerInfo 中会包括当前服务器的 SID 和处理事务的最新 ZXID。


public class Follower extends Learner {    final FollowerZooKeeperServer fzk;        //创建Follower客户端实例    Follower(QuorumPeer self, FollowerZooKeeperServer zk) {        this.self = self;        this.zk = zk;        this.fzk = zk;    }        //启动Follower客户端实例,也就是启动Learner客户端实例    void followLeader() throws InterruptedException {        ...        //从Leader选举的投票结果中找到Leader服务端        QuorumServer leaderServer = findLeader();                    try {            //3.建立和Leader服务端之间的连接            connectToLeader(leaderServer.addr, leaderServer.hostname);            //5.向Leader发起注册            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);            ...            syncWithLeader(newEpochZxid);                            QuorumPacket qp = new QuorumPacket();            while (this.isRunning()) {                readPacket(qp);                processPacket(qp);            }        } catch (Exception e) {            LOG.warn("Exception when following the leader", e);            try {                sock.close();            } catch (IOException e1) {                e1.printStackTrace();            }            pendingRevalidations.clear();        }    }    ...}
public class Learner { ... //Once connected to the leader, perform the handshake protocol to establish a following / observing connection. protected long registerWithLeader(int pktType) throws IOException { long lastLoggedZxid = self.getLastLoggedZxid(); QuorumPacket qp = new QuorumPacket(); qp.setType(pktType); qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); //Add sid to payload //5.将Learner客户端自己的基本信息LearnerInfo发送给Leader服务端 LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion()); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); qp.setData(bsid.toByteArray()); writePacket(qp, true); ... } void writePacket(QuorumPacket pp, boolean flush) throws IOException { synchronized (leaderOs) { if (pp != null) { leaderOs.writeRecord(pp, "packet"); } if (flush) { bufferedOutput.flush(); } } } ...}
复制代码


六.Leader 解析 LearnerInfo 计算出 epoch 并发送 LeaderInfo 给 Learner


由于 Leader 的 LearnerCnxAcceptor 在接收到来自 Learner 的连接请求后,会创建 LearnerHandler 来处理 Leader 与 Learner 的消息通信和数据同步。所以当 Learner 通过 registerWithLeader()方法向 Leader 发起注册请求后,Leader 服务端下对应的 LearnerHandler 线程就能收到 LearnerInfo 信息,于是便会根据 LearnerInfo 信息解析出 Learner 的 SID 和 ZXID。

 

首先调用 ZxidUtils 的 getEpochFromZxid()方法,通过将 Learner 的 ZXID 右移 32 位来解析出 Learner 的 epoch。然后调用 Leader 的 getEpochToPropose()方法比较 Learner 和 Leader 的 epoch,如果 Learner 的 epoch 大,则更新 Leader 的 epoch 为 Learner 的 epoch + 1。

 

接着在 Leader 的 getEpochToPropose()方法中,会将 Learner 的 SID 添加到 HashSet 类型的 connectingFollowers 中。通过 Leader 的 connectingFollowers 的 wait()方法和 notifyAll()方法,便能实现让 LearnerHandler 进行等待和唤醒。

 

直到过半 Learner 已向 Leader 进行了注册,同时更新了 Leader 的 epoch,之后 Leader 就可以确定当前集群的 epoch 了。可见,可以通过 Object 的 wait()方法和 notifyAll()方法来实现过半效果:未过半则进行阻塞,过半则进行通知继续后续处理。

 

当确定好当前集群的 epoch 后,Leader 的每个 LearnerHandler,都会发送一个包含该 epoch 的 LeaderInfo 消息给对应的 Learner,然后再通过 Leader 的 waitForEpochAck()方法等待过半 Learner 的响应。


public class LearnerHandler extends ZooKeeperThread {    final Leader leader;    //ZooKeeper server identifier of this learner    protected long sid = 0;    protected final Socket sock;    private BinaryInputArchive ia;    private BinaryOutputArchive oa;    ...    @Override    public void run() {        leader.addLearnerHandler(this);        tickOfNextAckDeadline = leader.self.tick.get() + leader.self.initLimit + leader.self.syncLimit;        //将ia和oa与BIO的Socket进行绑定        //以便当Leader通过oa发送LeaderInfo消息给Learner时,可以通过ia读取到Learner的ackNewEpoch响应        ia = BinaryInputArchive.getArchive(bufferedInput);        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());        oa = BinaryOutputArchive.getArchive(bufferedOutput);

QuorumPacket qp = new QuorumPacket(); ia.readRecord(qp, "packet"); byte learnerInfoData[] = qp.getData(); ... //根据LearnerInfo信息解析出Learner的SID ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData); if (learnerInfoData.length >= 8) { this.sid = bbsid.getLong(); } ... //根据Learner的ZXID解析出对应Learner的epoch long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long zxid = qp.getZxid(); //将Learner的epoch和Leader的epoch进行比较 //如果Learner的epoch更大,则更新Leader的epoch为Learner的epoch + 1 long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); ... //发送一个包含该epoch的LeaderInfo消息给该LearnerHandler对应的Learner QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); //发送包含该epoch的LeaderInfo消息后等待Learner响应 //读取Learner返回的ackNewEpoch响应 QuorumPacket ackEpochPacket = new QuorumPacket(); ia.readRecord(ackEpochPacket, "packet"); ... //等待过半Learner响应 leader.waitForEpochAck(this.getSid(), ss); ... } ...}
public class Leader { ... long epoch = -1; protected final Set<Long> connectingFollowers = new HashSet<Long>(); public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { synchronized(connectingFollowers) { if (!waitingForNewEpoch) { return epoch; } //将Learner的epoch和Leader的epoch进行比较 //如果Learner的epoch更大,则更新Leader的epoch为Learner的epoch + 1 if (lastAcceptedEpoch >= epoch) { epoch = lastAcceptedEpoch + 1; } if (isParticipant(sid)) { connectingFollowers.add(sid); } QuorumVerifier verifier = self.getQuorumVerifier(); if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) { waitingForNewEpoch = false; self.setAcceptedEpoch(epoch); connectingFollowers.notifyAll(); } else { long start = Time.currentElapsedTime(); long cur = start; long end = start + self.getInitLimit()*self.getTickTime(); while (waitingForNewEpoch && cur < end) { //通过HashSet类型的connectingFollowers的wait和notifyAll方法,让LearnerHandler就会进行等待 connectingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); } if (waitingForNewEpoch) { throw new InterruptedException("Timeout while waiting for epoch from quorum"); } } return epoch; } } ... protected final Set<Long> electingFollowers = new HashSet<Long>(); protected boolean electionFinished = false; public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException { synchronized(electingFollowers) { if (electionFinished) { return; } ... if (isParticipant(id)) { electingFollowers.add(id); } QuorumVerifier verifier = self.getQuorumVerifier(); if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) { electionFinished = true; electingFollowers.notifyAll(); } else { long start = Time.currentElapsedTime(); long cur = start; long end = start + self.getInitLimit()*self.getTickTime(); while (!electionFinished && cur < end) { electingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); } ... } } } ...}
复制代码


七.Learner 收到 Leader 发送的 LeaderInfo 后会反馈 ackNewEpoch 消息


Learner 通过 Learner 的 writePacket()方法向 Leader 发送 LearnerInfo 消息后,会继续通过 Learner 的 readPacket()方法接收 Leader 返回的 LeaderInfo 响应。当 Learner 接收到 Leader 的 LearnerHandler 返回的 LeaderInfo 消息后,就会解析出 epoch 和 ZXID,然后向 Leader 反馈一个 ackNewEpoch 响应。


public class Learner {           ...    protected Socket sock;    protected InputArchive leaderIs;    protected OutputArchive leaderOs;      ...    //3.建立和Leader服务端之间的连接    protected void connectToLeader(InetSocketAddress addr, String hostname) {        this.sock = createSocket();          int initLimitTime = self.tickTime * self.initLimit;        int remainingInitLimitTime = initLimitTime;        long startNanoTime = nanoTime();
for (int tries = 0; tries < 5; tries++) { ... remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000); if (remainingInitLimitTime <= 0) { LOG.error("initLimit exceeded on retries."); throw new IOException("initLimit exceeded on retries."); }
sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime)); if (self.isSslQuorum()) { ((SSLSocket) sock).startHandshake(); } sock.setTcpNoDelay(nodelay); break; ... Thread.sleep(1000); } self.authLearner.authenticate(sock, hostname); //下面定义的leaderIs和leaderOs是建立在BIO的Socket基础上的 //所以后续在registerWithLeader方法中可以先往leaderOs写数据然后阻塞等待从leaderIs读数据 leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); } protected long registerWithLeader(int pktType) throws IOException { long lastLoggedZxid = self.getLastLoggedZxid(); QuorumPacket qp = new QuorumPacket(); qp.setType(pktType); qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); //Add sid to payload //5.将Learner客户端自己的基本信息LearnerInfo发送给Leader服务端 LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion()); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); qp.setData(bsid.toByteArray()); writePacket(qp, true); //7.接收Leader发送的包含当前集群的epoch的LeaderInfo信息 readPacket(qp); final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); if (qp.getType() == Leader.LEADERINFO) { leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); byte epochBytes[] = new byte[4]; final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); if (newEpoch > self.getAcceptedEpoch()) { wrappedEpochBytes.putInt((int)self.getCurrentEpoch()); self.setAcceptedEpoch(newEpoch); } else if (newEpoch == self.getAcceptedEpoch()) { wrappedEpochBytes.putInt(-1); } else { throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch()); } //7.收到Leader发送的LeaderInfo信息后反馈ackNewEpoch消息给Learner QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); writePacket(ackNewEpoch, true); return ZxidUtils.makeZxid(newEpoch, 0); } ... } //write a packet to the leader //@param pp:the proposal packet to be sent to the leader void writePacket(QuorumPacket pp, boolean flush) throws IOException { synchronized (leaderOs) { if (pp != null) { leaderOs.writeRecord(pp, "packet"); } if (flush) { bufferedOutput.flush(); } } } //read a packet from the leader //@param pp:the packet to be instantiated void readPacket(QuorumPacket pp) throws IOException { synchronized (leaderIs) { leaderIs.readRecord(pp, "packet"); } long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; if (pp.getType() == Leader.PING) { traceMask = ZooTrace.SERVER_PING_TRACE_MASK; } } ...}
复制代码


八.Leader 收到过半 Learner 的 ackNewEpoch 消息后开始进行数据同步


Leader 每收到一个 Learner 的连接请求,都会启动一个 LearnerHandler 处理。每个 LearnerHandler 线程都会首先通过 Leader 的 getEpochToPropose()方法,阻塞等待过半 Learner 发送 LearnerInfo 信息发起对 Leader 的注册。

 

当过半 Learner 已经向 Leader 进行注册后,每个 LearnerHandler 线程又继续发送 LeaderInfo 信息给 Learner 确认 epoch,然后通过 Leader 的 waitForEpochAck()方法,阻塞等待过半 Learner 返回响应。

 

当 Leader 接收到过半 Learner 向 Leader 发送的 ackNewEpoch 响应后,每个 LearnerHandler 线程便会开始执行与 Learner 间的数据同步,而 Learner 会通过 Learner 的 syncWithLeader()方法执行与 Leader 的数据同步。


public class LearnerHandler extends ZooKeeperThread {    final Leader leader;    //ZooKeeper server identifier of this learner    protected long sid = 0;    protected final Socket sock;    private BinaryInputArchive ia;    private BinaryOutputArchive oa;    ...    @Override    public void run() {        leader.addLearnerHandler(this);        tickOfNextAckDeadline = leader.self.tick.get() + leader.self.initLimit + leader.self.syncLimit;        //将ia和oa与Socket进行绑定        //以便当Leader通过oa发送LeaderInfo消息给Learner时,可以通过ia读取到Learner的ackNewEpoch响应        ia = BinaryInputArchive.getArchive(bufferedInput);        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());        oa = BinaryOutputArchive.getArchive(bufferedOutput);
QuorumPacket qp = new QuorumPacket(); ia.readRecord(qp, "packet"); byte learnerInfoData[] = qp.getData(); ... //根据LearnerInfo信息解析出Learner的SID ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData); if (learnerInfoData.length >= 8) { this.sid = bbsid.getLong(); } ... //根据Learner的ZXID解析出对应Learner的epoch long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long zxid = qp.getZxid(); //将Learner的epoch和Leader的epoch进行比较 //如果Learner的epoch更大,则更新Leader的epoch为Learner的epoch + 1 long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); ... //发送一个包含该epoch的LeaderInfo消息给该LearnerHandler对应的Learner QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); QuorumPacket ackEpochPacket = new QuorumPacket(); //发送包含该epoch的LeaderInfo消息后等待Learner响应 //读取Learner返回的ackNewEpoch响应 ia.readRecord(ackEpochPacket, "packet"); ... //等待过半Learner响应 leader.waitForEpochAck(this.getSid(), ss); ... //下面执行与Learner的数据同步 peerLastZxid = ss.getLastZxid(); boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader); if (needSnap) { long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet"); bufferedOutput.flush(); // Dump data to peer leader.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); bufferedOutput.flush(); } LOG.debug("Sending NEWLEADER message to " + sid); if (getVersion() < 0x10000) { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null); oa.writeRecord(newLeaderQP, "packet"); } else { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null); queuedPackets.add(newLeaderQP); } bufferedOutput.flush(); //Start thread that blast packets in the queue to learner startSendingPackets();
//Have to wait for the first ACK, wait until the leader is ready, and only then we can start processing messages. qp = new QuorumPacket(); ia.readRecord(qp, "packet"); //阻塞等待过半Learner完成数据同步,接下来就可以启动QuorumPeer服务器实例了 leader.waitForNewLeaderAck(getSid(), qp.getZxid()); ... while (true) { //这里有关于Leader和Learner之间保持心跳的处理 } } ...}
public class Leader { ... protected final Set<Long> electingFollowers = new HashSet<Long>(); protected boolean electionFinished = false; public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException { synchronized(electingFollowers) { if (electionFinished) { return; } ... if (isParticipant(id)) { electingFollowers.add(id); } QuorumVerifier verifier = self.getQuorumVerifier(); if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) { electionFinished = true; electingFollowers.notifyAll(); } else { long start = Time.currentElapsedTime(); long cur = start; long end = start + self.getInitLimit()*self.getTickTime(); while(!electionFinished && cur < end) { electingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); } ... } } } ...}
复制代码


九.过半 Learner 完成数据同步就启动 Leader 和 Learner 绑定的服务器实例


Follower 客户端绑定了 FollowerZooKeeperServer 服务器实例,Leader 服务端绑定了 LeaderZooKeeperServer 服务器实例。

 

在 Leader 的 lead()方法中,首先创建 Learner 接收器 LearnerCnxAcceptor 监听 Learner 发起的连接请求,然后 Leader 的 lead()方法会阻塞等待过半 Learner 完成向 Leader 的注册,接着 Leader 的 lead()方法会阻塞等待过半 Learner 返回 ackNewEpoch 响应,接着 Leader 的 lead()方法会阻塞等待过半 Learner 完成数据同步,然后执行 Leader 的 startZkServer()方法启动 Leader 绑定的服务器实例,也就是执行 LeaderZooKeeperServer 的 startup()方法启动服务器。

 

而在 Learner 进行数据同步的 Learner 的 syncWithLeader()方法中,完成数据同步后同样会启动 Learner 绑定的服务器实例,也就是执行 LearnerZooKeeperServer 的 startup()方法启动服务器。

 

Leader 和 Learner 绑定的服务器实例的启动步骤,主要就是执行 ZooKeeperServer 的 startup()方法,即:创建并启动会话管理器 + 初始化服务器的请求处理链。


public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {    ...    //Follower客户端绑定了FollowerZooKeeperServer服务器实例    protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {        return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));    }        //Leader服务端绑定了LeaderZooKeeperServer服务器实例    protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {        return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));    }    ...}
public class Leader { final LeaderZooKeeperServer zk; ... void lead() throws IOException, InterruptedException { ... //创建Learner接收器LearnerCnxAcceptor监听Learner发起的连接请求 cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start(); //阻塞等待过半Learner完成向Leader的注册 long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch()); ... //阻塞等待过半Learner返回ackNewEpoch响应 waitForEpochAck(self.getId(), leaderStateSummary); ... //阻塞等待过半Learner完成数据同步 waitForNewLeaderAck(self.getId(), zk.getZxid()); ... //开始启动Leader绑定的LeaderZooKeeperServer服务器实例 startZkServer(); ... } private synchronized void startZkServer() { ... zk.startup(); ... } ...}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { ... public synchronized void startup() { startupWithServerState(State.RUNNING); } private void startupWithServerState(State state) { //创建并启动会话管理器 if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); //初始化服务器的请求处理链 setupRequestProcessors(); registerJMX(); //开启监控JVM停顿的线程 startJvmPauseMonitor(); setState(state); notifyAll(); } ...}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18815712

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
zk源码—单机和集群通信原理(二)_电子尖叫食人鱼_InfoQ写作社区