之前的Zookeeper系列文章中有分析过客户端如何发送具体的增删改查节点请求。
这些文章的分析都是偏业务层面的。如今回想起来,还是不知道该如何回答接下来的问题:
Zookeeper客户端的网络通讯模型是怎样的?接收到的响应是如何精确匹配到对应请求的?
本文主要就围绕这个问题来展开下。
有关于发送请求的一些具体内容,可以参考笔者之前的博客,比如: Zookeeper源码解析-客户端创建节点过程分析_恐龙弟旺仔的博客-CSDN博客
笔者主要从三个方面来分析下这个问题:包装请求、发送请求、接收响应
下面以一次GET请求为例,来展示整个过程。
1.包装请求一切的开始还是要从Zookeeper.java说起,里面封装了所有的操作
既然分析GET请求,那么就从Zookeeper.getData()开始
public class ZooKeeper {
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
// 1.拼装请求路径
final String serverPath = prependChroot(clientPath);
// 2.拼装请求头
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
// 3.拼装请求体
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
// 4.在这里将请求发送出去
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
}
请求的包装=请求头包装+请求体包装
格式如下:
发送请求的工作交由ClientCnxn来完成
public class ClientCnxn {
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
// 1.这里将request和response都封装到Packet对象里,具体见2.1
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
// 需要特别注意下这里,这里一直再检查packet对象的状态
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
}
Q:这里一直在不停的等待packet.finished状态,虽然还没看Packet这个finished代表什么,但是我们可以大胆猜测下,是不是在等待request执行完成呢,当接收到对应的response时,就把packet.finished设置为true,整个过程是阻塞的...
这个后续我们通过代码来分析。
2.1 ClientCnxn.queuePacket() 包装Packetpublic class ClientCnxn {
private final LinkedList outgoingQueue = new LinkedList();
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
synchronized (outgoingQueue) {
// 1.简单的创建packet对象
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
if (h.getType() == OpCode.closeSession) {
closing = true;
}
// 2.最终将请求包packet对象放到outgoingQueue中
outgoingQueue.add(packet);
}
}
// 3.然后把SendThread中的Selector唤醒
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
}
包装packet这一步,最终将Packet放入到outgoingQueue就结束了。
下面到了SendThread的表演时刻了。
2.2 SendThread 发送请求packetSendThread本身是一个Thread,主要内容在其run()方法中
class SendThread extends ZooKeeperThread {
// 通讯器
private final ClientCnxnSocket clientCnxnSocket;
@Override
public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
...
// 其他非重点内容我们直接忽略掉,直接看到这里发送请求包的地方,还是交由clientCnxnSocket来完成,具体见2.3
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
...
}
break;
} else {
...
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
}
}
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
}
}
SendThread通过run()方法不停的从上面2.1中的outgoingQueue集合中获取packet,并发送出去。
这样一种模型算是一种发送和执行的解耦模型。
2.3 clientCnxnSocket.doTransport() 发送Packetpublic class ClientCnxnSocketNIO extends ClientCnxnSocket {
void doTransport(int waitTimeOut, List pendingQueue, LinkedList outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
// 标准的NIO
selector.select(waitTimeOut);
Set selected;
synchronized (this) {
selected = selector.selectedKeys();
}
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
// 处理连接事件
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
// 1.处理读写事件
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 交由doIO()方法处理
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
// 这里用来设置Selector的OP_WRITE事件,如果outgoingQueue中有数据,则设置该状态
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
// 真正发送请求的地方
void doIO(List pendingQueue, LinkedList outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
// 处理读事件,非重点,后续读取响应时再分析
if (sockKey.isReadable()) {
...
}
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
// 1.按照先来后到原则,获取第一个待发送的packet
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
// 这里需要注意下xid
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
// 2.将packet.bb通过SocketChannel发送出去
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
// 3.packet发送出去之后并不是将包直接丢掉了,而是放入到pendingQueue
pendingQueue.add(p);
}
}
}
}
// 4.重新设置Selector的状态,如果outgoingQueue还有packet,则继续设置OP_WRITE状态
if (outgoingQueue.isEmpty()) {
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
}
}
总结:这里有四个比较有意思的小细节需要注意下
1)Selector OP_WRITE状态设置
既然使用了NIO来发送接收数据,那么Selector的OP_READ OP_WRITE状态必须要被设置。
那么OP_WRITE状态什么时候被设置呢,就是上面分析的,如果outgoingQueue中有数据,则设置Selector OP_WRITE状态。
2)Packet对象的发送
通过SocketChannel将数据发送到服务端,不是将整个Packet对象全部发送出去,而是将其中有效的RequestHeader和GetDataRequest对象序列化好发送出去
3)RequestHeader中的xid
这个xid是从cnxn中获取的那个xid,对客户端而言是唯一的,那么这个xid有什么作用呢?先卖个关子,后续分析
4)pendingQueue
通过SocketChannel将packet发送到服务端之后,然后将整个pendingQueue 集合中,这个有什么用呢?暂时还不清楚,但是感觉应该跟后续的接收响应有关
下面用一张图做一个客户端发送请求阶段性总结,
接收响应的操作同样也是在clientCnxnSocket.doTransport() 方法中
就是上述忽略的OP_READ事件处理
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
void doTransport(int waitTimeOut, List pendingQueue, LinkedList outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
// 标准的NIO
selector.select(waitTimeOut);
Set selected;
synchronized (this) {
selected = selector.selectedKeys();
}
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
// 处理连接事件
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
// 1.处理读写事件
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 交由doIO()方法处理
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
// 这里用来设置Selector的OP_WRITE事件,如果outgoingQueue中有数据,则设置该状态
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
// 真正发送请求的地方
void doIO(List pendingQueue, LinkedList outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
// 处理读事件,也就是响应信息
if (sockKey.isReadable()) {
// 1.标准的NIO读法
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
...
} else {
// 2.读取到的数据交由SendThread处理,具体见3.1
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
if (sockKey.isWritable()) {
...
}
}
}
3.1 SendThread.readResponse() 处理响应
class SendThread extends ZooKeeperThread {
private long lastPingSentNs;
private final ClientCnxnSocket clientCnxnSocket;
private Random r = new Random(System.nanoTime());
private boolean isFirstConnect = true;
// 读取响应结果
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
// 1.解析出响应头
replyHdr.deserialize(bbia, "header");
// 各种异常处理
...
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
// 2.从pendingQueue中获取首个packet
packet = pendingQueue.remove();
}
try {
// 如果请求头的xid和响应头的xid不一致,说明乱序了,直接抛异常
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
// 3.解析响应体
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
// 4.最后设置packet.finished=true
finishPacket(packet);
}
}
}
总结:
两个需要注意的小细节,与2.3中的问题相互应
1)xid作用
xid本身算作一个唯一标识符,标识请求和响应的唯一性,Zookeeper服务端处理完请求后,还会将请求头中的xid写入到响应头中。这样请求便与响应对应起来了。
2)Packet.finished状态
通过ClientCnxn发送GetDataRequest请求时,代码如下
public class ClientCnxn {
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
}
一直在不停的检查Packet.finished状态,如果是false,则阻塞等待,一直到接收到响应为止。
同样通过一张图来展示下整个过程
回到开头的问题,我们来回答下
1.客户端的发送请求模型本质上是一种阻塞的、解耦的发送模型。将请求发送到集合中,通过SendThread异步获取集合中的Packet请求发送到服务端。
2.发送到服务端的Packet请求在接收到响应之前,先放入pendingQueue集合,接收到响应时,通过请求头和响应头的xid来进行一致性比较
3.获取到响应后,将响应体放入Packet中,设置Packet.finished为true,客户端不再阻塞,可以获取到响应。