上一篇博客《分布式协调工具Zookeeper(介绍&安装&配置详解)》讲到了分布式协调工具Zookeeper,主要讲解Zookeeper的概念、应用场景以及安装配置。基于上一篇博客安装的环境,本文讲解如何使用Java操作Zookeeper?
代码已提交至Gtihub,有兴趣的同学可以下载看看(git版本号:e9d27b6df05095bb50c3666a1e8965102c85bb01
):https://github.com/ylw-github/Zookeeper-Demo.git
本文目录结构: l____1. 基本概念 l________1.1 创建节点(znode)方法 l________1.2 Watcher l____2. Java操作Zookeeper l____总结
1. 基本概念 1.1 创建节点(znode)方法Zookeeper提供了两套创建节点的方法,同步
和异步
创建节点方式。
其中同步的方式,有几个节点需要注意:
- 节点路径(名称) InodeName: (不允许递归创建节点,也就是说在父节点不存在 的情况下,不允许创建子节点)
- 节点内容: 要求类型是字节数组(也就是说,不支持序列化方式,如果需要实现序 列化,可使用java相关序列化框架,如Hessian、Kryo框架)
- 节点权限: 使用Ids.OPEN_ACL_UNSAFE开放权限即可。(这个参数一般在权展 没有太高要求的场景下,没必要关注)
- 节点类型: 创建节点的类型: CreateMode,提供四种首点象型,如下:
在ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperState和EventType两个枚举类,分别代表了通知状态和事件类型,同时定义了事件的回调方法:process(WatchedEvent event)
。
什么是watcher接口?
同一个事件类型在不同的通知状态中代表的含义有所不同,下表例举了常见的通知状态和事件类型:
KeeperStateEventType触发条件说明None(-1)客户端与服务端成功建立连接SyncConnected(0)NodeCreated(1)Watcher监听的对应数据节点被创建NodeDeleted(2)Watcher监听的对应数据节点被删除此时客户端和服务器处于连接状态NodeDataChanged(3)Watcher监听的对应数据节点的数据内容发生变更NodeChildChanged(4)Wather监听的对应数据节点的子节点列表发生变更Disconnected(0)None(-1)客户端与ZooKeeper服务器断开连接此时客户端和服务器处于断开连接状态Expired(-112)Node(-1)会话超时此时客户端会话失效,通常同时也会受到SessionExpiredException异常AuthFailed(4)None(-1)通常有两种情况,1:使用错误的schema进行权限检查 2:SASL权限检查失败通常同时也会收到AuthFailedException异常回调方法process():
- process方法是Watcher接口中的一个回调方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理,process方法的定义如下:
abstract public void process(WatchedEvent event);
这个回调方法的定义非常简单,我们重点看下方法的参数定义WatchedEvent:
- WatchedEvent包含了每一个事件的三个基本属性:通知状态(keeperState),事件类型(EventType)和节点路径(path)。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。
- 提到WatchedEvent,两者表示的是同一个事物,都是对一个服务端事件的封装。不同的是,WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象,而WatcherEvent因为实现了序列化接口,因此可以用于网络传输。
- 服务端在生成WatchedEvent事件之后,会调用getWrapper方法将自己包装成一个可序列化的WatcherEvent事件,以便通过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将WatcherEvent还原成一个WatchedEvent事件,并传递给process方法处理,回调方法process根据入参就能够解析出完整的服务端事件了。
- 需要注意的一点是,无论是WatchedEvent还是WatcherEvent,其对ZooKeeper服务端事件的封装都是机及其简单的。
1.创建项目
2.添加maven依赖
org.apache.zookeeper
zookeeper
3.4.6
3.Zookeeper客户端连接
package com.ylw.zookeeper.Test;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class Test {
//连接地址
private static final String ADDRES = "192.168.162.131:2181";
//session 会话
private static final int SESSION_OUTTIME = 2000;
//信号量,阻塞程序执行,用户等待zookeeper连接成功,发送成功信号,
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZooKeeper zk = new ZooKeeper(ADDRES, SESSION_OUTTIME, new Watcher() {
public void process(WatchedEvent event) {
// 获取事件状态
Event.KeeperState keeperState = event.getState();
// 获取事件类型
Event.EventType eventType = event.getType();
if (Event.KeeperState.SyncConnected == keeperState) {
if (Event.EventType.None == eventType) {
countDownLatch.countDown();
System.out.println("zk 启动连接...");
}
}
}
});
// 进行阻塞
countDownLatch.await();
String result = zk.create("/ylw_Lasting", "Lasting".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("result - >" + result);
zk.close();
}
}
注意创建节点的两种方式:
1.创建持久节点,并且允许任何服务器可以操作
String result = zk.create("/ylw_Lasting", "Lasting".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
2.创建临时节点
String result = zk.create("/ylw_temp", "temp".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
4.Zookeeper客户端连接
package com.ylw.zookeeper.Test;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
public class ZkClientWatcher implements Watcher {
// 集群连接地址
private static final String CONNECT_ADDRES = "192.168.162.131:2181,192.168.162.131:2182,192.168.162.131:2183";
// 会话超时时间
private static final int SESSIONTIME = 2000;
// 信号量,让zk在连接之前等待,连接成功后才能往下走.
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
private static String LOG_MAIN = "【main】 ";
private ZooKeeper zk;
public void createConnection(String connectAddres, int sessionTimeOut) {
try {
zk = new ZooKeeper(connectAddres, sessionTimeOut, this);
System.out.println(LOG_MAIN + "zk 开始启动连接服务器....");
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
public boolean createPath(String path, String data) {
try {
this.exists(path, true);
this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(LOG_MAIN + "节点创建成功, Path:" + path + ",data:" + data);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 判断指定节点是否存在
*
* @param path 节点路径
*/
public Stat exists(String path, boolean needWatch) {
try {
return this.zk.exists(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public boolean updateNode(String path, String data) throws KeeperException, InterruptedException {
exists(path, true);
this.zk.setData(path, data.getBytes(), -1);
return false;
}
public void process(WatchedEvent watchedEvent) {
// 获取事件状态
Event.KeeperState keeperState = watchedEvent.getState();
// 获取事件类型
Event.EventType eventType = watchedEvent.getType();
// zk 路径
String path = watchedEvent.getPath();
System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path);
// 判断是否建立连接
if (Event.KeeperState.SyncConnected == keeperState) {
if (Event.EventType.None == eventType) {
// 如果建立建立成功,让后程序往下走
System.out.println(LOG_MAIN + "zk 建立连接成功!");
countDownLatch.countDown();
} else if (Event.EventType.NodeCreated == eventType) {
System.out.println(LOG_MAIN + "事件通知,新增node节点" + path);
} else if (Event.EventType.NodeDataChanged == eventType) {
System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被修改....");
} else if (Event.EventType.NodeDeleted == eventType) {
System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被删除....");
}
}
System.out.println("--------------------------------------------------------");
}
public static void main(String[] args) throws KeeperException, InterruptedException {
ZkClientWatcher zkClientWatcher = new ZkClientWatcher();
zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME);
boolean createResult = zkClientWatcher.createPath("/del.do", "http://www.xxx001.delete");
//zkClientWatcher.updateNode("/del.do", "http://www.xxx002.delete");
}
}
运行结果: 更新节点:
//boolean createResult = zkClientWatcher.createPath("/del.do", "http://www.xxx001.delete");
zkClientWatcher.updateNode("/del.do", "http://www.xxx002.delete");
运行后: