客户端通过实例化Zookeeper对象来创建于服务端的连接,创建连接成功后,服务端会创建一个Session(会话),Session拥有唯一的SessionId,后续客户端就使用这个sessionId来进行标记。
本文主要介绍Session的创建以及服务端SessionTrack如何对会话进行管理。
1.Session创建客户端发起一次连接成功后,服务端会创建一个Session对象,具体如下
public static class SessionImpl implements Session {
SessionImpl(long sessionId, int timeout, long expireTime) {
this.sessionId = sessionId;
this.timeout = timeout;
this.tickTime = expireTime;
isClosing = false;
}
// 唯一的sessionId
final long sessionId;
// 协商出的超时时间
final int timeout;
// 检测时间
long tickTime;
boolean isClosing;
Object owner;
public long getSessionId() { return sessionId; }
public int getTimeout() { return timeout; }
public boolean isClosing() { return isClosing; }
}
Session创建完成后,会将具体的sessionId返回给客户端。后续客户端就通过这个sessionId来标识当前会话。
1.1 sessionId的创建sessionId是一个long类型的唯一ID,具体创建过程如下
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
public static long initializeNextSession(long id) {
long nextSid = 0;
nextSid = (Time.currentElapsedTime() >> 8;
nextSid = nextSid | (id SessionImpl的map
HashMap sessionsById = new HashMap();
// time -> SessionImpl集合
HashMap sessionSets = new HashMap();
// sessionId -> time
ConcurrentHashMap sessionsWithTimeout;
long nextSessionId = 0;
// 下一个超时时间点
long nextExpirationTime;
// 超时检查频次
int expirationInterval;
public SessionTrackerImpl(SessionExpirer expirer,
ConcurrentHashMap sessionsWithTimeout, int tickTime,
long sid, ZooKeeperServerListener listener)
{
super("SessionTracker", listener);
this.expirer = expirer;
this.expirationInterval = tickTime;
this.sessionsWithTimeout = sessionsWithTimeout;
// 根据当前时间和expirationInterval来获取下一次超时检查时间点
nextExpirationTime = roundToInterval(Time.currentElapsedTime());
this.nextSessionId = initializeNextSession(sid);
for (Entry e : sessionsWithTimeout.entrySet()) {
addSession(e.getKey(), e.getValue());
}
}
}
SessionTrackImpl的构造是从Zookeeper server在启动时构建的。这里的expirationInterval就是我们在zoo.cfg中配置的tickTime,默认为2000。
其通过三个map来对Session进行管理,我们一起来看下其中细节。
2.2 SessionTrack.addSession() 添加sessionpublic class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
synchronized public void addSession(long id, int sessionTimeout) {
// 将sessionId和其超时时间添加到sessionsWithTimeout中
sessionsWithTimeout.put(id, sessionTimeout);
if (sessionsById.get(id) == null) {
// 构建session体,将sessionId和SessionImpl对应关系保存到sessionsById中
// 注意这里设置的SessionImpl.tickTime=0,后续touchSession()方法会用到这个tickTime
SessionImpl s = new SessionImpl(id, sessionTimeout, 0);
sessionsById.put(id, s);
...
} else {
...
}
// 执行touchSession()方法
touchSession(id, sessionTimeout);
}
synchronized public boolean touchSession(long sessionId, int timeout) {
...
SessionImpl s = sessionsById.get(sessionId);
if (s == null || s.isClosing()) {
return false;
}
// 根据session的timeout,获取其到期时间,这个到期时间的计算比较有趣,是tickTime的整数倍
long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);
// 如果当前session的过期检测时间点tickTime在expireTime之后,则直接返回
// 首次添加,tickTime=0,故当前条件不成立
if (s.tickTime >= expireTime) {
// Nothing needs to be done
return true;
}
// 既然执行到这里,说明tickTime< expireTime
SessionSet set = sessionSets.get(s.tickTime);
if (set != null) {
set.sessions.remove(s);
}
// 首次添加的SessionImpl,设置其下一次过期检查时间(tickTime)为expireTime
s.tickTime = expireTime;
set = sessionSets.get(s.tickTime);
if (set == null) {
set = new SessionSet();
// 并将expireTime和SessionImpl集合的关联关系保存到sessionSets中
sessionSets.put(expireTime, set);
}
set.sessions.add(s);
return true;
}
}
总结:当SessionImpl在服务端被创建后,直接根据当前时间和其timeout计算出当前Session的下次过期时间(tickTime)。后续相同过期时间的Session都保存到sessionSets中
2.3 Session tickTime的变更每一个Session都设置有过期时间,上述2.2中,我们分析了当Session被首次添加到SessionTrack时,会将计算好的expireTime设置到SessionImpl中(具体就是其tickTime)属性。
那么当服务端接收到当前请求(无论是正常的业务请求还是心跳请求),都会重新设置其tickTime。我们来看下代码
ZookeeperServer.processPacket()就是处理客户端请求的代码(后续我们会详细分析这个过程),最终在ZookeeperServer.submitRequest()方法中
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public void submitRequest(Request si) {
...
try {
// 这个方法重新设置Session tickTime
touch(si.cnxn);
...
}
}
void touch(ServerCnxn cnxn) throws MissingSessionException {
if (cnxn == null) {
return;
}
long id = cnxn.getSessionId();
int to = cnxn.getSessionTimeout();
// 最终调用SessionTrack.touchSession()方法
if (!sessionTracker.touchSession(id, to)) {
throw new MissingSessionException(
"No session with sessionid 0x" + Long.toHexString(id)
+ " exists, probably expired and removed");
}
}
}
这里又回到了2.2,touchSession()方法会重新计算出一个新的超时时间expireTime,重新赋值到SessionImpl.tickTime属性,并添加到sessionSets中。
所以,当服务端每次接收到客户端的请求后,都会将当前Session.tickTime超时时间重新计算。
2.4 SessionTrack对Session的管理根据SessionTrackImpl的继承关系可以得知,其本身是一个Thread,那么其重要逻辑都在run()方法中
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
synchronized public void run() {
try {
while (running) {
currentTime = Time.currentElapsedTime();
// nextExpirationTime在SessionTrackImpl被构造时候就已经初始化了,主要根据server端设置的tickTime计算出来的,是tickTime的整数倍
// nextExpirationTime > currentTime说明还没有到过期检测时间点,直接跳过
if (nextExpirationTime > currentTime) {
this.wait(nextExpirationTime - currentTime);
continue;
}
// 执行到当前,说明nextExpirationTime对于的Session已经过期了,所以直接执行expire方法
SessionSet set;
set = sessionSets.remove(nextExpirationTime);
if (set != null) {
for (SessionImpl s : set.sessions) {
// 修改Session状态,并向客户端发送expire请求
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
// 重新设置下次过期检测时间点nextExpirationTime
nextExpirationTime += expirationInterval;
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
}
SessionTrackImpl的检测代码比较简单,粗暴的将nextExpirationTime时间点对应的SessionImpl set集合中的所有会话全部设置为过期即可。
一直以来,我们没有对这个过期时间检测时间点的设置有过分析,为什么它一定是expirationInterval(也就是zoo.cfg中tickTime参数)的整数倍呢?我们来看下roundToInterval()方法
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
private long roundToInterval(long time) {
return (time / expirationInterval + 1) * expirationInterval;
}
}
time代表传入的具体时间点,time / expirationInterval后,直接就是expirationInterval的整数倍了。
总结:通过对SessionTrack的分析,我们了解了Zookeeper服务端的会话管理器的基本操作。
那么有一个问题,为什么服务端要进行会话管理呢?不管理的话有没有问题呢?这个问题就留给读者啦。