zookeeper系列(三)zookeeper的使用--开源客户端
作者:leesf 掌控之中,才会成功;掌控之外,注定失败, 原创博客地址:http://www.cnblogs.com/leesf456/ 奇文共欣赏,大家共同学习进步。
一、前言
上一篇博客已经介绍了如何使用Zookeeper提供的原生态Java API进行操作,本篇博文主要讲解如何通过开源客户端来进行操作。
二、ZkClient
ZkClient是在Zookeeper原声API接口之上进行了包装,是一个更易用的Zookeeper客户端,其内部还实现了诸如Session超时重连、Watcher反复注册等功能。
2.1 添加依赖,使用maven管理直接添加配置文件即可
在pom.xml文件中添加如下内容即可。
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>
2.2 创建会话
使用ZkClient可以轻松的创建会话,连接到服务端
package com.hust.grid.leesf.zkClient; import java.io.IOException;
import org.I0Itec.zkclient.ZkClient; /**
* 使用ZkClient创建会话,连接到服务端
*/
public class Create_Session_Sample { //会话超时时间
private static final int SESSION_TIMEOUT = 5000;
//ZkClient的实例对象
private static ZkClient zkClient = null;
/**
* 连接服务器,创建一个会话
* @param host 127.0.0.1:2181
*/
public void connect(String host){
zkClient = new ZkClient(host, SESSION_TIMEOUT);
System.out.println("ZooKeeper session established");
} public static void main(String[] args) throws IOException, InterruptedException {
Create_Session_Sample createSessionSample = new Create_Session_Sample();
createSessionSample.connect("127.0.0.1:2181");
}
}
运行结果:结果表明已经成功创建会话。
ZooKeeper session established.
2.3 创建节点
ZkClient提供了递归创建节点的接口,即其帮助开发者完成父节点的创建,再创建子节点。
package com.hust.grid.leesf.zkClient; import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;
/**
* ZkClient提供了递归创建节点的接口,即其帮助开发者完成父节点的创建,再创建子节点
*/
public class Create_Node_Sample {
//会话超时时间
private static final int SESSION_TIMEOUT = 5000;
//ZkClient的实例对象
private static ZkClient zkClient = null;
/**
* 连接服务器,创建一个会话
* @param host 127.0.0.1:2181
*/
public void connect(String host){
zkClient = new ZkClient(host, SESSION_TIMEOUT);
System.out.println("ZooKeeper session established");
} public static void main(String[] args) throws IOException, InterruptedException {
Create_Session_Sample createSessionSample = new Create_Session_Sample();
createSessionSample.connect("127.0.0.1:2181");
String path = "/zk-book/c1";
//创建父子节点
zkClient.createPersistent(path, true);
System.out.println("success create znode.");
}
}
运行结果:
success create znode.
结果表明已经成功创建了节点,值得注意的是,在原生态接口中是无法创建成功的(父节点不存在),但是通过ZkClient可以递归的先创建父节点,再创建子节点。
2.4 删除节点
ZkClient提供了递归删除节点的接口,即其帮助开发者先删除所有子节点(存在),再删除父节点
package com.hust.grid.leesf.zkClient; import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;
/**
* ZkClient提供了递归删除节点的接口,即其帮助开发者先删除所有子节点(存在),再删除父节点
*/
public class Del_Data_Sample {
//会话超时时间
private static final int SESSION_TIMEOUT = 5000;
//ZkClient的实例对象
private static ZkClient zkClient = null;
/**
* 连接服务器,创建一个会话
* @param host 127.0.0.1:2181
*/
public void connect(String host){
zkClient = new ZkClient(host, SESSION_TIMEOUT);
System.out.println("ZooKeeper session established");
} public static void main(String[] args) throws IOException, InterruptedException {
Del_Data_Sample delDataSample = new Del_Data_Sample();
delDataSample.connect("127.0.0.1:2181");
String path = "/zk-book";
zkClient.createPersistent(path, "");
zkClient.createPersistent(path + "/c1", "");
System.out.println("success create znode.");
zkClient.deleteRecursive(path);
System.out.println("success delete znode.");
} }
运行结果:结果表明ZkClient可直接删除带子节点的父节点,因为其底层先删除其所有子节点,然后再删除父节点。
ZooKeeper session established
success create znode.
success delete znode.
2.5 获取子节点
package com.hust.grid.leesf.zkClient; import java.util.List;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient; public class Get_Children_Sample {
//会话超时时间
private static final int SESSION_TIMEOUT = 5000;
//ZkClient的实例对象
private static ZkClient zkClient = null;
/**
* 连接服务器,创建一个会话
* @param host 127.0.0.1:2181
*/
public void connect(String host){
zkClient = new ZkClient(host, SESSION_TIMEOUT);
System.out.println("ZooKeeper session established");
} public static void main(String[] args) throws InterruptedException {
Get_Children_Sample getChildrenSample = new Get_Children_Sample();
getChildrenSample.connect("127.0.0.1:2181");
String path = "/zk-book";
zkClient.subscribeChildChanges(path, new IZkChildListener() {
/**
* 子节点的路径被变更时回调此方法
*/
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath + " 's child changed, currentChilds:" + currentChilds);
}
});
zkClient.createPersistent(path);
Thread.sleep(1000);
zkClient.createPersistent(path + "/c1");
Thread.sleep(1000);
zkClient.delete(path + "/c1");
Thread.sleep(1000);
zkClient.delete(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
运行结果:
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:[c1]
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:null
结果表明:
客户端可以对一个不存在的节点进行子节点变更的监听;
一旦客户端对一个节点注册了子节点列表变更监听之后,那么当该节点的子节点列表发生变更时,服务端都会通知客户端,并将最新的子节点列表发送给客户端,该节点本身的创建或删除也会通知到客户端;(给节点注册了儿子节点的监听事件,当子节点或本节点变动时都会通知客户端,冰倩返回新的节点列表)
2.6获取节点的数据,当订阅节点数据变动时出发事件
package com.hust.grid.leesf.zkClient; import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient; public class Get_Data_Sample {
//会话超时时间
private static final int SESSION_TIMEOUT = 5000;
//ZkClient的实例对象
private static ZkClient zkClient = null;
/**
* 连接服务器,创建一个会话
* @param host 127.0.0.1:2181
*/
public void connect(String host){
zkClient = new ZkClient(host, SESSION_TIMEOUT);
System.out.println("ZooKeeper session established");
} public static void main(String[] args) throws InterruptedException {
Get_Data_Sample getDataSample = new Get_Data_Sample();
getDataSample.connect("127.0.0.1:2181");
String path = "/zk-book";
//创建一个临时节点
zkClient.createEphemeral(path, "123");
//节点数据变动时订阅监控
zkClient.subscribeDataChanges(path, new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("Node " + dataPath + " deleted.");
}
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("Node " + dataPath + " changed, new data: " + data);
}
});
//获取path节点的数据
System.out.println(zkClient.readData(path));
//修改path节点的数据
zkClient.writeData(path, "456");
Thread.sleep(1000);
//删除path节点的数据
zkClient.delete(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
2.7 检测节点是否存在,直接使用客户端检测节点是否存在,结果返回false不存在,true存在;
public class Exist_Node_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book";
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 2000);
System.out.println("Node " + path + " exists " + zkClient.exists(path));
}
三、Curator客户端
Curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连,反复注册Watcher和NodeExistsException异常等,现已成为Apache的顶级项目。
3.1 添加依赖
使用maven管理时,在pom.xml文件中添加如下内容即可
<!-- https://mvnrepository.com/artifact/org.apache.curator/apache-curator -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.4.2</version>
</dependency>
3.2 创建会话
Curator除了使用一般方法创建会话外,还可以使用fluent风格进行创建;
package com.hust.grid.leesf.curator; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* 使用Curator客户端,创建会话
* @author songzl
*
*/
public class Create_Session_Sample { public static void main(String[] args) throws Exception {
//重试策略:重试时间每间隔1000毫秒,最大重试次数3
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//第一种方式新建客户端:服务端ip和端口,session超时时间,连接的超时时间,重试策略实例
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy);
//大多数的方法在客户端启动之后才能工作
client.start();
System.out.println("Zookeeper session1 established. ");
//第二种方式新建客户端:这里设置的“base”被作为根路径使用
CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base").build();
client1.start();
System.out.println("Zookeeper session2 established. ");
}
}
注意:值得注意的是session2会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对/base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离。
3.3 创建节点
通过使用Fluent风格的接口,开发人员可以进行自由组合来完成各种类型节点的创建。
package com.hust.grid.leesf.curator; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode; public class Create_Node_Sample {
//服务端ip和端口号
private String host = "127.0.0.1:2181";
//session超时时间
private static final int sessionTimeOut = 5000;
//连接的超时时间
private static final int connectTimeOut = 3000;
//初始化Curator客户端
private static CuratorFramework client = null;
//重试策略:重试时间每间隔1000毫秒,最大重试次数3
private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); public void getCuratorFrameworkByNewClient(){
//第一种方式新建客户端:服务端ip和端口,session超时时间,连接的超时时间,重试策略实例
client = CuratorFrameworkFactory.newClient(host, sessionTimeOut, connectTimeOut, retryPolicy);
//大多数的方法在客户端启动之后才能工作
client.start();
} public void getCuratorFrameworkByBuilder(){
//第二种方式新建客户端:这里设置的“base”被作为此连接的根路径使用
// client = CuratorFrameworkFactory.builder().connectString(host)
// .sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).namespace("base").build();
//不设置namespace
client = CuratorFrameworkFactory.builder().connectString(host)
.sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).build();
client.start();
} public static void main(String[] args) throws Exception {
String path = "/zk-book/c1";
Create_Node_Sample createNodeSample = new Create_Node_Sample();
createNodeSample.getCuratorFrameworkByBuilder();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
System.out.println("success create znode: " + path);
} }
3.4节点的增、删、改、查
package com.hust.grid.leesf.curator; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat; public class Create_Node_Sample {
//服务端ip和端口号
private String host = "127.0.0.1:2181";
//session超时时间
private static final int sessionTimeOut = 5000;
//连接的超时时间
private static final int connectTimeOut = 3000;
//初始化Curator客户端
private static CuratorFramework client = null;
//重试策略:重试时间每间隔1000毫秒,最大重试次数3
private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); public void getCuratorFrameworkByNewClient(){
//第一种方式新建客户端:服务端ip和端口,session超时时间,连接的超时时间,重试策略实例
client = CuratorFrameworkFactory.newClient(host, sessionTimeOut, connectTimeOut, retryPolicy);
//大多数的方法在客户端启动之后才能工作
client.start();
} public void getCuratorFrameworkByBuilder(){
//第二种方式新建客户端:这里设置的“base”被作为此连接的根路径使用
// client = CuratorFrameworkFactory.builder().connectString(host)
// .sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).namespace("base").build();
//不设置namespace
client = CuratorFrameworkFactory.builder().connectString(host)
.sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).build();
client.start();
}
/**
* 创建节点
* @param path
*/
public void createNode(String path){
try {
Stat stat = client.checkExists().forPath(path);
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
String context = new String(client.getData().storingStatIn(stat).forPath(path));
System.out.println("success create znode: " + path + "节点内容为:"+context);
Thread.sleep(Integer.MAX_VALUE);//创建成功后需要阻塞一下线程保持回话
} catch (Exception e) {
System.out.println("fail create znode: " + path);
}
}
/**
* 更新节点数据
* @param path
*/
public void updateNode(String path){
try {
Stat stat = client.checkExists().forPath(path);
System.out.println("初始的版本号:"+stat.getVersion());
Stat stat1 = client.setData().withVersion(stat.getVersion()).forPath(path,"songzl".getBytes());
System.out.println("更新后的版本号:"+stat1.getVersion());
String context = new String(client.getData().storingStatIn(stat).forPath(path));
System.out.println("success set node data 路径为:" +path+ "更新的内容: " + context);
} catch (Exception e) {
System.out.println("Fail set node data " + e.getMessage());
}
}
/**
* 删除节点数据
* @param path
*/
public void deleteNode(String path){
try {
Stat stat = client.checkExists().forPath(path);
client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
System.out.println("success delete znode " + path);
} catch (Exception e) {
System.out.println("fail delete znode " + path);
}
}
/**
* 获取节点的数据
* @param path
*/
public void getNode(String path){
try {
Stat stat = client.checkExists().forPath(path);
System.out.println(stat);
String context = new String(client.getData().storingStatIn(stat).forPath(path));
System.out.println("success get node data:"+context);
} catch (Exception e) {
System.out.println("fail get node data");
}
} public static void main(String[] args) throws Exception {
String path = "/zk-book/c1";
Create_Node_Sample createNodeSample = new Create_Node_Sample();
createNodeSample.getCuratorFrameworkByBuilder();
createNodeSample.createNode(path);
} }
总结:
方法名 | 描述 |
---|---|
create() | 开始创建操作, 可以调用额外的方法(比如方式mode 或者后台执行background) 并在最后调用forPath()指定要操作的ZNode |
delete() | 开始删除操作. 可以调用额外的方法(版本或者后台处理version or background)并在最后调用forPath()指定要操作的ZNode |
checkExists() | 开始检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath()指定要操作的ZNode |
getData() | 开始获得ZNode节点数据的操作. 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode |
setData() | 开始设置ZNode节点数据的操作. 可以调用额外的方法(版本或者后台处理) 并在最后调用forPath()指定要操作的ZNode |
getChildren() | 开始获得ZNode的子节点列表。 以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode |
inTransaction() | 开始是原子ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交 |
3.5 异步接口
如同Zookeeper原生API提供了异步接口,Curator也提供了异步接口。在Zookeeper中,所有的异步通知事件处理都是由EventThread这个线程来处理的,EventThread线程用于串行处理所有的事件通知,其可以保证对事件处理的顺序性,但是一旦碰上复杂的处理单元,会消耗过长的处理时间,从而影响其他事件的处理,Curator允许用户传入Executor实例,这样可以将比较复杂的事件处理放到一个专门的线程池中去。
package com.hust.grid.leesf.curator; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode; public class Create_Node_Background_Sample { //服务端ip和端口号
private String host = "127.0.0.1:2181";
//session超时时间
private static final int sessionTimeOut = 5000;
//连接的超时时间
private static final int connectTimeOut = 3000;
//初始化Curator客户端
private static CuratorFramework client = null;
//重试策略:重试时间每间隔1000毫秒,最大重试次数3
private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//对执行中的线程进行管理,等待线程完成某些操作后,再对此线程做处理(起到过河拆桥、卸磨杀驴的作用)
static CountDownLatch semaphore = new CountDownLatch(2);
//创建一个线程池,此线程池共享队列中的任务,直到队列中所有任务处理完(只要队列中有任务,就不停不休的执行,除非手动杀死线程)
static ExecutorService tp = Executors.newFixedThreadPool(2); //第一种方式新建客户端:服务端ip和端口,session超时时间,连接的超时时间,重试策略实例
public void getCuratorFrameworkByNewClient(){
client = CuratorFrameworkFactory.newClient(host, sessionTimeOut, connectTimeOut, retryPolicy);
//大多数的方法在客户端启动之后才能工作
client.start();
} //第二种方式新建客户端:这里设置的“base”被作为此连接的根路径使用
public void getCuratorFrameworkByBuilder(){
// client = CuratorFrameworkFactory.builder().connectString(host)
// .sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).namespace("base").build();
//不设置namespace
client = CuratorFrameworkFactory.builder().connectString(host)
.sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).build();
client.start();
} public static void main(String[] args) throws Exception {
Create_Node_Background_Sample createNodeBackgroundSample = new Create_Node_Background_Sample();
createNodeBackgroundSample.getCuratorFrameworkByBuilder();
System.out.println("Main thread: " + Thread.currentThread().getName());
String path = "/zk-book";
//方法一:此方法使用一个线程池当做后台执行器,可以将比较复杂的事件处理放到一个专门的线程池中去
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("使用专门的线程池执行:event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
semaphore.countDown();
}
}, tp).forPath(path, "init".getBytes());
//方法二:普通的异步回调函数创建一个临时节点,EventThread线程用于串行处理所有的事件通知,其可以保证对事件处理的顺序性,但是一旦碰上复杂的处理单元,会消耗过长的处理时间
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("不适用线程池执行:event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
semaphore.countDown();
}
}).forPath(path, "init".getBytes()); semaphore.await();
tp.shutdown();
}
}
3.6 Curator除了提供很便利的API,还提供了一些典型的应用场景,开发人员可以使用参考更好的理解如何使用Zookeeper客户端,所有的都在recipes包中,只需要在pom.xml中添加如下依赖即可。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.2</version>
</dependency>
3.6.1 curator-recipes对节点的监听:一下是我代码测试的结论,若有误,请大家指正。
NodeCache将节点数据保存到本地缓存中,给这些数据注册一个监听器,当被新增、更新时触发监听器,(我测试时发现删除时监听器不触发);
NodeCache节点缓存不是线程安全的,在不同步的情况下不能保持同步;当多线程更新数据时必须使用版本号,避免错误覆盖其他进程数据;
NodeCache注册一个监听器,若session不超时、监听器没有被移除,此监听器一直有效并且可以重复使用(多次更新节点数据均被成功触发);
NodeCache也可以同时注册多个监听器,session不超时、监听器没有被移除,所有的监听器也都是可以正常被触发;
package com.hust.grid.leesf.curator; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
/**
* NodeCache将节点数据保存到本地缓存中,给这些数据注册一个监听器,当被增删改时触发监听器;
* 需要注意:这个缓存不是线程安全的,不能保证同步;当更新数据时必须使用版本号,避免数据错误覆盖;
* NodeCache的监听器可以重复添加多个,并且都会触发;也可以移除,若不移除则可一直使用;
*/
public class NodeCache_Sample { //服务端ip和端口号
private String host = "127.0.0.1:2181";
//session超时时间
private static final int sessionTimeOut = 5000;
//连接的超时时间
private static final int connectTimeOut = 3000;
//初始化Curator客户端
private static CuratorFramework client = null;
//重试策略:重试时间每间隔1000毫秒,最大重试次数3
private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//对执行中的线程进行管理,等待线程完成某些操作后,再对此线程做处理(起到过河拆桥、卸磨杀驴的作用)
static CountDownLatch semaphore = new CountDownLatch(1);
//创建一个线程池,此线程池共享队列中的任务,直到队列中所有任务处理完(只要队列中有任务,就不停不休的执行,除非手动杀死线程)
static ExecutorService tp = Executors.newFixedThreadPool(1); //第一种方式新建客户端:服务端ip和端口,session超时时间,连接的超时时间,重试策略实例
public void getCuratorFrameworkByNewClient(){
client = CuratorFrameworkFactory.newClient(host, sessionTimeOut, connectTimeOut, retryPolicy);
//大多数的方法在客户端启动之后才能工作
client.start();
} //第二种方式新建客户端:这里设置的“base”被作为此连接的根路径使用
public void getCuratorFrameworkByBuilder(){
// client = CuratorFrameworkFactory.builder().connectString(host)
// .sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).namespace("base").build();
//不设置namespace
client = CuratorFrameworkFactory.builder().connectString(host)
.sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).build();
client.start();
} /**
* path路径的节点数据放到NodeCache的本地缓存中,并且给nodeCache添加一个监听;
* 不使用单独的线程池处理
* @param path
* @throws Exception
*/
public void nodeCacheAddListener(String path) throws Exception{
final NodeCache cache = new NodeCache(client, path, false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData())+",线程名字:"+Thread.currentThread().getName());
}
});
}
//封装nodeCacheAddListener方法
public void warpNodeCacheAddListener(String path) throws Exception{
getCuratorFrameworkByBuilder();//实例化client
//path节点的缓存,创建一个监听器,不使用线程池,监听器可以注册多个
nodeCacheAddListener(path);
//创建时也调用监听器(之前还没有path节点,此时才创建,但是缓存节点的监听器被触发了)
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
//更行path节点数据时,缓存节点的监听器被触发了
client.setData().forPath(path, "songzl".getBytes());
//删除path节点时,不触发监听(因为节点给删除了,监听也就被移除了,还调用个毛线)
client.delete().deletingChildrenIfNeeded().forPath(path);
}
/**
* path路径的节点数据放到NodeCache的本地缓存中,并且给nodeCache添加一个监听;
* 使用单独的线程池处理
* @param path
* @throws Exception
*/
public void nodeCacheAddListenerExecutor(String path) throws Exception{
final NodeCache cache = new NodeCache(client, path, false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData())+",线程名字:"+Thread.currentThread().getName());
semaphore.countDown();
}
},tp);
}
//封装nodeCacheAddListenerExecutor方法
public void warpNodeCacheAddListenerExecutor(String path) throws Exception{
getCuratorFrameworkByBuilder();//实例化client
//path节点的缓存,创建一个监听器,使用线程池,监听器可以注册多个
nodeCacheAddListenerExecutor(path);
//创建时也调用监听器(之前还没有path节点,此时才创建,但是缓存节点的监听器被触发了)
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
//更行path节点数据时,缓存节点的监听器被触发了
client.setData().forPath(path, "songzl".getBytes());
//删除path节点时,不触发监听(因为节点给删除了,监听也就被移除了,还调用个毛线)
client.delete().deletingChildrenIfNeeded().forPath(path);
semaphore.await();
tp.shutdown();
} public static void main(String[] args) throws Exception {
String path = "/zk-book/nodecache";
//实例化client
NodeCache_Sample nodeCacheSample = new NodeCache_Sample();
nodeCacheSample.warpNodeCacheAddListener(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
3.6.2 curator-recipes对 子节点监听,3.6.1的节点性质子节点均具备,还有下面的几种特性;
给子节点添加的监听器,参数event提供了增删改的类型判断,因此字节点的增删改均会触发监听器;
给子节点注册监听器使用的path是父节点的绝对路径,当父节点路径下有子节点增改删时,触发子节点的监听器,操作父节点时不会触发;
在对子节点修改过于频繁时,若不阻塞线程,会丢失监听器调用次数(我用代码测试时发现总是少调用,原来是对子节点操作过于频繁,导致监听器调用次数丢失);
/**
* 给子节点添加监听器
* @param path
* @throws Exception
*/
public void childNodeCacheAddListener(String path) throws Exception{
PathChildrenCache cache = new PathChildrenCache(client, path, true);
cache.start(StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED," + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED," + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED," + event.getData().getPath());
break;
default:
break;
}
}
});
}
public static void main(String[] args) throws Exception {
String path = "/zk-demo";
//实例化client
NodeCache_Sample nodeCacheSample = new NodeCache_Sample();
nodeCacheSample.getCuratorFrameworkByBuilder();
nodeCacheSample.childNodeCacheAddListener(path);
client.create().withMode(CreateMode.PERSISTENT).forPath(path);
Thread.sleep(1000);
client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
Thread.sleep(1000);//必须阻塞下线程,避免快速操作来不及触发监听器就被下一个覆盖
client.setData().forPath(path + "/c1", "songzl".getBytes());
System.out.println("第一次修改子节点内容:"+ new String(client.getData().forPath(path + "/c1")));
Thread.sleep(1000);//必须阻塞下线程,避免快速操作来不及触发监听器就被下一个覆盖
client.setData().forPath(path + "/c1", "wangxn".getBytes());
System.out.println("第二次修改子节点内容"+ new String(client.getData().forPath(path + "/c1")));
Thread.sleep(1000);//必须阻塞下线程,避免快速操作来不及触发监听器就被下一个覆盖
client.delete().forPath(path + "/c1");
client.delete().forPath(path);
}
打印结果:由下图可以明确得出结论
3.7 Master选举,这个是zookeeper的核心之一;
借助Zookeeper,开发者可以很方便地实现Master选举功能,其大体思路如下:选择一个根节点,如/master_select,多台机器同时向该节点创建一个子节点/master_select/lock,利用Zookeeper特性,最终只有一台机器能够成功创建,成功的那台机器就是Master。
package com.hust.grid.leesf.curator; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry; public class Recipes_MasterSelect {
//服务端ip和端口号
private static String host = "127.0.0.1:2181";
//重试策略:重试时间每间隔1000毫秒,最大重试次数3
private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//Curator客户端
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(host).retryPolicy(retryPolicy).build();
//主服务路径
static String master_path = "/curator_recipes_master_path"; public static void main(String[] args) throws Exception {
client.start();
LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println("成为Master角色");
Thread.sleep(3000);//模拟Master的业务
System.out.println("完成Master操作,释放Master权利");
}
});
selector.autoRequeue();
selector.start();
Thread.sleep(Integer.MAX_VALUE);
} }
以上结果会反复循环,并且当一个应用程序完成Master逻辑后,另外一个应用程序的相应方法才会被调用,即当一个应用实例成为Master后,其他应用实例会进入等待,直到当前Master挂了或者推出后才会开始选举Master。
3.8 分布式锁也是zookeeper的核心之一(实现数据一致性的原理)
为了保证数据的一致性,经常在程序的某个运行点需要进行同步控制。以流水号生成场景为例,普通的后台应用通常采用时间戳方式来生成流水号,但是在用户量非常大的情况下,可能会出现并发问题。
package com.hust.grid.leesf.curator; import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* zookeeper实现分布式锁
*/
public class Recipes_Lock {
static String lock_path = "/curator_recipes_lock_path";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); public static void main(String[] args) throws Exception {
client.start();
/**
* InterProcessMutex是跨JVM的互斥锁,该锁是由zookeeper控制;
* 重点是它实现了分布式锁,当不同服务器的进程对同一个节点操作时是安全的受锁控制的;
* 并且此分布式锁是绝对公平的,用户都是按照请求的顺序获取互斥锁,依次执行;
*/
final InterProcessMutex lock = new InterProcessMutex(client, lock_path);
final CountDownLatch down = new CountDownLatch(1);
for (int i = 0; i < 30; i++) {
new Thread(new BuildOrderNo(lock,down)).start();
}
Thread.sleep(2000);//模拟生成订单前的其他业务,当操作完后开始生成订单
down.countDown();
}
}
/**
* 生成订单号类
* @author songzl
*
*/
class BuildOrderNo implements Runnable{
private InterProcessMutex lock;
private CountDownLatch down;
public BuildOrderNo(InterProcessMutex lock,CountDownLatch down){
this.lock = lock;
this.down = down;
}
public void run() {
try {
down.await();
lock.acquire();//获取互斥锁,检测当前线程是否可以执行
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = sdf.format(new Date());
System.out.println("生成的订单号是 : " + orderNo+"子线程名字:"+Thread.currentThread().getName());
//如果调用线程是获得它的线程,那么执行一个互斥锁。如果线程已经多次调用获取,当这个方法返回时,互斥锁仍然会被保留。
lock.release();//释放互斥锁,具备检查功能
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.9分布式计数器
分布式计数器的典型应用是统计系统的在线人数,借助Zookeeper也可以很方便实现分布式计数器功能:指定一个Zookeeper数据节点作为计数器,多个应用实例在分布式锁的控制下,通过更新节点的内容来实现计数功能。
package com.hust.grid.leesf.curator; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
/**
* 分布式计数器
*/
public class Recipes_DistAtomicInt {
static String distatomicint_path = "/curator_recipes_distatomicint_path";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); public static void main(String[] args) throws Exception {
client.start();
/**
* 创建一个增量的计数器;它首先尝试使用乐观锁定,如果失败就选择一个可选的互斥,对于乐观和互斥,可以使用重试策略重试增量。
*/
DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, distatomicint_path,
new RetryNTimes(3, 1000));
AtomicValue<Integer> rc = atomicInteger.add(1);
System.out.println(rc.preValue());
System.out.println("Result: " + rc.succeeded());
System.out.println(rc.postValue());
}
}
最新文章
- HDU2842 矩阵乘法
- JSON介绍
- setTimeout的应用
- 如何开启win7端口的图文教程
- 关于CSS中对IE条件注释的问题
- 树形DP 2013多校8(Terrorist’s destroy HDU4679)
- Scrum Meeting---Three(2015-10-27)
- html5之meta标签viewport应用
- weekend110(Hadoop)的 第二天笔记
- HDU 1312 Red and Black (DFS)
- 理解Twisted与非阻塞编程
- C++_运算符重载
- Dapper入门教程(一)——Dapper介绍
- C#编写街道管理系统
- python_hashlib模块
- PHP实用代码片段(二)
- php 中 opendir() readdir() scandir()
- Hibernate注解开发、注解创建索引
- git教程(全)
- 自定义android ProgressDialog
热门文章
- JavaScript例子2-使一个特定的表格隔行变色
- 解决centos-yum无法正常使用问题
- vue打包后css背景图片地址找不到
- asp.net Core 2.0 MVC为Controller或Action添加定制特性实现登录验证
- Troubleshooting: Cannot Run on an Android Device
- 【Day2】3.面向对象编程
- MyEclipse基本配置及优化【MyEclipse_10.7】
- 用Python+Aria2写一个自动选择最优下载方式的E站爬虫
- 2019ICPC南京网络赛A题 The beautiful values of the palace(三维偏序)
- DP tricks and experiences