前一篇文章介绍了Leader节点的选举过程,选举完成之后,集群中的各节点根据选举结果设置当前结果为LEADER或FOLLOWING。
设置完成之后,根据各自的节点状态进行启动服务。本文主要介绍下LEADER和Follower节点的启动过程。
1.leader节点启动leader节点的启动入口依然是QuorumPeer.run()方法
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
@Override
public void run() {
while (running) {
switch (getPeerState()) {
case LOOKING:
...
// follower节点启动
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
// leader节点启动
case LEADING:
LOG.info("LEADING");
try {
// 重点在这里,设置leader,并执行lead方法
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
}
1.1 QuorumPeer.makeLeader() 生成Leader
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
// 生成LeaderZooKeeperServer,通过1.2 我们来研究下
return new Leader(this, new LeaderZooKeeperServer(logFactory,this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}
1.2 LeaderZooKeeperServer
先来看下其继承结构,如下图所示
其本质上还是我们之前分析过的ZookeeperServer,只不过它有自己特定的方法
public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
CommitProcessor commitProcessor;
// 最重要的一个方法
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
finalProcessor, getLeader().toBeApplied);
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false,
getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
}
有关于LeaderZooKeeperServer的最重要的方法就是setupRequestProcessors(),它自定义设置了RequestProcessor的处理链,这个不同于我们之前分析过的ZookeeperServer。
ZookeeperServer处理链为:PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
LeaderZooKeeperServer处理链为:PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor
后续我们在分析LEADER处理请求时再详细分析这个处理链
2.启动leader,同步followers在leader选举结束后,是不是leader节点就直接启动就结束了呢?
实际不是这样的,leader选举完成后,当前leader节点只是一个准leader节点,后续还需要与follower一系列的操作之后,才会真正的变成leader节点,同时对外提供服务。
具体需要哪些操作呢,我们直接看源码。
2.1 Leader.lead() 入口方法public class Leader {
void lead() throws IOException, InterruptedException {
...
try {
self.tick.set(0);
// 加载snapshot数据
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// LearnerCnxAcceptor用于接收follower连接,详见2.2
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
readyToStart = true;
// 重新设置当前leader的epoch,并根据该epoch设置新的zxid
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
synchronized(this){
lastProposed = zk.getZxid();
}
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);
// TODO,这里我们还不知道这个ack是什么含义,后续介绍
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
try {
waitForNewLeaderAck(self.getId(), zk.getZxid());
} catch (InterruptedException e) {
...
}
// 执行到这里,说明所有的follower都已经回复了ack,说明整个集群是可运行态,当前leader也可以启动服务接收客户端请求了
startZkServer();
...
boolean tickSkip = true;
while (true) {
Thread.sleep(self.tickTime / 2);
if (!tickSkip) {
self.tick.incrementAndGet();
}
HashSet syncedSet = new HashSet();
// lock on the followers when we use it.
syncedSet.add(self.getId());
for (LearnerHandler f : getLearners()) {
// Synced set is used to check we have a supporting quorum, so only
// PARTICIPANT, not OBSERVER, learners should be used
if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
syncedSet.add(f.getSid());
}
// leader主动向follower发起ping请求
f.ping();
}
...
tickSkip = !tickSkip;
}
} finally {
zk.unregisterJMX(this);
}
}
}
2.2 LearnerCnxAcceptor(接收follower连接的线程)
class LearnerCnxAcceptor extends ZooKeeperThread{
private volatile boolean stop = false;
@Override
public void run() {
try {
while (!stop) {
try{
Socket s = ss.accept();
// 设置socket读取超时时间为tickTime * initLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
// 这里又创建新的线程LearnerHandler,详见2.3
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
} catch (SocketException e) {
...
} catch (SaslException e){
LOG.error("Exception while connecting to quorum learner", e);
}
}
} catch (Exception e) {
LOG.warn("Exception while accepting follower", e);
}
}
...
}
2.3 LearnerHandler 真正处理leader-follower之间的请求
/**
* There will be an instance of this class created by the Leader for each
* learner. All communication with a learner is handled by this
* class.
*/
public class LearnerHandler extends ZooKeeperThread {
}
通过LearnerHandler的注释可以很明确知道:Leader会为每一个连接上来的Follower都创建一个LearnerHandler线程,两者之间的所有交流都通过这个线程来处理。具体如何处理呢,暂且不表,继续回到主线。
到这里,我们Leader节点的启动可以先告一段落。
通过目前我们掌握的知识点,可以知道:
* Leader监听Follower的连接,为每一个连接都创建一个LearnerHandler线程来处理所有的请求响应;
* leader与follower之间有一系列的动作(需要等待ack,具体是什么动作我们还不知道),等所有的动作都结束后,最终才会真正启动leader服务,接收客户端请求。
那么leader与follower之间是如何交互的,交互了哪些东西呢?下面我们就需要从Follower节点的启动看起了。
3.Follower节点启动 3.1 创建Follower对象public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
return new Follower(this, new FollowerZooKeeperServer(logFactory,
this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}
}
3.2 FollowerZooKeeperServer的创建
其继承结构与LeaderZookeeperServer类似,就是中间多了一层继承LearnerZooKeeperServer,这里不再重复展示,只展示下其最重要的方法
public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true,
getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
可以看到,FollowerZooKeeperServer接收到客户端请求时的处理链为FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor
后续在分析到客户端请求时再具体分析。
4.启动Follower在上述1中,QuorumPeer通过调用Follower.followLeader()方法来启动并同步Leader信息,具体如下:
public class Follower extends Learner{
void followLeader() throws InterruptedException {
...
try {
// 获取被选举的Leader信息
QuorumServer leaderServer = findLeader();
try {
// 创建对Leader连接
connectToLeader(leaderServer.addr, leaderServer.hostname);
// 1.注册当前节点信息到leader
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
// 2.同步leader数据
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
// 处理leader请求
readPacket(qp);
processPacket(qp);
}
} catch (Exception e) {
LOG.warn("Exception when following the leader", e);
try {
sock.close();
} catch (IOException e1) {
e1.printStackTrace();
}
// clear pending revalidations
pendingRevalidations.clear();
}
} finally {
zk.unregisterJMX((Learner)this);
}
}
}
总结:目前通过对Follower的粗略了解,我们知道:
* 在启动时,Follower会主动连接Leader
* 连接后,Follower会同步Leader数据
具体怎么做呢,下面我们结合Leader和Follower的代码一起来看下。
5.Leader/Follower启动期间交互下面我们串着分析Leader/Follower的代码。
5.1 follower创建对leader长连接5.1.1 follower创建连接
public class Learner {
protected void connectToLeader(InetSocketAddress addr, String hostname)
throws IOException, ConnectException, InterruptedException {
// 直接获取到Leader的address后,创建socket连接
sock = new Socket();
sock.setSoTimeout(self.tickTime * self.initLimit);
for (int tries = 0; tries < 5; tries++) {
try {
// 连接超时时间为tickTime * syncLimit
sock.connect(addr, self.tickTime * self.syncLimit);
sock.setTcpNoDelay(nodelay);
break;
} catch (IOException e) {
...
}
Thread.sleep(1000);
}
...
}
}
5.1.2 Leader监听连接
class LearnerCnxAcceptor extends ZooKeeperThread{
private volatile boolean stop = false;
...
@Override
public void run() {
try {
while (!stop) {
try{
Socket s = ss.accept();
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
// 监听到follower的连接后,为每一个连接创建一个LearnerHandler来处理两者之间的请求
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
} catch (SocketException e) {
...
}
} catch (Exception e) {
LOG.warn("Exception while accepting follower", e);
}
}
...
}
5.2 follower向leader注册当前节点信息
5.2.1 follower发送注册信息
public class Learner {
protected long registerWithLeader(int pktType) throws IOException{
long lastLoggedZxid = self.getLastLoggedZxid();
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
// 发送当前节点信息
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
// 发送请求到leader,并从leader获取响应结果
writePacket(qp, true);
// leader接收到注册信息后,将leader的基本信息发送回来
readPacket(qp);
final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
if (qp.getType() == Leader.LEADERINFO) {
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
byte epochBytes[] = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
// 获取leader epoch信息
if (newEpoch > self.getAcceptedEpoch()) {
wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
self.setAcceptedEpoch(newEpoch);
} else if (newEpoch == self.getAcceptedEpoch()) {
wrappedEpochBytes.putInt(-1);
} else {
throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
}
// follower发送ack信息,一次完整的请求注册就结束了
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
writePacket(ackNewEpoch, true);
return ZxidUtils.makeZxid(newEpoch, 0);
} else {
...
}
}
}
5.2.2 Leader接收注册信息
public class LearnerHandler extends ZooKeeperThread {
public void run() {
try {
// 获取follower请求注册信息
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
...
byte learnerInfoData[] = qp.getData();
// 获取learner基本信息
if (learnerInfoData != null) {
if (learnerInfoData.length == 8) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
this.sid = bbsid.getLong();
} else {
LearnerInfo li = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
this.sid = li.getServerid();
this.version = li.getProtocolVersion();
}
} else {
this.sid = leader.followerCounter.getAndDecrement();
}
...
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
// 正常情况下,follower发送过来的session就是0x10000
if (this.getVersion() < 0x10000) {
...
} else {
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
// leader将epoch信息发送到follower
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
// leader接收follower的ack响应
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString()
+ " is not ACKEPOCH");
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
// 等待followers返回ack响应
leader.waitForEpochAck(this.getSid(), ss);
}
...
}
}
}
总结:笔者刚开始直接看learner这块代码也是很迷糊,为啥会有这么多东西,不知所云,但是一旦结合着follower的代码一起看的时候就很明了了。一个请求一个响应,每个follower都完成注册的时候,后续的工作就可以展开了。
5.3 follower从leader同步数据信息数据同步就是将leader服务器上那些没有在learner服务器上提交过的事务请求同步给learner服务器,以保证learner和leader数据的相同。
而同步又可以分为以下四类:
* 直接差异化同步(DIFF同步)
* 先回滚再差异化同步(TRUNC+DIFF同步)
* 仅回滚同步(TRUNC同步)
* 全量同步(SNAP同步)
5.3.1 leader发送同步信息
public class LearnerHandler extends ZooKeeperThread {
public void run() {
...
ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
ReadLock rl = lock.readLock();
try {
rl.lock();
// 如何确定以上四种状态,就是通过maxCommittedLog minCommittedLog 和 peerLastZxid之间的关系比对进行的
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
LOG.info("Synchronizing with Follower sid: " + sid
+" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
+" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
+" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
LinkedList proposals = leader.zk.getZKDatabase().getCommittedLog();
// follower已同步过
if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
LOG.info("leader and follower are in sync, zxid=0x{}",
Long.toHexString(peerLastZxid));
packetToSend = Leader.DIFF;
zxidToSend = peerLastZxid;
} else if (proposals.size() != 0) {
LOG.debug("proposal size is {}", proposals.size());
// 直接进行差异化同步
if ((maxCommittedLog >= peerLastZxid)
&& (minCommittedLog
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?