您当前的位置: 首页 >  zookeeper

Charge8

暂无认证

  • 3浏览

    0关注

    447博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Zookeeper官方客户端API使用

Charge8 发布时间:2022-05-23 10:32:41 ,浏览量:3

ZooKeeper应用的开发主要通过 Java客户端API去连接和操作ZooKeeper集群。

ZooKeeper的 Java客户端API有:

  • ZooKeeper官方的Java客户端API。
  • 第三方的Java客户端API:比如:ZKClient,Curator(重点)
一、Zookeeper官方客户端API

ZooKeeper官方的 Java客户端API提供了基本的操作。

例如:创建会话、创建节点、读取节点、 更新数据、删除节点和检查节点是否存在等。不过,对于实际开发来说,ZooKeeper官方API有一些不足之处,例如:

  • ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册。∙
  • 会话超时之后没有实现重连机制。
  • 异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常。
  • 仅提供了简单的 byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口。
  • 创建节点时如果抛出异常,需要自行检查节点是否存在。
  • 无法实现级联删除。

总之,ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用。但是,第三方客户端框架的底层封装使用的都是原生API。所以有必要了解一下原生API。

Zookeeper 提供了 Java客户端,创建项目并引入其依赖:

    
    
      org.apache.zookeeper
      zookeeper
      3.6.3
    

注意:保持与服务端版本一致,不然会有很多兼容性的问题。

1、 ZooKeeper类

ZooKeeper原生客户端主要使用 org.apache.zookeeper.ZooKeeper类来连接和使用 ZooKeeper服务。

1.1 常用构造器

简单介绍一下参数比较多的构造器:

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider)

  • connectString:使用逗号分隔的列表,每个ZooKeeper节点是一个 host:port 对,集群模式下用逗号隔开。
    • host:是机器名或者IP地址,
    • port:是ZooKeeper节点对客户端提供服务的端口号。
  • sessionTimeout:会话超时时间,该值不能超过服务端所设置的minSessionTimeout 和maxSessionTimeout;
  • watcher:会话监听器,服务端事件将会触该监听;一般我们可以实现 Watcher接口。
  • sessionId(long):自定义会话ID;
  • sessionPasswd(byte[]):会话密码;
  • canBeReadOnly(boolean):该连接是否为只读的;
  • hostProvider(HostProvider):服务端地址提供者,指示客户端如何选择某个服务来调用,默认采用StaticHostProvider实现;

下面实例我们使用这个构造器:

ZooKeeper (connectString, sessionTimeout, watcher)

1.2 主要方法
  • create(path, data, acl,createMode):创建一个给定路径的 znode,并在znode 保存 data[]的 数据,createMode指定 znode 的类型。
  • delete(path, version):如果给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode。
  • exists(path, watch):判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch。
  • getData(path, watch):返回给定 path 上的 znode 数据,并在 znode 设置一个watch。
  • setData(path, data, version):如果给定 path 上的 znode 的版本和给定的version 匹配,设置 znode 数据。
  • getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字,并在 znode 设置一个 watch。
  • sync(path):把客户端 session 连接节点和 leader 节点进行同步。
2、原生Java客户端使用 2.1 连接ZK服务
  • clientConnnect1方法:使用匿名监听类
  • clientConnnect2方法:使用自定义监听类
public class ZkConnectUtils {

    private static String CONNECT_STR = "192.168.xxx.xxx:2181"; //集群模式下用逗号隔开
    //private final static String CLUSTER_CONNECT_STR = "192.168.xxx.xxx:2181,192.168.xxx.xxx:2181,192.168.xxx.xxx:2181";
    private final static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        // 因为建立 ZooKeeper连接,本身是异步过程,所以,我们使用功能 countDownLatch让其等待一下。
        //ZooKeeper zooKeeper = clientConnnect1();
        ZooKeeper zooKeeper = clientConnnect2();
        System.out.println("==zooKeeper==" + zooKeeper);
        System.out.println(zooKeeper.getState());// CONNECTED
    }

    /**
     * 使用匿名监听类
     * @throws Exception
     */
    public static ZooKeeper clientConnnect1() throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STR, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (Event.KeeperState.SyncConnected == event.getState() && event.getType() == Event.EventType.None) {
                    // 如果收到了服务端的响应事件,连接成功
                    countDownLatch.countDown();
                    System.out.println("连接已建立");
                }
            }
        });
        System.out.printf("连接中...");
        countDownLatch.await();
        return zooKeeper;

    }

    /**
     * 使用自定义监听类
     * @throws Exception
     */
    public static ZooKeeper clientConnnect2() throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STR, 5000, new MyZkSessionWatcher(countDownLatch));
        System.out.printf("连接中...");
        countDownLatch.await();
        return zooKeeper;
    }
}

自定义会话监听类:

public class MyZkSessionWatcher implements Watcher {

    private CountDownLatch countDownLatch;

    public MyZkSessionWatcher(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            if(Event.EventType.None == event.getType() && null == event.getPath()){
                // 如果收到了服务端的响应事件,连接成功
                countDownLatch.countDown();
                System.out.println("===连接已建立===" + event.getState());
                System.out.println(" zookeeper session established");
            }else if(Event.EventType.NodeCreated == event.getType()){
                System.out.println(" success create znode ");
            }else if(Event.EventType.NodeDataChanged == event.getType()){
                System.out.println(" success change znode data" + event.getPath());
            }else if(Event.EventType.NodeDeleted == event.getType()){
                System.out.println(" success delete znode ");
            }else if(Event.EventType.NodeChildrenChanged == event.getType()){
                System.out.println(" success change NodeChildrenznode " + event.getPath());
            }
        }
    }
}
2.2 CRUD实例
public class CRUDService {

    public static void main(String[] args) throws Exception {
        ZooKeeper zooKeeper = ZkConnectUtils.clientConnnect2();

        syncCreateNode(zooKeeper, "/sync_Node", "value11");
        asyncCreateNode(zooKeeper, "/async_Node", "value22");

        String data = getNodeData(zooKeeper, "/sync_Node");
        System.out.println(data);

        updateNodeData(zooKeeper, "/sync_Node", "update value11111");
        deleteNode(zooKeeper, "/async_Node");
    }

    /**
     * 同步创建节点
     * @param zooKeeper
     * @param node
     * @param data
     */
    private static void syncCreateNode(ZooKeeper zooKeeper, String node, String data) {
        try {
            String result_path = zooKeeper.create(node,
                    data.getBytes(StandardCharsets.UTF_8), //节点数据
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, //所有权限
                    CreateMode.PERSISTENT); //持久节点
            System.out.println("syncCreateNode: result_path=" + result_path);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 异步创建节点
     * @param zooKeeper
     * @param node
     * @param data
     */
    private static void asyncCreateNode(ZooKeeper zooKeeper, String node, String data) throws Exception {
        zooKeeper.create(node, data.getBytes(StandardCharsets.UTF_8),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL, //临时节点
                (rc, path, ctx, name) -> {
                    System.out.println("asyncCreateNode 回调: ");
                    System.out.println("rc=" + rc);
                    System.out.println("path=" + path);
                    System.out.println("ctx=" + ctx);
                    System.out.println("name=" + name);
                },"context");
        TimeUnit.SECONDS.sleep(2);
    }

    /**
     * 获取节点数据
     * @param zooKeeper
     * @param node
     * @return
     */
    private static String getNodeData(ZooKeeper zooKeeper, String node) throws Exception {
        byte[] data = zooKeeper.getData(node, true, new Stat());
        return new String(data, StandardCharsets.UTF_8);
    }

    /**
     * 修改节点数据
     * @param zooKeeper
     * @param node
     * @param data
     */
    private static void updateNodeData(ZooKeeper zooKeeper, String node, String data) throws Exception {
        // 添加监听,Watch就会监听到
        zooKeeper.exists(node, true);
        /**
         * 如果想要都能监听到。必须在修改之后调用exists方法或者其他起作用的方法,比如:get,exists等
         *  -1 表示任何版本,也可以指定某个版本,多次修改只会监听一次,
         */
        zooKeeper.setData(node, data.getBytes(StandardCharsets.UTF_8), -1);
        //client.exists(node, true);
        zooKeeper.setData(node, data.getBytes(StandardCharsets.UTF_8), -1);
    }

    /**
     * 删除节点
     * @param zooKeeper
     * @param node
     */
    private static void deleteNode(ZooKeeper zooKeeper, String node) throws Exception {
        // 添加监听,Watch就会监听到
        zooKeeper.exists(node, true);
        zooKeeper.delete(node, -1);
    }

}

在这里插入图片描述

– 求知若饥,虚心若愚。

关注
打赏
1664721914
查看更多评论
立即登录/注册

微信扫码登录

0.0483s