Zookeeper学习(二)
一、Znode节点属性
dataVersion 数据版本, 每次当 Znode 中的数据发生变化的时候, dataVersion
都会自增一下
cversion 节点版本, 每次当 Znode 的节点发生变化的时候, cversion 都会自增
aclVersion ACL(Access Control List) 的版本号, 当 Znode 的权限信息发生
变化的时候aclVersion会自增
zxid 事务ID
ctime 创建时间
mtime 最近一次更新的时间
ephemeralOwner 如果 Znode 为临时节点, ephemeralOwner 表示与该节点关联
的 SessionId
我们通过get 节点的目录,可以得到节点的属性
二、watch机制
对zookeeper里的某个节点设置个监听,可以知道该节点是否进行了“增加”,“删除”,“修改”
Watcher 的特点
一次性触发 一个 Watcher 只会被触发一次, 如果需要继续监听, 则需要再次添加
Watcher
事件封装: Watcher 得到的事件是被封装过的, 包括三个内容 keeperState,
eventType, path
设置监听机制给/hello节点
另外再打开Hadoop101,修改/hello里的数据,看能否监听到
证明:可以监听到hello节点的数据改变了
三、zookeeper的JAVAAPI操作
这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端框架 Curator ,解决了很多
Zookeeper客户端非常底层的细节开发工作 。
Curator包含了几个包:
curator-framework:对zookeeper的底层api的一些封装
curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式
计数器等
创建maven工程,导入jar包
创建一个测试类,开始进行zookeeper的javaapi编程
节点操作
①创建永久节点
@Test
/*
创建永久节点
*/
public void createZnode() throws Exception {
//1.定制一个重试策略
/*
parame1:重试间隔时间
parame2:重试的最大次数
*/
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1); //2.获取一个客户端对象
/*
parame1:要连接的zookeeper服务器列表
parame2:会话的超时时间
parame3:连接的超时时间
param4:重试策略
*/
String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
//3.开启客户端
client.start();
//4.创建节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/hello2","world".getBytes());
//5.关闭客户端
client.close();
}
运行后,进入linux查看,创建成功
②创建临时节点
和创建永久节点的区别是:临时节点是:EPHEMERAL,而且要让他休眠几秒否则Linux看不到,因为是临时的,会话结束就会消失。
@Test
/*
创建临时节点
*/
public void createTmpZnode() throws Exception {
//1.定制一个重试策略
/*
parame1:重试间隔时间
parame2:重试的最大次数
*/
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1); //2.获取一个客户端对象
/*
parame1:要连接的zookeeper服务器列表
parame2:会话的超时时间
parame3:连接的超时时间
param4:重试策略
*/
String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
//3.开启客户端
client.start();
//4.创建节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/hello4","world".getBytes());
Thread.sleep(5000);
//5.关闭客户端
client.close();
}
这样就看到了,不过等会话结束就没了
③修改节点数据
使用的是:client.setData().forPath(节点路径,要更新的数据(记得转换成byte))
/*
修改节点数据
*/
@Test
public void setZnodeData() throws Exception {
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1); String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
client.start();
client.setData().forPath("/hello","zookeeper".getBytes());
client.close();
}
④获取节点数据
用byte数组来获取client里的数据
然后转换成String输出
/*
获取节点数据
*/
@Test
public void getZnodeData() throws Exception {
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1); String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
client.start();
byte[] bytes= client.getData().forPath("/hello");
System.out.println(new String(bytes));
client.close();
}
在打印处输出:
⑤设置节点的watch机制
通过监听hello3节点,对其进行创建,修改,删除
/*
节点的watch机制
*/
@Test
public void watchZnode() throws Exception {
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1);
String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
client.start();
TreeCache treeCache=new TreeCache(client,"/hello3");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
ChildData data= treeCacheEvent.getData();
if(data!=null){
switch (treeCacheEvent.getType()){
case NODE_ADDED:
System.out.println("监控到有新增节点!");
break;
case NODE_REMOVED:
System.out.println("监控到有节点被移除!");
break;
case NODE_UPDATED:
System.out.println("监控到有节点被更新!");
break;
default:
break;
}
}
}
});
treeCache.start(); Thread.sleep(100000); }
打印台输出:
最新文章
- 理解和使用SQL Server中的并行
- Oracle数据库sys为什么只能以sysdba登录
- IOS开发-UI基础-视图
- Bitmap.Config 详解
- Java Script基础(十一) 表单验证
- AngularJS - Watch 监听对象
- 组件局域网中的无集线器、Windows XP、Windows 7、Windows 8的对等网
- (转) ThinkPHP模板自定义标签使用方法
- 如何在tomcat启动的时候运行一个Java类
- sso单点登录解决方案收集
- 更改Calendar背景图(使用系统映像选择器)
- ASP.NET MVC+EF框架+EasyUI实现权限管理系列(2)-数据库访问层的设计Demo
- 【LeetCode】3. Longest Substring Without Repeating Characters
- PAT (Advanced Level) 1100. Mars Numbers (20)
- 用Spark学习矩阵分解推荐算法
- java并发包下的并发工具类
- js 定时更改div背景图片
- propsData 选项 全局扩展的数据传递
- Mac下需要安装的一些软件及常用的配置文件
- 你可能从未听过的 Linux 发行版