您当前的位置: 首页 > 

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Curator的简单封装使用

恐龙弟旺仔 发布时间:2020-02-22 12:00:17 ,浏览量:0

前言:

    闲来无事,想起把zookeeper的相关知识再补充一下。作为中间件部门的成员,与zookeeper打交道的机会还是很多的。目前市面上的很多产品,尤其是分布式相关的,基本都会用上zookeeper。

    本文不是一篇介绍zookeeper是什么和怎么使用的文章,而是介绍zookeeper的使用框架Curator的。本来笔者还想着把Curator的使用再好好熟悉下,因为一段时间不用就觉得生疏了,突然发现之前竟然对Curator的使用封装过,好吧,那就发出来吧,以供大家参考、指正。

 

    写这篇博客的另一个动机就是为了下一篇博客,下一篇会写一下真正的大牛他们是怎么封装使用Curator,也算是抛砖引玉了。

 

准备工作:

    老规矩,首先介绍下笔者的使用环境和相关依赖。

    1.zookeeper服务,单机版,本地使用,版本为3.4.11

    2.maven依赖如下:

		
		
		    org.apache.zookeeper
		    zookeeper
		    3.4.11
			
				
					log4j
					log4j
				
			
		

		
		
		    org.apache.curator
		    curator-framework
		    4.0.0
			
				
					zookeeper
					org.apache.zookeeper
				
			
		
		
		
			org.apache.curator
			curator-recipes
			4.0.0
		

1.Curator封装使用

    笔者就写了一个单类,里面封装CRUD的操作方法,具体如下:

package com.example.demo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.Executor;

/**
 * Curator的简单封装
 * @author lucky
 */
public class CuratorManager {

	/** 默认值 */
	private static final String DEFAULT_CONNECT_STRING = "localhost:2181";
	private static final int DEFAULT_SESSION_TIME_OUT = 5000;
	private static final int DEFAULT_CONNECTION_TIME_OUT = 3000;
	private static final RetryPolicy DEFAULT_RETRY_POLICY
			= new ExponentialBackoffRetry(1000, 3);

	/** zookeeper服务器地址 */
	private String connectAddr;

	/** session超时时间 */
	private int sessionTimeOut;

	/** 连接超时时间 */
	private int connectionTimeOut;

	/** 重试策略 */
	private RetryPolicy policy = null;

	private CuratorFramework client = null;

	/** 构造方法 */
	public CuratorManager() {
		this(DEFAULT_CONNECT_STRING,DEFAULT_SESSION_TIME_OUT,DEFAULT_CONNECTION_TIME_OUT,DEFAULT_RETRY_POLICY);
	}

	public CuratorManager(String connectAddr) {
		this(connectAddr,DEFAULT_SESSION_TIME_OUT,DEFAULT_CONNECTION_TIME_OUT,DEFAULT_RETRY_POLICY);
	}

	public CuratorManager(String connectAddr, Integer sessionTimeOut, Integer connectionTimeOut, RetryPolicy policy) {
		this.connectAddr = connectAddr == null ? DEFAULT_CONNECT_STRING : connectAddr;
		this.sessionTimeOut = sessionTimeOut == null ? DEFAULT_SESSION_TIME_OUT : sessionTimeOut;
		this.connectionTimeOut = connectionTimeOut == null ? DEFAULT_CONNECTION_TIME_OUT : connectionTimeOut;
		this.policy = policy == null ? DEFAULT_RETRY_POLICY : policy;

		this.client = CuratorFrameworkFactory.newClient(connectAddr,sessionTimeOut,connectionTimeOut,policy);
		client.start();
	}

	/** create */

	/**
	 * 只创建路径,data默认为空
	 * @param path 路径
	 * @throws Exception
	 */
	public void create(String path) throws Exception{
		this.createWithData(path,"");
	}

	/**
	 * 创建路径和对应data值
	 * @param path 路径
	 * @param data 值
	 * @throws Exception
	 */
	public void createWithData(String path, String data) throws Exception{
		this.createWithMode(path,data,CreateMode.PERSISTENT);
	}

	/**
	 * 创建路径和对应data值,并指定创建模式
	 * @param path 路径
	 * @param data 值
	 * @param mode 创建模式
	 * @throws Exception
	 */
	public void createWithMode(String path, String data, CreateMode mode) throws Exception{
		client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, data.getBytes());
	}

	/**
	 * 创建路径和对应data值,指定创建模式,并创建callback
	 * @param path 路径
	 * @param data 值
	 * @param mode 创建模式
	 * @param callback 响应事件
	 * @param es 使用线程池来执行响应事件
	 * @throws Exception
	 */
	public void createWithBackGround(String path, String data, CreateMode mode, BackgroundCallback callback, Executor es) throws Exception {

		ACLBackgroundPathAndBytesable pathAndBytesable = client.create().creatingParentsIfNeeded().withMode(mode);
		if (null != callback) {
			if (null != es) {
				pathAndBytesable.inBackground(callback, es);
			} else {
				pathAndBytesable.inBackground(callback);
			}
		}

		pathAndBytesable.forPath(path, data.getBytes());
	}

	/** delete */

	/**
	 * 删除该路径,包括子节点
	 * @param path 路径
	 * @throws Exception
	 */
	public void delete(String path) throws Exception {
		this.deleteWithVersion(path,-1);
	}

	/**
	 * 删除该路径指定version,包括子节点
	 * @param path 路径
	 * @param version 版本
	 * @throws Exception
	 */
	public void deleteWithVersion(String path, int version) throws Exception {
		this.deleteWithAllChildren(path, version, false);
	}

	/**
	 * 可自定义是否删除该path下子节点
	 * @param path 路径
	 * @param version 版本
	 * @param deleteChildren 是否删除子节点
	 * @throws Exception
	 */
	public void deleteWithAllChildren(String path, int version, boolean deleteChildren) throws Exception {
		if (deleteChildren) {
			client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version).forPath(path);
		} else {
			client.delete().guaranteed().withVersion(version).forPath(path);
		}
	}

	/** get */

	/**
	 * 获取路径对应值
	 * @param path 路径
	 * @return 值
	 * @throws Exception
	 */
	public String get(String path) throws Exception {
		byte[] bytes = client.getData().forPath(path);
		return new String(bytes);
	}

	/**
	 * 获取路径stat信息
	 * @param path 路径
	 * @return stat信息
	 * @throws Exception
	 */
	public Stat getWithStat(String path) throws Exception {
		Stat stat = new Stat();
		client.getData().storingStatIn(stat).forPath(path);

		return stat;
	}


	/** update */

	/**
	 * 修改path对应data值
	 * @param path 路径
	 * @param data 值
	 * @throws Exception
	 */
	public void update(String path, String data) throws Exception {
		this.updateWithVersion(path ,data, -1);
	}

	/**
	 * 修改path对应version的data值
	 * @param path 路径
	 * @param data 值
	 * @param version 版本
	 * @throws Exception
	 */
	public void updateWithVersion(String path, String data, int version) throws Exception {
		client.setData().withVersion(version).forPath(path, data.getBytes());
	}

	/**
	 * 添加对应path的监听,当该path的data值变动时,则触发callback
	 * @param path 路径
	 * @param callback 回调方法
	 * @param es 使用线程池来执行响应事件
	 * @throws Exception
	 */
	public void dataListener(String path, NodeCacheCallback callback, Executor es) throws Exception {
		NodeCache nodeCache = new NodeCache(client, path, false);
		nodeCache.start(true);

		NodeCacheListener listener = new NodeCacheListener() {
			@Override
			public void nodeChanged() throws Exception {
				callback.callback(nodeCache);
			}
		};

		if (null != es) {
			nodeCache.getListenable().addListener(listener, es);
		} else {
			nodeCache.getListenable().addListener(listener);
		}
	}

	/**
	 * 添加对应path的监听,当该path下的子节点发生变动时,则触发callback
	 * 注意:二级子节点时间无法监听
	 * @param path 路径
	 * @param callback 回调方法
	 * @param es 使用线程池来执行响应事件
	 * @throws Exception
	 */
	public void childrenListener(String path, NodeChildrenCacheCallback callback, Executor es) throws Exception {

		PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true);
		pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

		PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener(){
			@Override
			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
				callback.callback(pathChildrenCache, client, event);
			}
		};

		if (null != es) {
			pathChildrenCache.getListenable().addListener(pathChildrenCacheListener, es);
		} else {
			pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
		}
	}

	/**
	 * 自定义data值变化监听接口
	 */
	interface NodeCacheCallback{
		void callback(NodeCache cache);
	}

	/**
	 * 自定义子节点变动监听接口
	 */
	interface NodeChildrenCacheCallback{
		void callback(PathChildrenCache cache, CuratorFramework client, PathChildrenCacheEvent event);
	}

	public static void main(String[] args) {

		CuratorManager curatorManager = new CuratorManager();

		String path = "/xw/test/create";
		String data = "just for test";

		try {

			// create
			curatorManager.createWithData(path ,data);
            // update
			curatorManager.update(path,"test just");
            // get state
			Stat withStat = curatorManager.getWithStat(path);
			System.out.println(withStat);
			
            // get data
			String s = curatorManager.get(path);
			System.out.println(s);

            // delete
			curatorManager.delete(path);
			curatorManager.deleteWithAllChildren("/xw",-1,true);

            // data change listener
			curatorManager.dataListener(path, new NodeCacheCallback() {
				@Override
				public void callback(NodeCache cache) {
					System.out.println("node data update, current data: " + cache.getCurrentData());
				}
			}, null);

			// children change listener 
			curatorManager.childrenListener(path, new NodeChildrenCacheCallback() {
				@Override
				public void callback(PathChildrenCache cache, CuratorFramework client, PathChildrenCacheEvent event) {
					switch (event.getType()) {
						case CHILD_ADDED:
							System.out.println("child_add, data:" + event.getData());

						case CHILD_UPDATED:
							System.out.println("child_update, data:" + event.getData());

						case CHILD_REMOVED:
							System.out.println("child_removed, data:" + event.getData());

						default:
							break;
					}
				}
			}, null);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
  总结:

    简单的封装使用Curator,看了越来越多的源码之后,就会发现,写代码的时候不自觉的会想往这方面去靠。

    会去想着写更简洁的、封装性更好的代码。

    同时设计模式相关的也会不自觉的冒出来。

    共勉之!

 

关注
打赏
1655041699
查看更多评论
立即登录/注册

微信扫码登录

0.0423s