前言:
前一篇文章中介绍了Zookeeper.java客户端创建的基本过程。有很多不太明确的知识点,主要是因为没有与实际场景结合起来。所以本文中,通过实际请求示例的分析来了解下其具体操作过程。
1.create()创建节点信息通过一个示例,来展示下客户端如何发送创建节点信息
public class ZkClient {
private String connectString = "127.0.0.1:2181";
private int sessionTimeout = 3000;
ZooKeeper zkCli = null;
// 初始化客户端
@Before
public void init() throws IOException {
zkCli = new ZooKeeper(connectString, sessionTimeout, null);
}
// 创建子节点
@Test
public void createZnode() throws KeeperException, InterruptedException {
String path = zkCli.create("/hello", "world".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(path);
}
}
示例很简单,连接服务端地址为本地启动的server,创建节点直接调用zookeeper.create()方法即可,参数即路径、value信息以及节点mode即可。
2.Zookeeper.create() 方法分析public class ZooKeeper {
public String create(final String path, byte data[], List acl,
CreateMode createMode)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath, createMode.isSequential());
// 如果有chrootpath的话,需要拼接上
final String serverPath = prependChroot(clientPath);
// 请求头 具体内容见2.1
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.create);
// 请求体 具体内容见2.2
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
// 将基本信息封装到请求体中
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
throw new KeeperException.InvalidACLException();
}
request.setAcl(acl);
// 统一交由ClientCnxn发送,具体分析见3
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (cnxn.chrootPath == null) {
return response.getPath();
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}
}
2.1 RequestHeader请求头
public class RequestHeader implements Record {
// 唯一的id号
private int xid;
// 代表当前请求类型,创建、获取节点内容等不同类型,具体在ZooDefs.OpCode中
private int type;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeInt(xid,"xid");
a_.writeInt(type,"type");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
xid=a_.readInt("xid");
type=a_.readInt("type");
a_.endRecord(tag);
}
}
2.2 CreateRequest 创建节点请求体
public class CreateRequest implements Record {
// 路径信息
private String path;
// 节点值
private byte[] data;
// 权限控制信息,非重点,直接忽略
private java.util.List acl;
// 节点类型,具体见CreateMode
private int flags;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeString(path,"path");
a_.writeBuffer(data,"data");
{
a_.startVector(acl,"acl");
if (acl!= null) { int len1 = acl.size();
for(int vidx1 = 0; vidx1 0) {
lastZxid = replyHdr.getZxid();
}
// 将响应结果反序列化到packet.response中
// 根据不同的响应类进行对应的反序列化,比如GetDataResponse、CreateResponse等
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
finishPacket(packet);
}
}
}
5.2.1 ReplyHeader 响应头
public class ReplyHeader implements Record {
// 客户端生成的xid,用来将请求和响应对上
private int xid;
// zookeeper服务端当前最新的事务ID
private long zxid;
// 当出现异常时,返回对应的异常码,具体的异常码都在Code中
private int err;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeInt(xid,"xid");
a_.writeLong(zxid,"zxid");
a_.writeInt(err,"err");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
xid=a_.readInt("xid");
zxid=a_.readLong("zxid");
err=a_.readInt("err");
a_.endRecord(tag);
}
}
5.3 读取响应结果
最终我们回到开始的地方,zookeeper.create()方法
public String create(final String path, byte data[], List acl,
CreateMode createMode)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath, createMode.isSequential());
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.create);
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
throw new KeeperException.InvalidACLException();
}
request.setAcl(acl);
// 读取到响应头ReplyHeader,若有error,则直接抛错
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
// CreateResponse只有一个参数,就是path,创建成功后,返回对应的全路径信息
if (cnxn.chrootPath == null) {
return response.getPath();
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}
总结:响应的代码不算复杂,主要就是从Selector中接收读事件,将响应结果交由SendThread来处理,处理完成的响应结果封装到packet.response中。
响应体如下所示: