烟台做网站需要多少钱,专业建站外包,wordpress会员邀请系统,线上商城开发文章目录 ZooKeeper 实战(四) Curator Watch事件监听0.前言1.Watch 事件监听概念2.NodeCache2.1.全参构造器参数2.2.代码DEMO2.3.日志输出 3.PathChildrenCache3.1.全参构造器参数3.2.子节点监听时间类型3.2.代码DEMO 4.TreeCache4.1.构造器参数4.2.代码DEMO4.3.日志输出 ZooKe… 文章目录 ZooKeeper 实战(四) Curator Watch事件监听0.前言1.Watch 事件监听概念2.NodeCache2.1.全参构造器参数2.2.代码DEMO2.3.日志输出 3.PathChildrenCache3.1.全参构造器参数3.2.子节点监听时间类型3.2.代码DEMO 4.TreeCache4.1.构造器参数4.2.代码DEMO4.3.日志输出 ZooKeeper 实战(四) Curator Watch事件监听
0.前言
上一篇博客只介绍了有关Curator中对ZNode的CRUD操作从本篇起开始逐步介绍更加高级的API操作。
1.Watch 事件监听概念
ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能能够让多个订阅者同时监听某一个对象当一个对象自身状态变化时会通知所有订阅者。虽然ZooKeeper原生支持通过注册Watcher来进行事件监听但是其使用并不是特别方便需要开发人员反复注册Watcher比较繁琐。
而 Curator 引入了Cache 来实现对 ZooKeeper 服务端事件的监听。
Curator 中提供了三种 CacheWatcher来监听不同节点变化类型
NodeCache监听指定的节点。PathChildrenCache监听指定节点的子节点。TreeCache监听指定节点及其子孙节点。
2.NodeCache
监听指定的节点增删改都会监听。
2.1.全参构造器参数
/*** param: client 注册监听的客户端* param: path 节点路径* param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false*/
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);
2.2.代码DEMO Overridepublic void run(ApplicationArguments args) throws Exception {log.info(。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。);String path /ahao/watcher;TimeUnit.SECONDS.sleep(3);// 创建NodeCache对象NodeCache nodeCache new NodeCache(client,path);// 添加监听器nodeCache.getListenable().addListener(new NodeCacheListener() {Overridepublic void nodeChanged() throws Exception {ChildData currentData nodeCache.getCurrentData();if (currentData ! null){String s new String(currentData.getData(),StandardCharsets.UTF_8);log.info(监听{}节点发生变化数据内容{},path,s);}else {log.info(监听{}节点被删除了,path);}}});// 开启监听nodeCache.start();TimeUnit.SECONDS.sleep(2);// 创建节点client.create().creatingParentsIfNeeded().forPath(path,第一次新增.getBytes(StandardCharsets.UTF_8));TimeUnit.SECONDS.sleep(2);// 更新节点client.setData().forPath(path,数据修改了.getBytes(StandardCharsets.UTF_8));TimeUnit.SECONDS.sleep(2);// 删除节点client.delete().deletingChildrenIfNeeded().forPath(path);}2.3.日志输出 3.PathChildrenCache
监听指定节点的子节点。当一个子节点增删改时 PathChildrenCache会包含最新的子节点的数据和状态。
3.1.全参构造器参数
/*** param: client 注册监听的客户端* param: path 节点路径* param: cacheData 是否缓存节点内容包含节点状态* param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false* param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的否则缓存可能会看到不一致的结果*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
3.2.子节点监听时间类型
public enum Type
{// 子节点添加CHILD_ADDED,// 子节点的数据变更CHILD_UPDATED,// 子节点被删除CHILD_REMOVED,// 以下三个事件类型表示当连接断开时PathChildrenCache将继续保持其断开连接之前的状态并且在连接恢复后PathChildrenCache将为断开连接期间发生的所有添加、删除和更新发出正常的子事件。// 当连接状态处于ConnectionState.SUSPENDED。CONNECTION_SUSPENDED,// 当连接状态处于ConnectionState.RECONNECTEDCONNECTION_RECONNECTED,// 当连接状态处于ConnectionState.LOSTCONNECTION_LOST,// 当通过PathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT)启动监听时该事件表示PathChildrenCache初始化完成This event signals that the initial cache has been populated.INITIALIZED
}3.2.代码DEMO Overridepublic void run(ApplicationArguments args) throws Exception {log.info(。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。);String path /ahao/watcher;TimeUnit.SECONDS.sleep(3);// 创建PathChildrenCache对象// 此处的cacheData参数一定要设置为true不然Curator不会缓存数据当本地// 那么后续pathChildrenCache.getCurrentData()得到的数据都为nullPathChildrenCache pathChildrenCache new PathChildrenCache(client,path,true);// 添加监听器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if (event.getType() PathChildrenCacheEvent.Type.INITIALIZED){log.info(PathChildrenCache初始化完事件类型{}, event.getType());}else {ChildData currentData event.getData();log.info(事件类型{}监听到的子节点发生变化{},event.getType(),currentData.getPath());}}});// 开启监听pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);// 创建子节点TimeUnit.SECONDS.sleep(2);client.create().creatingParentsIfNeeded().forPath(path/c1);client.create().creatingParentsIfNeeded().forPath(path/c2);client.create().creatingParentsIfNeeded().forPath(path/c3/age);// 修改子节点TimeUnit.SECONDS.sleep(2);client.setData().forPath(path/c1,c1更新了.getBytes(StandardCharsets.UTF_8));client.setData().forPath(path/c2,c2更新了.getBytes(StandardCharsets.UTF_8));// 删除子节点TimeUnit.SECONDS.sleep(2);client.delete().deletingChildrenIfNeeded().forPath(path/c3);}3.3.日志输出
可以看出PathChildrenCache只会监听直属子节点的变化其非直属子节点的后代节点如/c3/age没有发布通知。 4.TreeCache
监听指定节点及其子孙节点。
4.1.构造器参数
/*** param: client 注册监听的客户端* param: path 节点路径*/
public TreeCache(CuratorFramework client, String path)/*** param: client 注册监听的客户端* param: path 节点路径* param: cacheData 是否缓存节点内容包含节点状态* param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false* param: maxDepth 最大深度。最深的那个后代节点到path所需要经过的节点数* param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的否则缓存可能会看到不一致的结果* param: createParentNodes 是否需要创建父节点。如果父节点不存在泽创建父节点容器节点* param: TreeCacheSelector TreeCache选择器。根据指定的策略和条件选择适合的缓存树来创建和维护TreeCache*/
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)
4.2.代码DEMO Overridepublic void run(ApplicationArguments args) throws Exception {log.info(。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。);String path /ahao/watcher/tree;TimeUnit.SECONDS.sleep(3);// 创建TreeCache对象也可通过TreeCache.newBuilder()创建TreeCache treeCache new TreeCache(client,path);treeCache.getListenable().addListener(new TreeCacheListener() {Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {if (event.getType() TreeCacheEvent.Type.INITIALIZED){log.info(TreeCache初始化完事件类型{}, event.getType());}else {ChildData currentData event.getData();log.info(事件类型{}监听到的子节点发生变化{},event.getType(),currentData.getPath());}}});// 开启监听treeCache.start();// 创建节点TimeUnit.SECONDS.sleep(2);client.create().creatingParentsIfNeeded().forPath(path);client.create().creatingParentsIfNeeded().forPath(path /t1);client.create().creatingParentsIfNeeded().forPath(path /t2/ccc);// 修改子节点TimeUnit.SECONDS.sleep(2);client.setData().forPath(path,根节点更新了.getBytes(StandardCharsets.UTF_8));client.setData().forPath(path /t2/ccc,/t2/ccc更新了.getBytes(StandardCharsets.UTF_8));// 删除子节点TimeUnit.SECONDS.sleep(2);client.delete().deletingChildrenIfNeeded().forPath(path /t2);}4.3.日志输出
可以看出TreeCache会监听当前节点和后代节点的变化。