您当前的位置: 首页 >  网络

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Zookeeper客户端网络通讯模型分析

恐龙弟旺仔 发布时间:2022-07-03 17:30:56 ,浏览量:0

前言:

之前的Zookeeper系列文章中有分析过客户端如何发送具体的增删改查节点请求。

这些文章的分析都是偏业务层面的。如今回想起来,还是不知道该如何回答接下来的问题:

Zookeeper客户端的网络通讯模型是怎样的?接收到的响应是如何精确匹配到对应请求的?

本文主要就围绕这个问题来展开下。

有关于发送请求的一些具体内容,可以参考笔者之前的博客,比如: Zookeeper源码解析-客户端创建节点过程分析_恐龙弟旺仔的博客-CSDN博客 

笔者主要从三个方面来分析下这个问题:包装请求、发送请求、接收响应

下面以一次GET请求为例,来展示整个过程。

1.包装请求

一切的开始还是要从Zookeeper.java说起,里面封装了所有的操作

既然分析GET请求,那么就从Zookeeper.getData()开始

public class ZooKeeper {
	public byte[] getData(final String path, Watcher watcher, Stat stat)
        throws KeeperException, InterruptedException
     {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if (watcher != null) {
            wcb = new DataWatchRegistration(watcher, clientPath);
        }

        // 1.拼装请求路径
        final String serverPath = prependChroot(clientPath);

        // 2.拼装请求头
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        // 3.拼装请求体
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        
        // 4.在这里将请求发送出去
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
    }
}

请求的包装=请求头包装+请求体包装

格式如下:

2.发送请求

发送请求的工作交由ClientCnxn来完成

 

public class ClientCnxn {
	public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        // 1.这里将request和response都封装到Packet对象里,具体见2.1
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
        
        // 需要特别注意下这里,这里一直再检查packet对象的状态
        synchronized (packet) {
            while (!packet.finished) {
                packet.wait();
            }
        }
        return r;
    }
}

Q:这里一直在不停的等待packet.finished状态,虽然还没看Packet这个finished代表什么,但是我们可以大胆猜测下,是不是在等待request执行完成呢,当接收到对应的response时,就把packet.finished设置为true,整个过程是阻塞的... 

这个后续我们通过代码来分析。

2.1 ClientCnxn.queuePacket() 包装Packet
public class ClientCnxn {
    private final LinkedList outgoingQueue = new LinkedList();
    
	Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        Packet packet = null;

        synchronized (outgoingQueue) {
            // 1.简单的创建packet对象
            packet = new Packet(h, r, request, response, watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                // 2.最终将请求包packet对象放到outgoingQueue中
                outgoingQueue.add(packet);
            }
        }
        // 3.然后把SendThread中的Selector唤醒
        sendThread.getClientCnxnSocket().wakeupCnxn();
        return packet;
    }
}

包装packet这一步,最终将Packet放入到outgoingQueue就结束了。

下面到了SendThread的表演时刻了。

2.2 SendThread 发送请求packet

SendThread本身是一个Thread,主要内容在其run()方法中

class SendThread extends ZooKeeperThread {
    // 通讯器
    private final ClientCnxnSocket clientCnxnSocket;
    
	@Override
    public void run() {
        clientCnxnSocket.introduce(this,sessionId);
        clientCnxnSocket.updateNow();
        clientCnxnSocket.updateLastSendAndHeard();
        int to;
        long lastPingRwServer = Time.currentElapsedTime();
        final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
        InetSocketAddress serverAddress = null;
        while (state.isAlive()) {
            try {
                ...
				// 其他非重点内容我们直接忽略掉,直接看到这里发送请求包的地方,还是交由clientCnxnSocket来完成,具体见2.3
                clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
            } catch (Throwable e) {
                if (closing) {
                    if (LOG.isDebugEnabled()) {
                        ...
                    }
                    break;
                } else {
                    ...
                    clientCnxnSocket.updateNow();
                    clientCnxnSocket.updateLastSendAndHeard();
                }
            }
        }
        cleanup();
        clientCnxnSocket.close();
        if (state.isAlive()) {
            eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                                                    Event.KeeperState.Disconnected, null));
        }
        ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                                 "SendThread exited loop for session: 0x"
                                 + Long.toHexString(getSessionId()));
    }
}

SendThread通过run()方法不停的从上面2.1中的outgoingQueue集合中获取packet,并发送出去。

这样一种模型算是一种发送和执行的解耦模型。

2.3 clientCnxnSocket.doTransport() 发送Packet
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
	void doTransport(int waitTimeOut, List pendingQueue, LinkedList outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
        // 标准的NIO
        selector.select(waitTimeOut);
        Set selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            // 处理连接事件
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    sendThread.primeConnection();
                }
             // 1.处理读写事件   
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                // 交由doIO()方法处理
                doIO(pendingQueue, outgoingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
            synchronized(outgoingQueue) {
                // 这里用来设置Selector的OP_WRITE事件,如果outgoingQueue中有数据,则设置该状态
                if (findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                    enableWrite();
                }
            }
        }
        selected.clear();
    }
    
    // 真正发送请求的地方
    void doIO(List pendingQueue, LinkedList outgoingQueue, ClientCnxn cnxn)
      throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        // 处理读事件,非重点,后续读取响应时再分析
        if (sockKey.isReadable()) {
            ...
        }
        if (sockKey.isWritable()) {
            synchronized(outgoingQueue) {
                // 1.按照先来后到原则,获取第一个待发送的packet
                Packet p = findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());

                if (p != null) {
                    updateLastSend();
                    // If we already started writing p, p.bb will already exist
                    if (p.bb == null) {
                        if ((p.requestHeader != null) &&
                                (p.requestHeader.getType() != OpCode.ping) &&
                                (p.requestHeader.getType() != OpCode.auth)) {
                            // 这里需要注意下xid
                            p.requestHeader.setXid(cnxn.getXid());
                        }
                        p.createBB();
                    }
                    // 2.将packet.bb通过SocketChannel发送出去
                    sock.write(p.bb);
                    if (!p.bb.hasRemaining()) {
                        sentCount++;
                        outgoingQueue.removeFirstOccurrence(p);
                        if (p.requestHeader != null
                                && p.requestHeader.getType() != OpCode.ping
                                && p.requestHeader.getType() != OpCode.auth) {
                            synchronized (pendingQueue) {
                                // 3.packet发送出去之后并不是将包直接丢掉了,而是放入到pendingQueue
                                pendingQueue.add(p);
                            }
                        }
                    }
                }
                // 4.重新设置Selector的状态,如果outgoingQueue还有packet,则继续设置OP_WRITE状态
                if (outgoingQueue.isEmpty()) {
                    disableWrite();
                } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                    disableWrite();
                } else {
                    // Just in case
                    enableWrite();
                }
            }
        }
    }
}

总结:这里有四个比较有意思的小细节需要注意下

1)Selector OP_WRITE状态设置

既然使用了NIO来发送接收数据,那么Selector的OP_READ OP_WRITE状态必须要被设置。

那么OP_WRITE状态什么时候被设置呢,就是上面分析的,如果outgoingQueue中有数据,则设置Selector OP_WRITE状态。

2)Packet对象的发送

通过SocketChannel将数据发送到服务端,不是将整个Packet对象全部发送出去,而是将其中有效的RequestHeader和GetDataRequest对象序列化好发送出去

3)RequestHeader中的xid

这个xid是从cnxn中获取的那个xid,对客户端而言是唯一的,那么这个xid有什么作用呢?先卖个关子,后续分析

4)pendingQueue

通过SocketChannel将packet发送到服务端之后,然后将整个pendingQueue 集合中,这个有什么用呢?暂时还不清楚,但是感觉应该跟后续的接收响应有关

下面用一张图做一个客户端发送请求阶段性总结,

3.接收响应

接收响应的操作同样也是在clientCnxnSocket.doTransport() 方法中

就是上述忽略的OP_READ事件处理

 

public class ClientCnxnSocketNIO extends ClientCnxnSocket {
	void doTransport(int waitTimeOut, List pendingQueue, LinkedList outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
        // 标准的NIO
        selector.select(waitTimeOut);
        Set selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            // 处理连接事件
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    sendThread.primeConnection();
                }
             // 1.处理读写事件   
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                // 交由doIO()方法处理
                doIO(pendingQueue, outgoingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
            synchronized(outgoingQueue) {
                // 这里用来设置Selector的OP_WRITE事件,如果outgoingQueue中有数据,则设置该状态
                if (findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                    enableWrite();
                }
            }
        }
        selected.clear();
    }
    
    // 真正发送请求的地方
    void doIO(List pendingQueue, LinkedList outgoingQueue, ClientCnxn cnxn)
      throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        // 处理读事件,也就是响应信息
        if (sockKey.isReadable()) {
            // 1.标准的NIO读法
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from server sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely server has closed socket");
            }
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
                if (incomingBuffer == lenBuffer) {
                    recvCount++;
                    readLength();
                } else if (!initialized) {
                    ...
                } else {
                    // 2.读取到的数据交由SendThread处理,具体见3.1
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }
        if (sockKey.isWritable()) {
            ...
        }
    }
}
3.1 SendThread.readResponse() 处理响应
class SendThread extends ZooKeeperThread {
    private long lastPingSentNs;
    private final ClientCnxnSocket clientCnxnSocket;
    private Random r = new Random(System.nanoTime());        
    private boolean isFirstConnect = true;

    // 读取响应结果
    void readResponse(ByteBuffer incomingBuffer) throws IOException {
        ByteBufferInputStream bbis = new ByteBufferInputStream(
            incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ReplyHeader replyHdr = new ReplyHeader();

        // 1.解析出响应头
        replyHdr.deserialize(bbia, "header");
        // 各种异常处理
        ...

        Packet packet;
        synchronized (pendingQueue) {
            if (pendingQueue.size() == 0) {
                throw new IOException("Nothing in the queue, but got "
                                      + replyHdr.getXid());
            }
            // 2.从pendingQueue中获取首个packet
            packet = pendingQueue.remove();
        }

        try {
            // 如果请求头的xid和响应头的xid不一致,说明乱序了,直接抛异常
            if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                packet.replyHeader.setErr(
                    KeeperException.Code.CONNECTIONLOSS.intValue());
                throw new IOException("Xid out of order. Got Xid "
                                      + replyHdr.getXid() + " with err " +
                                      + replyHdr.getErr() +
                                      " expected Xid "
                                      + packet.requestHeader.getXid()
                                      + " for a packet with details: "
                                      + packet );
            }

            packet.replyHeader.setXid(replyHdr.getXid());
            packet.replyHeader.setErr(replyHdr.getErr());
            packet.replyHeader.setZxid(replyHdr.getZxid());
            if (replyHdr.getZxid() > 0) {
                lastZxid = replyHdr.getZxid();
            }
            // 3.解析响应体
            if (packet.response != null && replyHdr.getErr() == 0) {
                packet.response.deserialize(bbia, "response");
            }

            if (LOG.isDebugEnabled()) {
                LOG.debug("Reading reply sessionid:0x"
                          + Long.toHexString(sessionId) + ", packet:: " + packet);
            }
        } finally {
            // 4.最后设置packet.finished=true
            finishPacket(packet);
        }
    }
}

总结:

两个需要注意的小细节,与2.3中的问题相互应

1)xid作用

xid本身算作一个唯一标识符,标识请求和响应的唯一性,Zookeeper服务端处理完请求后,还会将请求头中的xid写入到响应头中。这样请求便与响应对应起来了。

2)Packet.finished状态

通过ClientCnxn发送GetDataRequest请求时,代码如下

public class ClientCnxn {
	public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();

        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
        
        synchronized (packet) {
            while (!packet.finished) {
                packet.wait();
            }
        }
        return r;
    }
}

一直在不停的检查Packet.finished状态,如果是false,则阻塞等待,一直到接收到响应为止。

同样通过一张图来展示下整个过程

总结:

回到开头的问题,我们来回答下

1.客户端的发送请求模型本质上是一种阻塞的、解耦的发送模型。将请求发送到集合中,通过SendThread异步获取集合中的Packet请求发送到服务端。

2.发送到服务端的Packet请求在接收到响应之前,先放入pendingQueue集合,接收到响应时,通过请求头和响应头的xid来进行一致性比较

3.获取到响应后,将响应体放入Packet中,设置Packet.finished为true,客户端不再阻塞,可以获取到响应。

 

关注
打赏
1655041699
查看更多评论
立即登录/注册

微信扫码登录

0.0430s