Curator是 Netflix公司开源的一套ZooKeeper客户端框架,和 ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及 NodeExistsException异常等。
Curator还为 ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。
Curator 客户端使用更加方便,功能更加强大,目前应用更加广泛。
官方网站:https://curator.apache.org/
添加依赖:
- curator-framework:是对ZooKeeper的底层API的一些封装。
- curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
org.apache.zookeeper
zookeeper
3.6.3
org.apache.curator
curator-framework
5.1.0
org.apache.zookeeper
zookeeper
org.apache.curator
curator-recipes
5.1.0
org.apache.zookeeper
zookeeper
1、创建一个客户端实例
在使用 curator-framework包操作 ZooKeeper前,首先要创建一个客户端实例(CuratorFramework类型的对象
)。
有两种方法:
- 使用工厂类CuratorFrameworkFactory的静态 newClient()方法。
- 使用工厂类CuratorFrameworkFactory的静态 builder构造者方法。
public class CuratorClientConnectUtils {
private static String CONNECT_STR = "192.168.xxx.xxx:2181"; //集群模式下用逗号隔开
private static class InnerClass{
private static CuratorFramework client = clientConnect2();
}
public static CuratorFramework getInstance(){
return InnerClass.client;
}
public static void main(String[] args){
//CuratorFramework client = clientConnect1();
CuratorFramework client = clientConnect2();
//启动客户端,必须要有
client.start();
System.out.println("==CuratorFramework==" + client);
}
/**
* 使用工厂类CuratorFrameworkFactory的静态newClient()方法。
* @throws Exception
*/
public static CuratorFramework clientConnect1() {
// 重试策略, 失败重连3次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 3);
//创建客户端实例
CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STR, retryPolicy);
return client;
}
/**
* 使用工厂类CuratorFrameworkFactory的静态builder构造者方法。
* @throws Exception
*/
public static CuratorFramework clientConnect2() {
// 重试策略, 失败重连3次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 3);
//创建客户端实例
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(CONNECT_STR)
.sessionTimeoutMs(3000) // 会话超时时间
.connectionTimeoutMs(50000) // 连接超时时间
.retryPolicy(retryPolicy)
.namespace("Curator_Workspace") // 指定隔离名称,表示所有节点的操作都会在该工作空间下进行。不指定时,使用自定义的节点path
.build();
return client;
}
}
部分参数说明:
- connectionString: 服务器地址列表,集群用逗号分隔, 如 host1:port1,host2:port2,host3:port3 。
- retryPolicy: 重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。
- 超时时间: Curator 客户端创建过程中,有两个超时时间的设置。
- 一个是sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。
- 一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。 sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。
1)创建节点
在 Curator 中,客户端可以使用 create 方法
创建数据节点,并通过 withMode 方法
指定节点类型(持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 方法
来指定节点的路径和数据信息。 也支持级联创建节点需要指定 creatingParentsIfNeeded方法
。
2)获取数据
客户端使用 getData()方法
获取节点数据。
3)更新节点 客户端使用 setData() 方法更新 ZooKeeper 服务上的数据节点,在 setData 方法
的后边,通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。
4)删除节点
客户端使用 delete方法
删除节点。
guaranteed方法:
主要作用是保障删除成功。 其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。deletingChildrenIfNeeded方法:
指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子孙节点。
5)异步接口
Curator 引入了 BackgroundCallback 接口
,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。
客户端可以通过 inBackground方法
添加 BackgroundCallback 接口来异步处理操作。异步处理回调我们可以在 processResult方法
操作。实例中异步创建节点使用了默认的线程。
CRUD实例如下:
public class CRUDService {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorClientConnectUtils.getInstance();
// 启动客户端
client.start();
String path1 = "/curator_znode/syncCreateNode.node1/node1"; //允许级联创建
String path2 = "/curator_znode/asyncCreateNode.node1";
String path3 = "/curator_znode2";
//createOneNode(client, path3, "value path");
syncCreateNode(client, path1, "VALUE11");
//asyncCreateNode(client, path2, "VALUE22");
//asyncCreateNode2(client, path2, "VALUE22");
//String nodeData = getNodeData(client, path1);
//System.out.println("getNodeData, path1=:" + nodeData);
updateNodeData(client, path1, "value33333");
//deleteNode(client, path3);
String path = "/curator_znode";
//deleteChildrenIfNeeded(client, path);
}
/**
* 创建单节点
*
* @param client
* @param path - 如果是级联节点,父节点不存在时,会报错:KeeperErrorCode = NoNode for /curator_znode/asyncCreateNode.node1
* @param data
* @throws Exception
*/
public static void createOneNode(CuratorFramework client, String path, String data) throws Exception {
String path1 = client.create()
.withMode(CreateMode.PERSISTENT)
.forPath(path, data.getBytes(StandardCharsets.UTF_8));
System.out.println("createOneNode:path1=" + path1);
}
/**
* 同步级联创建节点(原生API不支持/后台客户端也不支持,但是Curator支持)
* 父节点不存在,则会创建
* 最后的子节点如果存在,则会报错:KeeperErrorCode = NodeExists for /curator_znode/syncCreateNode.node1/node1
* @param client
* @param path
* @param data
*/
public static void syncCreateNode(CuratorFramework client, String path, String data) {
try {
String path1 = client.create()
.creatingParentsIfNeeded() //级联创建,父节点不存在,则会创建
.withMode(CreateMode.PERSISTENT) //持久节点
.forPath(path, data.getBytes(StandardCharsets.UTF_8));
System.out.println("syncCreateNode:path1=" + path1);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 异步级联创建节点
* @param client
* @param path
* @param data
*/
public static void asyncCreateNode(CuratorFramework client, String path, String data) throws Exception {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.inBackground(new BackgroundCallback() {
// 创建成功的回调
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("asyncCreateNode=========" + event.getName() + ":" + event.getPath());
}
})
.forPath(path, data.getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
}
/**
* 异步级联创建节点
* @param client
* @param path
* @param data
*/
public static void asyncCreateNode2(CuratorFramework client, String path, String data) throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.inBackground(new BackgroundCallback() {
// 创建成功的回调
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("asyncCreateNode====使用自定义线程池=====" + event.getName() + ":" + event.getPath());
}
}, executorService)
.forPath(path, data.getBytes(StandardCharsets.UTF_8));
TimeUnit.MILLISECONDS.sleep(100);
}
/**
* 获取节点数据
* @param client
* @param path
*/
public static String getNodeData(CuratorFramework client, String path) throws Exception {
byte[] data = client.getData().storingStatIn(new Stat()).forPath(path);
return new String(data, StandardCharsets.UTF_8);
}
/**
* 修改节点数据
* @param client
* @param path
* @param data
*/
public static void updateNodeData(CuratorFramework client, String path, String data) throws Exception {
Stat stat = client.setData()
//.withVersion(-1) //可以根据需要使用
.forPath(path, data.getBytes(StandardCharsets.UTF_8));
System.out.println("updateNodeData path, stat:" + stat);
}
/**
* 删除该节点,该节点必须为空节点
* 如果该节点有子节点会报错:KeeperErrorCode = Directory not empty for /curator_znode
* @param client
* @param path
*/
public static void deleteNode(CuratorFramework client, String path) throws Exception {
client.delete()
//.withVersion(1) // 版本号不匹配时,无法删除,会报错:KeeperErrorCode = BadVersion for /curator_znode
.forPath(path); //删除该节点
}
/**
* 级联删除该节点以及子孙节点
* @param client
* @param path
*/
public static void deleteChildrenIfNeeded(CuratorFramework client, String path) throws Exception {
client.delete()
.guaranteed() //保证删除:如果删除失败,那么在后端还是继续会删除,直到成功
.deletingChildrenIfNeeded()
//.withVersion(1)
.forPath(path); //级联删除
}
/**
* 判断node节点是否存在
* @param client
* @param nodePath
* @return true - 表示节点存在
* @throws Exception
*/
public static boolean checkNodeExists(CuratorFramework client,String nodePath) throws Exception {
Stat stat = client.checkExists().forPath(nodePath);
System.out.println(null==stat ? "节点不存在" : "节点存在");
return null != stat;
}
}
2.1 报错:KeeperErrorCode = ConnectionLoss
14:49:35.317 [Curator-Framework-0] ERROR org.apache.curator.framework.imps.CuratorFrameworkImpl - Background operation retry gave up
org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
...
14:49:35.319 [Curator-Framework-0] ERROR org.apache.curator.framework.imps.CuratorFrameworkImpl - Background retry gave up
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:981)
...
14:49:36.313 [Curator-ConnectionStateManager-0] WARN org.apache.curator.framework.state.ConnectionStateManager - Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: 1002. Adjusted session timeout ms: 1000
报错信息如上,查看了一下: 具体是什么时间超时判断,暂时不清楚。我尝试把会话时间调到3000之后。问题就解决了。
cache是一种缓存机制,可以借助 cache实现监听。
简单来说,cache在客户端缓存了 znode的各种状态,当感知到 zk集群的 znode状态变化,会触发 event事件,注册的监听器会处理这些事件。
curator支持的 cache种类有4种:
- Path Cache Path Cache用来观察ZNode的子节点并缓存状态,但是不会对二级子节点进行监听。如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。
- Node Cache Node Cache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。
- Tree Cache Tree Cache是上面两种的合体,Tree Cache观察的是自身+所有子节点的所有数据,并缓存所有节点数据。 TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点 进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件。
Curator Cache
Curator Cache,是在 zk3.6新版本添加的特性,之前版本上面3种监听可以单独使用,在 zk3.6+版本之后,上面3种监听被标记为过时,并使用CuratorCache取代了上面3种监听
。
Curator Cache类型是通过 CuratorCache类来实现的,监听器对应的接口为 CuratorCacheListener。
Curator Cache使用实例如下:
public class CuratorCacheTest {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorClientConnectUtils.getInstance();
// 启动客户端
client.start();
String path1 = "/curatorCache1";
String path2 = "/curatorCache2";
curatorCache1(client, path1);
curatorCache2(client, path2);
if(CRUDService.checkNodeExists(client, path1)){
CRUDService.deleteChildrenIfNeeded(client, path1);
//CRUDService.deleteNode(client, path1);
}
if(CRUDService.checkNodeExists(client, path2)){
CRUDService.deleteChildrenIfNeeded(client, path2);
//CRUDService.deleteNode(client, path2);
}
CRUDService.syncCreateNode(client, path1, "VALUE22");
CRUDService.asyncCreateNode(client, path2, "VALUE22");
CRUDService.updateNodeData(client, path2, "value444");
String path22 = path2 + "/node1";
CRUDService.syncCreateNode(client, path22, "VALUE22");
//让线程休眠(为了方便测试)
TimeUnit.MINUTES.sleep(3);
}
public static void curatorCache1(CuratorFramework zkClient, String path) {
CuratorCache curatorCache = CuratorCache.build(zkClient, path);
curatorCache.listenable().addListener(new CuratorCacheListener() {
@Override
public void event(Type type, ChildData oldData, ChildData newdata) {
switch (type) {
//各种判断
case NODE_CREATED:
System.out.println("==该节点被创建,type=" + type);
break;
default:
System.out.println("==该节点操作,type=" + type);
break;
}
}
});
//开启缓存。 这将导致缓存根节点的完整刷新,并为所有找到的节点生成事件,等等。
curatorCache.start();
}
public static void curatorCache2(CuratorFramework zkClient, String path) throws InterruptedException {
CuratorCache curatorCache = CuratorCache.builder(zkClient,path).build();
//构建监听器:
//1.node cache:CuratorCacheListener.builder().forNodeCache(new NodeCacheListener(){} );
//2.path cache:CuratorCacheListener.builder().forPathChildrenCache();
//3.tree cache:CuratorCacheListener.builder().forTreeCache.forTreeCache();
CuratorCacheListener listener = CuratorCacheListener.builder()
.forNodeCache(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点改变了...");
}
})
.build();
//添加监听
curatorCache.listenable().addListener(listener);
//开启缓存
curatorCache.start();
}
}
在zk服务端,使用命令操作了节点,监听被触发。
– 求知若饥,虚心若愚。