ZooKeeper客户端实战

目录

Zookeeper原生Java客户端使用

引入zookeeper依赖

ZooKeeper常用构造器

使用原生API连接zookeeper

Zookeeper主要方法

Zookeeper方法特点

Curator开源客户端使用

引入依赖

 Curator使用案例


Zookeeper原生Java客户端使用

引入zookeeper依赖

<!-- zookeeper client -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>

注意:保持与服务端版本一致,不然会有很多兼容性的问题。

ZooKeeper原生客户端主要使用org.apache.zookeeper.ZooKeeper这个类来使用ZooKeeper服务。

ZooKeeper常用构造器

ZooKeeper (connectString, sessionTimeout, watcher)

connectString:
      使用逗号分隔的列表,每个ZooKeeper节点是一个host.port对,host是机器名或者IP地址,port是ZooKeeper节点对客户端提供服务的端口号。客户端会任意选取connectString中的一个节点建立连接。
sessionTimeout :  session timeout时间。
watcher:用于接收到来自ZooKeeper集群的事件。 


使用原生API连接zookeeper

public class ZkClientDemo {

    // 单机跟集群的配置
    private static final  String  CONNECT_STR="localhost:2181";
    private final static  String CLUSTER_CONNECT_STR="ip:2181,ip:2181,ip:2181";

    public static void main(String[] args) throws Exception {

        final CountDownLatch countDownLatch=new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STR,
                4000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if(Event.KeeperState.SyncConnected==event.getState() 
                        && event.getType()== Event.EventType.None){
                    //如果收到了服务端的响应事件,连接成功
                    countDownLatch.countDown();
                    System.out.println("连接建立");
                }
            }
        });
        System.out.printf("连接中");
        countDownLatch.await();
        //CONNECTED
        System.out.println(zooKeeper.getState());

        //创建持久节点
        zooKeeper.create("/user","zhangsan".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
}
Zookeeper主要方法

create(path, data, acl,createMode): 创建一个给定路径的 znode,并在 znode 保存 data[]的 数据,createMode指定 znode 的类型。

delete(path, version):如果给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode。

exists(path, watch):判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch。

getData(path, watch):返回给定 path 上的 znode 数据,并在 znode 设置一个 watch。

setData(path, data, version):如果给定 path 上的 znode 的版本和给定的 version 匹配,设置 znode 数据。

getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字,并在 znode 设置一个 watch。

sync(path):把客户端 session 连接节点和 leader 节点进行同步。


Zookeeper方法特点

1. 所有获取 znode 数据的 API 都可以设置一个 watch 用来监控 znode 的变化。

2. 所有更新 znode 数据的 API 都有两个版本: 无条件更新版本和条件更新版本。如果 version 为 -1,更新为无条件更新。否则只有给定的 version 和 znode 当前的 version 一样,才会进行更新,这样的更新是条件更新。

3. 所有的方法都有同步和异步两个版本。同步版本的方法发送请求给 ZooKeeper 并等待服务器的响 应。异步版本把请求放入客户端的请求队列,然后马上返回。异步版本通过 callback 来接受来 自服务端的响应。

以下是zookeeper一些常见的使用方法案例:

public class ZkClientDemo {

    private static final  String  CONNECT_STR="127.0.0.1:2181";

    public static void main(String[] args) throws Exception {
        //获取zookeeper对象
        ZooKeeper zooKeeper = ZooKeeperFacotry.create(CONNECT_STR);
        //CONNECTED
        System.out.println(zooKeeper.getState());

        Stat stat = zooKeeper.exists("/user",false);
        if(null ==stat){
            //创建持久节点
            zooKeeper.create("/user","zhangsan".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

        //永久监听  addWatch -m mode  /user
        zooKeeper.addWatch("/user",new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println(event);
                //TODO
            }
        },AddWatchMode.PERSISTENT);

        stat = new Stat();
        byte[] data = zooKeeper.getData("/user", false, stat);
        System.out.println(" data: "+new String(data));
        System.out.println(" stat: "+stat);
        // -1: 无条件更新
        //zooKeeper.setData("/user", "third".getBytes(), -1);
        // 带版本条件更新
        int version = stat.getVersion();
        zooKeeper.setData("/user", "zhangsan".getBytes(), version);
    }
}

Curator开源客户端使用

       Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。 在实际的开发场景中,使用Curator客户端就足以应付日常的ZooKeeper集群操作的需求。

引入依赖

Curator 包含了几个包:

curator-framework是对ZooKeeper的底层API的一些封装。

curator-client提供了一些客户端的操作,例如重试策略等。

curator-recipes封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。

<!-- zookeeper client -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>

<!--curator-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.1.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>

 Curator使用案例

一些核心概念跟原生java很类似,以下是使用Curator来完成一些常见操作。

public class CuratorDemo {

    private static final  String  CONNECT_STR="127.0.0.1:2181";
    private final static  String CLUSTER_CONNECT_STR="ip:2181,ip:2181,ip:2181";

    public static void main(String[] args) throws Exception {
        //构建客户端实例
        CuratorFramework curatorFramework= CuratorFrameworkFactory.builder()
                .connectString(CONNECT_STR)
                .retryPolicy(new ExponentialBackoffRetry(1000,3)) // 设置重试策略
                .build();
        //启动客户端
        curatorFramework.start();

        String path = "/user";

        // 检查节点是否存在
        Stat stat = curatorFramework.checkExists().forPath(path);
        if (stat != null) {
            // 删除节点
            curatorFramework.delete()
                    .deletingChildrenIfNeeded()  // 如果存在子节点,则删除所有子节点
                    .forPath(path);  // 删除指定节点
        }
        // 创建节点
        curatorFramework.create()
                .creatingParentsIfNeeded()  // 如果父节点不存在,则创建父节点
                .withMode(CreateMode.PERSISTENT)
                .forPath(path, "Init Data".getBytes());

        // 注册节点监听
        curatorFramework.getData()
                .usingWatcher(new CuratorWatcher() {
                    @Override
                    public void process(WatchedEvent event) throws Exception {
                        byte[] bytes = curatorFramework.getData().forPath(path);
                        System.out.println("Node data changed: " + new String(bytes));
                    }
                })
                .forPath(path);


        // 更新节点数据    set /user  Update Data
        curatorFramework.setData()
                .forPath(path, "Update Data".getBytes());


       stat=new Stat();
        //查询节点数据
        byte[] bytes = curatorFramework.getData().storingStatIn(stat)
                .forPath("/user");
        System.out.println(new String(bytes));


        ExecutorService executorService = Executors.newSingleThreadExecutor();
        //异步处理,可以指定线程池
        curatorFramework.getData().inBackground((item1, item2) -> {
            System.out.println("background:"+item1+","+item2);
            System.out.println(item2.getStat());
        },executorService).forPath(path);


        // 创建节点缓存,用于监听指定节点的变化
        final NodeCache nodeCache = new NodeCache(curatorFramework, path);
        // 启动NodeCache并立即从服务端获取最新数据
        nodeCache.start(true);

        // 注册节点变化监听器
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                byte[] newData = nodeCache.getCurrentData().getData();
                System.out.println("Node data changed: " + new String(newData));
            }
        });

        // 创建PathChildrenCache
        PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
        pathChildrenCache.start();

        // 注册子节点变化监听器
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                    ChildData childData = event.getData();
                    System.out.println("Child added: " + childData.getPath());
                } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
                    ChildData childData = event.getData();
                    System.out.println("Child removed: " + childData.getPath());
                } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
                    ChildData childData = event.getData();
                    System.out.println("Child updated: " + childData.getPath());
                }
            }
        });

    }
}

相关推荐

  1. ZooKeeper客户实战

    2024-02-02 01:36:01       30 阅读
  2. Kafka客户实战

    2024-02-02 01:36:01       33 阅读
  3. go实现tcp客户

    2024-02-02 01:36:01       21 阅读
  4. python实现UDP客户

    2024-02-02 01:36:01       15 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-02-02 01:36:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-02-02 01:36:01       18 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-02-02 01:36:01       17 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-02-02 01:36:01       20 阅读

热门阅读

  1. 433. 最小基因变化

    2024-02-02 01:36:01       37 阅读
  2. MongoDB实战 – 用MongoDB Shell访问MongoDB数据库

    2024-02-02 01:36:01       32 阅读
  3. 与后端配合单个/批量导出excel的方法

    2024-02-02 01:36:01       29 阅读
  4. c入门第一篇——hello c!

    2024-02-02 01:36:01       29 阅读
  5. Apache Commons

    2024-02-02 01:36:01       35 阅读