ZooKeeper应用的开发主要通过 Java客户端API去连接和操作ZooKeeper集群。
ZooKeeper的 Java客户端API有:
- ZooKeeper官方的Java客户端API。
- 第三方的Java客户端API:比如:ZKClient,Curator(重点)
ZooKeeper官方的 Java客户端API提供了基本的操作。
例如:创建会话、创建节点、读取节点、 更新数据、删除节点和检查节点是否存在等。不过,对于实际开发来说,ZooKeeper官方API有一些不足之处,例如:
- ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册。∙
- 会话超时之后没有实现重连机制。
- 异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常。
- 仅提供了简单的 byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口。
- 创建节点时如果抛出异常,需要自行检查节点是否存在。
- 无法实现级联删除。
总之,ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用
。但是,第三方客户端框架的底层封装使用的都是原生API。所以有必要了解一下原生API。
Zookeeper 提供了 Java客户端,创建项目并引入其依赖:
org.apache.zookeeper
zookeeper
3.6.3
注意:
保持与服务端版本一致,不然会有很多兼容性的问题。
ZooKeeper原生客户端主要使用 org.apache.zookeeper.ZooKeeper
类来连接和使用 ZooKeeper服务。
简单介绍一下参数比较多的构造器:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider)
- connectString:使用逗号分隔的列表,每个ZooKeeper节点是一个 host:port 对,集群模式下用逗号隔开。
- host:是机器名或者IP地址,
- port:是ZooKeeper节点对客户端提供服务的端口号。
- sessionTimeout:会话超时时间,该值不能超过服务端所设置的minSessionTimeout 和maxSessionTimeout;
- watcher:会话监听器,服务端事件将会触该监听;一般我们可以实现 Watcher接口。
- sessionId(long):自定义会话ID;
- sessionPasswd(byte[]):会话密码;
- canBeReadOnly(boolean):该连接是否为只读的;
- hostProvider(HostProvider):服务端地址提供者,指示客户端如何选择某个服务来调用,默认采用StaticHostProvider实现;
下面实例我们使用这个构造器:
ZooKeeper (connectString, sessionTimeout, watcher)
1.2 主要方法- create(path, data, acl,createMode):创建一个给定路径的 znode,并在znode 保存 data[]的 数据,createMode指定 znode 的类型。
- delete(path, version):如果给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode。
- exists(path, watch):判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch。
- getData(path, watch):返回给定 path 上的 znode 数据,并在 znode 设置一个watch。
- setData(path, data, version):如果给定 path 上的 znode 的版本和给定的version 匹配,设置 znode 数据。
- getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字,并在 znode 设置一个 watch。
- sync(path):把客户端 session 连接节点和 leader 节点进行同步。
- clientConnnect1方法:使用匿名监听类
- clientConnnect2方法:使用自定义监听类
public class ZkConnectUtils {
private static String CONNECT_STR = "192.168.xxx.xxx:2181"; //集群模式下用逗号隔开
//private final static String CLUSTER_CONNECT_STR = "192.168.xxx.xxx:2181,192.168.xxx.xxx:2181,192.168.xxx.xxx:2181";
private final static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
// 因为建立 ZooKeeper连接,本身是异步过程,所以,我们使用功能 countDownLatch让其等待一下。
//ZooKeeper zooKeeper = clientConnnect1();
ZooKeeper zooKeeper = clientConnnect2();
System.out.println("==zooKeeper==" + zooKeeper);
System.out.println(zooKeeper.getState());// CONNECTED
}
/**
* 使用匿名监听类
* @throws Exception
*/
public static ZooKeeper clientConnnect1() throws Exception {
ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STR, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState() && event.getType() == Event.EventType.None) {
// 如果收到了服务端的响应事件,连接成功
countDownLatch.countDown();
System.out.println("连接已建立");
}
}
});
System.out.printf("连接中...");
countDownLatch.await();
return zooKeeper;
}
/**
* 使用自定义监听类
* @throws Exception
*/
public static ZooKeeper clientConnnect2() throws Exception {
ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STR, 5000, new MyZkSessionWatcher(countDownLatch));
System.out.printf("连接中...");
countDownLatch.await();
return zooKeeper;
}
}
自定义会话监听类:
public class MyZkSessionWatcher implements Watcher {
private CountDownLatch countDownLatch;
public MyZkSessionWatcher(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void process(WatchedEvent event) {
if(Event.KeeperState.SyncConnected == event.getState()){
if(Event.EventType.None == event.getType() && null == event.getPath()){
// 如果收到了服务端的响应事件,连接成功
countDownLatch.countDown();
System.out.println("===连接已建立===" + event.getState());
System.out.println(" zookeeper session established");
}else if(Event.EventType.NodeCreated == event.getType()){
System.out.println(" success create znode ");
}else if(Event.EventType.NodeDataChanged == event.getType()){
System.out.println(" success change znode data" + event.getPath());
}else if(Event.EventType.NodeDeleted == event.getType()){
System.out.println(" success delete znode ");
}else if(Event.EventType.NodeChildrenChanged == event.getType()){
System.out.println(" success change NodeChildrenznode " + event.getPath());
}
}
}
}
2.2 CRUD实例
public class CRUDService {
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = ZkConnectUtils.clientConnnect2();
syncCreateNode(zooKeeper, "/sync_Node", "value11");
asyncCreateNode(zooKeeper, "/async_Node", "value22");
String data = getNodeData(zooKeeper, "/sync_Node");
System.out.println(data);
updateNodeData(zooKeeper, "/sync_Node", "update value11111");
deleteNode(zooKeeper, "/async_Node");
}
/**
* 同步创建节点
* @param zooKeeper
* @param node
* @param data
*/
private static void syncCreateNode(ZooKeeper zooKeeper, String node, String data) {
try {
String result_path = zooKeeper.create(node,
data.getBytes(StandardCharsets.UTF_8), //节点数据
ZooDefs.Ids.OPEN_ACL_UNSAFE, //所有权限
CreateMode.PERSISTENT); //持久节点
System.out.println("syncCreateNode: result_path=" + result_path);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 异步创建节点
* @param zooKeeper
* @param node
* @param data
*/
private static void asyncCreateNode(ZooKeeper zooKeeper, String node, String data) throws Exception {
zooKeeper.create(node, data.getBytes(StandardCharsets.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL, //临时节点
(rc, path, ctx, name) -> {
System.out.println("asyncCreateNode 回调: ");
System.out.println("rc=" + rc);
System.out.println("path=" + path);
System.out.println("ctx=" + ctx);
System.out.println("name=" + name);
},"context");
TimeUnit.SECONDS.sleep(2);
}
/**
* 获取节点数据
* @param zooKeeper
* @param node
* @return
*/
private static String getNodeData(ZooKeeper zooKeeper, String node) throws Exception {
byte[] data = zooKeeper.getData(node, true, new Stat());
return new String(data, StandardCharsets.UTF_8);
}
/**
* 修改节点数据
* @param zooKeeper
* @param node
* @param data
*/
private static void updateNodeData(ZooKeeper zooKeeper, String node, String data) throws Exception {
// 添加监听,Watch就会监听到
zooKeeper.exists(node, true);
/**
* 如果想要都能监听到。必须在修改之后调用exists方法或者其他起作用的方法,比如:get,exists等
* -1 表示任何版本,也可以指定某个版本,多次修改只会监听一次,
*/
zooKeeper.setData(node, data.getBytes(StandardCharsets.UTF_8), -1);
//client.exists(node, true);
zooKeeper.setData(node, data.getBytes(StandardCharsets.UTF_8), -1);
}
/**
* 删除节点
* @param zooKeeper
* @param node
*/
private static void deleteNode(ZooKeeper zooKeeper, String node) throws Exception {
// 添加监听,Watch就会监听到
zooKeeper.exists(node, true);
zooKeeper.delete(node, -1);
}
}
– 求知若饥,虚心若愚。