您当前的位置: 首页 >  zookeeper

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

zookeeper客户端断开自动重连相关代码分析

恐龙弟旺仔 发布时间:2022-06-18 09:27:51 ,浏览量:0

前言:

Zookeeper在我们的实际使用中,都是以集群模式来对外提供服务。

在客户端的使用中,写入集群所有节点的ip:port信息,客户端启动后便会随机选择一个Zookeeper server节点创建长连接。

如果当前连接的Zookeeper server节点网络异常,当前长连接便会断掉重连。

本文主要就来看下Zookeeper客户端是如何检测连接断掉,并自动做重连动作的。

有关于Zookeeper客户端状态的相关信息可以参考笔者另一篇博客: zookeeper客户端会话状态分析_恐龙弟旺仔的博客-CSDN博客 

1.Zookeeper客户端由CONNECTING转CONNECTED

从笔者之前的博客中可以看出,客户端在创建Zookeeper对象时,状态为CONNECTING,后续通过SendThread.onConnected()方法调用后

class SendThread extends ZooKeeperThread {
	void onConnected(int _negotiatedSessionTimeout, long _sessionId,
                byte[] _sessionPasswd, boolean isRO) throws IOException {
            negotiatedSessionTimeout = _negotiatedSessionTimeout;
            ...
            if (!readOnly && isRO) {
                LOG.error("Read/write client got connected to read-only server");
            }
            readTimeout = negotiatedSessionTimeout * 2 / 3;
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
            hostProvider.onConnected();
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
        	// 在这里将状态设置为CONNECTED
            state = (isRO) ?
                    States.CONNECTEDREADONLY : States.CONNECTED;
            ...
        }
}

在这里完成了连接,认证了sessionId,协商了超时时间。

2. Socket异常之后的操作

客户端发送ping信息以及请求包信息都是通过SendThread.run方法。

当服务端异常时,通过Socket发送请求包就会抛出Socket相关Exception,我们来看下SendThread对此类Exception的处理

class SendThread extends ZooKeeperThread {
 
    @Override
    public void run() {
		...
        // 1.state.isAlive() 的判断条件是 this != CLOSED && this != AUTH_FAILED;
        // 当客户端状态非CLOSED和AUTH_FAILED时,属于正常状态    
        while (state.isAlive()) {
            try {
                // 2.clientCnxnSocket.isConnected() 的判断条件是 sockKey != null; 
                // 这个sockKey就是SocketChannel注册Selector 连接事件所返回的SelectorKey
                
                // 如果SelectorKey为null,说明这个sockKey被清理掉了,那么就会触发重连Zookeeper server机制
                // 那么这个sockKey是什么时候被清理的呢?
                if (!clientCnxnSocket.isConnected()) {
                    if(!isFirstConnect){
                        try {
                            Thread.sleep(r.nextInt(1000));
                        } catch (InterruptedException e) {
                            LOG.warn("Unexpected exception", e);
                        }
                    }

                    if (closing || !state.isAlive()) {
                        break;
                    }
                    if (rwServerAddress != null) {
                        serverAddress = rwServerAddress;
                        rwServerAddress = null;
                    } else {
                        serverAddress = hostProvider.next(1000);
                    }
                    // 2.1 在这里重连
                    startConnect(serverAddress);
                    clientCnxnSocket.updateLastSendAndHeard();
                }

                if (state.isConnected()) {
                    ...
                } else {
                    to = connectTimeout - clientCnxnSocket.getIdleRecv();
                }

                if (to             
关注
打赏
1655041699
查看更多评论
0.0371s