前言:
闲来无事,想起把zookeeper的相关知识再补充一下。作为中间件部门的成员,与zookeeper打交道的机会还是很多的。目前市面上的很多产品,尤其是分布式相关的,基本都会用上zookeeper。
本文不是一篇介绍zookeeper是什么和怎么使用的文章,而是介绍zookeeper的使用框架Curator的。本来笔者还想着把Curator的使用再好好熟悉下,因为一段时间不用就觉得生疏了,突然发现之前竟然对Curator的使用封装过,好吧,那就发出来吧,以供大家参考、指正。
写这篇博客的另一个动机就是为了下一篇博客,下一篇会写一下真正的大牛他们是怎么封装使用Curator,也算是抛砖引玉了。
准备工作:
老规矩,首先介绍下笔者的使用环境和相关依赖。
1.zookeeper服务,单机版,本地使用,版本为3.4.11
2.maven依赖如下:
org.apache.zookeeper
zookeeper
3.4.11
log4j
log4j
org.apache.curator
curator-framework
4.0.0
zookeeper
org.apache.zookeeper
org.apache.curator
curator-recipes
4.0.0
1.Curator封装使用
笔者就写了一个单类,里面封装CRUD的操作方法,具体如下:
package com.example.demo.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.Executor;
/**
* Curator的简单封装
* @author lucky
*/
public class CuratorManager {
/** 默认值 */
private static final String DEFAULT_CONNECT_STRING = "localhost:2181";
private static final int DEFAULT_SESSION_TIME_OUT = 5000;
private static final int DEFAULT_CONNECTION_TIME_OUT = 3000;
private static final RetryPolicy DEFAULT_RETRY_POLICY
= new ExponentialBackoffRetry(1000, 3);
/** zookeeper服务器地址 */
private String connectAddr;
/** session超时时间 */
private int sessionTimeOut;
/** 连接超时时间 */
private int connectionTimeOut;
/** 重试策略 */
private RetryPolicy policy = null;
private CuratorFramework client = null;
/** 构造方法 */
public CuratorManager() {
this(DEFAULT_CONNECT_STRING,DEFAULT_SESSION_TIME_OUT,DEFAULT_CONNECTION_TIME_OUT,DEFAULT_RETRY_POLICY);
}
public CuratorManager(String connectAddr) {
this(connectAddr,DEFAULT_SESSION_TIME_OUT,DEFAULT_CONNECTION_TIME_OUT,DEFAULT_RETRY_POLICY);
}
public CuratorManager(String connectAddr, Integer sessionTimeOut, Integer connectionTimeOut, RetryPolicy policy) {
this.connectAddr = connectAddr == null ? DEFAULT_CONNECT_STRING : connectAddr;
this.sessionTimeOut = sessionTimeOut == null ? DEFAULT_SESSION_TIME_OUT : sessionTimeOut;
this.connectionTimeOut = connectionTimeOut == null ? DEFAULT_CONNECTION_TIME_OUT : connectionTimeOut;
this.policy = policy == null ? DEFAULT_RETRY_POLICY : policy;
this.client = CuratorFrameworkFactory.newClient(connectAddr,sessionTimeOut,connectionTimeOut,policy);
client.start();
}
/** create */
/**
* 只创建路径,data默认为空
* @param path 路径
* @throws Exception
*/
public void create(String path) throws Exception{
this.createWithData(path,"");
}
/**
* 创建路径和对应data值
* @param path 路径
* @param data 值
* @throws Exception
*/
public void createWithData(String path, String data) throws Exception{
this.createWithMode(path,data,CreateMode.PERSISTENT);
}
/**
* 创建路径和对应data值,并指定创建模式
* @param path 路径
* @param data 值
* @param mode 创建模式
* @throws Exception
*/
public void createWithMode(String path, String data, CreateMode mode) throws Exception{
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, data.getBytes());
}
/**
* 创建路径和对应data值,指定创建模式,并创建callback
* @param path 路径
* @param data 值
* @param mode 创建模式
* @param callback 响应事件
* @param es 使用线程池来执行响应事件
* @throws Exception
*/
public void createWithBackGround(String path, String data, CreateMode mode, BackgroundCallback callback, Executor es) throws Exception {
ACLBackgroundPathAndBytesable pathAndBytesable = client.create().creatingParentsIfNeeded().withMode(mode);
if (null != callback) {
if (null != es) {
pathAndBytesable.inBackground(callback, es);
} else {
pathAndBytesable.inBackground(callback);
}
}
pathAndBytesable.forPath(path, data.getBytes());
}
/** delete */
/**
* 删除该路径,包括子节点
* @param path 路径
* @throws Exception
*/
public void delete(String path) throws Exception {
this.deleteWithVersion(path,-1);
}
/**
* 删除该路径指定version,包括子节点
* @param path 路径
* @param version 版本
* @throws Exception
*/
public void deleteWithVersion(String path, int version) throws Exception {
this.deleteWithAllChildren(path, version, false);
}
/**
* 可自定义是否删除该path下子节点
* @param path 路径
* @param version 版本
* @param deleteChildren 是否删除子节点
* @throws Exception
*/
public void deleteWithAllChildren(String path, int version, boolean deleteChildren) throws Exception {
if (deleteChildren) {
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version).forPath(path);
} else {
client.delete().guaranteed().withVersion(version).forPath(path);
}
}
/** get */
/**
* 获取路径对应值
* @param path 路径
* @return 值
* @throws Exception
*/
public String get(String path) throws Exception {
byte[] bytes = client.getData().forPath(path);
return new String(bytes);
}
/**
* 获取路径stat信息
* @param path 路径
* @return stat信息
* @throws Exception
*/
public Stat getWithStat(String path) throws Exception {
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
return stat;
}
/** update */
/**
* 修改path对应data值
* @param path 路径
* @param data 值
* @throws Exception
*/
public void update(String path, String data) throws Exception {
this.updateWithVersion(path ,data, -1);
}
/**
* 修改path对应version的data值
* @param path 路径
* @param data 值
* @param version 版本
* @throws Exception
*/
public void updateWithVersion(String path, String data, int version) throws Exception {
client.setData().withVersion(version).forPath(path, data.getBytes());
}
/**
* 添加对应path的监听,当该path的data值变动时,则触发callback
* @param path 路径
* @param callback 回调方法
* @param es 使用线程池来执行响应事件
* @throws Exception
*/
public void dataListener(String path, NodeCacheCallback callback, Executor es) throws Exception {
NodeCache nodeCache = new NodeCache(client, path, false);
nodeCache.start(true);
NodeCacheListener listener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
callback.callback(nodeCache);
}
};
if (null != es) {
nodeCache.getListenable().addListener(listener, es);
} else {
nodeCache.getListenable().addListener(listener);
}
}
/**
* 添加对应path的监听,当该path下的子节点发生变动时,则触发callback
* 注意:二级子节点时间无法监听
* @param path 路径
* @param callback 回调方法
* @param es 使用线程池来执行响应事件
* @throws Exception
*/
public void childrenListener(String path, NodeChildrenCacheCallback callback, Executor es) throws Exception {
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true);
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener(){
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
callback.callback(pathChildrenCache, client, event);
}
};
if (null != es) {
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener, es);
} else {
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
}
}
/**
* 自定义data值变化监听接口
*/
interface NodeCacheCallback{
void callback(NodeCache cache);
}
/**
* 自定义子节点变动监听接口
*/
interface NodeChildrenCacheCallback{
void callback(PathChildrenCache cache, CuratorFramework client, PathChildrenCacheEvent event);
}
public static void main(String[] args) {
CuratorManager curatorManager = new CuratorManager();
String path = "/xw/test/create";
String data = "just for test";
try {
// create
curatorManager.createWithData(path ,data);
// update
curatorManager.update(path,"test just");
// get state
Stat withStat = curatorManager.getWithStat(path);
System.out.println(withStat);
// get data
String s = curatorManager.get(path);
System.out.println(s);
// delete
curatorManager.delete(path);
curatorManager.deleteWithAllChildren("/xw",-1,true);
// data change listener
curatorManager.dataListener(path, new NodeCacheCallback() {
@Override
public void callback(NodeCache cache) {
System.out.println("node data update, current data: " + cache.getCurrentData());
}
}, null);
// children change listener
curatorManager.childrenListener(path, new NodeChildrenCacheCallback() {
@Override
public void callback(PathChildrenCache cache, CuratorFramework client, PathChildrenCacheEvent event) {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("child_add, data:" + event.getData());
case CHILD_UPDATED:
System.out.println("child_update, data:" + event.getData());
case CHILD_REMOVED:
System.out.println("child_removed, data:" + event.getData());
default:
break;
}
}
}, null);
} catch (Exception e) {
e.printStackTrace();
}
}
}
总结:
简单的封装使用Curator,看了越来越多的源码之后,就会发现,写代码的时候不自觉的会想往这方面去靠。
会去想着写更简洁的、封装性更好的代码。
同时设计模式相关的也会不自觉的冒出来。
共勉之!