1.单机版的 zk 服务端的启动过程
单机版 zk 服务端的启动,主要分为两个阶段:预启动阶段和初始化阶段,其启动流程图如下:
接下来介绍 zk 服务端的预启动阶段(启动管理)与初始化阶段的具体流程,也就是单机版的 zk 服务端是如何从初始化到对外提供服务的。
(1)预启动阶段
在 zk 服务端进行初始化之前,首先要对配置文件等信息进行解析和载入,而 zk 服务端的预启动阶段的主要工作流程如下:
public class QuorumPeerMain { protected QuorumPeer quorumPeer; ... //1.启动程序入口 public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { //启动程序 main.initializeAndRun(args); } catch (IllegalArgumentException e) { ... } LOG.info("Exiting normally"); System.exit(0); } protected void initializeAndRun(String[] args) { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { //2.解析配置文件 config.parse(args[0]); } //3.创建和启动历史文件清理器 DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); //4.根据配置判断是集群模式还是单机模式 if (args.length == 1 && config.isDistributed()) { //集群模式 runFromConfig(config); } else { //单机模式 ZooKeeperServerMain.main(args); } } ...}
public class ZooKeeperServerMain { private ServerCnxnFactory cnxnFactory; ... public static void main(String[] args) { ZooKeeperServerMain main = new ZooKeeperServerMain(); try { //启动程序 main.initializeAndRun(args); } catch (IllegalArgumentException e) { ... } LOG.info("Exiting normally"); System.exit(0); } protected void initializeAndRun(String[] args) { ... ServerConfig config = new ServerConfig(); //2.解析配置文件 if (args.length == 1) { config.parse(args[0]); } else { config.parse(args); } runFromConfig(config); } //以下是初始化阶段的内容 public void runFromConfig(ServerConfig config) { ... txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null); ... } ...}
public class DatadirCleanupManager { //配置zoo.cfg文件中的autopurge.snapRetainCount和autopurge.purgeInterval实现数据快照文件的定时清理; private final File snapDir;//数据快照地址 private final File dataLogDir;//事务日志地址 //配置zoo.cfg文件中的autopurge.snapRetainCount,可指定需要保留的文件数目,默认是保留3个; private final int snapRetainCount;//需要保留的文件数目 //配置zoo.cfg文件中的autopurge.purgeInterval可指定清理频率,以小时为单位,默认0表示不开启自动清理功能; private final int purgeInterval;//清理频率 private Timer timer; ...}
复制代码
一.启动程序
QuorumPeerMain 类是 zk 服务的启动入口,可理解为 Java 中的 main 函数。通常我们执行 zkServer.sh 脚本启动 zk 服务时,就会运行这个类。QuorumPeerMain 的 main()方法会调用它的 initializeAndRun()方法来启动程序。
二.解析 zoo.cfg 配置文件
在 QuorumPeerMain 的 main()方法中,会执行它的 initializeAndRun()方法。
在 QuorumPeerMain 的 initializeAndRun()方法中,便会解析 zoo.cfg 配置文件。
在 ZooKeeperServerMain 的 initializeAndRun()方法中,也会解析 zoo.cfg 配置文件。
zoo.cfg 配置文件配置了 zk 运行时的基本参数,包括 tickTime、dataDir 等。
三.创建和启动历史文件清理器
文件清理器在日常的使用中非常重要。面对大流量的网络访问,zk 会产生海量的数据。如果磁盘数据过多或者磁盘空间不足,可能会导致 zk 服务端不能正常运行,所以 zk 采用 DatadirCleanupManager 类去清理历史文件。
其中 DatadirCleanupManager 类有 5 个属性,如上代码所示。DatadirCleanupManager 会对事务日志和数据快照文件进行定时清理,这种自动清理历史数据文件的机制可以尽量避免 zk 磁盘空间的浪费。
四.判断集群模式还是单机模式
根据从 zoo.cfg 文件解析出来的集群服务器地址列表来判断是否是单机模式。如果是单机模式,则会调用 ZooKeeperServerMain 的 main()方法来进行启动。如果是集群模式,则会调用 QuorumPeerMain 的 runFromConfig()方法来进行启动。
(2)初始化阶段
初始化阶段会根据预启动解析出的配置信息,初始化服务器实例。该阶段的主要工作流程如下:
一.创建数据持久化工具实例 FileTxnSnapLog
二.创建服务端统计工具实例 ServerStats
三.根据两个工具实例创建单机版服务器实例
四.创建网络连接工厂实例
五.初始化网络连接工厂实例
六.启动网络连接工厂实例的线程
七.恢复单机版服务器实例的本地数据
八.创建并启动服务器实例的会话管理器
九.初始化单机版服务器实例的请求处理链
十.注册单机版服务器实例到网络连接工厂实例
单机版服务器实例:ZooKeeperServer
网络连接工厂实例:ServerCnxnFactory
会话管理器:SessionTracker
public class ZooKeeperServerMain { private ServerCnxnFactory cnxnFactory; ... //以下是初始化阶段的内容 public void runFromConfig(ServerConfig config) { ... //1.创建zk数据管理器——持久化工具类FileTxnSnapLog的实例 txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); //2.创建zk服务运行统计器——统计工具类ServerStats的实例 //3.创建服务器实例ZooKeeperServer final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null); txnLog.setServerStats(zkServer.serverStats()); ... //4.创建服务端网络连接工厂实例ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); cnxnFactory.startup(zkServer); ... } ...}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { private final ServerStats serverStats; private FileTxnSnapLog txnLogFactory = null; private ZKDatabase zkDb; protected int tickTime = DEFAULT_TICK_TIME; private final ZooKeeperServerListener listener; ... public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb) { //2.创建服务端统计工具ServerStats实例 serverStats = new ServerStats(this); this.txnLogFactory = txnLogFactory; this.txnLogFactory.setServerStats(this.serverStats); this.zkDb = zkDb; this.tickTime = tickTime; setMinSessionTimeout(minSessionTimeout); setMaxSessionTimeout(maxSessionTimeout); listener = new ZooKeeperServerListenerImpl(this); } ...}
复制代码
一.创建数据持久化工具实例 FileTxnSnapLog
可以通过 FileTxnSnapLog 对 zk 服务器的内存数据进行持久化,具体会将内存数据持久化到配置文件里的事务日志文件 + 快照数据文件中。
所以在执行 ZooKeeperServerMain 的 runFromConfig()方法启动 zk 服务端时,首先会根据 zoo.cfg 配置文件中的 dataDir 数据快照目录和 dataLogDir 事务日志目录,通过"new FileTxnSnapLog()"来创建持久化工具类 FileTxnSnapLog 的实例。
二.创建服务端统计工具实例 ServerStats
ServerStats 用于统计 zk 服务端运行时的状态信息,主要统计的数据包括:服务端向客户端发送的响应包次数、接收客户端发送的请求包次数、服务端处理请求的延迟情况、处理客户端的请求次数。
在执行 ZooKeeperServerMain.runFromConfig()方法时,执行到 ZooKeeperServer 的构造方法就会首先创建 ServerStats 实例。
三.根据两个工具实例创建单机版服务器实例
ZooKeeperServer 是单机版服务端的核心实体类。在执行 ZooKeeperServerMain.runFromConfig()方法时,创建完 zk 数据管理器——持久化工具类 FileTxnSnapLog 的实例后,就会通过"new ZooKeeperServer()"来创建单机版服务器实例 ZooKeeperServer。
此时会传入从 zoo.cfg 配置文件中解析出的 tickTime 和会话超时时间来创建服务器实例。创建完服务器实例 ZooKeeperServer 后,接下来才会对该 ZooKeeperServer 服务器实例进行更多的初始化工作,包括网络连接器、内存数据库和请求处理器等组件的初始化。
四.创建网络连接工厂实例
zk 中客户端和服务端的网络通信,本质是通过 Java 的 IO 数据流进行通信的。zk 一开始就是使用自己实现的 NIO 进行网络通信的,但之后引入了 Netty 框架来满足不同使用情况下的需求。
在执行 ZooKeeperServerMain 的 runFromConfig()方法时,创建完服务器实例 ZooKeeperServer 后,就会通过 ServerCnxnFactory 的 createFactory()方法来创建服务端网络连接工厂实例 ServerCnxnFactory。
ServerCnxnFactory 的 createFactory()方法首先会获取配置值,判断是使用 NIO 还是使用 Netty,然后再通过反射去实例化服务端网络连接工厂。
可以通过配置 zookeeper.serverCnxnFactory 来指定使用:zk 自己实现的 NIO 还是 Netty 框架,来构建服务端网络连接工厂 ServerCnxnFactory。
public abstract class ServerCnxnFactory { ... static public ServerCnxnFactory createFactory() throws IOException { String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY); if (serverCnxnFactoryName == null) { serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance(); return serverCnxnFactory; } ...}
复制代码
五.初始化网络连接工厂实例
在执行 ZooKeeperServerMain 的 runFromConfig()方法时,创建完服务端网络连接工厂 ServerCnxnFactory 实例后,就会调用网络连接工厂 ServerCnxnFactory 的 configure()方法来初始化网络连接工厂 ServerCnxnFactory 实例。
这里以 NIOServerCnxnFactory 的 configure()方法为例,该方法主要会启动一个 NIO 服务器,以及创建三类线程:
一.处理客户端连接的 AcceptThread 线程
二.处理客户端请求的一批 SelectorThread 线程
三.处理过期连接的 ConnectionExpirerThread 线程
初始化完 ServerCnxnFactory 实例后,虽然此时 NIO 服务器已对外开放端口,客户端也能访问到 2181 端口,但此时 zk 服务端还不能正常处理客户端请求。
public class NIOServerCnxnFactory extends ServerCnxnFactory { //最大客户端连接数 protected int maxClientCnxns = 60; //处理过期连接的线程 private ConnectionExpirerThread expirerThread; //处理客户端建立连接的线程 private AcceptThread acceptThread; //处理客户端请求的线程 private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>(); //会话过期相关 int sessionlessCnxnTimeout; //连接过期队列 private ExpiryQueue<NIOServerCnxn> cnxnExpiryQueue; //selector线程数,CPU核数的一半 private int numSelectorThreads; //工作线程数 private int numWorkerThreads; ... public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException { ... maxClientCnxns = maxcc; sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000); //连接过期队列 cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout); //创建一个自动处理过期连接的ConnectionExpirerThread线程 expirerThread = new ConnectionExpirerThread();
int numCores = Runtime.getRuntime().availableProcessors(); numSelectorThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores/2), 1)); numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores); ... //创建一批SelectorThread线程 for (int i=0; i<numSelectorThreads; ++i) { selectorThreads.add(new SelectorThread(i)); } //打开ServerSocketChannel this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); //绑定端口,启动NIO服务器 ss.socket().bind(addr); ss.configureBlocking(false); //创建一个AcceptThread线程 acceptThread = new AcceptThread(ss, addr, selectorThreads); } ...}
复制代码
六.启动网络连接工厂实例的线程
在执行 ZooKeeperServerMain 的 runFromConfig()方法时,调用完网络连接工厂 ServerCnxnFactory 的 configure()方法初始化网络连接工厂 ServerCnxnFactory 实例后,便会调用 ServerCnxnFactory 的 startup()方法去启动 ServerCnxnFactory 的线程。
注意:对过期连接进行处理是由一个 ConnectionExpirerThread 线程负责的。
public class NIOServerCnxnFactory extends ServerCnxnFactory { private ExpiryQueue<NIOServerCnxn> cnxnExpiryQueue;//连接的过期队列 ... public void startup(ZooKeeperServer zks, boolean startServer) { //6.启动各种线程 start(); setZooKeeperServer(zks); if (startServer) { //7.恢复本地数据 zks.startdata(); //8.创建并启动会话管理器SessionTracker //9.初始化zk的请求处理链 //10.注册zk服务器实例 zks.startup(); } } public void start() { stopped = false; if (workerPool == null) { workerPool = new WorkerService("NIOWorker", numWorkerThreads, false); } for (SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } } ... //用来处理过期连接,会启动一个超时检查线程来检查连接是否过期 private class ConnectionExpirerThread extends ZooKeeperThread { ConnectionExpirerThread() { super("ConnnectionExpirer"); } @Override public void run() { while (!stopped) { //使用了分桶管理策略 long waitTime = cnxnExpiryQueue.getWaitTime(); if (waitTime > 0) { Thread.sleep(waitTime); continue; } for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) { conn.close(); } } } } //用来处理要建立连接的客户端OP_ACCEPT请求 private class AcceptThread extends AbstractSelectThread { private final ServerSocketChannel acceptSocket; private final SelectionKey acceptKey; private final Collection<SelectorThread> selectorThreads; private Iterator<SelectorThread> selectorIterator; ... public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException { super("NIOServerCxnFactory.AcceptThread:" + addr); this.acceptSocket = ss; this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT); this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads)); selectorIterator = this.selectorThreads.iterator(); } @Override public void run() { ... while (!stopped && !acceptSocket.socket().isClosed()) { select(); } ... } private void select() { selector.select(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { if (!doAccept()) { pauseAccept(10); } } else { LOG.warn("Unexpected ops in accept select " + key.readyOps()); } } } private void pauseAccept(long millisecs) { acceptKey.interestOps(0); selector.select(millisecs); acceptKey.interestOps(SelectionKey.OP_ACCEPT); } private boolean doAccept() { boolean accepted = false; SocketChannel sc = null; sc = acceptSocket.accept(); accepted = true; InetAddress ia = sc.socket().getInetAddress(); int cnxncount = getClientCnxnCount(ia); ... sc.configureBlocking(false); // Round-robin assign this connection to a selector thread if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); ... acceptErrorLogger.flush(); return accepted; } } //用来处理AcceptThread线程建立好的客户端连接请求 class SelectorThread extends AbstractSelectThread { private final int id; private final Queue<SocketChannel> acceptedQueue; private final Queue<SelectionKey> updateQueue; public SelectorThread(int id) throws IOException { super("NIOServerCxnFactory.SelectorThread-" + id); this.id = id; acceptedQueue = new LinkedBlockingQueue<SocketChannel>(); updateQueue = new LinkedBlockingQueue<SelectionKey>(); } public boolean addAcceptedConnection(SocketChannel accepted) { if (stopped || !acceptedQueue.offer(accepted)) { return false; } wakeupSelector(); return true; } ... @Override public void run() { while (!stopped) { select(); processAcceptedConnections(); processInterestOpsUpdateRequests(); } ... } private void select() { selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected); Collections.shuffle(selectedList); Iterator<SelectionKey> selectedKeys = selectedList.iterator(); while(!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selected.remove(key); ... } } private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; key = accepted.register(selector, SelectionKey.OP_READ); NIOServerCnxn cnxn = createConnection(accepted, key, this); key.attach(cnxn); addCnxn(cnxn); } } ... } ... protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) { return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread); } private void addCnxn(NIOServerCnxn cnxn) throws IOException { ... //激活连接 touchCnxn(cnxn); } public void touchCnxn(NIOServerCnxn cnxn) { //这个cnxnExpiryQueue与管理过期连接有关 cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout()); }}
复制代码
七.恢复单机版服务器实例的本地数据
启动 zk 服务端需要从本地快照数据文件 + 事务日志文件中进行数据恢复。在执行 ZooKeeperServerMain 的 runFromConfig()方法时,调用完 ServerCnxnFactory 的 startup()方法启动 ServerCnxnFactory 的线程后,就会调用单机版服务器实例 ZooKeeperServer 的 startdata()方法来恢复本地数据。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { private ZKDatabase zkDb; private FileTxnSnapLog txnLogFactory = null; ... //7.恢复本地数据 public void startdata() { if (zkDb == null) { zkDb = new ZKDatabase(this.txnLogFactory); } if (!zkDb.isInitialized()) { loadData(); } } public void loadData() throws IOException, InterruptedException { if (zkDb.isInitialized()) { setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { setZxid(zkDb.loadDataBase()); } // Clean up dead sessions LinkedList<Long> deadSessions = new LinkedList<Long>(); for (Long session : zkDb.getSessions()) { if (zkDb.getSessionWithTimeOuts().get(session) == null) { deadSessions.add(session); } } for (long session : deadSessions) { killSession(session, zkDb.getDataTreeLastProcessedZxid()); } // Make a clean snapshot takeSnapshot(); } public void takeSnapshot(){ try { txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts()); } catch (IOException e) { LOG.error("Severe unrecoverable error, exiting", e); System.exit(10); } } ...}
复制代码
八.创建并启动服务器实例的会话管理器
会话管理器 SessionTracker 主要负责 zk 服务端的会话管理。在执行 ZooKeeperServerMain 的 runFromConfig()方法时,调用完单机版服务器实例 ZooKeeperServer 的 startdata()方法完成本地数据恢复后,就会调用 ZooKeeperServer 的 startup()方法来开始创建并启动会话管理器,也就是在 startup()方法中会调用 createSessionTracker()和 startSessionTracker()方法。SessionTracker 其实也是一个继承了 ZooKeeperThread 的线程。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { protected SessionTracker sessionTracker; private FileTxnSnapLog txnLogFactory = null; private ZKDatabase zkDb; ... public synchronized void startup() { startupWithServerState(State.RUNNING); } private void startupWithServerState(State state) { //8.创建并启动会话管理器SessionTracker if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); //9.初始化服务器实例ZooKeeperServer的请求处理链 setupRequestProcessors(); //注册JMX服务 registerJMX(); //开启监控JVM停顿的线程 startJvmPauseMonitor(); setState(state); notifyAll(); } protected void createSessionTracker() { sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener()); } protected void startSessionTracker() { ((SessionTrackerImpl)sessionTracker).start(); } ...}
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker { private final ExpiryQueue<SessionImpl> sessionExpiryQueue; private final SessionExpirer expirer; ... @Override public void run() { while (running) { //使用了分桶管理策略 long waitTime = sessionExpiryQueue.getWaitTime(); if (waitTime > 0) { Thread.sleep(waitTime); continue; } for (SessionImpl s : sessionExpiryQueue.poll()) { setSessionClosing(s.sessionId); expirer.expire(s); } } } ...}
复制代码
九.初始化单机版服务器实例的请求处理链
在执行 ZooKeeperServerMain 的 runFromConfig()方法时,在 ZooKeeperServer 的 startup()方法中调用方法创建并启动好会话管理器后,就会继续在 ZooKeeperServer 的 startup()方法中调用方法初始化请求处理链,也就是在 startup()方法中会调用 setupRequestProcessors()方法。
zk 处理请求的方式是典型的责任链模式,zk 服务端会使用多个请求处理器来依次处理一个客户端请求。所以在服务端启动时,会将这些请求处理器串联起来形成一个请求处理链。
单机版服务器的请求处理链包括 3 个请求处理器:
第一个请求处理器是:PrepRequestProcessor
第二个请求处理器是:SyncRequestProcessor
第三个请求处理器是:FinalRequestProcessor
zk 服务端会严格按照顺序分别调用这 3 个请求处理器处理客户端的请求,其中 PrepRequestProcessor 和 SyncRequestProcessor 其实也是一个线程。服务端收到的客户端请求会不断被添加到请求处理器的请求队列中,然后请求处理器线程启动后就会不断从请求队列中提取请求出来进行处理。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { protected RequestProcessor firstProcessor; ... public synchronized void startup() { startupWithServerState(State.RUNNING); } private void startupWithServerState(State state) { //8.创建并启动会话管理器SessionTracker if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); //9.初始化服务器实例ZooKeeperServer的请求处理链 setupRequestProcessors(); //注册JMX服务 registerJMX(); //开启监控JVM停顿的线程 startJvmPauseMonitor(); setState(state); notifyAll(); } protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); } ...}
public interface RequestProcessor { void processRequest(Request request) throws RequestProcessorException; void shutdown();}
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>(); private final RequestProcessor nextProcessor; ZooKeeperServer zks; ... public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { super("ProcessThread(sid:" + zks.getServerId() + " cport:" + zks.getClientPort() + "):", zks.getZooKeeperServerListener()); this.nextProcessor = nextProcessor; this.zks = zks; } public void processRequest(Request request) { submittedRequests.add(request); } @Override public void run() { ... while (true) { Request request = submittedRequests.take(); pRequest(request); } ... } protected void pRequest(Request request) throws RequestProcessorException { ... //事务请求处理 pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true); ... //交给下一个处理器处理 nextProcessor.processRequest(request); } ...}
复制代码
十.注册单机版服务器实例到网络连接工厂实例
就是调用 ServerCnxnFactory 的 startup()方法中的 setZooKeeperServer()方法,将初始化好的单机版服务器实例 ZooKeeperServer 注册到网络连接工厂实例 ServerCnxnFactory。同时,也会将网络连接工厂实例 ServerCnxnFactory 注册到单机版服务器实例 ZooKeeperServer。此时,zk 服务端就可以对外提供正常的服务了。
public class NIOServerCnxnFactory extends ServerCnxnFactory { ... public void startup(ZooKeeperServer zks, boolean startServer) { //6.启动各种线程 start(); //10.注册zk服务器实例 setZooKeeperServer(zks); if (startServer) { //7.恢复本地数据 zks.startdata(); //8.创建并启动会话管理器SessionTracker //9.初始化zk的请求处理链 zks.startup(); } } ...}
public abstract class ServerCnxnFactory { ... protected ZooKeeperServer zkServer; final public void setZooKeeperServer(ZooKeeperServer zks) { this.zkServer = zks; if (zks != null) { if (secure) { zks.setSecureServerCnxnFactory(this); } else { zks.setServerCnxnFactory(this); } } } ...}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { ... protected ServerCnxnFactory serverCnxnFactory; protected ServerCnxnFactory secureServerCnxnFactory; public void setServerCnxnFactory(ServerCnxnFactory factory) { serverCnxnFactory = factory; } public void setSecureServerCnxnFactory(ServerCnxnFactory factory) { secureServerCnxnFactory = factory; } ...}
复制代码
文章转载自:东阳马生架构
原文链接:https://www.cnblogs.com/mjunz/p/18815712
体验地址:http://www.jnpfsoft.com/?from=001YH
评论