欢迎关注公众号:有文化的技术人
定期分享原创优质文章
前言
ZooKeeper 服务器会在本地处理只读请求(exists、getData 和 getChildren)。假如一个服务器接收到客户端的 getData 请求,服务器读取该状态信息,并将这些信息返回给客户端。因为服务器会在本地处理请求,所以 ZooKeeper 在处理以只读请求为主要负载时,性能会很高。我们还可以增加更多的服务器到 ZooKeeper 集群中,这样就可以处理更多的读请求,大幅提高整体处理能力。
那些会改变 ZooKeeper 状态的客户端请求(create、delete 和 setData)将会被转发给群首,集群在同一时刻只会存在一个群首,其他服务器追随群首被称为追随者(follower)。群首作为中心点处理所有对 ZooKeeper 系统变更的请求,它就像一个定序器,建立了所有对 ZooKeeper 状态的更新的顺序。Leader 接收到客户端的请求后,会将请求构建成一个提议(Proposal),同时会为该提议绑定一个 zxid(zxid 可以表示执行顺序),然后将该提议广播到集群上的所有服务器,Leader 等待 Follwer 反馈,当有过半数(>=N/2+1) 的 Follower 反馈信息后,Leader 将再次向集群内 Follower 广播 Commit 信息,Commit 为将之前的 Proposal 提交。
zxid:事务请求唯一标记,由 leader 服务器负责分配对事务请求进行定序,是 8 字节的 long 类型,由两部分组成:前 4 字节代表 epoch,后 4 字节代表 counter,即 zxid=epoch+counter。
epoch 可以认为是 Leader 编号,每一次重新选举出一个新 Leader 时,都会为该 Leader 分配一个 epoch,该值也是一个递增的,可以防止旧 Leader 活过来后继续广播之前旧提议造成状态不一致问题,只有当前 Leader 的提议才会被 Follower 处理。Leader 没有进行选举期间,epoch 是一致不会变化的。
counter:ZooKeeper 状态的每一次改变, counter 就会递增加 1.
zxid=epoch+counter,其中 epoch 不会改变,counter 每次递增 1,,这样 zxid 就具有递增性质, 如果 zxid1 小于 zxid2, 那么 zxid1 肯定先于 zxid2 发生。
这就是 ZAB 协议在处理数据一致性大致的原理流程,由于请求间可能存在着依赖关系,ZAB 协议保证 Leader 广播的变更序列被顺序的处理:一个状态被处理那么它所依赖的状态也已经提前被处理;ZAB 协议支持的崩溃恢复可以保证在 Leader 进程崩溃的时候可以重新选出 Leader 并且保证数据的完整性。
一、选举初始化
Leader 选举初始化入口:QuorumPeer.startLeaderElection(),代码如下:
public synchronized void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
this.electionAlg = createElectionAlgorithm(electionType);
}
复制代码
从上面可看出,当前节点在启动的时候,初始状态都是 LOOKING,都会先投自己一票。
然后我们在进入 createElectionAlgorithm:
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
复制代码
大致工作如下: 1、创建一个 QuorumCnxManager 实例; 2、启动 QuorumCnxManager.Listener 线程; 3、构建一种选举算法 FastLeaderElection,早期 Zookeeper 实现了四种选举算法,但是后面废弃了三种,最新版本只保留 FastLeaderElection 这一种选举算法;
Leader 选举期间集群中各节点之间互相进行投票,就会涉及到网络 IO 通信,QuorumCnxManager 就是用来管理维护选举期间网络 IO 通信的工具类。
Leader 选举涉及到两个核心类:QuorumCnxManager 和 FastLeaderElection,下面分别详细介绍
二、网络 IO
QuorumCnxManager 维护选举期间的网络 IO 的大致流程:
Listener
1、QuorumCnxManager 有一个内部类 Listener,其继承了 Thread,初始化阶段就会启动该线程,Listener 的 run 方法实现也非常简单:初始化一个 ServerSocket,然后在一个 while 循环中调用 accept 接收客户端(注意:这里的客户端指的是集群中其它服务器)连接;
public void run() {
while((!shutdown) && (numRetries < 3)){
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
addr=根据配置信息获取地址
setName(addr.toString());
//监听选举端口
ss.bind(addr);
while (!shutdown) {
try {
//接收客户端连接
client = ss.accept();
//设置连接参数
setSockOpts(client);
//开始处理
receiveConnection(client);
} catch (SocketTimeoutException e) {
}
}
} catch (IOException e) {
}
}
}
}
复制代码
2、当有客户端连接进来后,会将该客户端 Socket 封装成 RecvWorker 和 SendWorker,它们都是线程,分别负责和该 Socket 所代表的客户端进行读写;RecvWorker 和 SendWorker 是成对出现的,每对负责维护和集群中的一台服务器进行网络 IO 通信;
public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
handleConnection(sock, din);
} catch (IOException e) {
}
}
private void handleConnection(Socket sock, DataInputStream din) throws IOException {
...
//这里的思路是如果请求连接的节点的ServerId小于当前节点,则关闭连接,并由当前节点发起连接
//隐含的意思就是ZK集群中节点的连接都是由ServerId大的连ServerId小的
if (sid < self.getId()) {
//如果连接已经建立则关闭
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
closeSocket(sock);
//当前节点去连接对方节点
if (electionAddr != null) {
connectOne(sid, electionAddr);
} else {
connectOne(sid);
}
} else {
//如果接受该连接,则创建对应的读写worker
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
//如果已经创建则关闭旧的
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
//启动读写事件处理
sw.start();
rw.start();
}
}
复制代码
对于两个 worker 来说,它们本身的逻辑很简单,SendWorker 就是不断的把 queueSendMap 中存放的对应 serverId 的数据发出去。RecvWorker 就是把收到的数据加入 recvQueue 队列.
现在假设这个场景:集群中存在 A、B 两个节点:
当 A 节点连接 B 节点时,在 B 节点上会维护一对 RecvWorker 和 SendWorker 用于 B 节点和 A 节点进行通信;
同理,如果 B 节点连接 A 节点,在 A 节点上会维护一对 RecvWorker 和 SendWorker 用于 A 节点和 B 节点进行通信;
A 和 B 之间创建了两条通道,实际上 A 和 B 间通信只需要一条通道即可,为避免浪费资源,Zookeeper 采用如下原则:myid 小的一方作为服务端,否则连接无效会被关闭;
比如 A 的 myid 是 1,B 的 myid 是 2,如果 A 去连接 B,B 收到连接请求后 B 发现对端 myid 小于自己,判定该连接无效,会关闭该连接; 如果是 B 连接 A,A 收到连接请求后发现对端 myid 大于自己,则认为该连接有效,并会为该连接创建一对 RecvWorker 和 SendWorker 线程并启动
FastLeaderElection
1、FastLeaderElection 负责 Leader 选举核心规则算法实现,注意 FastLeaderElection 类中也包含了两个内部类 WorkerSender 和 WorkerReceiver,类似于 QuorumCnxManager 中的 SendWorker 和 RecvWorker,也是用于发送和接收线程;
public void start() {
this.messenger.start();
}
void start(){
//对应WorkerSender类
this.wsThread.start();
//对应WorkerReceiver类
this.wrThread.start();
}
复制代码
这里可以看到 FastLeaderElection 内部也是开启了两个线程负责读写,这里需要跟前面 Listener 的逻辑结合考虑。Listener 开启的线程一个负责读取数据放入队列,一个负责把队列中的数据发出去,但读取的数据给谁用呢?发送的数据是哪来的呢?FastLeaderElection 里的两线程就是跟它们交互的。
2、WorkSender 线程代码如下:
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
//处理发送消息
process(m);
} catch (InterruptedException e) {
break;
}
}
}
void process(ToSend m) {
//序列化消息
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch,
m.configData);
//发送数据
manager.toSend(m.sid, requestBuffer);
}
public void toSend(Long sid, ByteBuffer b) {
//如果数据时发送给自己的那么绕过IO直接加入到recv队列
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
} else {
//否则把数据加入到指定ServerId的待发送队列
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
if (oldq != null) {
addToSendQueue(oldq, b);
} else {
addToSendQueue(bq, b);
}
//连接指定ServerId,该方法内部如果连接已经建立则会返回,否则创建连接
connectOne(sid);
}
}
复制代码
从上面可看出,FastLeaderElection 中进行选举广播投票信息时,将投票信息写入到对端服务器大致流程如下:
将数据封装成 ToSend 格式放入到 sendqueue;
WorkerSender 线程会一直轮询提取 sendqueue 中的数据,当提取到 ToSend 数据后,会获取到集群中所有参与 Leader 选举节点(除 Observer 节点外的节点)的 sid,如果 sid 即为本机节点,则转成 Notification 直接放入到 recvqueue 中,因为本机不再需要走网络 IO;否则放入到 queueSendMap 中,key 是要发送给哪个服务器节点的 sid,ByteBuffer 即为 ToSend 的内容,queueSendMap 维护的着当前节点要发送的网络数据信息,由于发送到同一个 sid 服务器可能存在多条数据,所以 queueSendMap 的 value 是一个 queue 类型;
QuorumCnxManager 中的 SendWorkder 线程不停轮询 queueSendMap 中是否存在自己要发送的数据,每个 SendWorkder 线程都会绑定一个 sid 用于标记该 SendWorkder 线程和哪个对端服务器进行通信,因此,queueSendMap.get(sid)即可获取该线程要发送数据的 queue,然后通过 queue.poll()即可提取该线程要发送的数据内容;
然后通过调用 SendWorkder 内部维护的 socket 输出流即可将数据写入到对端服务器
3、WorkerReceiver 线程代码如下:
public void run() {
Message response;
while (!stop) {
try {
//这里本质上是从recvQueue里取出数据
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
//没有数据则继续等待
if(response == null) continue;
...
int rstate = response.buffer.getInt();
long rleader = response.buffer.getLong();
long rzxid = response.buffer.getLong();
long relectionEpoch = response.buffer.getLong();
long rpeerepoch;
QuorumVerifier rqv = null;
//如果不是一个有投票权的节点,例如Observer节点
if(!validVoter(response.sid)) {
//直接把自己的投票信息返回
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
} else {
//获取发消息的节点的状态
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (rstate) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
//赋值Notification
n.leader = rleader;
n.zxid = rzxid;
n.electionEpoch = relectionEpoch;
n.state = ackstate;
n.sid = response.sid;
n.peerEpoch = rpeerepoch;
n.version = version;
n.qv = rqv;
//如果当前节点正在寻找Leader
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
//把收到的消息加入队列
recvqueue.offer(n);
//如果对方节点也是LOOKING状态,且周期小于自己,则把自己投票信息发回去
if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get())){
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
} else {
//如果当前节点不是LOOKING状态,那么它已经知道谁是Leader了
Vote current = self.getCurrentVote();
//如果对方是LOOKING状态,那么就把自己认为的Leader信息返给对方
if(ackstate == QuorumPeer.ServerState.LOOKING){
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
}
}
}
复制代码
从上面代码可看出,FastLeaderElection 中进行选举广播投票信息时,从对端服务器读取投票信息的大致流程如下:
QuorumCnxManager 中的 RecvWorker 线程会一直从 Socket 的输入流中读取数据,当读取到对端发送过来的数据时,转成 Message 格式并放入到 recvQueue 中;
FastLeaderElection.WorkerReceiver 线程会轮询方式从 recvQueue 提取数据并转成 Notification 格式放入到 recvqueue 中;
FastLeaderElection 从 recvqueu 提取所有的投票信息进行比较 最终选出一个 Leader
三、leader 选举
之前的文章已介绍过 Zookeeper 集群启动的大致流程,QuorumPeer 线程中会有一个 Loop 循环,获取 serverState 状态后进入不同分支,当分支退出后继续下次循环,FastLeaderElection 选举策略调用就是发生在检测到 serverState 状态为 LOOKING 时进入到 LOOKING 分支中调用的。分支代码如下:
case LOOKING:
LOG.info("LOOKING");
ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
复制代码
从上面代码可以看出,Leader 选举策略入口方法为:FastLeaderElection.lookForLeader()方法。当 QuorumPeer.serverState 变成 LOOKING 时,该方法会被调用,表示执行新一轮 Leader 选举。下面来看下 lookForLeader 方法的大致实现逻辑:
1、更新自己期望投票信息,即自己期望选哪个服务器作为 Leader(用 sid 代替期望服务器节点)以及该服务器 zxid、epoch 等信息,第一次投票默认都是投自己当选 Leader,然后调用 sendNotifications 方法广播该投票到集群中所有可以参与投票服务器,代码如下:
synchronized (this) {
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info(
"New election. My id = {}, proposed zxid=0x{}",
self.getId(),
Long.toHexString(proposedZxid));
sendNotifications();
复制代码
logicalclock 维护 electionEpoch,即选举轮次,在进行投票结果赛选的时候需要保证大家在一个投票轮次 updateProposal()方法有三个参数:a.期望投票给哪个服务器(sid)、b.该服务器的 zxid、c.该服务器的 epoch,在后面会看到这三个参数是选举 Leader 时的核心指标:
getInitId()用于获取当前 myid
getInitLastLoggedZxid()提取 lastProcessedZxid 值,lastProcessedZxid 是最后一次 commit 的事务请求的 zxid
getPeerEpoch():获取 epoch 值,每个 leader 任期内都要有一个 epoch 代表该 Leader 轮次,同时把该 epoch 同步到集群送的所有其它节点,并会被保存到本地硬盘 dataLogDir 目录下 currentEpoch 文件中,这里的 getPeerEpoch()就是获取最近一次 Leader 的 epoch,如果是第一次部署启动则默认从 0 开始
发送给集群中所有可参与投票节点,注意也包括自身节点:
将 proposedLeader、proposedZxid、electionEpoch、peerEpoch、sid(要发送给哪个节点的 sid)等信息封装为一个 ToSend 对象,并放入到 LinkedBlockingQueue sendqueue 队列中,注意遍历集群中所有参与投票节点的 sid,为每个 sid 封装成一个 ToSend
WorkerSender 线程将会从 sendqueue 队列中获取要发送消息根据 sid 发送给集群中指定的节点
2、然后就开始等待其它服务器发送给自己的投票信息
3、将接收到投票的 state 进行判断确定执行哪个分支逻辑:
如果是 FOLLOWING 或 LEADING,则说明对端已选举出 Leader,这时只需要验证下这个 Leader 是否有效即可,有效则代表选举结束,否则继续接收投票信息
OBSERVING:忽略该投票信息,因为 Observer 不能参与投票
LOOKING:则表示对端也还处于 Leader 选举状态
4、LOOKING 状态
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
Long.toHexString(n.electionEpoch),
Long.toHexString(logicalclock.get()));
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
LOG.debug(
"Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
n.sid,
n.leader,
Long.toHexString(n.zxid),
Long.toHexString(n.electionEpoch));
// don't care about the version if it's in LOOKING state
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
if (voteSet.hasAllQuorums()) {
// Verify if there is any change in the proposed leader
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
复制代码
首先对之前提到的选举轮次 electionEpoch 进行判断,这里分为三种情况:
只有对方发过来的投票的 electionEpoch 和当前节点相等表示是同一轮投票,即投票有效,然后调用 totalOrderPredicate()对投票进行 PK,返回 true 代表对端胜出,则表示第一次投票是错误的(第一次都是投给自己),更新自己投票期望对端为 Leader,然后调用 sendNotifications()将自己最新的投票广播出去。返回 false 则代表自己胜出,第一次投票没有问题,就不用管
如果对端发过来的 electionEpoch 大于自己,则表明重置自己的 electionEpoch,然后清空之前获取到的所有投票 recvset,因为之前获取的投票轮次落后于当前则代表之前的投票已经无效了,然后调用 totalOrderPredicate()将当前期望的投票和对端投票进行 PK,用胜出者更新当前期望投票,然后调用 sendNotifications()将自己期望头破广播出去。注意:这里不管哪一方胜出,都需要广播出去,而不是步骤 a 中己方胜出不需要广播,这是因为由于 electionEpoch 落后导致之前发出的所有投票都是无效的,所以这里需要重新发送
如果对端发过来的 electionEpoch 小于自己,则表示对方投票无效,直接忽略不进行处理
四、选举 PK 机制
下面来看下totalOrderPredicate
方法
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug(
"id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}",
newId,
curId,
Long.toHexString(newZxid),
Long.toHexString(curZxid));
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch)
|| ((newEpoch == curEpoch)
&& ((newZxid > curZxid)
|| ((newZxid == curZxid)
&& (newId > curId)))));
}
复制代码
这个 PK 逻辑原理(胜出一方代表更有希望成为 Leader)如下: 1、首先比较 epoch,哪个 epoch 大哪个胜出,前面介绍过 epoch 代表了 Leader 的轮次,是一个递增的,epoch 越大就意味着数据越新,Leader 数据越新则可以减少后续数据同步的效率,当然应该优先选为 Leader; 2、然后才是比较 zxid,由于 zxid=epoch+counter,第一步已经把 epoch 比较过了,其实这步骤只是相当于比较 counter 大小,counter 越大则代表数据越新,优先选为 Leader。注:其实第 1 和第 2 可以合并到一起,直接比较 zxid 即可,因为 zxid=epoch+counter,第 1 比较显的有些多余
3、如果前两个指标都没法比较出来,只能通过 sid 来确定,zxid 相等说明两个服务器的数据是一致的,所以选哪个当 Leader 其实没有区别,这里就随机选择一个 sid 大的当 Leader
评论