上海微信网站开发,益阳建设企业网站,高端网站开发价格,上海的网站开发公司电话Java学习面试指南#xff1a;https://javaxiaobear.cn ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有#xff1a; ZooKeeper官方的Java客户端API。 第三方的Java客户端API#xff0c;比如Curator。
ZooKeeper官方的客户… Java学习面试指南https://javaxiaobear.cn ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有 ZooKeeper官方的Java客户端API。 第三方的Java客户端API比如Curator。
ZooKeeper官方的客户端API提供了基本的操作。例如创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。不过对于实际开发来说ZooKeeper官方API有一些不足之处具体如下 ZooKeeper的Watcher监测是一次性的每次触发之后都需要重新进行注册。 会话超时之后没有实现重连机制。 异常处理烦琐ZooKeeper提供了很多异常对于开发人员来说可能根本不知道应该如何处理这些抛出的异常。 仅提供了简单的byte[]数组类型的接口没有提供Java POJO级别的序列化数据处理接口。 创建节点时如果抛出异常需要自行检查节点是否存在。 无法实现级联删除。
总之ZooKeeper官方API功能比较简单在实际开发过程中比较笨重一般不推荐使用。
1、Zookeeper 原生Java客户端使用
1、搭建服务
1、创建一个maven项目引入zookeeper client依赖
!-- zookeeper client --
dependencygroupIdorg.apache.zookeeper/groupIdartifactIdzookeeper/artifactIdversion3.8.3/version
/dependency注意保持与服务端版本一致不然会有很多兼容性的问题
2、学习API
ZooKeeper原生客户端主要使用org.apache.zookeeper.ZooKeeper这个类来使用ZooKeeper服务。
ZooKeeper常用构造器
ZooKeeper (connectString, sessionTimeout, watcher)connectString:使用逗号分隔的列表每个ZooKeeper节点是一个host.port对host 是机器名或者IP地址port是ZooKeeper节点对客户端提供服务的端口号。客户端会任意选取connectString 中的一个节点建立连接。 sessionTimeout : session timeout时间。 watcher:用于接收到来自ZooKeeper集群的事件。
使用 zookeeper 原生 API,连接zookeeper集群
3、代码连接客户端
public class ZkClientDemo {private static final String CONNECT_STR localhost:2181;/*** 连接地址*/private final static String CLUSTER_CONNECT_STRip:2181;public static void main(String[] args) throws Exception {final CountDownLatch countDownLatchnew CountDownLatch(1);ZooKeeper zooKeeper new ZooKeeper(CLUSTER_CONNECT_STR, 4000, watchedEvent - {if(Watcher.Event.KeeperState.SyncConnected watchedEvent.getState()){//如果收到了服务端的响应事件连接成功countDownLatch.countDown();System.out.println(连接建立);}});System.out.println(连接中);countDownLatch.await();//CONNECTEDSystem.out.println(zooKeeper.getState());Stat stat zooKeeper.exists(/zk,false);if(null stat){//创建持久节点zooKeeper.create(/zk,javaxiaobear.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}//持久监听zooKeeper.addWatch(/zk,new Watcher() {Overridepublic void process(WatchedEvent event) {System.out.println(event);}},AddWatchMode.PERSISTENT);Thread.sleep(Integer.MAX_VALUE);}
}2、Zookeeper主要方法 create(path, data, acl,createMode): 创建一个给定路径的 znode并在 znode 保存 data[]的 数据createMode指定 znode 的类型。 delete(path, version):如果给定 path 上的 znode 的版本和给定的 version 匹配 删除 znode。 exists(path, watch):判断给定 path 上的 znode 是否存在并在 znode 设置一个 watch。 getData(path, watch):返回给定 path 上的 znode 数据并在 znode 设置一个 watch。 setData(path, data, version):如果给定 path 上的 znode 的版本和给定的 version 匹配设置 znode 数据。 getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字并在 znode 设置一个 watch。 sync(path):把客户端 session 连接节点和 leader 节点进行同步。
方法特点 所有获取 znode 数据的 API 都可以设置一个 watch 用来监控 znode 的变化。 所有更新 znode 数据的 API 都有两个版本: 无条件更新版本和条件更新版本。如果 version 为 -1更新为无条件更新。否则只有给定的 version 和 znode 当前的 version 一样才会进行更新这样的更新是条件更新。 所有的方法都有同步和异步两个版本。同步版本的方法发送请求给 ZooKeeper 并等待服务器的响 应。异步版本把请求放入客户端的请求队列然后马上返回。异步版本通过 callback 来接受来 自服务端的响应。
1、连接Linux服务器
ZooKeeper 类通过其构造函数提供连接功能。构造函数的签名如下
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)connectionStringZooKeeper集群主机可以为集群也可为单个。sessionTimeout会话超时以毫秒为单位。watcher一个实现“Watcher”接口的对象。ZooKeeper集群通过Watcher对象返回连接状态。让我们创建ZooKeeperConnection类并添加一个方法connect。该连接方法创建一个ZooKeeper对象所连接到的ZooKeeper集群然后返回该对象。在这里CountDownLatch用于停止等待主进程直到客户端与ZooKeeper集群连接为止。 ZooKeeper集群通过Watcher回调返回连接状态。客户端与ZooKeeper集群连接后将调用Watcher回调并且Watcher回调调用CountDownLatch的countDown方法释放锁并在主进程中等待。
代码实现如下
public class ZkClientDemo {private static final String CONNECT_STR localhost:2181;/*** 连接地址*/private final static String CLUSTER_CONNECT_STRip:2181;public static void main(String[] args) throws Exception {final CountDownLatch countDownLatchnew CountDownLatch(1);ZooKeeper zooKeeper new ZooKeeper(CLUSTER_CONNECT_STR, 4000, watchedEvent - {System.out.println(watchedEvent);if(Watcher.Event.KeeperState.SyncConnected watchedEvent.getState()){//如果收到了服务端的响应事件连接成功countDownLatch.countDown();System.out.println(连接建立);}});System.out.println(连接中);countDownLatch.await();//CONNECTEDSystem.out.println(zooKeeper.getState());}
}**注意**如果连接请检查一下是否开启了防火墙服务器2181端口是否放开了
2、创建节点
create基础的方法用法如下
create(String path, byte[] data, ListACL acl, CreateMode createMode) pathZnode路径。例如/javaxiaobear/javaxiaobear/gooddata数据存储在指定的Znode点路径acl要创建的节点的访问控制列表。ZooKeeper API提供了一个静态接口ZooDefs.Ids以获取一些基本的ACL列表。例如ZooDefs.Ids.OPEN_ACL_UNSAFE返回打开的znode的acl列表。createMode节点的类型临时的序列的或两者兼而有之。这是一个枚举。
代码实现如下
/*** 创建节点* param parentPath 父节点路径* param nodeName 节点名称* param data 数据*/
public void createNode(String parentPath, String nodeName, String data) throws Exception{zooKeeper.create(parentPath / nodeName ,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}3、判断节点是否存在
ZooKeeper类提供了exists方法来检查znode的存在。如果指定的znode存在它将返回znode的元数据。exists方法的用法如下
exists(String path, boolean watcher)参数
path节点路径watcher布尔值指定是否要观看特定Z序节点或不看
代码实现如下
/*** 判断某个节点是否存在* param path 节点路径* throws Exception 异常*/
public Stat existsNode(String path) throws Exception{return zooKeeper.exists(path, true);
}4、获取节点数据
ZooKeeper类提供了getData方法来获取附加到指定znode中的数据及其状态。getData方法的用法如下
getData(String path, Watcher watcher, Stat stat)参数
pathZnode路径。watcher类型为Watcher的回调函数。当指定的znode的数据发生更改时ZooKeeper集群将通过Watcher回调进行通知。这是一次性通知。stat返回Znode点的元数据。
代码实现如下
/*** 获取节点数据* param path 路径* return* throws Exception*/
public String getData(String path) throws Exception{byte[] data zooKeeper.getData(path, false, null);return new String(data);
}5、修改节点
ZooKeeper类提供setData方法来修改附加到指定znode中的数据。setData方法的用法如下
setData(String path, byte[] data, int version)参数
pathZnode路径data数据存储在指定的Znode点的路径。versionznode的当前版本。每当更改数据时ZooKeeper都会更新znode的版本号。
代码实现如下
/*** 设置节点数据* param path 路径* param data 数据* throws Exception*/
public void setData(String path, String data) throws Exception{zooKeeper.setData(path, data.getBytes(), zooKeeper.exists(path, true).getVersion());
}6、获取子节点数据
ZooKeeper类提供了getChildren方法来获取特定znode的所有子节点。getChildren方法的用法如下
getChildren(String path, Watcher watcher)参数
pathZnode路径。watcher类型为Watcher的回调函数。当指定的znode的数据发生更改时ZooKeeper集群将通过Watcher回调进行通知。这是一次性通知。
代码实现如下
/*** 获取路径下的子节点* param path 路径* throws Exception*/public void getChildren(String path) throws Exception{//首先判断节点是否存在Stat exists zooKeeper.exists(path, true);if (null ! exists){ListString children zooKeeper.getChildren(path, false);for (String child : children) {System.out.println(child);}}}7、删除节点
ZooKeeper类提供了delete方法来删除指定的znode。delete方法的用法如下
delete(String path, int version)参数
pathZnode路径。versionznode的当前版本。
代码实现如下
/*** 删除节点* param path* throws Exception*/
public void deleteNode(String path) throws Exception{zooKeeper.delete(path, -1);
}8、监听节点
/*** 监听节点* param path 路径* throws Exception*/
public void addWatch(String path) throws Exception{zooKeeper.addWatch(path, new Watcher() {Overridepublic void process(WatchedEvent watchedEvent) {System.out.println(watchedEvent.getState());}}, AddWatchMode.PERSISTENT);
}9、全部代码实现
public class ZkBaseOperations {private static final String ZOOKEEPER_ADDRESS ip:2181;private static final int SESSION_TIMEOUT 3000;private ZooKeeper zooKeeper;public ZkBaseOperations() {try {CountDownLatch connectionLatch new CountDownLatch(1);zooKeeper new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, event - {if (event.getState() Watcher.Event.KeeperState.SyncConnected) {connectionLatch.countDown();System.out.println(连接成功);}});} catch (Exception e) {e.printStackTrace();}}/*** 关闭连接*/public void close() {if (zooKeeper ! null) {try {zooKeeper.close();} catch (InterruptedException e) {e.printStackTrace();}}}/*** 创建节点* param parentPath 父节点路径* param nodeName 节点名称* param data 数据*/public void createNode(String parentPath, String nodeName, String data) throws Exception{zooKeeper.create(parentPath / nodeName ,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}/*** 判断某个节点是否存在* param path 节点路径* throws Exception 异常*/public Stat existsNode(String path) throws Exception{return zooKeeper.exists(path, true);}/*** 获取节点数据* param path 路径* return* throws Exception*/public String getData(String path) throws Exception{byte[] data zooKeeper.getData(path, false, null);return new String(data);}/*** 设置节点数据* param path 路径* param data 数据* throws Exception*/public void setData(String path, String data) throws Exception{zooKeeper.setData(path, data.getBytes(), zooKeeper.exists(path, true).getVersion());}/*** 获取路径下的子节点* param path 路径* throws Exception*/public void getChildren(String path) throws Exception{//首先判断节点是否存在Stat exists zooKeeper.exists(path, true);if (null ! exists){ListString children zooKeeper.getChildren(path, false);for (String child : children) {System.out.println(child);}}}/*** 删除节点* param path* throws Exception*/public void deleteNode(String path) throws Exception{zooKeeper.delete(path, -1);}public static void main(String[] args) {ZkBaseOperations zookeeper new ZkBaseOperations();try {//创建节点zookeeper.createNode(/javaxiaobear, 666,javaxiaobear is a good website);//判断阶段是否存在Stat stat zookeeper.existsNode(/javaxiaobear/666);if(stat ! null) {System.out.println(javaxiaobear Node exists and the node version is stat.getVersion());} else {System.out.println(Node does not exists);}//获取节点数据String data zookeeper.getData(/javaxiaobear/666);System.out.println(获取节点的数据为 data);//设置节点数据zookeeper.setData(/javaxiaobear, 666);//获取子节点zookeeper.getChildren(/javaxiaobear);//删除节点zookeeper.deleteNode(/javaxiaobear);zookeeper.close();} catch (Exception e) {throw new RuntimeException(e);}}
}2、zkClient开源客户端
ZkClient是Github上⼀个开源的zookeeper客户端在Zookeeper原⽣API接⼝之上进⾏了包装是⼀个更易⽤的Zookeeper客户端同时zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能。
1、如何使用
**添加依赖**在pom.xml⽂件中添加如下内容
dependencygroupIdcom.101tec/groupIdartifactIdzkclient/artifactIdversion0.11/version
/dependency2、创建会话
使⽤ZkClient可以轻松的创建会话连接到服务端。
public class CreateSession {/*** 连接地址*/private final static String CLUSTER_CONNECT_STRip:2181;public static void main(String[] args) {ZkClient zkClient new ZkClient(CLUSTER_CONNECT_STR);System.out.println(zkClient 连接成功);}
}运⾏结果zkClient 连接成功。结果表明已经成功创建会话。
3、创建节点
ZkClient提供了递归创建节点的接⼝即其帮助开发者先完成⽗节点的创建再创建⼦节点
public class Create_Node_Sample {public static void main(String[] args) {ZkClient zkClient new ZkClient(ip:2181);//createParents的值设置为true可以递归创建节点zkClient.createPersistent(/javaxiaobear,true);System.out.println(创建节点成功);}
}运⾏结果success create znode. 结果表明已经成功创建了节点值得注意的是在原⽣态接⼝中是⽆法创建成功的⽗节点不存在但是通过ZkClient通过设置createParents参数为true可以递归的先创建⽗节点再创建⼦节点
4、删除节点
ZkClient提供了递归删除节点的接⼝即其帮助开发者先删除所有⼦节点存在再删除⽗节点。
public class DeleteNode {public static void main(String[] args) {ZkClient zkClient new ZkClient(ip:2181);zkClient.deleteRecursive(/javaxiaobear);System.out.println(删除节点成功);}
}运⾏结果: 删除节点成功
结果表明ZkClient可直接删除带⼦节点的⽗节点因为其底层先删除其所有⼦节点然后再删除⽗节点
5、获取⼦节点
public class Get_Children_Sample {public static void main(String[] args) throws Exception {ZkClient zkClient new ZkClient(127.0.0.1:2181, 5000);ListString children zkClient.getChildren(/javaxiaobear);System.out.println(children);//注册监听事件zkClient.subscribeChildChanges(path, new IZkChildListener() {public void handleChildChange(String parentPath, ListStringcurrentChilds) throws Exception {System.out.println(parentPath s child changed,currentChilds: currentChilds);}});zkClient.createPersistent(/javaxiaobear);Thread.sleep(1000);zkClient.createPersistent(/javaxiaobear/c1);Thread.sleep(1000);zkClient.delete(/javaxiaobear/c1);Thread.sleep(1000);zkClient.delete(path);Thread.sleep(Integer.MAX_VALUE);}
}运⾏结果
/zk-book s child changed, currentChilds:[]
/zk-book s child changed, currentChilds:[c1]
/zk-book s child changed, currentChilds:[]
/zk-book s child changed, currentChilds:null结果表明客户端可以对⼀个不存在的节点进⾏⼦节点变更的监听。⼀旦客户端对⼀个节点注册了⼦节点列表变更监听之后那么当该节点的⼦节点列表发⽣变更时服务端都会通知客户端并将最新的⼦节点列表发送给客户端该节点本身的创建或删除也会通知到客户端。
6、获取数据节点是否存在、更新、删除
public class Get_Data_Sample {public static void main(String[] args) throws InterruptedException {String path /javaxiaobear-zk;ZkClient zkClient new ZkClient(127.0.0.1:2181);//判断节点是否存在boolean exists zkClient.exists(path);if (!exists){zkClient.createEphemeral(path, 123);}//注册监听zkClient.subscribeDataChanges(path, new IZkDataListener() {public void handleDataChange(String path, Object data) throwsException {System.out.println(path该节点内容被更新更新后的内容data);}public void handleDataDeleted(String s) throws Exception {System.out.println(s 该节点被删除);}});//获取节点内容Object o zkClient.readData(path);System.out.println(o);//更新zkClient.writeData(path,4567);Thread.sleep(1000);//删除zkClient.delete(path);Thread.sleep(1000);}
}
运⾏结果
123
/javaxiaobear-zk该节点内容被更新更新后的内容4567
/javaxiaobear-zk 该节点被删除3、Curator开源客户端使用
Curator是Netflix公司开源的一套ZooKeeper客户端框架和ZkClient一样它解决了非常底层的细节开发工作包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。
Curator是Apache基金会的顶级项目之一Curator具有更加完善的文档另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。
Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案例如Recipe、共享锁服务、Master选举机制和分布式计算器等帮助开发者避免了“重复造轮子”的无效开发工作。
Guava is to Java that Curator to ZooKeeper
在实际的开发场景中使用Curator客户端就足以应付日常的ZooKeeper集群操作的需求。
官网https://curator.apache.org/
1、如何使用
引入依赖
Curator 包含了几个包 curator-framework是对ZooKeeper的底层API的一些封装。 curator-client提供了一些客户端的操作例如重试策略等。 curator-recipes封装了一些高级特性如Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
!-- zookeeper client --
dependencygroupIdorg.apache.zookeeper/groupIdartifactIdzookeeper/artifactIdversion3.8.0/version
/dependency!--curator--
dependencygroupIdorg.apache.curator/groupIdartifactIdcurator-recipes/artifactIdversion5.1.0/versionexclusionsexclusiongroupIdorg.apache.zookeeper/groupIdartifactIdzookeeper/artifactId/exclusion/exclusions
/dependency2、常用API
1、创建一个客户端实例
在使用curator-framework包操作ZooKeeper前首先要创建一个客户端实例。这是一个CuratorFramework类型的对象有两种方法 使用工厂类CuratorFrameworkFactory的静态newClient()方法。 public static CuratorFramework newClient(String connectString,RetryPolicy retryPolicy)
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)connectString指的是需要连接的zookeeperAddress ipretryPolicy指的是连接zk时使用哪种重试策略sessionTimeoutMs指的是会话超时时间connectionTimeoutMs指的是连接超时时间 代码实现如下 public class CuratorDemo {private final static String CLUSTER_CONNECT_STRip:2181;public static void main(String[] args) throws Exception {// 重试策略RetryPolicy retryPolicy new ExponentialBackoffRetry(1000, 3);
// //创建客户端实例CuratorFramework client CuratorFrameworkFactory.newClient(CLUSTER_CONNECT_STR, retryPolicy);//启动客户端client.start();System.out.println(zookeeper初始化连接成功 client);//关闭连接client.close();}
}使用工厂类CuratorFrameworkFactory的静态builder构造者方法。 public class CuratorDemo {private final static String CLUSTER_CONNECT_STRip:2181;public static void main(String[] args) throws Exception {//重试策略RetryPolicy retryPolicy new ExponentialBackoffRetry(1000, 3);CuratorFramework client CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000) // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace(base) // 包含隔离名称.build();//启动客户端client.start();System.out.println(zookeeper初始化连接成功 client);//关闭连接client.close();}
}connectionString服务器地址列表在指定服务器地址列表的时候可以是一个地址也可以是多个地址。如果是多个地址那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3port3 。 retryPolicy重试策略当客户端异常退出或者与服务端失去连接的时候可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理如果返回的是 OK 表示一切操作都没有问题而 SYSTEMERROR 表示系统或服务端错误。 策略名称描述ExponentialBackoffRetry重试一组次数重试之间的睡眠时间增加RetryNTimes重试最大次数RetryOneTime只重试一次RetryUntilElapsed在给定的时间结束之前重试 超时时间Curator 客户端创建过程中有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端而 connectionTimeoutMs 作用在客户端。
2、创建节点
主要方法有以下
public T forPath(String path) //创建节点并赋值内容public T forPath(String path, byte[] data)//判断节点是否存在节点存在了创建时仍然会报错public ExistsBuilder checkExists()代码实现如下
/*** 创建节点* param nodePath 路径* param data 数据*/
public void createNode(String nodePath, String data) throws Exception{if (StringUtils.isEmpty(nodePath)) {System.out.println(节点【 nodePath 】不能为空);}//1、对节点是否存在进行判断否则会报错【NodeExistsException: KeeperErrorCode NodeExists for /root】Stat exists client.checkExists().forPath(nodePath);if (null ! exists) {System.out.println(节点【 nodePath 】已存在不能新增);}//2、创建节点 永久节点client.create().forPath(nodePath);// 2.1 手动指定节点的类型client.create().withMode(CreateMode.PERSISTENT).forPath(nodePath);//2.2 如果父节点不存在可创建当前的父节点String node client.create().creatingParentsIfNeeded().forPath(nodePath);System.out.println(node);//创建节点并为当前节点赋值内容if (!StringUtils.isBlank(data)) {//2.3、创建永久节点并为当前节点赋值内容client.create().forPath(nodePath, data.getBytes());//2.4、创建永久有序节点client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(nodePath, data.getBytes());//2.5、创建临时节点client.create().withMode(CreateMode.EPHEMERAL).forPath(nodePath, data.getBytes());}//2.6、创建临时有序节点client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(nodePath, data.getBytes());
}在 Curator 中可以使用 create 函数创建数据节点并通过 withMode 函数指定节点类型持久化节点临时节点顺序节点临时顺序节点持久化顺序节点等默认是持久化节点之后调用 forPath 函数来指定节点的路径和数据信息。
3、获取节点数据
主要的方法如下
//获取某个节点数据
client.getData().forPath(nodePath)
//读取zookeeper的数据并放到Stat中
client.getData().storingStatIn(stat1).forPath(nodePath)参数说明
path指定要读取的节点
stat指定数据节点的节点状态信息
4、设置修改节点
我们通过客户端实例的 setData() 方法更新 ZooKeeper 服务上的数据节点在setData 方法的后边通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。
/*** 设置修改节点数据* param nodePath 节点路径* param data 数据*/
public void setData(String nodePath, String data) throws Exception{//更新节点Stat stat client.setData().forPath(nodePath, data.getBytes());// 指定版本号更新节点更新的时候如果指定数据版本的话那么需要和zookeeper中当前数据的版本要一致-1表示匹配任何版本Stat stat1 client.setData().withVersion(-1).forPath(nodePath, data.getBytes());//异步设置某个节点数据Stat stat2 client.setData().inBackground().forPath(nodePath, data.getBytes());System.out.println(stat1.toString());
}5、获取某个节点的子节点
/*** 获取某个节点的子节点* param nodePath* throws Exception*/
public void getChildren(String nodePath) throws Exception{ListString list client.getChildren().forPath(nodePath);for (String s : list) {System.out.println(s);}
}6、删除节点
/*** 删除节点* param nodePath 节点路径* throws Exception*/
public void delete(String nodePath) throws Exception{client.delete().forPath(nodePath);//删除节点即使出现网络故障zookeeper也可以保证删除该节点client.delete().guaranteed().forPath(nodePath);//级联删除节点如果当前节点有子节点子节点也可以一同删除client.delete().deletingChildrenIfNeeded().forPath(nodePath);
}guaranteed该函数的功能如字面意思一样主要起到一个保障删除成功的作用其底层工作方式是只要该客户端的会话有效就会在后台持续发起删除请求直到该数据节点在 ZooKeeper 服务端被删除。deletingChildrenIfNeeded指定了该函数后系统在删除该数据节点的时候会以递归的方式直接删除其子节点以及子节点的子节点。
3、功能接口实现
1、异步接口
Curator 引入了BackgroundCallback 接口用来处理服务器端返回来的信息这个处理过程是在异步线程中调用默认在 EventThread 中调用也可以自定义线程池。
public interface BackgroundCallback
{/*** Called when the async background operation completes** param client the client* param event operation result details* throws Exception errors*/public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}如上接口主要参数为 client 客户端 和 服务端事件 event inBackground 异步处理默认在EventThread中执行
制定线程池异步处理
/*** 指定线程池 获取节点数据* param nodePath* throws Exception*/
public void testExecutorService(String nodePath) throws Exception{ExecutorService executorService Executors.newSingleThreadExecutor();byte[] data client.getData().inBackground((o1, o2) - {}, executorService).forPath(nodePath);System.out.println(new String(data));
}2、Curator 监听器
API方法如下
/*** Receives notifications about errors and background events*/
public interface CuratorListener
{/*** Called when a background task has completed or a watch has triggered** param client client* param event the event* throws Exception any errors*/public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}针对 background 通知和错误通知。使用此监听器之后调用inBackground 方法会异步获得监听
Curator Caches
Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。Cache 分为两类注册类型节点监听和子节点监听。
node cache:
NodeCache 对某一个节点进行监听
public NodeCache(CuratorFramework client, String path);可以通过注册监听器来实现对当前节点数据变化的处理
public void addListener(NodeCacheListener listener)代码实现
public class CuratorNodeCacheTest {private static final String ZOOKEEPER_ADDRESS ip:2181;private static final int SESSION_TIMEOUT 3000;public CuratorFramework client;public CuratorNodeCacheTest() {// 重试策略RetryPolicy retryPolicy new ExponentialBackoffRetry(1000, 3);client CuratorFrameworkFactory.newClient(ZOOKEEPER_ADDRESS, retryPolicy);client.start();}/**** param nodePath 路径*/public void nodeCache(String nodePath) throws Exception {NodeCache nodeCache new NodeCache(client, nodePath);//监听当前节点路径nodeCache.getListenable().addListener(new NodeCacheListener() {Overridepublic void nodeChanged() throws Exception {//查询节点数据byte[] data client.getData().forPath(nodePath);System.out.println(new String(data));}});nodeCache.start();}
}path cache:
PathChildrenCache 会对子节点进行监听但是不会对二级子节点进行监听
public PathChildrenCache(CuratorFramework client,String path,boolean cacheData)可以通过注册监听器来实现对当前节点的子节点数据变化的处理
public void addListener(PathChildrenCacheListener listener)/*** * param nodePath 节点路径* throws Exception*/
public void pathCache(String nodePath) throws Exception{PathChildrenCache pathChildrenCache new PathChildrenCache(client, nodePath, true);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println(event);}});// 如果设置为true则在首次启动时就会缓存节点内容到Cache中pathChildrenCache.start(true);
}tree cache:
TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件。
public TreeCache(CuratorFramework client, String path, boolean cacheData)可以通过注册监听器来实现对当前节点的子节点及递归子节点数据变化的处理
public void addListener(TreeCacheListener listener)public void treeCache(String nodePath) throws Exception{TreeCache treeCache new TreeCache(client, nodePath);treeCache.getListenable().addListener(new TreeCacheListener() {Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {System.out.println(event);}});treeCache.start();
}