ZooKeeper常用客户端有三种:原生客户端、zkClient、curator

项目中使用前,需要导入相关依赖

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency> <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>

原生客户端

创建会话

不使用监听

public class TestCreateSession {
/*服务地址*/
private static final String ZK_SERVER = "127.0.0.1:2181";
@Test
public void createSession2() throws IOException {
ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, null);
System.out.println("zk.getState() = " + zk.getState());
}
}

zk.getState() = CONNECTING 

通过之前的学习可以知道,CONNECTING标志客户端正在连接,并不能确保已经连接上zk服务。可能发生还没有连接到zk服务就进行对zk访问的情况

使用监听

public class TestCreateSession {
/*服务地址*/
private static final String ZK_SERVER = "127.0.0.1:2181";
/*倒计时器*/
private CountDownLatch latch = new CountDownLatch(1);
@Test
public void createSession() throws IOException, InterruptedException {
ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected){/*确保zk已连接*/
latch.countDown();
}
}
});
latch.await();
System.out.println("zk.getState() = " + zk.getState());
}
}

zk.getState() = CONNECTED

使用监听机制可以确保在ZooKeeper初始化完成前进行等待,初始化完成再进行后续操作

客户端基本操作

 public class TestJavaApi implements Watcher {
/*zk服务地址*/
private static final String ZK_SERVER = "127.0.0.1:2181";
/*会话连接超时时间*/
private static final int SESSION_TIMEOUT = 50000;
/*指定目录【节点】*/
private static final String ZK_PATH = "/zkDir";
/*客户端连接会话*/
private ZooKeeper zk = null; /*倒计时器*/
private CountDownLatch latch = new CountDownLatch(1);
/**
* 事件被触发时的动作
* @param event 事件
*/
@Override
public void process(WatchedEvent event) {
System.out.println("收到事件通知:" + zk.getState() +"\n");
if (event.getState() == Event.KeeperState.SyncConnected){
latch.countDown();
}
} /**
* 创建zk会话连接
* @param connectString zk服务器地址列表,可以是"地址1,地址2,...."
* @param sessionTimeout Session超时时间
*/
public void createZkSession(String connectString, int sessionTimeout){
try {
zk = new ZooKeeper(connectString,sessionTimeout,this);
latch.await();
System.out.println("zk.getState() = " + zk.getState());
} catch (IOException|InterruptedException e) {
System.out.println("连接创建失败");
e.printStackTrace();
}
} /**
* 关闭zk会话
*/
public void releaseSession(){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
} /**
* 创建节点【目录、文件】
* @param path 节点
* @param data 节点数据
* @return
*/
public boolean createNode(String path,String data){
try {
String node = zk.create(path/*节点path*/,
data.getBytes()/*节点数据*/,
ZooDefs.Ids.OPEN_ACL_UNSAFE/*权限控制 OPEN_ACL_UNSAFE相当于world:anyone*/,
CreateMode.EPHEMERAL)/*临时节点*/;
System.out.println("节点创建成功,node = " + node);
return true;
} catch (KeeperException|InterruptedException e) {
System.out.println("节点创建失败");
e.printStackTrace();
}
return false;
} /**
* 获取节点数据
* @param path 节点路径
* @return
*/
public String readNode(String path){
try {
byte[] data = zk.getData(path, true, null);
String nodeData = new String(data,"utf-8");
//System.out.println("获取"+path+"节点数据:"+nodeData);
return nodeData;
} catch (KeeperException | InterruptedException | UnsupportedEncodingException e) {
e.printStackTrace();
return null;
}
} /**
* 修改节点数据
* @param path 节点path
* @param newData 节点新数据
* @return
*/
public boolean writeNode(String path,String newData){
try {
Stat stat = zk.setData(path, newData.getBytes(), -1);
System.out.println("节点["+path+"]修改成功");
return true;
} catch (KeeperException|InterruptedException e) {
e.printStackTrace();
}
return false;
} /**
* 删除指定节点
* @param path 节点path
*/
public void deleteNode(String path){
try {
zk.delete(path,-1);
System.out.println("节点["+path+"]删除成功");
} catch (InterruptedException|KeeperException e) {
System.out.println("节点["+path+"]删除失败");
e.printStackTrace();
}
} public static void main(String[] args) {
TestJavaApi api = new TestJavaApi();
api.createZkSession(ZK_SERVER,SESSION_TIMEOUT);
if(api.createNode(ZK_PATH,"初始节点内容")){
System.out.println("第一次读"+ZK_PATH+"节点数据:"+api.readNode(ZK_PATH));
api.writeNode(ZK_PATH,"修改ZK_PATH节点数据");
System.out.println("第二次读"+ZK_PATH+"节点数据:"+api.readNode(ZK_PATH));
api.deleteNode(ZK_PATH);
}
api.releaseSession();
}
}
/**
************输出结果***********
收到事件通知:CONNECTED zk.getState() = CONNECTED
节点创建成功,node = /zkDir
第一次读/zkDir节点数据:初始节点内容
收到事件通知:CONNECTED 节点[/zkDir]修改成功
第二次读/zkDir节点数据:修改ZK_PATH节点数据
收到事件通知:CONNECTED 节点[/zkDir]删除成功
*/

watch机制

 public class ZkWatcher implements Watcher {
private static final String ZK_SERVER = "127.0.0.1:2181";
private static final int SESSION_TIMEOUT = 15000;
private static final String PARENT_PATH ="/testWatcher";
private static final String CHILDREN_PATH = "/testWatcher/children";
private ZooKeeper zk = null;
/*定义原子变量,用于计算进入监听的次数*/
private static AtomicInteger seq = new AtomicInteger();
/*会话进入标志*/
private static final String LOG_PREFIX_OF_MAIN = "【main】"; /*倒计时器*/
private CountDownLatch latch = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
System.out.println("**************进入process方法**************");
System.out.println("event = " + event);
/*模拟业务连接初始化工作*/
TimeUtils.threadSleep(200);
if (event == null) { return; }
/*连接状态*/
Event.KeeperState eventState = event.getState();
/*事件类型*/
Event.EventType eventType = event.getType();
/*受影响的路径*/
String eventPath = event.getPath();
/*进入监听标志*/
String logPreFix = "【watcher-"+seq.incrementAndGet()+"】";
System.out.println(logPreFix + "收到watcher通知");
System.out.println(logPreFix + "连接状态:\t"+eventState.toString());
System.out.println(logPreFix + "事件类型:\t"+eventType.toString()); if(Event.KeeperState.SyncConnected == eventState){
if (Event.EventType.None == eventType){/*成功连接上ZK服务器*/
System.out.println(logPreFix + "成功连接上ZK服务器");
latch.countDown();
}else if (Event.EventType.NodeCreated == eventType){/*创建节点*/
System.out.println(logPreFix + "创建节点");
TimeUtils.threadSleep(100);
/*使用监听*/
exist(eventPath,true);
}else if (Event.EventType.NodeChildrenChanged == eventType){
System.out.println(logPreFix + "子节点变更");
TimeUtils.threadSleep(1000);
System.out.println(logPreFix + "子节点列表:" + getChildren(eventPath,true));
}else if (Event.EventType.NodeDataChanged == eventType){
System.out.println(logPreFix + "修改节点数据");
TimeUtils.threadSleep(100);
System.out.println(logPreFix + "修改后节点内容:" + readNode(eventPath, true));
}else if (Event.EventType.NodeDeleted == eventType){
System.out.println(logPreFix + "删除节点");
System.out.println(logPreFix + "节点 " + eventPath + " 被删除");
}
}else if(Event.KeeperState.Disconnected == eventState){
System.out.println(logPreFix + "与zk服务器断开连接");
}else if(Event.KeeperState.AuthFailed == eventState){
System.out.println(logPreFix + "验证失败");
}else if(Event.KeeperState.Expired == eventState){
System.out.println(logPreFix + "会话超时");
}
System.out.println("----------------------------------------");
}
/**
* 创建ZK连接
* @param connectAddr ZK服务器地址列表
* @param sessionTimeout Session超时时间
*/
public void createConnection(String connectAddr, int sessionTimeout) {
this.releaseConnection();
try {
zk = new ZooKeeper(connectAddr, sessionTimeout, this);
System.out.println(LOG_PREFIX_OF_MAIN + "开始连接zk服务器");
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
} /**
* 关闭ZK连接
*/
public void releaseConnection() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} /**
* 创建节点
* @param path 节点路径
* @param data 数据内容
* @return
*/
public boolean createPath(String path, String data) {
try {/*设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)*/
zk.exists(path, true);
System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " +
this.zk.create( /*路径*/
path,/*数据*/
data.getBytes(),/*所有可见*/
ZooDefs.Ids.OPEN_ACL_UNSAFE,/*永久存储*/
CreateMode.PERSISTENT ) +
", content: " + data);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
} /**
* 删除所有节点
*/
public void deleteAllTestPath() {
if(this.exist(CHILDREN_PATH, false) != null){
this.deleteNode(CHILDREN_PATH);
}
if(this.exist(PARENT_PATH, false) != null){
this.deleteNode(PARENT_PATH);
}
} /**
* 删除指定节点
* @param path
*/
public void deleteNode(String path) {
try {
zk.delete(path,-1);
System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path);
} catch (InterruptedException|KeeperException e) {
e.printStackTrace();
}
} /**
* 获取节点内容
* @param path
* @param needWatch
* @return
*/
public String readNode(String path, boolean needWatch) {
try {
byte[] data = zk.getData(path, needWatch, null);
return new String(data,"utf-8");
} catch (KeeperException|InterruptedException|UnsupportedEncodingException e) {
e.printStackTrace();
return null;
}
} /**
* 获取指定节点的子节点列表
* @param path
* @param needWatch
* @return
*/
public List<String> getChildren(String path, boolean needWatch) {
try {
return this.zk.getChildren(path, needWatch);
} catch (KeeperException|InterruptedException e) {
e.printStackTrace();
return null;
}
}
/**
* 更新指定节点数据内容
* @param path 节点路径
* @param data 数据内容
* @return
*/
public boolean writeNode(String path, String data) {
try {
System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " +
this.zk.setData(path, data.getBytes(), -1));
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* path节点是否存在
* @param path
* @param needWatch
* @return
*/
public Stat exist(String path, boolean needWatch) {
try {
return zk.exists(path,needWatch);
} catch (KeeperException|InterruptedException e) {
e.printStackTrace();
return null;
}
} public static void main(String[] args) throws Exception {
//建立watcher
ZkWatcher watcher = new ZkWatcher();
//创建连接
watcher.createConnection(ZK_SERVER, SESSION_TIMEOUT);
//System.out.println(zkWatch.zk.toString());
Thread.sleep(1000);
// 清理节点
watcher.deleteAllTestPath();
if (watcher.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {
System.out.println("---------------------- read parent ----------------------------");
/*
读取数据,在操作节点数据之前先调用zookeeper的getData()方法是为了可以watch到对节点的操作。
watch是一次性的,也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次。
*/
watcher.readNode(PARENT_PATH, true);
watcher.writeNode(PARENT_PATH, System.currentTimeMillis() + "");
System.out.println("---------------------- read children path ----------------------------");
/*
读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,
而不会输出NodeChildrenChanged,也就是说创建子节点时没有watch。
如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在
创建c1时watch,输出c1的NodeChildrenChanged,而不会输出创建c2时的NodeChildrenChanged,
如果watch到c2的NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法,
其中path="/p/c1"
*/
watcher.getChildren(PARENT_PATH, true);
Thread.sleep(1000);
// 创建子节点,同理如果想要watch到NodeChildrenChanged状态,需要调用getChildren(CHILDREN_PATH, true)
watcher.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");
Thread.sleep(1000);
watcher.readNode(CHILDREN_PATH, true);
watcher.writeNode(CHILDREN_PATH, System.currentTimeMillis() + "");
}
Thread.sleep(20000);
// 清理节点
watcher.deleteAllTestPath();
Thread.sleep(1000);
watcher.releaseConnection();
}
} class TimeUtils{
public static void threadSleep(long mills){
try {
Thread.sleep(mills);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} /*
*********输出结果********
【main】开始连接zk服务器
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:None path:null
【watcher-1】收到watcher通知
【watcher-1】连接状态: SyncConnected
【watcher-1】事件类型: None
【watcher-1】成功连接上ZK服务器
----------------------------------------
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatcher
【main】节点创建成功, Path: /testWatcher, content: 1567510219582
---------------------- read parent ----------------------------
【main】更新数据成功,path:/testWatcher, stat: 223,224,1567510219588,1567510219598,1,0,0,0,13,0,223 ---------------------- read children path ----------------------------
【watcher-2】收到watcher通知
【watcher-2】连接状态: SyncConnected
【watcher-2】事件类型: NodeCreated
【watcher-2】创建节点
----------------------------------------
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatcher
【watcher-3】收到watcher通知
【watcher-3】连接状态: SyncConnected
【watcher-3】事件类型: NodeDataChanged
【watcher-3】修改节点数据
【watcher-3】修改后节点内容:1567510219598
----------------------------------------
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatcher/children
【main】节点创建成功, Path: /testWatcher/children, content: 1567510220605
【watcher-4】收到watcher通知
【watcher-4】连接状态: SyncConnected
【watcher-4】事件类型: NodeCreated
【watcher-4】创建节点
----------------------------------------
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatcher
【watcher-5】收到watcher通知
【watcher-5】连接状态: SyncConnected
【watcher-5】事件类型: NodeChildrenChanged
【watcher-5】子节点变更
【main】更新数据成功,path:/testWatcher/children, stat: 225,226,1567510220606,1567510221615,1,0,0,0,13,0,225 【watcher-5】子节点列表:[children]
----------------------------------------
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatcher/children
【watcher-6】收到watcher通知
【watcher-6】连接状态: SyncConnected
【watcher-6】事件类型: NodeDataChanged
【watcher-6】修改节点数据
【watcher-6】修改后节点内容:1567510221615
----------------------------------------
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeDeleted path:/testWatcher/children
【main】删除节点成功,path:/testWatcher/children
【main】删除节点成功,path:/testWatcher
【watcher-7】收到watcher通知
【watcher-7】连接状态: SyncConnected
【watcher-7】事件类型: NodeDeleted
【watcher-7】删除节点
【watcher-7】节点 /testWatcher/children 被删除
----------------------------------------
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatcher
【watcher-8】收到watcher通知
【watcher-8】连接状态: SyncConnected
【watcher-8】事件类型: NodeChildrenChanged
【watcher-8】子节点变更 */

ZooKeeper认证机制

最新文章

  1. TF-IDF算法确定阅读主题词解答英语阅读Title题目
  2. Android事件分发机制完全解析,带你从源码的角度彻底理解
  3. WPF/Silverlight Template使用及总结(转)
  4. tomcat的安装
  5. Combiner
  6. tcp-client-c++
  7. Installation Directory must be on a local hard drive解决办法
  8. PRINTDLG 结构体
  9. Word查找和替换通配符(完全版)
  10. java如何声明一个数组用来存储随机生成的字母并且保证不重复
  11. scss 初学笔记 一 变量声明 默认的样式 嵌套
  12. thinkphp实现文件上传
  13. odoo 前端页面渲染--数据库管理页面
  14. github上用golang写的项目
  15. HTML之元素分类(HTML基础知识)
  16. 处理:&ldquo;ORA-00257: archiver error. Connect internal only, until freed&rdquo;的错误问题
  17. ubuntu16.04——WingIDE安装 操作服务器是一件很好玩的事情
  18. postman 的基础使用
  19. 18 Customers&#39; Role in Good Customer Service 客户在高质量客服中的作用
  20. 3532: [Sdoi2014]Lis 最小字典序最小割

热门文章

  1. unittest单元测试框架总结(转)
  2. java判断回文数
  3. @ControllerAdvice全局数据预处理
  4. u盘被占用,无法弹出解决办法
  5. 【CF1257C】Dominated Subarray【贪心】
  6. 【Nacos】Nacos安装
  7. 6 October
  8. C++ 拷贝构造函数与赋值函数的区别(很严谨和全面)
  9. Uva 1471 Defense Lines(LIS变形)
  10. php Closure类 闭包 匿名函数