您当前的位置: 首页 >  zookeeper

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Zookeeper源码解析-Leader/Follower节点的启动

恐龙弟旺仔 发布时间:2021-11-02 19:11:10 ,浏览量:0

前言:

前一篇文章介绍了Leader节点的选举过程,选举完成之后,集群中的各节点根据选举结果设置当前结果为LEADER或FOLLOWING。

设置完成之后,根据各自的节点状态进行启动服务。本文主要介绍下LEADER和Follower节点的启动过程。

1.leader节点启动

leader节点的启动入口依然是QuorumPeer.run()方法

public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
 
    @Override
    public void run() {
     
        while (running) {
            switch (getPeerState()) {
                case LOOKING:
                    ...

                // follower节点启动        
                case FOLLOWING:
                    try {
                        LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        follower.shutdown();
                        setFollower(null);
                        setPeerState(ServerState.LOOKING);
                    }
                    break; 
                // leader节点启动    
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        // 重点在这里,设置leader,并执行lead方法
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
            }          
                    
    }
}
1.1 QuorumPeer.makeLeader() 生成Leader
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
    // 生成LeaderZooKeeperServer,通过1.2 我们来研究下
    return new Leader(this, new LeaderZooKeeperServer(logFactory,this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}
1.2 LeaderZooKeeperServer

先来看下其继承结构,如下图所示

其本质上还是我们之前分析过的ZookeeperServer,只不过它有自己特定的方法

public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
    CommitProcessor commitProcessor;
 
    // 最重要的一个方法
    @Override
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
                finalProcessor, getLeader().toBeApplied);
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        proposalProcessor.initialize();
        firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }

}

有关于LeaderZooKeeperServer的最重要的方法就是setupRequestProcessors(),它自定义设置了RequestProcessor的处理链,这个不同于我们之前分析过的ZookeeperServer。

ZookeeperServer处理链为:PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor

LeaderZooKeeperServer处理链为:PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor

后续我们在分析LEADER处理请求时再详细分析这个处理链

2.启动leader,同步followers

在leader选举结束后,是不是leader节点就直接启动就结束了呢?

实际不是这样的,leader选举完成后,当前leader节点只是一个准leader节点,后续还需要与follower一系列的操作之后,才会真正的变成leader节点,同时对外提供服务。

具体需要哪些操作呢,我们直接看源码。

2.1 Leader.lead() 入口方法
public class Leader {
	void lead() throws IOException, InterruptedException {
        ...
        try {
            self.tick.set(0);
            // 加载snapshot数据
            zk.loadData();
            
            leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

            // LearnerCnxAcceptor用于接收follower连接,详见2.2
            cnxAcceptor = new LearnerCnxAcceptor();
            cnxAcceptor.start();
            
            readyToStart = true;
            // 重新设置当前leader的epoch,并根据该epoch设置新的zxid
            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
            zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
            
            synchronized(this){
                lastProposed = zk.getZxid();
            }
            
            newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                    null, null);

            // TODO,这里我们还不知道这个ack是什么含义,后续介绍
            waitForEpochAck(self.getId(), leaderStateSummary);
            self.setCurrentEpoch(epoch);
            try {
                waitForNewLeaderAck(self.getId(), zk.getZxid());
            } catch (InterruptedException e) {
                ...
            }
            
            // 执行到这里,说明所有的follower都已经回复了ack,说明整个集群是可运行态,当前leader也可以启动服务接收客户端请求了
            startZkServer();
            
            ...
            boolean tickSkip = true;
    
            while (true) {
                Thread.sleep(self.tickTime / 2);
                if (!tickSkip) {
                    self.tick.incrementAndGet();
                }
                HashSet syncedSet = new HashSet();

                // lock on the followers when we use it.
                syncedSet.add(self.getId());

                for (LearnerHandler f : getLearners()) {
                    // Synced set is used to check we have a supporting quorum, so only
                    // PARTICIPANT, not OBSERVER, learners should be used
                    if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
                        syncedSet.add(f.getSid());
                    }
                    // leader主动向follower发起ping请求
                    f.ping();
                }
				...
              tickSkip = !tickSkip;
            }
        } finally {
            zk.unregisterJMX(this);
        }
    }
}
2.2 LearnerCnxAcceptor(接收follower连接的线程)
class LearnerCnxAcceptor extends ZooKeeperThread{
    private volatile boolean stop = false;

    @Override
    public void run() {
        try {
            while (!stop) {
                try{
                    Socket s = ss.accept();
                    // 设置socket读取超时时间为tickTime * initLimit
                    s.setSoTimeout(self.tickTime * self.initLimit);
                    s.setTcpNoDelay(nodelay);

                    BufferedInputStream is = new BufferedInputStream(
                        s.getInputStream());
                    // 这里又创建新的线程LearnerHandler,详见2.3
                    LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    ...
                } catch (SaslException e){
                    LOG.error("Exception while connecting to quorum learner", e);
                }
            }
        } catch (Exception e) {
            LOG.warn("Exception while accepting follower", e);
        }
    }
...
}
2.3 LearnerHandler 真正处理leader-follower之间的请求
/**
 * There will be an instance of this class created by the Leader for each
 * learner. All communication with a learner is handled by this
 * class.
 */
public class LearnerHandler extends ZooKeeperThread {
    
}

通过LearnerHandler的注释可以很明确知道:Leader会为每一个连接上来的Follower都创建一个LearnerHandler线程,两者之间的所有交流都通过这个线程来处理。具体如何处理呢,暂且不表,继续回到主线。

到这里,我们Leader节点的启动可以先告一段落。

通过目前我们掌握的知识点,可以知道:

* Leader监听Follower的连接,为每一个连接都创建一个LearnerHandler线程来处理所有的请求响应;

* leader与follower之间有一系列的动作(需要等待ack,具体是什么动作我们还不知道),等所有的动作都结束后,最终才会真正启动leader服务,接收客户端请求。

那么leader与follower之间是如何交互的,交互了哪些东西呢?下面我们就需要从Follower节点的启动看起了。

3.Follower节点启动 3.1 创建Follower对象
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
	protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
        return new Follower(this, new FollowerZooKeeperServer(logFactory, 
                this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
    }
}
3.2 FollowerZooKeeperServer的创建

其继承结构与LeaderZookeeperServer类似,就是中间多了一层继承LearnerZooKeeperServer,这里不再重复展示,只展示下其最重要的方法

public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true,
                getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this,
                new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();
    }

可以看到,FollowerZooKeeperServer接收到客户端请求时的处理链为FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor

后续在分析到客户端请求时再具体分析。

4.启动Follower

在上述1中,QuorumPeer通过调用Follower.followLeader()方法来启动并同步Leader信息,具体如下:

public class Follower extends Learner{
	void followLeader() throws InterruptedException {
        ...
        try {
            // 获取被选举的Leader信息
            QuorumServer leaderServer = findLeader();            
            try {
                // 创建对Leader连接
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                // 1.注册当前节点信息到leader
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                // 2.同步leader数据
                syncWithLeader(newEpochZxid);                
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) {
                    // 处理leader请求
                    readPacket(qp);
                    processPacket(qp);
                }
            } catch (Exception e) {
                LOG.warn("Exception when following the leader", e);
                try {
                    sock.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
    
                // clear pending revalidations
                pendingRevalidations.clear();
            }
        } finally {
            zk.unregisterJMX((Learner)this);
        }
    }
}

总结:目前通过对Follower的粗略了解,我们知道:

* 在启动时,Follower会主动连接Leader

* 连接后,Follower会同步Leader数据

具体怎么做呢,下面我们结合Leader和Follower的代码一起来看下。

5.Leader/Follower启动期间交互

下面我们串着分析Leader/Follower的代码。

5.1 follower创建对leader长连接

5.1.1 follower创建连接

public class Learner {    
	protected void connectToLeader(InetSocketAddress addr, String hostname)
            throws IOException, ConnectException, InterruptedException {
        // 直接获取到Leader的address后,创建socket连接
        sock = new Socket();        
        sock.setSoTimeout(self.tickTime * self.initLimit);
        for (int tries = 0; tries < 5; tries++) {
            try {
                // 连接超时时间为tickTime * syncLimit
                sock.connect(addr, self.tickTime * self.syncLimit);
                sock.setTcpNoDelay(nodelay);
                break;
            } catch (IOException e) {
                ...
            }
            Thread.sleep(1000);
        }
		...
    } 
}

5.1.2 Leader监听连接

class LearnerCnxAcceptor extends ZooKeeperThread{
    private volatile boolean stop = false;
	...
    @Override
    public void run() {
        try {
            while (!stop) {
                try{
                    Socket s = ss.accept();
                    s.setSoTimeout(self.tickTime * self.initLimit);
                    s.setTcpNoDelay(nodelay);

                    BufferedInputStream is = new BufferedInputStream(
                        s.getInputStream());
                    // 监听到follower的连接后,为每一个连接创建一个LearnerHandler来处理两者之间的请求
                    LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    ...
            }
        } catch (Exception e) {
            LOG.warn("Exception while accepting follower", e);
        }
    }
	...
}
5.2 follower向leader注册当前节点信息

5.2.1 follower发送注册信息

public class Learner {      
	protected long registerWithLeader(int pktType) throws IOException{
    	long lastLoggedZxid = self.getLastLoggedZxid();
        QuorumPacket qp = new QuorumPacket();                
        qp.setType(pktType);
        qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
        
        // 发送当前节点信息
        LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
        ByteArrayOutputStream bsid = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
        boa.writeRecord(li, "LearnerInfo");
        qp.setData(bsid.toByteArray());
        // 发送请求到leader,并从leader获取响应结果
        writePacket(qp, true);
        // leader接收到注册信息后,将leader的基本信息发送回来
        readPacket(qp);        
        final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
		if (qp.getType() == Leader.LEADERINFO) {
        	leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
        	byte epochBytes[] = new byte[4];
        	final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
            // 获取leader epoch信息
        	if (newEpoch > self.getAcceptedEpoch()) {
        		wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
        		self.setAcceptedEpoch(newEpoch);
        	} else if (newEpoch == self.getAcceptedEpoch()) {
                wrappedEpochBytes.putInt(-1);
        	} else {
        		throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
        	}
            // follower发送ack信息,一次完整的请求注册就结束了
        	QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
        	writePacket(ackNewEpoch, true);
            return ZxidUtils.makeZxid(newEpoch, 0);
        } else {
        	...
        }
    } 
}

5.2.2 Leader接收注册信息

public class LearnerHandler extends ZooKeeperThread {
	public void run() {
        try {

            // 获取follower请求注册信息
            ia = BinaryInputArchive.getArchive(bufferedInput);
            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
            oa = BinaryOutputArchive.getArchive(bufferedOutput);

            QuorumPacket qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            ...
            byte learnerInfoData[] = qp.getData();
            // 获取learner基本信息
            if (learnerInfoData != null) {
            	if (learnerInfoData.length == 8) {
            		ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
            		this.sid = bbsid.getLong();
            	} else {
            		LearnerInfo li = new LearnerInfo();
            		ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
            		this.sid = li.getServerid();
            		this.version = li.getProtocolVersion();
            	}
            } else {
            	this.sid = leader.followerCounter.getAndDecrement();
            }

           ...
            
            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
            
            long peerLastZxid;
            StateSummary ss = null;
            long zxid = qp.getZxid();
            long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            // 正常情况下,follower发送过来的session就是0x10000
            if (this.getVersion() < 0x10000) {
                ...
            } else {
                byte ver[] = new byte[4];
                ByteBuffer.wrap(ver).putInt(0x10000);
                // leader将epoch信息发送到follower
                QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
                oa.writeRecord(newEpochPacket, "packet");
                bufferedOutput.flush();
                // leader接收follower的ack响应
                QuorumPacket ackEpochPacket = new QuorumPacket();
                ia.readRecord(ackEpochPacket, "packet");
                if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                    LOG.error(ackEpochPacket.toString()
                            + " is not ACKEPOCH");
                    return;
				}
                ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
                // 等待followers返回ack响应
                leader.waitForEpochAck(this.getSid(), ss);
            }
            
            ...
        }
    }
}

总结:笔者刚开始直接看learner这块代码也是很迷糊,为啥会有这么多东西,不知所云,但是一旦结合着follower的代码一起看的时候就很明了了。一个请求一个响应,每个follower都完成注册的时候,后续的工作就可以展开了。

5.3 follower从leader同步数据信息

数据同步就是将leader服务器上那些没有在learner服务器上提交过的事务请求同步给learner服务器,以保证learner和leader数据的相同。

而同步又可以分为以下四类:

* 直接差异化同步(DIFF同步)

* 先回滚再差异化同步(TRUNC+DIFF同步)

* 仅回滚同步(TRUNC同步)

* 全量同步(SNAP同步)

5.3.1 leader发送同步信息

public class LearnerHandler extends ZooKeeperThread {
	public void run() {
 
        ...
        ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
            ReadLock rl = lock.readLock();
            try {
                rl.lock();  
                // 如何确定以上四种状态,就是通过maxCommittedLog  minCommittedLog 和 peerLastZxid之间的关系比对进行的
                final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
                final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
                LOG.info("Synchronizing with Follower sid: " + sid
                        +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
                        +" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
                        +" peerLastZxid=0x"+Long.toHexString(peerLastZxid));

                LinkedList proposals = leader.zk.getZKDatabase().getCommittedLog();

                // follower已同步过
                if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
                    LOG.info("leader and follower are in sync, zxid=0x{}",
                            Long.toHexString(peerLastZxid));
                    packetToSend = Leader.DIFF;
                    zxidToSend = peerLastZxid;
                } else if (proposals.size() != 0) {
                    LOG.debug("proposal size is {}", proposals.size());
                    // 直接进行差异化同步
                    if ((maxCommittedLog >= peerLastZxid)
                            && (minCommittedLog             
关注
打赏
1655041699
查看更多评论
0.1052s