上文分析了Zookeeper集群模式下启动的主要过程,其中有一个重要的点就是Leader节点的选举。
由于Zookeeper中所有的事务类型请求都交由Leader节点来执行,所以Leader的选举是非常重要的,本文就来看下Leader选举的时机和执行过程。
1.Zookeeper集群节点的角色在Zookeeper集群中,节点有以下三种角色
节点角色主要功能Leader1.处理事务性请求Follower1.处理非事务性请求,转发事务性请求到Leader 2.参与事务请求Proposal的投票 3.leader选举的投票Observer1.处理非事务性请求,转发事务性请求到leaderObserver与Follower的区别就是:Observer并不参与leader选举的投票,也不参与写操作的过半写成功策略。所以其可以在不影响写性能的情况下,提高集群读性能。
2.Leader节点选举的时机leader节点在什么时候会被选举呢?主要有以下两个时机:
* 服务器初始化启动(当集群刚启动时,这时集群中没有leader节点,需要选取出一个leader节点)
* 集群运行期间Leader节点异常(leader节点服务异常,其他节点无法和leader节点保持通信,那么就会重新选择一个leader)
按照我们之前分析过的节点的状态:LOOKING, FOLLOWING, LEADING, OBSERVING
当选举leader时,节点的状态都处于LOOKING状态,当Leader选举完成时,根据节点不同的角色,则分别处于LEADING、FOLLOWING、OBSERVING状态。
3.leader节点选举过程在集群启动的过程中,leader节点的选举可以分为以下几个步骤
1)每个节点发起投票,选取自己为leader节点,将该投票信息发送给其他节点[投票信息为(myid,ZXID)];
2)接收其他节点的投票,检查投票有效性,并处理投票。
处理投票过程很简单,就是将别人的投票和自己的投票进行PK:
* 先比较两张票的ZXID,ZXID较大的优先作为Leader;
* 如果ZXID一样大,那么myid较大的就作为leader
投票PK后,会将胜出的投票信息重新发送到其他节点。
3)统计投票
每次投票后,服务器统计投票信息,判断是否已经有过半的机器接收到相同的投票信息,如果已经有了,则将该投票对应的机器认定为Leader
4)服务器状态变更
经过3)步骤后,既然已经选出了leader,那么每个服务器就会根据结果来确定自己的角色,或Leader或Follower
示例:
在前一篇文章中,我们搭建了一个伪集群模式的Zookeeper集群,myid分别设置如下:
服务器信息myidzookeeper_31zookeeper_22zookeeper_13那么在集群各节点启动的时候,按照上面leader选举过程
1)第一次投票选取自己为leader,投票信息如下
服务器信息投票信息(myid,ZXID)zookeeper_3(1,0)zookeeper_2(2,0)zookeeper_1(3,0)2)接收处理投票
每台机器都接收到其他机器的投票,分别进行PK,将PK后的结果重新发送出去
服务器信息投票信息(myid,ZXID)处理后重新投票信息zookeeper_3(1,0)PK (2,0) (1,0)PK (3,0)(3,0)zookeeper_2(2,0)PK (3,0) (2,0)PK (1,0)(3,0)zookeeper_1(3,0)PK (2,0) (3,0)PK (1,0)(3,0)根据第一轮的投票结果,重新进入第二轮选举,最终发现(3,0)被2台以上机器接受,所以Zookeeper_1当选为leader。
3)变更服务器状态
服务器信息节点状态zookeeper_3Followerzookeeper_2Followerzookeeper_1Leader以上讲述了初始运行期间的leader选举过程,那么在集群运行过程中的leader重新选举是什么样的呢?
答:基本是一样的过程。在leader宕机后,整个集群暂停对外提供服务,进入新一轮的leader选举,首先各节点将自己的状态变更为LOOKING,然后按照上面的步骤重新发起投票,还是按照一样的方式进行选票PK,最终确定了新的leader后,大家各司其职,进入各自角色,集群恢复对外提供。
上面从文字角度来描述了整个leader选举的过程,下面我们从源码角度来分析下整个过程
4.Leader选举入口解析leader选举的入口,我们上一篇文章分析过,就是QuorumPeer.startLeaderElection()方法
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
// 当前节点选举用的ip:port
private InetSocketAddress myQuorumAddr;
public InetSocketAddress getQuorumAddress(){
return myQuorumAddr;
}
// leader选举方式
private int electionType;
Election electionAlg;
synchronized public void startLeaderElection() {
try {
// 创建当前选票
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
// 获取当前节点的选举用ip:port
myQuorumAddr = p.addr;
break;
}
}
if (myQuorumAddr == null) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// 在QuorumPeerConfig中,默认设置electionAlg = 3
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
// 在这里获取leader选举算法
this.electionAlg = createElectionAlgorithm(electionType);
}
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = createCnxnManager();
// 当前节点与其他Zookeeper节点通信管理器,具体见4
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
// 默认使用FastLeaderElection在进行leader选举,后续继续分析5
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
}
根据源码可知,目前3.4.13版本的Zookeeper,默认使用FastLeaderElection来作为leader选举算法实现。
而与其他节点进行通信的任务则交由QuorumCnxManager来实现,我们先简单来看下其实现。
5 QuorumCnxManager的通信机制 5.1 QuorumCnxManager的基本参数通过QuorumPeer对其构造方法的调用来入手
public QuorumCnxManager createCnxnManager() {
return new QuorumCnxManager(this.getId(),
this.getView(),
this.authServer,
this.authLearner,
this.tickTime * this.syncLimit,
this.getQuorumListenOnAllIPs(),
this.quorumCnxnThreadsSize,
this.isQuorumSaslAuthEnabled());
}
调用的构造方法最终设置属性如下
public class QuorumCnxManager {
// 每一个SendWorker对应一个发送线程,用于发送消息到对应的节点
final ConcurrentHashMap senderWorkerMap;
// 接收到的消息,用集合存放
public final ArrayBlockingQueue recvQueue;
// 每一个sid对应的消息发送队列
final ConcurrentHashMap queueSendMap;
// 每一个sid最后一个被发送的消息
final ConcurrentHashMap lastMessageSent;
// 连接超时时间
private int cnxTO = 5000;
// 配置的myid信息
final long mySid;
// 读写超时时间
final int socketTimeout;
// 每一个sid所对应的节点信息(ip port等)
final Map view;
// 监听器线程,用于创建ServerSocket服务,接收其他节点的连接
public final Listener listener;
public QuorumCnxManager(final long mySid,
Map view,
QuorumAuthServer authServer,
QuorumAuthLearner authLearner,
int socketTimeout,
boolean listenOnAllIPs,
int quorumCnxnThreadsSize,
boolean quorumSaslAuthEnabled,
ConcurrentHashMap senderWorkerMap) {
this.senderWorkerMap = senderWorkerMap;
this.recvQueue = new ArrayBlockingQueue(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap();
this.lastMessageSent = new ConcurrentHashMap();
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if(cnxToValue != null){
this.cnxTO = Integer.parseInt(cnxToValue);
}
this.mySid = mySid;
this.socketTimeout = socketTimeout;
this.view = view;
this.listenOnAllIPs = listenOnAllIPs;
initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
quorumSaslAuthEnabled);
// Starts listener thread that waits for connection requests
listener = new Listener();
}
}
5.2 QuorumCnxManager.Listener
public class Listener extends ZooKeeperThread {
volatile ServerSocket ss = null;
@Override
public void run() {
int numRetries = 0;
InetSocketAddress addr;
while((!shutdown) && (numRetries < 3)){
try {
// 创建对应端口的监听服务
ss = new ServerSocket();
ss.setReuseAddress(true);
if (listenOnAllIPs) {
int port = view.get(QuorumCnxManager.this.mySid)
.electionAddr.getPort();
addr = new InetSocketAddress(port);
} else {
addr = view.get(QuorumCnxManager.this.mySid)
.electionAddr;
}
LOG.info("My election bind port: " + addr.toString());
setName(view.get(QuorumCnxManager.this.mySid)
.electionAddr.toString());
ss.bind(addr);
while (!shutdown) {
Socket client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
// 获取连接
receiveConnection(client);
}
numRetries = 0;
}
} catch (IOException e) {
...
}
}
...
}
// 处理其他节点过来的连接
public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
// 交由handleConnection处理
handleConnection(sock, din);
} catch (IOException e) {
LOG.error("Exception handling connection, addr: {}, closing server connection",
sock.getRemoteSocketAddress());
closeSocket(sock);
}
}
private void handleConnection(Socket sock, DataInputStream din)
throws IOException {
Long sid = null;
try {
// Read server id
sid = din.readLong();
if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
sid = din.readLong();
// next comes the #bytes in the remainder of the message
// note that 0 bytes is fine (old servers)
int num_remaining_bytes = din.readInt();
if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
closeSocket(sock);
return;
}
byte[] b = new byte[num_remaining_bytes];
// remove the remainder of the message from din
int num_read = din.read(b);
if (num_read != num_remaining_bytes) {
LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
}
}
...
} catch (IOException e) {
closeSocket(sock);
LOG.warn("Exception reading or writing challenge: " + e.toString());
return;
}
// 默认由高sid向低sid的节点发送连接请求,所以如果接收到低sid的节点向当前较高sid的节点发送连接请求,则直接关闭该连接
if (sid < this.mySid) {
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
LOG.debug("Create new connection to server: " + sid);
closeSocket(sock);
connectOne(sid);
} else {
// 针对该连接创建对应的SendWorker和RecvWorker,用于后续的发送接收消息
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(SEND_CAPACITY));
sw.start();
rw.start();
return;
}
}
}
QuorumCnxManager.Listener主要用于创建一个对应端口的监听服务,监听其他节点的连接。
这里有一个注意点:连接只能由高sid的节点向低sid的节点创建,否则则关闭这个连接。
连接创建成功后,则为连接创建SendWorker和RecvWorker线程,用于发送和接收消息,有关于这两个线程的主要内容,读者可自行阅读。
6.FastLeaderElection选举算法当集群中各节点刚启动时,节点状态都为LOOKING,这时会进行Leader选举,选举过程如上所述,下面我们通过代码来展示下上述分析的过程。
6.1 发送投票,选举自己为Leader在QuorumPeer.run()方法中,会根据当前节点的状态执行不同的操作,我们来看下当状态为LOOKING时的操作
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
public void run() {
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
// readonly状态的节点,非本文分析重点,直接忽略
if (Boolean.getBoolean("readonlymode.enabled")) {
...
} else {
try {
setBCVote(null);
// makeLEStrategy().lookForLeader()进行leader选举,这是重点,见5.1.1
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
...
}
}
}
}
6.1.1 FastLeaderElection.lookForLeader() leader选举
public class FastLeaderElection implements Election {
public Vote lookForLeader() throws InterruptedException {
...
try {
HashMap recvset = new HashMap();
HashMap outofelection = new HashMap();
int notTimeout = finalizeWait;
synchronized(this){
// logicalclock自增一次
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
// 对其他节点发送投票信息,具体见5.1.2
sendNotifications();
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
// 从recvqueue接收队列中获取选票
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
// 这里说明,还没有与集群中的其他节点建立连接,则需要首先建立连接,具体见5.1.3
manager.connectAll();
}
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
...
}
}
}
}
6.1.2 FastLeaderElection.sendNotifications() 向其他节点发送投票信息
public class FastLeaderElection implements Election {
private void sendNotifications() {
// 对集群中的其他节点发送ToSend 投票信息
for (QuorumServer server : self.getVotingView().values()) {
long sid = server.id;
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch);
if(LOG.isDebugEnabled()){
...
}
sendqueue.offer(notmsg);
}
}
}
这里的发送投票信息并非同步发送,而是将投票信息添加到sendqueue中,后续通过WorkerSender进行发送。
6.1.3 QuorumCnxManager.connectAll() 与其他节点创建连接
public class QuorumCnxManager {
public void connectAll(){
long sid;
for(Enumeration en = queueSendMap.keys();
en.hasMoreElements();){
sid = en.nextElement();
connectOne(sid);
}
}
synchronized public void connectOne(long sid){
// 如果还未建立连接
// 建立好的连接会在senderWorkerMap中创建对应的key value
if (!connectedToPeer(sid)){
InetSocketAddress electionAddr;
if (view.containsKey(sid)) {
electionAddr = view.get(sid).electionAddr;
} else {
LOG.warn("Invalid server id: " + sid);
return;
}
try {
LOG.debug("Opening channel to server " + sid);
Socket sock = new Socket();
setSockOpts(sock);
// 使用最原始的方式创建连接
sock.connect(view.get(sid).electionAddr, cnxTO);
LOG.debug("Connected to server " + sid);
if (quorumSaslAuthEnabled) {
initiateConnectionAsync(sock, sid);
} else {
// 初始化连接
initiateConnection(sock, sid);
}
} ...
} else {
LOG.debug("There is a connection already for server " + sid);
}
}
// 初始化连接
private boolean startConnection(Socket sock, Long sid)
throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// 将当前节点的sid发送过去
dout = new DataOutputStream(sock.getOutputStream());
dout.writeLong(this.mySid);
dout.flush();
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
// authenticate learner
authLearner.authenticate(sock, view.get(sid).hostname);
// 还是之前的校验规则,比较两个sid,只能由较大的sid服务向较小的sid服务主动发起连接
if (sid > this.mySid) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + this.mySid + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {
// 连接完成后,针对每个连接都创建对应的SendWorker和RecvWorker线程
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
return false;
}
}
Q:关于首次发送投票信息还剩最后一个问题,SendWorker.run()方法中发送的信息都是从queueSendMap中获取的,这个map中的数据是什么时候被添加进去的呢?
A:这个可以留给读者自己思考(可以提示下,从FastLeaderElection.Messenger类来入手)
总结:在各节点首次发送投票时,都会将自己作为Leader,然后将这个投票发送给集群中的其他节点。
发送信息需要先创建连接,各个节点都会主动创建对其他所有节点的连接,但是只有高sid到低sid主动创建的连接才是有效的。
发送消息和接收消息分别由两个线程来执行(QuorumCnxManager中的SendWorker和RecvWorker线程)
6.2 接收投票信息,并进行处理6.2.1 接收其他节点的投票结果
接收投票结果的事情都由RecvWorker来处理,我们直接看下其run()方法
class RecvWorker extends ZooKeeperThread {
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
int length = din.readInt();
if (length PACKETMAXSIZE) {
throw new IOException(
"Received packet with invalid packet: "
+ length);
}
// 读取结果
byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
// 包装成Message,并添加到QuorumCnxManager.recvQueue()队列中
addToRecvQueue(new Message(message.duplicate(), sid));
}
} ...
}
}
recvQueue队列中的消息由谁来处理呢?实际是由FastLeaderElection.Messenger来处理的
6.2.2 Messenger处理接收到的投票信息
protected class Messenger {
class WorkerReceiver extends ZooKeeperThread {
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try{
// 获取到其他节点的投票信息
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
// 当前的投票集合不包含该响应的sid,则发送自己的投票信息,后续发送
if(!validVoter(response.sid)){
Vote current = self.getCurrentVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
sendqueue.offer(notmsg);
} else {
...
Notification n = new Notification();
// 获取推选者的状态
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (response.buffer.getInt()) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
// 获取推选者的基本信息,包装到Notification中
n.leader = response.buffer.getLong();
n.zxid = response.buffer.getLong();
n.electionEpoch = response.buffer.getLong();
n.state = ackstate;
n.sid = response.sid;
if(!backCompatibility){
n.peerEpoch = response.buffer.getLong();
} else {
if(LOG.isInfoEnabled()){
LOG.info("Backward compatibility mode, server id=" + n.sid);
}
n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
}
n.version = (response.buffer.remaining() >= 4) ?
response.buffer.getInt() : 0x0;
...
// 若当前节点为LOOKING状态(初始化启动时确实是LOOKING状态)
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
// 将获取到的其他节点的投票信息包装成Notification,并添加到recvqueue中,这个后续会用到
recvqueue.offer(n);
// 如果接收到的投票信息 选举周期小于当前节点的选举周期,则直接将当前节点的投票信息发送过去
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())){
Vote v = getVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch());
sendqueue.offer(notmsg);
}
} else {
...
}
}
} catch (InterruptedException e) {
System.out.println("Interrupted Exception while waiting for new message" +
e.toString());
}
}
LOG.info("WorkerReceiver is down");
}
}
}
上面有一个重点操作,就是 将获取到的其他节点的投票信息包装成Notification,并添加到recvqueue中(recvqueue.offer(n)操作),这个添加到recvqueue集合中,后续怎么用呢?
6.2.3 FastLeaderElection.lookForLeader()
我们继续回到这个方法,里面有关于Notification的处理方法。
public class FastLeaderElection implements Election {
public Vote lookForLeader() throws InterruptedException {
while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){
// 获取Notification信息
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
else if(validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
// 如果接收到的投票选举周期大于当前节点的逻辑时钟,则重置当前节点的逻辑时钟,并清空当前接收到的投票
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
// 当前节点的投票 PK 接收到的选票,将胜出者重新投票出去
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
// 接收到的投票选举周期小于当前节点的逻辑时钟,则直接忽略,说明本次投票无效
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
// 按照规则比对zxid、epoch等信息,胜出的则重新投票出去
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
// 将接收到的选票归档到recvset中
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 判断是否已经有过半相同投票,如果有,说明已经选出了leader
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// 如果选票有变更,有比之前更好的leader出现,
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
// 返回之前选择的leader,包装成vote,返回
if (n == null) {
// 最终通过判断当前id是否被选中的leader id,若是,则更新当前节点状态为LEADING,否则为FOLLOWING/OBSERVING
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
}
}
}
}
代码比较长,也比较关键。Leader选举的主要过程都体现在上述代码中。
处理投票的过程也就如在上面展示的方式一样,进行ZXID和sid的比较,具体代码如下:
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
总结:
关于Leader选举算是Zookeeper中的一个难点,笔者在学习代码的时候也是各种峰回路转,痛苦指数三颗星。
当然,写了之后只是印象加深了些,但是有时候回过头来看,还是有点懵逼,所以代码要常看,没事debug。
如果还是觉得累,那么就直接记住以下两句话就行了:
1.集群中哪个机器处理过的数据越新(ZXID最大的那个),越有可能成为Leader;
2.如果大家ZXID一样大,那么sid最大的那个就是leader;
参考:【分布式】Zookeeper的Leader选举 - leesf - 博客园