A Simple Watch Client

为了向你介绍ZooKeeper Java API,我们开发了一个非常简单的监视器客户端。ZooKeeper客户端监视一个ZooKeeper节点的改变并且通过开始和停止一个程序来作出响应。

必备条件

客户端有四个必备条件:

  • 它作为参数:
    1. ZooKeeper服务端的地址
    2. znode的名字 - 被监视的节点
    3. 写输出内容的文件名字
    4. 带有参数的可执行文件
  • 它抓取这个znode的数据并且开始这个可执行文件
  • 如果znode改变,客户端重新抓取内容并且重启这个可执行文件
  • 如果znode消失,客户端杀死这个可执行文件

程序设计

按照惯例,ZooKeeper应用被分为两部分,一部分维护连接,另一部分监视数据。在这个应用中,Executor的类维护ZooKeeper连接,DataMonitor的类监视ZooKeeper树中的数据。同时,Executor包含主线程和执行逻辑,它负责很少的用户交互,和你作为参数传进去的可执行程序的交互,还有那一个例子关闭和重启,根据znode的状态。

Executor 类

Executor对象是这个例子程序的主要容器。它包含ZooKeeper对象,DataMonitor,就像上面程序设计描述的那样。

   // from the Executor class...

    public static void main(String[] args) {
if (args.length < 4) {
System.err
.println("USAGE: Executor hostPort znode filename program [args ...]");
System.exit(2);
}
String hostPort = args[0];
String znode = args[1];
String filename = args[2];
String exec[] = new String[args.length - 3];
System.arraycopy(args, 3, exec, 0, exec.length);
try {
new Executor(hostPort, znode, filename, exec).run();
} catch (Exception e) {
e.printStackTrace();
}
} public Executor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
} public void run() {
try {
synchronized (this) {
while (!dm.dead) {
wait();
}
}
} catch (InterruptedException e) {
}
}

Executor的回调工作是开始和停止可执行文件,这个可执行文件的名字是你从命令行的传过来的。它做这个是为了响应被ZooKeeper对象触发的事件。就像你在上面的代码看到的那样,Executor传入一个引入给他自己作为ZooKeeper构造函数的Watcher 参数。

它也传入一个引用给它自己作为DataMonitor的构造器的DataMonitorListener参数。每一个的Executor的定义,它实现了这些接口:

public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {
...

Watcher接口被ZooKeeper Java API定义。ZooKeeper使用它和它的容器通信。它只支持一个方法,process(),并且ZooKeeper使用它和主线程感兴趣的事件通信,例如ZooKeeper通信或者ZooKeeper会话的状态。这个例子中的Executor只是简单的向下推送这些事件 给DataMonitor来决定怎么处理它。它这样做只是为了说明这一点,依照惯例,Executor或像Executor的对象"拥有"ZooKeeper连接,但是它可以自由地把事件委托给其它对象。它也使用这个作为监听事件触发的通道。

public void process(WatchedEvent event) {
dm.process(event);
}

另一方面DataMonitorListener接口,不是ZooKeeper API的一部分,只是为了这个应用例子而设计的。DataMonitor对象使用它和它的容器通信,也就是Executor对象。DataMonitorListener接口就像这样:

public interface DataMonitorListener {
/**
* The existence status of the node has changed.
*/
void exists(byte data[]); /**
* The ZooKeeper session is no longer valid.
*
* @param rc
* the ZooKeeper reason code
*/
void closing(int rc);
}

这个接口被定义在DataMonitor类并且在Executor类中实现。当Executor.exists()被调用,Executor决定是否启动或关闭每一个请求。当znode不存在的时候再次调用需要杀掉可执行程序。

当Executor.closing()被调用,Executor决定是否关掉它自己来响应ZooKeeper连接永久消失。

正如你可能已经猜到的。DataMonitor是调用这些方法的对象,来响应ZooKeeper的状态改变。

下面是Executor的DataMonitorListener.exists()和DataMonitorListener.closing的实现:

public void exists( byte[] data ) {
if (data == null) {
if (child != null) {
System.out.println("Killing process");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
}
}
child = null;
} else {
if (child != null) {
System.out.println("Stopping child");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
FileOutputStream fos = new FileOutputStream(filename);
fos.write(data);
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
System.out.println("Starting child");
child = Runtime.getRuntime().exec(exec);
new StreamWriter(child.getInputStream(), System.out);
new StreamWriter(child.getErrorStream(), System.err);
} catch (IOException e) {
e.printStackTrace();
}
}
} public void closing(int rc) {
synchronized (this) {
notifyAll();
}
}

DataMonitor 类

The DataMonitor class has the meat of the ZooKeeper logic(这句咋翻译)。它几乎上是异步和事件驱动。 DataMonitor kicks things off in the constructor with:

public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
DataMonitorListener listener) {
this.zk = zk;
this.znode = znode;
this.chainedWatcher = chainedWatcher;
this.listener = listener; // Get things started by checking if the node exists. We are going
// to be completely event driven
zk.exists(znode, true, this, null);
}

调用ZooKeeper.exists()来检查znode是否存在,设置一个监视器,并且传递它自己的引用给它自己作为完成回调的对象,在这个意义上,it kicks things off,因为真实的处理在监视器被触发的时候发生。

注意

不要把完成回调和监视回调弄混淆了。ZooKeeper.exists()完成回调,它发生在DataMonitor对象的StatCallback.processResult()方法实现中,当异步的监视器set操作(被ZooKeeper.exists())在服务端完成的时候被调用。

另一方面,监视器的触发,发送一个事件给Executor对象,因为Executor作为ZooKeeper对象的监视器被注册。

此外,你可能注意到DataMonitor也能句注册它自已作为这个特定监视器事件的监听者。这是ZooKeeper3.0.0的新特性(支持多个监听者)。然而在这例子中,DataMonitor没有把它自己注册为监视器。

在ZooKeeper.exists()操作在服务端完成,ZooKeeper API在客户端调用这个完成回调函数:

public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
} byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
// We don't need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
if ((b == null && b != prevData)
|| (b != null && !Arrays.equals(prevData, b))) {
listener.exists(b);
prevData = b;
}
}

这个代码首先检查Znode 是否存在,致命错误,和可恢复的错误。如果文件(或znode)存在,它从znode获取数据,并且如果状态已经改变它调用Executor的exists()回调函数。注意,它不需要为getData调用做任何Exception处理因为它有任何可能导致错误的监视器:如果在ZooKeeper.getData()方法之前这个节点被删除,通过ZooKeeper.exists()设置的监听事件会触发一个回调;如果有通信错误,当连接回来的时候会触发一个连接监听事件。

最后,注意DataMonitor是怎么处理监听事件的:

 public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}

如果客户端ZooKeeper库在会话到期(到期事件)之前可以和ZooKeeper重新建立通信通道(同步连接事件)所有会话的监视器将自动地在服务端被建立(自动重置监视器是ZooKeeper 3.0.0的新特性)。关于这点的更多信息可以参考ZooKeeper Watches。

再深入这个方法一些,当DataMonitor等到一个znode的事件,它调用ZooKeeper.exists()来查找什么被改变了。

完整的源码列表

Executor.java

/**
* A simple example program to use DataMonitor to start and
* stop executables based on a znode. The program watches the
* specified znode and saves the data that corresponds to the
* znode in the filesystem. It also starts the specified program
* with the specified arguments when the znode exists and kills
* the program if the znode goes away.
*/
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper; public class Executor
implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
String znode; DataMonitor dm; ZooKeeper zk; String filename; String exec[]; Process child; public Executor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
} /**
* @param args
*/
public static void main(String[] args) {
if (args.length < 4) {
System.err
.println("USAGE: Executor hostPort znode filename program [args ...]");
System.exit(2);
}
String hostPort = args[0];
String znode = args[1];
String filename = args[2];
String exec[] = new String[args.length - 3];
System.arraycopy(args, 3, exec, 0, exec.length);
try {
new Executor(hostPort, znode, filename, exec).run();
} catch (Exception e) {
e.printStackTrace();
}
} /***************************************************************************
* We do process any events ourselves, we just need to forward them on.
*
* @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
*/
public void process(WatchedEvent event) {
dm.process(event);
} public void run() {
try {
synchronized (this) {
while (!dm.dead) {
wait();
}
}
} catch (InterruptedException e) {
}
} public void closing(int rc) {
synchronized (this) {
notifyAll();
}
} static class StreamWriter extends Thread {
OutputStream os; InputStream is; StreamWriter(InputStream is, OutputStream os) {
this.is = is;
this.os = os;
start();
} public void run() {
byte b[] = new byte[80];
int rc;
try {
while ((rc = is.read(b)) > 0) {
os.write(b, 0, rc);
}
} catch (IOException e) {
} }
} public void exists(byte[] data) {
if (data == null) {
if (child != null) {
System.out.println("Killing process");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
}
}
child = null;
} else {
if (child != null) {
System.out.println("Stopping child");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
FileOutputStream fos = new FileOutputStream(filename);
fos.write(data);
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
System.out.println("Starting child");
child = Runtime.getRuntime().exec(exec);
new StreamWriter(child.getInputStream(), System.out);
new StreamWriter(child.getErrorStream(), System.err);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

DataMonitor.java

/**
* A simple class that monitors the data and existence of a ZooKeeper
* node. It uses asynchronous ZooKeeper APIs.
*/
import java.util.Arrays; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat; public class DataMonitor implements Watcher, StatCallback { ZooKeeper zk; String znode; Watcher chainedWatcher; boolean dead; DataMonitorListener listener; byte prevData[]; public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
DataMonitorListener listener) {
this.zk = zk;
this.znode = znode;
this.chainedWatcher = chainedWatcher;
this.listener = listener;
// Get things started by checking if the node exists. We are going
// to be completely event driven
zk.exists(znode, true, this, null);
} /**
* Other classes use the DataMonitor by implementing this method
*/
public interface DataMonitorListener {
/**
* The existence status of the node has changed.
*/
void exists(byte data[]); /**
* The ZooKeeper session is no longer valid.
*
* @param rc
* the ZooKeeper reason code
*/
void closing(int rc);
} public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
} public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
} byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
// We don't need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
if ((b == null && b != prevData)
|| (b != null && !Arrays.equals(prevData, b))) {
listener.exists(b);
prevData = b;
}
}
}

最新文章

  1. Android屏幕适配
  2. aop
  3. Ruby-随机数
  4. output_buffering开启
  5. [Java] 转:多线程 (并发)总结
  6. HDU 5514 Frogs 容斥定理
  7. MORE ABORT AWR?
  8. PHP中的ORM
  9. 仿网易新闻app下拉标签选择菜单
  10. SQL奇技淫巧
  11. 设计一个有getMin功能的栈(2)
  12. Python之旅.第三章.函数3.30
  13. 摘录&lt;奇特的一生&gt;1~4——[苏]格拉宁
  14. MHA高可用
  15. LNMP下动静分离部署phpmyadmin软件包
  16. bootstrap中的模态框(modal,弹出层)
  17. Python+Selenium笔记(十一):配置selenium Grid
  18. 如何更新 Visual Studio 2017 的离线安装包
  19. python函数之format
  20. 解决方案:System.InvalidOperationException: 此实现不是 Windows 平台 FIPS 验证的加密算法的一部分。

热门文章

  1. opencv-学习笔记(2)
  2. Java 集合框架之 Map
  3. 你代码写得这么丑,一定是因为你长得不好看----panboo第一篇博客
  4. Sparsity Invariant CNNs
  5. Cstring, TCHAR*, char*的转换
  6. 【redis数据库学习】用JAVA连接redis数据库各种报错
  7. Jmeter系列-自动生成html报告
  8. C# 知识回顾 - 表达式树 Expression Trees
  9. [剑指Offer] 53.表示数值的字符串
  10. jsp当做第二个servlet request的生命周期 请求 响应 不管中间经历多少个servlet 只要最后一个serlvt执行后 则生命周期结束 request的域消失