前两篇文章主要介绍了集群模式下Zookeeper服务端的启动流程,以及Leader选举的过程。在leader选举完成后,集群中的各节点分别有了对应的角色:Leader、Follower、Observer。那么按照对应的模式,会分别启动不同的服务,也就是前文提到的几个服务类,如下所示:
本文就主要先介绍下其基础类ZookeeperServer的知识点,后续再分别介绍其子类。
1.ZookeeperServer的属性及构造方法/**
* This class implements a simple standalone ZooKeeperServer. It sets up the
* following chain of RequestProcessors to process requests:
* PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
*/
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
// 心跳时间
public static final int DEFAULT_TICK_TIME = 3000;
protected int tickTime = DEFAULT_TICK_TIME;
// session超时时间设置
protected int minSessionTimeout = -1;
protected int maxSessionTimeout = -1;
// session管理器,之前有专门分析过
protected SessionTracker sessionTracker;
//事务日志、快照日志处理器
private FileTxnSnapLog txnLogFactory = null;
// 内存数据库
private ZKDatabase zkDb;
private final AtomicLong hzxid = new AtomicLong(0);
public final static Exception ok = new Exception("No prob");
// 构建Processor处理器
protected RequestProcessor firstProcessor;
// server状态
protected volatile State state = State.INITIAL;
protected enum State {
INITIAL, RUNNING, SHUTDOWN, ERROR;
}
// 服务端口连接处理器
private ServerCnxnFactory serverCnxnFactory;
// 服务端状态信息
private final ServerStats serverStats;
// shutdown钩子函数处理
private ZooKeeperServerShutdownHandler zkShutdownHandler;
public ZooKeeperServer() {
serverStats = new ServerStats(this);
listener = new ZooKeeperServerListenerImpl(this);
}
// 默认构造以下参数信息
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
int minSessionTimeout, int maxSessionTimeout,
DataTreeBuilder treeBuilder, ZKDatabase zkDb) {
serverStats = new ServerStats(this);
this.txnLogFactory = txnLogFactory;
this.txnLogFactory.setServerStats(this.serverStats);
this.zkDb = zkDb;
this.tickTime = tickTime;
this.minSessionTimeout = minSessionTimeout;
this.maxSessionTimeout = maxSessionTimeout;
listener = new ZooKeeperServerListenerImpl(this);
LOG.info("Created server with tickTime " + tickTime
+ " minSessionTimeout " + getMinSessionTimeout()
+ " maxSessionTimeout " + getMaxSessionTimeout()
+ " datadir " + txnLogFactory.getDataDir()
+ " snapdir " + txnLogFactory.getSnapDir());
}
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
DataTreeBuilder treeBuilder) throws IOException {
this(txnLogFactory, tickTime, -1, -1, treeBuilder,
new ZKDatabase(txnLogFactory));
}
}
注释中,我们能发现很多信息,作为一个ZookeeperServer在处理请求时的流程链为:PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor。
2.ZookeeperServer主要方法分析 2.1 loadData() 启动加载节点信息public void loadData() throws IOException, InterruptedException {
if(zkDb.isInitialized()){
setZxid(zkDb.getDataTreeLastProcessedZxid());
}
else {
// 之前有分析过加载数据的这块逻辑,主要在zkDb.loadDataBase()中实现
setZxid(zkDb.loadDataBase());
}
// 有关于过期的会话,则会直接删除
LinkedList deadSessions = new LinkedList();
for (Long session : zkDb.getSessions()) {
if (zkDb.getSessionWithTimeOuts().get(session) == null) {
deadSessions.add(session);
}
}
zkDb.setDataTreeInit(true);
for (long session : deadSessions) {
// XXX: Is lastProcessedZxid really the best thing to use?
killSession(session, zkDb.getDataTreeLastProcessedZxid());
}
}
这块逻辑之前有分析过,通过ZKDatabase.loadDataBase()方法来加载快照日志信息和事务日志信息,最终返回最新的那条事务操作的zxid信息。
2.2 firstProcessor的构造有关于RequestProcessor的构造实际上是使用了一个装饰器的模式,从firstProcess开始执行,会不断的调用nextProcessor,一直执行到最后一个为止。
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
PrepRequestProcessor和SyncRequestProcessor都有一个nextProcessor的属性,整个调用链如上所示:PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor。
2.3 处理客户端连接请求在还未对客户端创建连接时,则首先会处理客户端的连接请求。在之前server处理会话创建请求的文章中我们有详细分析过,这里大致过一下即可
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request from client "
+ cnxn.getRemoteSocketAddress()
+ " client's lastZxid is 0x"
+ Long.toHexString(connReq.getLastZxidSeen()));
}
...
// 如果请求的lastZXID 大于 server端的最新的ZXID,说明客户端请求异常
if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
+ Long.toHexString(connReq.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
LOG.info(msg);
throw new CloseRequestException(msg);
}
// 与服务端协商session超时时间,需要介于minSessionTimeout 和 maxSessionTimeout之间
int sessionTimeout = connReq.getTimeOut();
byte passwd[] = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
// We don't want to receive any packets until we are sure that the
// session is setup
cnxn.disableRecv();
long sessionId = connReq.getSessionId();
// 如果客户端是首次连接,那么sessionId未分配过,则默认为0,如果不是0,说明之前已经分配过
// 但由于某种原因,又断开重连了,所以服务端针对这种连接会重新打开对应的session
if (sessionId != 0) {
long clientSessionId = connReq.getSessionId();
LOG.info("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
+ " at " + cnxn.getRemoteSocketAddress());
serverCnxnFactory.closeSession(sessionId);
cnxn.setSessionId(sessionId);
// 重新打开对应的session
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
} else {
LOG.info("Client attempting to establish new session at "
+ cnxn.getRemoteSocketAddress());
// 首次连接,需要创建Session
createSession(cnxn, passwd, sessionTimeout);
}
}
2.4 处理客户端其他请求
有关于处理客户端请求,之前的博客中也有过说明,这里再简单介绍下
private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
int xid, ByteBuffer bb, List authInfo) {
// 拼装Request对象
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
submitRequest(si);
}
public void submitRequest(Request si) {
if (firstProcessor == null) {
// 若还未初始化,则针对已经到达的请求,先休息1秒,再判断processor是否已创建完成,未完成直接报错
synchronized (this) {
try {
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
// session处理
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
// 直接交由PrepRequestProcessor进行处理
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
请求的处理主要是交由RequestProcessor来处理。
总结:本文没有什么比较特殊的点,其中的知识点基本在之前的博客中都有涉猎,主要是对ZookeeperServer做一个总结,方便后续对其子类进行展开介绍。