前文中,分析了Zookeeper server端处理节点创建请求的过程。在日常的使用中,客户端除了创建节点,还会对节点信息进行修改、删除、查询等操作。
本文就来分析下这三个常用的请求类型。
建议先看下上一篇博客,基本的处理路线都差不多,所以我们直接进入到关键方法即可
1.处理节点修改请求我们还是从NIOServerCnxn.readPayload()方法出发,最终调用到RequestProcessor的处理,还是一样的调用顺序PrepRequestProcessor --> SyncRequestProcessor --> FinalRequestProcessor
1.1 PrepRequestProcessor.pRequest()public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.hdr = null;
request.txn = null;
try {
switch (request.type) {
case OpCode.setData:
// 修改节点请求
SetDataRequest setDataRequest = new SetDataRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
break;
}
...
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException
{
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type);
switch (type) {
case OpCode.setData:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
// 请求信息反序列化为SetDataRequest对象
SetDataRequest setDataRequest = (SetDataRequest)record;
if(deserialize)
ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
// path以及ACL检查
path = setDataRequest.getPath();
validatePath(path, request.sessionId);
nodeRecord = getRecordForPath(path);
checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,
request.authInfo);
version = setDataRequest.getVersion();
int currentVersion = nodeRecord.stat.getVersion();
if (version != -1 && version != currentVersion) {
throw new KeeperException.BadVersionException(path);
}
// 无论当前节点的值是否被修改,version都会加1
version = currentVersion + 1;
request.txn = new SetDataTxn(path, setDataRequest.getData(), version);
nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
nodeRecord.stat.setVersion(version);
addChangeRecord(nodeRecord);
break;
}
...
}
}
1.2 SyncRequestProcessor
具体可以参考上一篇博客,请求的处理方式都是一样的。
SyncRequestProcessor做的事情主要就是将节点信息添加到ZKDatabase中,后续的事务信息存储和快照存储,当请求达到一定阈值就会被触发。
1.3 FinalRequestProcessor根据之前分析的过程可知,最终交由ZKDatabase.processTxn()方法来处理
public class ZKDatabase {
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
return dataTree.processTxn(hdr, txn);
}
}
public class DataTree {
public ProcessTxnResult processTxn(TxnHeader header, Record txn){
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) {
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
// 修改stat信息
rc.stat = setData(setDataTxn.getPath(), setDataTxn
.getData(), setDataTxn.getVersion(), header
.getZxid(), header.getTime());
break;
...
}
}
}
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte lastdata[] = null;
synchronized (n) {
// 重新设置该节点的data stat信息
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
...
return s;
}
}
总结:修改节点信息直接修改了DataTree中对应DataNode的value信息、stat信息(Mtime、Mzxid、version值)
2.处理节点删除请求server端的处理同1,我们在这里主要说下不同点
2.1 PrepRequestProcessor.pRequest()public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.hdr = null;
request.txn = null;
try {
switch (request.type) {
case OpCode.delete:
// 创建删除节点请求
DeleteRequest deleteRequest = new DeleteRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
break;
}
...
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException
{
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type);
switch (type) {
case OpCode.delete:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
// 将请求信息反序列为删除节点请求对象
DeleteRequest deleteRequest = (DeleteRequest)record;
if(deserialize)
ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
// 检查path合法性
path = deleteRequest.getPath();
lastSlash = path.lastIndexOf('/');
if (lastSlash == -1 || path.indexOf('\0') != -1
|| zks.getZKDatabase().isSpecialPath(path)) {
throw new KeeperException.BadArgumentsException(path);
}
parentPath = path.substring(0, lastSlash);
parentRecord = getRecordForPath(parentPath);
ChangeRecord nodeRecord = getRecordForPath(path);
// 检查ACL权限及version合法性
checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE,
request.authInfo);
int version = deleteRequest.getVersion();
if (version != -1 && nodeRecord.stat.getVersion() != version) {
throw new KeeperException.BadVersionException(path);
}
if (nodeRecord.childCount > 0) {
throw new KeeperException.NotEmptyException(path);
}
// 设置请求request信息,删除节点的话,只需要一个path即可
request.txn = new DeleteTxn(path);
parentRecord = parentRecord.duplicate(request.hdr.getZxid());
parentRecord.childCount--;
addChangeRecord(parentRecord);
addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path,
null, -1, null));
break;
...
}
}
针对删除节点请求,PrepRequestProcessor主要检查请求的合法性,包括path、version、ACL等有效性
2.2 ZKDatabase.processTxn()public class ZKDatabase {
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
return dataTree.processTxn(hdr, txn);
}
}
public class DataTree {
public ProcessTxnResult processTxn(TxnHeader header, Record txn){
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) {
case OpCode.delete:
DeleteTxn deleteTxn = (DeleteTxn) txn;
rc.path = deleteTxn.getPath();
// 直接调用deleteNode()方法
deleteNode(deleteTxn.getPath(), header.getZxid());
break;
...
}
}
}
public void deleteNode(String path, long zxid)
throws KeeperException.NoNodeException {
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
DataNode node = nodes.get(path);
if (node == null) {
throw new KeeperException.NoNodeException();
}
// 直接在内存中删除该节点
nodes.remove(path);
synchronized (node) {
aclCache.removeUsage(node.acl);
}
DataNode parent = nodes.get(parentName);
if (parent == null) {
throw new KeeperException.NoNodeException();
}
// 直接删除父节点下该子节点的相关信息
synchronized (parent) {
parent.removeChild(childName);
parent.stat.setPzxid(zxid);
long eowner = node.stat.getEphemeralOwner();
if (eowner != 0) {
HashSet nodes = ephemerals.get(eowner);
if (nodes != null) {
synchronized (nodes) {
nodes.remove(path);
}
}
}
node.parent = null;
}
...
Set processed = dataWatches.triggerWatch(path,
EventType.NodeDeleted);
childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
EventType.NodeChildrenChanged);
}
}
总结:删除节点也是同样的流程,直接删除内存中的该节点,修改父节点中该子节点的相关信息即可。
3.处理节点查询请求server端的处理同1,我们在这里主要说下不同点
3.1 PrepRequestProcessor.pRequest()public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.hdr = null;
request.txn = null;
try {
switch (request.type) {
//All the rest don't need to create a Txn - just verify session
// 以下这些类型的情趣不需要事务处理,所以不需要像上述一样创建事务请求
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;
}
...
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}
}
3.2 FinalRequestProcessor处理
public class FinalRequestProcessor implements RequestProcessor {
public void processRequest(Request request) {
// 由于不是事务处理,所以request.hdr为空,不进行事务处理
...
switch (request.type) {
case OpCode.getData: {
lastOp = "GETD";
// 反序列化请求到GetDataRequest
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
// 直接从内存中获取对应DataNode的节点信息
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
// ACL权限检查
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
// 获取节点内容信息
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
// 将节点内容信息和stat信息封装到GetDataResponse返回
rsp = new GetDataResponse(b, stat);
break;
}
}
}
}
总结:与1、2相比,节点查询请求最大的不同就是没有事务处理。最后通过FinalRequestProcessor处理获取节点值信息和stat信息,返回到客户端