首页 技术 正文
技术 2022年11月19日
0 收藏 494 点赞 4,919 浏览 5503 个字

前言

  Curator是Netflix开源的一套ZooKeeper客户端框架:

    1.封装ZooKeeper client与ZooKeeper server之间的连接处理;

    2.提供了一套Fluent风格的操作API;

    3.提供ZooKeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装。

  Curator几个组成部分:

    Client:是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法

    Framework: 用来简化ZooKeeper高级功能的使用, 并增加了一些新的功能, 比如管理到ZooKeeper集群的连接, 重试处理

    Recipes:实现了通用ZooKeeper的recipe, 该组件建立在Framework的基础之上

    Utilities:各种ZooKeeper的工具类

    Errors: 异常处理, 连接, 恢复等

    Extensions: recipe扩展

  Curator内部实现的几种重试策略:

    ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加

    RetryNTimes:指定最大重试次数的重试策略

    RetryOneTime:仅重试一次

    RetryUntilElapsed:一直重试直到达到规定的时间

正文

  1.项目使用maven工程,在pom.xml中添加依赖

  <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.5.0</version>
</dependency>

  2.下面代码从增删改查、事务、事件订阅/监听器来实现的。

package om.xbq.demo;import java.util.Collection;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;public class CuratorDemo { // 此demo使用的集群,所以有多个ip和端口
private static String CONNECT_SERVER = "192.168.242.129:2181,192.168.242.129:2182,192.168.242.129:2183";
private static int SESSION_TIMEOUT = 3000;
private static int CONNECTION_TIMEOUT = 3000; public static void main(String[] args) {
// 连接 ZooKeeper
CuratorFramework framework = CuratorFrameworkFactory.
newClient(CONNECT_SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, new ExponentialBackoffRetry(1000,10));
// 启动
framework.start(); Stat stat = ifExists(framework); if(stat != null){
// update(framework);
// delete(framework);
// query(framework);// 监听事件,只监听一次,不推荐
// listener1(framework);
}else {
// add(framework);
}// 事务
// transaction(framework);// 持久监听,推荐使用
listener2(framework);
} /**
* 判断节点是否存在
* @param cf
* @return
*/
public static Stat ifExists(CuratorFramework cf){
Stat stat = null;
try {
stat = cf.checkExists().forPath("/node_curator/test");;
System.out.println(stat);
} catch (Exception e) {
e.printStackTrace();
}
return stat;
} /**
* @Title: add
* @Description: TODO(增加节点 , 可以增加 多级节点)
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void add(CuratorFramework cf){
try {
String rs = cf.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath("/node_curator/test","xbq".getBytes());
System.out.println(rs);
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
} /**
* @Title: update
* @Description: TODO(修改指定节点的值)
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void update(CuratorFramework cf){
try {
Stat stat = cf.setData().forPath("/node_curator/test", "javaCoder".getBytes());
System.out.println(stat);
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
} /**
* @Title: delete
* @Description: TODO(删除节点或者删除包括子节点在内的父节点)
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void delete(CuratorFramework cf){
try {
// 递归删除的话,则输入父节点
cf.delete().deletingChildrenIfNeeded().forPath("/node_curator");
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
} /**
* @Title: query
* @Description: TODO(查询节点的值)
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void query(CuratorFramework cf){
try {
byte[] value = cf.getData().forPath("/node_curator/test");
System.out.println(new String(value));
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
} /**
* @Title: transaction
* @Description: TODO(一组crud操作同生同灭)
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void transaction(CuratorFramework cf){
try {
// 事务处理, 事务会自动回滚
Collection<CuratorTransactionResult> results = cf.inTransaction()
.create().withMode(CreateMode.PERSISTENT).forPath("/node_xbq1").and()
.create().withMode(CreateMode.PERSISTENT).forPath("/node_xbq2").and().commit();
// 遍历
for(CuratorTransactionResult result:results){
System.out.println(result.getResultStat() + "->" + result.getForPath() + "->" + result.getType());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
} /**
* @Title: listener1
* @Description: TODO(监听 事件 -- 通过 usingWatcher 方法)
* 注意:通过CuratorWatcher 去监听指定节点的事件, 只监听一次
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void listener1(CuratorFramework cf){
try {
cf.getData().usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
System.out.println("触发事件:" + event.getType());
}
}).forPath("/javaCoder"); System.in.read(); // 挂起,在控制台上输入 才停止
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
} /**
* @Title: listener2
* @Description: TODO(监听 子节点的事件,不监听 自己 -- 通过 PathChildrenCacheListener 方法,推荐使用)
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void listener2(CuratorFramework cf) {
// 节点node_xbq不存在 会新增
PathChildrenCache cache = new PathChildrenCache(cf, "/node_xbq", true);
try {
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("触发事件:" + event.getType());
}
});
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
}
}

源码下载

  https://gitee.com/xbq168/CuratorDemo

  

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:8,999
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,511
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,357
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,140
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,770
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,848