在介绍完client端的基本操作之后,后续我们就要对zookeeper server端的相关操作进行解析了。
实际重点还是在server端,数据存储同步恢复、事务请求执行、master选举等都是在server端进行的。
server有单机版和集群版,大致流程都差不多,为了便于理解,我们先从单机版进行分析。
1.zkServer.cmd的分析windows环境下,启动server端,就是通过bin/zkServer.cmd start命令执行的(Linux环境下,是bin/zkServer.sh start命令)。我们来看下这个脚本主要执行了哪些启动类
setlocal
call "%~dp0zkEnv.cmd"
# 启动类为QuorumPeerMain
set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
echo on
call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*
endlocal
通过启动脚本,可以看到QuorumPeerMain为其启动类,而启动参数ZOOCFG是从zkEnv.cmd中获取的,如下所示,即conf/zoo.cfg文件
set ZOOCFGDIR=%~dp0%..\conf
set ZOO_LOG_DIR=%~dp0%..
set ZOO_LOG4J_PROP=INFO,CONSOLE
REM for sanity sake assume Java 1.6
REM see: http://java.sun.com/javase/6/docs/technotes/tools/windows/java.html
REM add the zoocfg dir to classpath
set CLASSPATH=%ZOOCFGDIR%
REM make it work in the release
SET CLASSPATH=%~dp0..\*;%~dp0..\lib\*;%CLASSPATH%
REM make it work for developers
SET CLASSPATH=%~dp0..\build\classes;%~dp0..\build\lib\*;%CLASSPATH%
set ZOOCFG=%ZOOCFGDIR%\zoo.cfg
总结下来,zkServer启动时主要是启动QuorumPeerMain类,参数为conf/zoo.cfg
2.源码debug启动测试通过脚本我们都知道该如何启动了,那么我们能不能直接通过源码来启动呢,方便我们进行debug。本质上是可以的。
2.1 修改zoo.cfg在conf/目录下,只有zoo_sample.cfg文件,我们需要复制一下该文件,并重命名为zoo.cfg。完成之后,如下所示
2.2 添加log4j.properties文件
将conf/log4j.properties文件复制到src/java/main目录下。完成之后,如下所示
2.3 设置启动参数
需要在QuorumPeerMain类的启动参数中添加conf/zoo.cfg。完成之后,如下所示:
到这里,就可以对QuorumPeerMain进行debug启动。
3.QuorumPeerMain启动流程 3.1 QuorumPeerMain.main()此时args[] 为conf/zoo.cfg
public class QuorumPeerMain {
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
// 直接调用initializeAndRun
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
...
System.exit(0);
}
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 直接将zoo.cfg配置文件交由QuorumPeerConfig解析,具体见3.2
config.parse(args[0]);
}
// 提供一个定时任务,来清理data数据和log数据,这个后续我们专门来分析
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// 单机模式的启动,是我们本次分析的重点,具体见3.3,上面的是集群模式,后续我们专门分析
ZooKeeperServerMain.main(args);
}
}
}
3.2 QuorumPeerConfig.parse() 解析zoo.cfg配置文件
public class QuorumPeerConfig {
public void parse(String path) throws ConfigException {
File configFile = new File(path);
try {
...
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
// 加载配置文件到Properties中
cfg.load(in);
} finally {
in.close();
}
// 这个方法就是将Properties中对应属性回写到QuorumPeerConfig中,方法过长,笔者不再展示
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}
}
}
3.3 单机模式zookeeper启动 ZooKeeperServerMain.main()
public class ZooKeeperServerMain {
public static void main(String[] args) {
ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
// 一样的套路
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
...
}
LOG.info("Exiting normally");
System.exit(0);
}
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
try {
// JDK提供的对应用程序提供管理监控功能的框架,非本文重点,先忽略
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
ServerConfig config = new ServerConfig();
// 解析zoo.cfg,同样是将文件解析到QuorumPeerConfig中,参考上面3.2
if (args.length == 1) {
config.parse(args[0]);
} else {
config.parse(args);
}
// 启动server
runFromConfig(config);
}
public void runFromConfig(ServerConfig config) throws IOException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
final ZooKeeperServer zkServer = new ZooKeeperServer();
// 通过ZooKeeperServerShutdownHandler进行shutdownLatch锁释放
final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(
new ZooKeeperServerShutdownHandler(shutdownLatch));
// 日志文件和数据文件的处理类,后续单独文章来讲解
txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
config.dataDir));
txnLog.setServerStats(zkServer.serverStats());
zkServer.setTxnLogFactory(txnLog);
// 设置基本参数
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
// 在这里创建端口服务类,用于监听端口请求,具体方法见4
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
cnxnFactory.startup(zkServer);
shutdownLatch.await();
shutdown();
cnxnFactory.join();
if (zkServer.canShutdown()) {
zkServer.shutdown(true);
}
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
} finally {
if (txnLog != null) {
txnLog.close();
}
}
}
}
4 zookeeper端口服务暴露监听
zookeeper中,默认对外开放2181端口,那么作为server端,肯定要对这个端口进行监听,接收client请求并处理,返回响应。
具体服务的创建都是通过ServerCnxnFactory来完成的。
4.1 ServerCnxnFactory.createFactory() 创建具体工厂类public abstract class ServerCnxnFactory {
public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
static public ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName =
System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
// 默认创建的是NIOServerCnxnFactory,其使用JDK原生的NIO来完成服务暴露监听
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.getDeclaredConstructor().newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName);
ioe.initCause(e);
throw ioe;
}
}
}
默认我们的服务是由NIOServerCnxnFactory来创建的。通过其源码可以看出,它是对JDK NIO的原生使用。
在性能方面,肯定是不如Netty的,所以在实际使用中,我们可以在系统中设置如下参数
-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
主动将原生NIO服务切换为Netty服务以提高性能。
4.2 NIOServerCnxnFactory.configure() 加载配置、注册监听事件public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
configureSaslLogin();
thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
thread.setDaemon(true);
maxClientCnxns = maxcc;
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
}
}
代码很简单,就是标准的NioServer启动端口,创建连接监听。
4.3 NIOServerCnxnFactory.startup() 启动zookeeper服务,加载相关数据public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
public void startup(ZooKeeperServer zks) throws IOException,
InterruptedException {
// 这里本质上是启动NIOServerCnxnFactory.run()方法用于监听读写事件
start();
setZooKeeperServer(zks);
// 加载数据到内存中,这里我们先知道有这个操作,后续分析事务日志时再一起说明下
zks.startdata();
// 最终启动,调用ZookeeperServer.startup(),具体见4.4
zks.startup();
}
}
startup启动zookeeper服务时,需要启动监听客户端读写事件服务;加载数据到内存中
4.4 ZookeeperServer.startup() 启动服务public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
// sessionTracker是会话管理器,负责会话的创建、管理清理等操作,这个后续我们专门一篇文章来说明
startSessionTracker();
// 业务处理器,后续介绍服务端处理请求时会详细说明
setupRequestProcessors();
registerJMX();
setState(State.RUNNING);
notifyAll();
}
}
总结:
一张图来了解下单机版Zookeeper服务启动全貌
到这,我们简单的分析了Zookeeper服务器单机版的启动过程。总体来说有以下内容:
1.启动端口服务监听,监听客户端的连接、读写等事件
2.启动会话管理器SessionTracker,用于管理、清理过期会话
3.加载数据到内存中
4.提供日志文件和数据文件的处理类
主要就是这些,本文对其骨架进行分析,后续上面这些点都会通过专门的文章来详细分析。