前文讲述了server端处理会话创建请求的全过程,相对于客户端发送请求处理而言,服务端主要是监听READ事件,根据不同的请求类型进行对应的处理(处理主要交由requestProcessor来完成)。
创建完会话之后,就要进行具体操作请求了。本文就重点来分析下服务端如何处理创建节点请求,也就是如何解析CreateRequest,如下图
如何返回响应CreateResponse(只有一个属性,path)
关于监听READ事件的相关代码在上一篇文章中,已经有过分析了,所以本文中不再赘述。
需要注意的是,这里分析是单机版本的Zookeeper服务器,后续集群模式下会专门来分析的。
1.Zookeeper server监听create请求具体见上一篇 server处理会话创建的博客,本文不再赘述。
最终我们进入NIOServerCnxn.readPayload()方法,用于处理不同类型的请求
public class NIOServerCnxn extends ServerCnxn {
private void readPayload() throws IOException, InterruptedException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
}
// remaining()==0,说明已经读取到len个字节,数据已经全部读取到
if (incomingBuffer.remaining() == 0) { // have we read length bytes?
packetReceived();
incomingBuffer.flip();
// 如果initialized初始化状态为false,说明是第一次请求,那么这个请求就是创建Session的请求
if (!initialized) {
// 上文已经分析过的处理创建会话请求
readConnectRequest();
} else {
// 这里是本文分析的重点,处理其他类型的请求都在这
readRequest();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
private void readRequest() throws IOException {
// 交由ZookeeperServer来处理
zkServer.processPacket(this, incomingBuffer);
}
}
2.ZooKeeperServer.processPacket()
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
// 解析请求头,请求头中的type代表了不同的请求类型
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
incomingBuffer = incomingBuffer.slice();
// 权限控制相关,非本文重点,忽略
if (h.getType() == OpCode.auth) {
...
} else {
// sasl相关,非重点
if (h.getType() == OpCode.sasl) {
...
}
else {
// 最终我们的其他类型请求都在这里进行处理
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
// 包装后的Request,交由submitRequest()处理
submitRequest(si);
}
}
cnxn.incrOutstandingRequests(h);
}
//
public void submitRequest(Request si) {
...
try {
// session过期时间处理,每一次新请求的到来都会延迟session的过期
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
// 还是交由requestProcessor处理
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
}
ZookeeperServer接收到客户端的请求之后,通过请求头RequestHeader的type来判断不同的请求类型,然后交由不同的方法来处理。
3.RequestProcess处理请求根据之前的分析,RequestProcessor的处理顺序为 PrepRequestProcessor --> SyncRequestProcessor --> FinalRequestProcessor
所以我们依旧按照这个属性逐个分析下
3.1 PrepRequestProcessor.pRequest()public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.hdr = null;
request.txn = null;
try {
switch (request.type) {
case OpCode.create:
CreateRequest createRequest = new CreateRequest();
// 创建节点请求处理
pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
break;
}
...
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}
// 具体处理在这里
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException
{
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type);
switch (type) {
case OpCode.create:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
CreateRequest createRequest = (CreateRequest)record;
if(deserialize)
// 将客户端的请求体反序列化到CreateRequest对象中
ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
// path检查
String path = createRequest.getPath();
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
LOG.info("Invalid path " + path + " with session 0x" +
Long.toHexString(request.sessionId));
throw new KeeperException.BadArgumentsException(path);
}
// ACL权限检查
List listACL = removeDuplicates(createRequest.getAcl());
if (!fixupACL(request.authInfo, listACL)) {
throw new KeeperException.InvalidACLException(path);
}
String parentPath = path.substring(0, lastSlash);
ChangeRecord parentRecord = getRecordForPath(parentPath);
checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
request.authInfo);
int parentCVersion = parentRecord.stat.getCversion();
// 根据创建节点类型,重置path信息
CreateMode createMode =
CreateMode.fromFlag(createRequest.getFlags());
if (createMode.isSequential()) {
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
}
validatePath(path, request.sessionId);
try {
if (getRecordForPath(path) != null) {
throw new KeeperException.NodeExistsException(path);
}
} catch (KeeperException.NoNodeException e) {
// ignore this one
}
// 检查父节点是否临时节点
boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
if (ephemeralParent) {
throw new KeeperException.NoChildrenForEphemeralsException(path);
}
int newCversion = parentRecord.stat.getCversion()+1;
// 补充request的txn对象信息,后续requestProcessor会用到
request.txn = new CreateTxn(path, createRequest.getData(),
listACL,
createMode.isEphemeral(), newCversion);
StatPersisted s = new StatPersisted();
if (createMode.isEphemeral()) {
s.setEphemeralOwner(request.sessionId);
}
// 修改父节点的stat信息
parentRecord = parentRecord.duplicate(request.hdr.getZxid());
parentRecord.childCount++;
parentRecord.stat.setCversion(newCversion);
addChangeRecord(parentRecord);
addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
0, listACL));
break;
}
...
}
在PrepRequestProcessor的处理中,主要是对节点创建信息的一系列校验,path是否合法,父节点是否临时节点等等,后续处理交由SyncRequestProcessor 执行
3.2 SyncRequestProcessor之前的博客中有关于事务日志分析和快照日志分析,有详细的介绍过SyncRequestProcessor 的相关方法,本质上是交由run()方法执行的,
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
public void run() {
try {
...
while (true) {
// 获取到本次请求,也就是创建节点请求
Request si = null;
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// 直接添加到ZKDatabase中
if (zks.getZKDatabase().append(si)) {
...
}
}
}
}
}
}
实际SyncRequestProcessor做的事情主要就是将节点信息添加到ZKDatabase中,后续的事务信息存储和快照存储,当请求达到一定阈值就会被触发。
3.3 FinalRequestProcessor.processRequest()public class FinalRequestProcessor implements RequestProcessor {
public void processRequest(Request request) {
...
ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) {
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.get(0).zxid 根据请求类型交由不同的方法处理 --> 检查创建节点请求各种合法性 --> 交由RequestProcessor处理 ,最终将节点信息保存到ZKDatabase中,并添加相关的事务日志信息(和快照日志信息)。
还是通过一个时序图来展示下整个过程
