您当前的位置: 首页 >  zookeeper

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Zookeeper源码解析-客户端创建节点过程分析

恐龙弟旺仔 发布时间:2021-10-08 19:07:57 ,浏览量:0

前言:

前一篇文章中介绍了Zookeeper.java客户端创建的基本过程。有很多不太明确的知识点,主要是因为没有与实际场景结合起来。所以本文中,通过实际请求示例的分析来了解下其具体操作过程。

1.create()创建节点信息

通过一个示例,来展示下客户端如何发送创建节点信息

public class ZkClient {

    private String connectString = "127.0.0.1:2181";
    private int sessionTimeout = 3000;
    ZooKeeper zkCli = null;

    // 初始化客户端
    @Before
    public void init() throws IOException {
        zkCli = new ZooKeeper(connectString, sessionTimeout, null);
    }

    // 创建子节点
    @Test
    public void createZnode() throws KeeperException, InterruptedException {
        String path = zkCli.create("/hello", "world".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(path);
    }
}

示例很简单,连接服务端地址为本地启动的server,创建节点直接调用zookeeper.create()方法即可,参数即路径、value信息以及节点mode即可。

2.Zookeeper.create() 方法分析    
public class ZooKeeper {
	public String create(final String path, byte data[], List acl,
            CreateMode createMode)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath, createMode.isSequential());

        // 如果有chrootpath的话,需要拼接上
        final String serverPath = prependChroot(clientPath);

        // 请求头 具体内容见2.1
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.create);
        // 请求体 具体内容见2.2
        CreateRequest request = new CreateRequest();
        CreateResponse response = new CreateResponse();
        // 将基本信息封装到请求体中
        request.setData(data);
        request.setFlags(createMode.toFlag());
        request.setPath(serverPath);
        if (acl != null && acl.size() == 0) {
            throw new KeeperException.InvalidACLException();
        }
        request.setAcl(acl);
        // 统一交由ClientCnxn发送,具体分析见3
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (cnxn.chrootPath == null) {
            return response.getPath();
        } else {
            return response.getPath().substring(cnxn.chrootPath.length());
        }
    }
}
2.1 RequestHeader请求头
public class RequestHeader implements Record {
  // 唯一的id号 
  private int xid;
  // 代表当前请求类型,创建、获取节点内容等不同类型,具体在ZooDefs.OpCode中  
  private int type;
    
  public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(this,tag);
    a_.writeInt(xid,"xid");
    a_.writeInt(type,"type");
    a_.endRecord(this,tag);
  }
  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(tag);
    xid=a_.readInt("xid");
    type=a_.readInt("type");
    a_.endRecord(tag);
  }
}
2.2 CreateRequest 创建节点请求体
public class CreateRequest implements Record {
  // 路径信息
  private String path;
  // 节点值  
  private byte[] data;
  // 权限控制信息,非重点,直接忽略  
  private java.util.List acl;
  // 节点类型,具体见CreateMode  
  private int flags;
 
  public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(this,tag);
    a_.writeString(path,"path");
    a_.writeBuffer(data,"data");
    {
      a_.startVector(acl,"acl");
      if (acl!= null) {          int len1 = acl.size();
          for(int vidx1 = 0; vidx1 0) {
                    lastZxid = replyHdr.getZxid();
                }
                // 将响应结果反序列化到packet.response中
                // 根据不同的响应类进行对应的反序列化,比如GetDataResponse、CreateResponse等
                if (packet.response != null && replyHdr.getErr() == 0) {
                    packet.response.deserialize(bbia, "response");
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reading reply sessionid:0x"
                            + Long.toHexString(sessionId) + ", packet:: " + packet);
                }
            } finally {
                finishPacket(packet);
            }
        }
}

5.2.1 ReplyHeader 响应头

public class ReplyHeader implements Record {
  // 客户端生成的xid,用来将请求和响应对上  
  private int xid;
  // zookeeper服务端当前最新的事务ID  
  private long zxid;
  // 当出现异常时,返回对应的异常码,具体的异常码都在Code中  
  private int err;
 
  public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(this,tag);
    a_.writeInt(xid,"xid");
    a_.writeLong(zxid,"zxid");
    a_.writeInt(err,"err");
    a_.endRecord(this,tag);
  }
  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(tag);
    xid=a_.readInt("xid");
    zxid=a_.readLong("zxid");
    err=a_.readInt("err");
    a_.endRecord(tag);
}
}
5.3 读取响应结果

最终我们回到开始的地方,zookeeper.create()方法

public String create(final String path, byte data[], List acl,
            CreateMode createMode)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath, createMode.isSequential());

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.create);
        CreateRequest request = new CreateRequest();
        CreateResponse response = new CreateResponse();
        request.setData(data);
        request.setFlags(createMode.toFlag());
        request.setPath(serverPath);
        if (acl != null && acl.size() == 0) {
            throw new KeeperException.InvalidACLException();
        }
        request.setAcl(acl);
        // 读取到响应头ReplyHeader,若有error,则直接抛错
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        // CreateResponse只有一个参数,就是path,创建成功后,返回对应的全路径信息
        if (cnxn.chrootPath == null) {
            return response.getPath();
        } else {
            return response.getPath().substring(cnxn.chrootPath.length());
        }
    }

总结:响应的代码不算复杂,主要就是从Selector中接收读事件,将响应结果交由SendThread来处理,处理完成的响应结果封装到packet.response中。

响应体如下所示:

 

关注
打赏
1655041699
查看更多评论
立即登录/注册

微信扫码登录

0.1575s