在Leader/Follower节点完成启动后,Leader就可以对外提供服务了。
关于客户端请求主要分为两种:事务请求和非事务请求。非事务请求相对比较简单些,我们先从这个看起,下一篇文章再来分析下如何处理事务请求。
建议读者可以先看下之前分析过的单机版Zookeeper server处理客户端请求的文章,因为集群版的server处理请求时有很多处理过程是相同的。
1.处理请求链路分析Leader在接收到客户端请求后,最终还是将请求交由RequestProcessor来处理,所以我们要先了解下Leader节点的处理链路。
public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
CommitProcessor commitProcessor;
// 最重要的一个方法
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
finalProcessor, getLeader().toBeApplied);
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false,
getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
}
则可以知道,LeaderZooKeeperServer处理链为:PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor
这些RequestProcessor中有些比较熟悉,比如PrepRequestProcessor 、FinalRequestProcessor,这是ZookeeperServer中已经有的processor,所以后续分析时会简单介绍下即可,主要精力放在新的processor上
2.PrepRequestProcessor.pRequest()public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
protected void pRequest(Request request) throws RequestProcessorException {
request.hdr = null;
request.txn = null;
try {
switch (request.type) {
//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
// 针对非事务请求,只校验session信息
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;
default:
LOG.warn("unknown type " + request.type);
break;
}
}
}
}
针对非事务请求,只校验session信息,直接交由下一个processor处理
3.ProposalRequestProcessor.ProposalRequestProcessor()public class ProposalRequestProcessor implements RequestProcessor {
public void processRequest(Request request) throws RequestProcessorException {
// 如果请求来自leaner
if(request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
// 事务和非事务请求都会将该请求流转到下一个processor(CommitProcessor ),
nextProcessor.processRequest(request);
// 而针对事务请求的话(事务请求头不为空),则还需要进行事务投票等动作,下一篇文章重点分析
if (request.hdr != null) {
try {
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
syncProcessor.processRequest(request);
}
}
}
}
ProposalRequestProcessor是leader的事务投票处理器,事务请求和非事务请求的处理在这里进行分流。
针对本文分析的非事务请求,则直接将请求流转给CommitProcessor。
4.CommitProcessor处理请求public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
LinkedList queuedRequests = new LinkedList();
synchronized public void processRequest(Request request) {
// request.addRQRec(">commit");
if (LOG.isDebugEnabled()) {
LOG.debug("Processing request:: " + request);
}
// 在这里将请求都添加到queuedRequests集合中
// 本process是一个线程,所以,最终还是调用run()方法来执行
if (!finished) {
queuedRequests.add(request);
notifyAll();
}
}
public void run() {
try {
Request nextPending = null;
while (!finished) {
// 非事务请求在下面被添加到toProcess后,下一次循环就可以直接遍历toProcess中的请求,直接交由下一个processor处理
int len = toProcess.size();
for (int i = 0; i < len; i++) {
nextProcessor.processRequest(toProcess.get(i));
}
toProcess.clear();
// 这里是事务请求的处理,直接忽略掉,下一篇重点分析
synchronized (this) {
...
}
synchronized (this) {
// 获取请求,根据请求类型不同做不同处理
while (nextPending == null && queuedRequests.size() > 0) {
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request;
break;
case OpCode.sync:
if (matchSyncs) {
nextPending = request;
} else {
toProcess.add(request);
}
break;
// 以上都是事务请求类型处理,所以针对非事务请求,直接将请求添加到toProcess中
default:
toProcess.add(request);
}
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
} catch (Throwable e) {
LOG.error("Unexpected exception causing CommitProcessor to exit", e);
}
LOG.info("CommitProcessor exited loop!");
}
}
所以,针对非事务请求,CommitProcessor的最终做法仍旧是交由下一个processor(ToBeAppliedRequestProcessor )处理
5.ToBeAppliedRequestProcessorstatic class ToBeAppliedRequestProcessor implements RequestProcessor {
private RequestProcessor next;
private ConcurrentLinkedQueue toBeApplied;
public void processRequest(Request request) throws RequestProcessorException {
next.processRequest(request);
Proposal p = toBeApplied.peek();
if (p != null && p.request != null
&& p.request.zxid == request.zxid) {
toBeApplied.remove();
}
}
}
很直接的看出来,针对非事务请求,依旧是交由下一个processor来处理
6.FinalRequestProcessorpublic class FinalRequestProcessor implements RequestProcessor {
public void processRequest(Request request) {
// 我们以getData这个非事务请求为例,
switch (request.type) {
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
// 处理getData请求的方式就是直接从当前内存数据库ZKDatabase中获取到对应path的DataNode信息
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
// 获取到节点值信息和stat信息后,直接包装到GetDataResponse返回
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}
}
}
}
总结:针对非事务请求的处理(如getData请求),无论是集群模式(Leader或follower),还是单机模式的节点,对于其处理都比较简单。最终都是在FinalRequestProcessor中通过path来获取对应的value值和stat信息,包装到GetDataResponse返回给客户端。
本文小小水文一篇,抛砖引玉,重点在下一篇,关于事务请求的处理。