写点什么

zookeeper 的 Leader 选举源码解析

  • 2023-03-29
    四川
  • 本文字数:14193 字

    阅读完需:约 47 分钟

zookeeper的Leader选举源码解析

作者:京东物流 梁吉超


zookeeper 是一个分布式服务框架,主要解决分布式应用中常见的多种数据问题,例如集群管理,状态同步等。为解决这些问题 zookeeper 需要 Leader 选举进行保障数据的强一致性机制和稳定性。本文通过集群的配置,对 leader 选举源进行解析,让读者们了解如何利用 BIO 通信机制,多线程多层队列实现高性能架构。

01Leader 选举机制

Leader 选举机制采用半数选举算法。


每一个 zookeeper 服务端称之为一个节点,每个节点都有投票权,把其选票投向每一个有选举权的节点,当其中一个节点选举出票数过半,这个节点就会成为 Leader,其它节点成为 Follower。

02Leader 选举集群配置

  1. 重命名 zoo_sample.cfg 文件为 zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg

  2. 修改 zoo.cfg 文件,修改值如下:


【plain】zoo1.cfg文件内容:dataDir=/export/data/zookeeper-1clientPort=2181server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observer

zoo2.cfg文件内容:dataDir=/export/data/zookeeper-2clientPort=2182server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observer

zoo3.cfg文件内容:dataDir=/export/data/zookeeper-3clientPort=2183server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observer

zoo4.cfg文件内容:dataDir=/export/data/zookeeper-4clientPort=2184server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observer
复制代码


  1. server.第几号服务器(对应 myid 文件内容)=ip:数据同步端口:选举端口:选举标识


  • participant 默认参与选举标识,可不写. observer 不参与选举


4.在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4 目录下创建 myid 文件,文件内容分别写 1 ,2,3,4,用于标识 sid(全称:Server ID)赋值。


  1. 启动三个 zookeeper 实例:


  • bin/zkServer.sh start conf/zoo1.cfg

  • bin/zkServer.sh start conf/zoo2.cfg

  • bin/zkServer.sh start conf/zoo3.cfg


  1. 每启动一个实例,都会读取启动参数配置 zoo.cfg 文件,这样实例就可以知道其作为服务端身份信息 sid 以及集群中有多少个实例参与选举。

03Leader 选举流程


图 1 第一轮到第二轮投票流程


前提:


设定票据数据格式 vote(sid,zxid,epoch)


  • sid 是 Server ID 每台服务的唯一标识,是 myid 文件内容;

  • zxid 是数据事务 id 号;

  • epoch 为选举周期,为方便理解下面讲解内容暂定为 1 初次选举,不写入下面内容里。


按照顺序启动 sid=1,sid=2 节点


第一轮投票:


  1. sid=1 节点:初始选票为自己,将选票 vote(1,0)发送给 sid=2 节点;

  2. sid=2 节点:初始选票为自己,将选票 vote(2,0)发送给 sid=1 节点;

  3. sid=1 节点:收到 sid=2 节点选票 vote(2,0)和当前自己的选票 vote(1,0),首先比对 zxid 值,zxid 越大代表数据最新,优先选择 zxid 最大的选票,如果 zxid 相同,选举最大 sid。当前投票选举结果为 vote(2,0),sid=1 节点的选票变为 vote(2,0);

  4. sid=2 节点:收到 sid=1 节点选票 vote(1,0)和当前自己的选票 vote(2,0),参照上述选举方式,选举结果为 vote(2,0),sid=2 节点的选票不变;

  5. 第一轮投票选举结束。


第二轮投票:


  1. sid=1 节点:当前自己的选票为 vote(2,0),将选票 vote(2,0)发送给 sid=2 节点;

  2. sid=2 节点:当前自己的选票为 vote(2,0),将选票 vote(2,0)发送给 sid=1 节点;

  3. sid=1 节点:收到 sid=2 节点选票 vote(2,0)和自己的选票 vote(2,0), 按照半数选举算法,总共 3 个节点参与选举,已有 2 个节点选举出相同选票,推举 sid=2 节点为 Leader,自己角色变为 Follower;

  4. sid=2 节点:收到 sid=1 节点选票 vote(2,0)和自己的选票 vote(2,0),按照半数选举算法推举 sid=2 节点为 Leader,自己角色变为 Leader。


这时启动 sid=3 节点后,集群里已经选举出 leader,sid=1 和 sid=2 节点会将自己的 leader 选票发回给 sid=3 节点,通过半数选举结果还是 sid=2 节点为 leader。

3.1 Leader 选举采用多层队列架构

zookeeper 选举底层主要分为选举应用层和消息传输队列层,第一层应用层队列统一接收和发送选票,而第二层传输层队列,是按照服务端 sid 分成了多个队列,是为了避免给每台服务端发送消息互相影响。比如对某台机器发送不成功不会影响正常服务端的发送。



图 2 多层队列上下关系交互流程图

04 解析代码入口类

通过查看 zkServer.sh 文件内容找到服务启动类:


org.apache.zookeeper.server.quorum.QuorumPeerMain

05 选举流程代码解析


图 3 选举代码实现流程图


  1. 加载配置文件 QuorumPeerConfig.parse(path);


针对 Leader 选举关键配置信息如下:


  • 读取 dataDir 目录找到 myid 文件内容,设置当前应用 sid 标识,做为投票人身份信息。下面遇到 myid 变量为当前节点自己 sid 标识。

  • 设置 peerType 当前应用是否参与选举

  • new QuorumMaj()解析 server.前缀加载集群成员信息,加载 allMembers 所有成员,votingMembers 参与选举成员,observingMembers 观察者成员,设置 half 值 votingMembers.size()/2.


【Java】public QuorumMaj(Properties props) throws ConfigException {        for (Entry<Object, Object> entry : props.entrySet()) {            String key = entry.getKey().toString();            String value = entry.getValue().toString();            //读取集群配置文件中的server.开头的应用实例配置信息            if (key.startsWith("server.")) {                int dot = key.indexOf('.');                long sid = Long.parseLong(key.substring(dot + 1));                QuorumServer qs = new QuorumServer(sid, value);                allMembers.put(Long.valueOf(sid), qs);                if (qs.type == LearnerType.PARTICIPANT)//应用实例绑定的角色为PARTICIPANT意为参与选举                    votingMembers.put(Long.valueOf(sid), qs);                else {                    //观察者成员                    observingMembers.put(Long.valueOf(sid), qs);                }            } else if (key.equals("version")) {                version = Long.parseLong(value, 16);            }        }        //过半基数        half = votingMembers.size() / 2;    }
复制代码


  1. QuorumPeerMain.runFromConfig(config) 启动服务;

  2. QuorumPeer.startLeaderElection() 开启选举服务;


  • 设置当前选票 new Vote(sid,zxid,epoch)


【plain】synchronized public void startLeaderElection(){try {           if (getPeerState() == ServerState.LOOKING) {               //首轮:当前节点默认投票对象为自己               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());           }       } catch(IOException e) {           RuntimeException re = new RuntimeException(e.getMessage());           re.setStackTrace(e.getStackTrace());           throw re;       }//........}
复制代码


  • 创建选举管理类:QuorumCnxnManager;

  • 初始化 recvQueue<Message(sid,ByteBuffer)>接收投票队列(第二层传输队列);

  • 初始化 queueSendMap<sid,queue>按 sid 发送投票队列(第二层传输队列);

  • 初始化 senderWorkerMap<sid,SendWorker>发送投票工作线程容器,表示着与 sid 投票节点已连接;

  • 初始化选举监听线程类 QuorumCnxnManager.Listener。


【Java】//QuorumPeer.createCnxnManager()public QuorumCnxManager(QuorumPeer self,                        final long mySid,                        Map<Long,QuorumPeer.QuorumServer> view,                        QuorumAuthServer authServer,                        QuorumAuthLearner authLearner,                        int socketTimeout,                        boolean listenOnAllIPs,                        int quorumCnxnThreadsSize,                        boolean quorumSaslAuthEnabled) {    //接收投票队列(第二层传输队列)    this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);    //按sid发送投票队列(第二层传输队列)    this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();    //发送投票工作线程容器,表示着与sid投票节点已连接     this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();    this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();

String cnxToValue = System.getProperty("zookeeper.cnxTimeout"); if(cnxToValue != null){ this.cnxTO = Integer.parseInt(cnxToValue); }

this.self = self;

this.mySid = mySid; this.socketTimeout = socketTimeout; this.view = view; this.listenOnAllIPs = listenOnAllIPs;

initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize, quorumSaslAuthEnabled); // Starts listener thread that waits for connection requests //创建选举监听线程 接收选举投票请求 listener = new Listener(); listener.setName("QuorumPeerListener");}//QuorumPeer.createElectionAlgorithmprotected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: qcm = createCnxnManager();// new QuorumCnxManager(... new Listener()) QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start();//启动选举监听线程 FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; }return le;}
复制代码


  1. 开启选举监听线程 QuorumCnxnManager.Listener;


  • 创建 ServerSockket 等待大于自己 sid 节点连接,连接信息存储到 senderWorkerMap<sid,SendWorker>;

  • sid>self.sid 才可以连接过来。


【Java】//上面的listener.start()执行后,选择此方法public void run() {    int numRetries = 0;    InetSocketAddress addr;    Socket client = null;    while((!shutdown) && (numRetries < 3)){        try {            ss = new ServerSocket();            ss.setReuseAddress(true);            if (self.getQuorumListenOnAllIPs()) {                int port = self.getElectionAddress().getPort();                addr = new InetSocketAddress(port);            } else {                // Resolve hostname for this server in case the                // underlying ip address has changed.                self.recreateSocketAddresses(self.getId());                addr = self.getElectionAddress();            }            LOG.info("My election bind port: " + addr.toString());            setName(addr.toString());            ss.bind(addr);            while (!shutdown) {                client = ss.accept();                setSockOpts(client);                LOG.info("Received connection request "                        + client.getRemoteSocketAddress());                // Receive and handle the connection request                // asynchronously if the quorum sasl authentication is                // enabled. This is required because sasl server                // authentication process may take few seconds to finish,                // this may delay next peer connection requests.                if (quorumSaslAuthEnabled) {                    receiveConnectionAsync(client);                } else {//接收连接信息                    receiveConnection(client);                }                numRetries = 0;            }        } catch (IOException e) {            if (shutdown) {                break;            }            LOG.error("Exception while listening", e);            numRetries++;            try {                ss.close();                Thread.sleep(1000);            } catch (IOException ie) {                LOG.error("Error closing server socket", ie);            } catch (InterruptedException ie) {                LOG.error("Interrupted while sleeping. " +                    "Ignoring exception", ie);            }            closeSocket(client);        }    }    LOG.info("Leaving listener");    if (!shutdown) {        LOG.error("As I'm leaving the listener thread, "                + "I won't be able to participate in leader "                + "election any longer: "                + self.getElectionAddress());    } else if (ss != null) {        // Clean up for shutdown.        try {            ss.close();        } catch (IOException ie) {            // Don't log an error for shutdown.            LOG.debug("Error closing server socket", ie);        }    }}

//代码执行路径:receiveConnection()->handleConnection(...)private void handleConnection(Socket sock, DataInputStream din) throws IOException {//...省略 if (sid < self.getId()) { /* * This replica might still believe that the connection to sid is * up, so we have to shut down the workers before trying to open a * new connection. */ SendWorker sw = senderWorkerMap.get(sid); if (sw != null) { sw.finish(); }

/* * Now we start a new connection */ LOG.debug("Create new connection to server: {}", sid); closeSocket(sock);

if (electionAddr != null) { connectOne(sid, electionAddr); } else { connectOne(sid); }

} else { // Otherwise start worker threads to receive data. SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw);

SendWorker vsw = senderWorkerMap.get(sid);

if (vsw != null) { vsw.finish(); } //存储连接信息<sid,SendWorker> senderWorkerMap.put(sid, sw);

queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));

sw.start(); rw.start(); }}
复制代码


  1. 创建 FastLeaderElection 快速选举服务;


  • 初始选票发送队列 sendqueue(第一层队列)

  • 初始选票接收队列 recvqueue(第一层队列)

  • 创建线程 WorkerSender

  • 创建线程 WorkerReceiver


【Java】//FastLeaderElection.starterprivate void starter(QuorumPeer self, QuorumCnxManager manager) {    this.self = self;    proposedLeader = -1;    proposedZxid = -1;    //发送队列sendqueue(第一层队列)    sendqueue = new LinkedBlockingQueue<ToSend>();    //接收队列recvqueue(第一层队列)    recvqueue = new LinkedBlockingQueue<Notification>();    this.messenger = new Messenger(manager);}//new Messenger(manager)Messenger(QuorumCnxManager manager) {    //创建线程WorkerSender    this.ws = new WorkerSender(manager);

this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); this.wsThread.setDaemon(true); //创建线程WorkerReceiver this.wr = new WorkerReceiver(manager);

this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); this.wrThread.setDaemon(true);}
复制代码


  1. 开启 WorkerSender 和 WorkerReceiver 线程。


WorkerSender 线程自旋获取 sendqueue 第一层队列元素


  • sendqueue 队列元素内容为相关选票信息详见 ToSend 类;

  • 首先判断选票 sid 是否和自己 sid 值相同,相等直接放入到 recvQueue 队列中;

  • 不相同将 sendqueue 队列元素转储到 queueSendMap<sid,queue>第二层传输队列中。


【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{//...  public void run() {    while (!stop) {        try {            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);            if(m == null) continue;  //将投票信息发送出去            process(m);        } catch (InterruptedException e) {            break;        }    }    LOG.info("WorkerSender is down");  }}//QuorumCnxManager#toSendpublic void toSend(Long sid, ByteBuffer b) {    /*     * If sending message to myself, then simply enqueue it (loopback).     */    if (this.mySid == sid) {         b.position(0);         addToRecvQueue(new Message(b.duplicate(), sid));        /*         * Otherwise send to the corresponding thread to send.         */    } else {         /*          * Start a new connection if doesn't have one already.          */         ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(            SEND_CAPACITY);         ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);         //转储到queueSendMap<sid,queue>第二层传输队列中         if (oldq != null) {             addToSendQueue(oldq, b);         } else {             addToSendQueue(bq, b);         }         connectOne(sid);         }}
复制代码


WorkerReceiver 线程自旋获取 recvQueue 第二层传输队列元素转存到 recvqueue 第一层队列中。


【Java】//WorkerReceiverpublic void run() {    Message response;    while (!stop) {      // Sleeps on receive      try {          //自旋获取recvQueue第二层传输队列元素          response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);          if(response == null) continue;          // The current protocol and two previous generations all send at least 28 bytes          if (response.buffer.capacity() < 28) {              LOG.error("Got a short response: " + response.buffer.capacity());              continue;          }          //...  if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){         //第二层传输队列元素转存到recvqueue第一层队列中         recvqueue.offer(n);         //...      }    }//...}
复制代码

06 选举核心逻辑

  1. 启动线程 QuorumPeer


开始 Leader 选举投票 makeLEStrategy().lookForLeader();


sendNotifications()向其它节点发送选票信息,选票信息存储到 sendqueue 队列中。sendqueue 队列由 WorkerSender 线程处理。


【plain】//QuorunPeer.run//...try {   reconfigFlagClear();    if (shuttingDownLE) {       shuttingDownLE = false;       startLeaderElection();       }    //makeLEStrategy().lookForLeader() 发送投票    setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {    LOG.warn("Unexpected exception", e);    setPeerState(ServerState.LOOKING);}  //...//FastLeaderElection.lookLeaderpublic Vote lookForLeader() throws InterruptedException {//...  //向其他应用发送投票sendNotifications();//...}

private void sendNotifications() { //获取应用节点 for (long sid : self.getCurrentAndNextConfigVoters()) { QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch, qv.toString().getBytes()); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } //储存投票信息 sendqueue.offer(notmsg); }}

class WorkerSender extends ZooKeeperThread { //... public void run() { while (!stop) { try {//提取已储存的投票信息 ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue;

process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); }//...}
复制代码


自旋 recvqueue 队列元素获取投票过来的选票信息:


【Java】public Vote lookForLeader() throws InterruptedException {//.../* * Loop in which we exchange notifications until we find a leader */while ((self.getPeerState() == ServerState.LOOKING) &&        (!stop)){    /*     * Remove next notification from queue, times out after 2 times     * the termination time     */    //提取投递过来的选票信息    Notification n = recvqueue.poll(notTimeout,            TimeUnit.MILLISECONDS);/* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */if(n == null){    if(manager.haveDelivered()){        //已全部连接成功,并且前一轮投票都完成,需要再次发起投票        sendNotifications();    } else {        //如果未收到选票信息,manager.contentAll()自动连接其它socket节点        manager.connectAll();    }    /*     * Exponential backoff     */    int tmpTimeOut = notTimeout*2;    notTimeout = (tmpTimeOut < maxNotificationInterval?            tmpTimeOut : maxNotificationInterval);    LOG.info("Notification time out: " + notTimeout);         }     //....    }  //...}
复制代码


【Java】//manager.connectAll()->connectOne(sid)->initiateConnection(...)->startConnection(...)

private boolean startConnection(Socket sock, Long sid) throws IOException { DataOutputStream dout = null; DataInputStream din = null; try { // Use BufferedOutputStream to reduce the number of IP packets. This is // important for x-DC scenarios. BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream()); dout = new DataOutputStream(buf);

// Sending id and challenge // represents protocol version (in other words - message type) dout.writeLong(PROTOCOL_VERSION); dout.writeLong(self.getId()); String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort(); byte[] addr_bytes = addr.getBytes(); dout.writeInt(addr_bytes.length); dout.write(addr_bytes); dout.flush();

din = new DataInputStream( new BufferedInputStream(sock.getInputStream())); } catch (IOException e) { LOG.warn("Ignoring exception reading or writing challenge: ", e); closeSocket(sock); return false; }

// authenticate learner QuorumPeer.QuorumServer qps = self.getVotingView().get(sid); if (qps != null) { // TODO - investigate why reconfig makes qps null. authLearner.authenticate(sock, qps.hostname); }

// If lost the challenge, then drop the new connection //保证集群中所有节点之间只有一个通道连接 if (sid > self.getId()) { LOG.info("Have smaller server identifier, so dropping the " + "connection: (" + sid + ", " + self.getId() + ")"); closeSocket(sock); // Otherwise proceed with the connection } else { SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw);

SendWorker vsw = senderWorkerMap.get(sid);

if(vsw != null) vsw.finish();

senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>( SEND_CAPACITY));

sw.start(); rw.start();

return true;

} return false;}
复制代码


如上述代码中所示,sid>self.sid 才可以创建连接 Socket 和 SendWorker,RecvWorker 线程,存储到 senderWorkerMap<sid,SendWorker>中。对应第 2 步中的 sid<self.sid 逻辑,保证集群中所有节点之间只有一个通道连接。



图 4 节点之间连接方式


【Java】

public Vote lookForLeader() throws InterruptedException {//... if (n.electionEpoch > logicalclock.get()) { //当前选举周期小于选票周期,重置recvset选票池 //大于当前周期更新当前选票信息,再次发送投票 logicalclock.set(n.electionEpoch); recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {//相同选举周期 //接收的选票与当前选票PK成功后,替换当前选票 updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); }//...

}
复制代码


在上代码中,自旋从 recvqueue 队列中获取到选票信息。开始进行选举:


  • 判断当前选票和接收过来的选票周期是否一致

  • 大于当前周期更新当前选票信息,再次发送投票

  • 周期相等:当前选票信息和接收的选票信息进行 PK


【Java】//接收的选票与当前选票PKprotected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));        if(self.getQuorumVerifier().getWeight(newId) == 0){            return false;        }

/* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId))))); }
复制代码


在上述代码中的 totalOrderPredicate 方法逻辑如下:


  • 竞选周期大于当前周期为 true

  • 竞选周期相等,竞选 zxid 大于当前 zxid 为 true

  • 竞选周期相等,竞选 zxid 等于当前 zxid,竞选 sid 大于当前 sid 为 true

  • 经过上述条件判断为 true 将当前选票信息替换为竞选成功的选票,同时再次将新的选票投出去。


【Java】public Vote lookForLeader() throws InterruptedException {//...   //存储节点对应的选票信息    // key:选票来源sid  value:选票推举的Leader sid    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

//半数选举开始 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /*WorkerSender * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { //已选举出leader 更新当前节点是否为leader self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());

Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch); leaveInstance(endVote); return endVote; } }//...}/** * Termination predicate. Given a set of votes, determines if have * sufficient to declare the end of the election round. * * @param votes * Set of votes * @param vote * Identifier of the vote received last PK后的选票 */private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) { SyncedLearnerTracker voteSet = new SyncedLearnerTracker(); voteSet.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self .getQuorumVerifier().getVersion()) { voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } /* * First make the views consistent. Sometimes peers will have different * zxids for a server depending on timing. */ //votes 来源于recvset 存储各个节点推举出来的选票信息 for (Map.Entry<Long, Vote> entry : votes.entrySet()) {//选举出的sid和其它节点选择的sid相同存储到voteSet变量中。 if (vote.equals(entry.getValue())) {//保存推举出来的sid voteSet.addAck(entry.getKey()); } } //判断选举出来的选票数量是否过半 return voteSet.hasAllQuorums();}//QuorumMaj#containsQuorumpublic boolean containsQuorum(Set<Long> ackSet) { return (ackSet.size() > half); }
复制代码


在上述代码中:recvset 是存储每个 sid 推举的选票信息。


第一轮 sid1:vote(1,0,1) ,sid2:vote(2,0,1);


第二轮 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。


最终经过选举信息 vote(2,0,1)为推荐 leader,并用推荐 leader 在 recvset 选票池里比对持相同票数量为 2 个。因为总共有 3 个节点参与选举,sid1 和 sid2 都选举 sid2 为 leader,满足票数过半要求,故确认 sid2 为 leader。


  • setPeerState 更新当前节点角色;

  • proposedLeader 选举出来的 sid 和自己 sid 相等,设置为 Leader;

  • 上述条件不相等,设置为 Follower 或 Observing;

  • 更新 currentVote 当前选票为 Leader 的选票 vote(2,0,1)。

07 总结

通过对 Leader 选举源码的解析,可以了解到:


  1. 多个应用节点之间网络通信采用 BIO 方式进行相互投票,同时保证每个节点之间只使用一个通道,减少网络资源的消耗,足以见得在 BIO 分布式中间件开发中的技术重要性。

  2. 基于 BIO 的基础上,灵活运用多线程和内存消息队列完好实现多层队列架构,每层队列由不同的线程分工协作,提高快速选举性能目的。

  3. 为 BIO 在多线程技术上的实践带来了宝贵的经验。

发布于: 刚刚阅读数: 4
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
zookeeper的Leader选举源码解析_数据库_京东科技开发者_InfoQ写作社区