zookeeper工具类:

获取连接实例;创建节点;获取子节点;设置节点数据;获取节点数据;访问控制等。

package org.windwant.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.List; /**
* zookeeper util
*/
public class ZookeeperUtil {
private static final int SESSION_TIMEOUT = 30000; /**
* 使用连接串创建连接
* @param domain
* @param w
* @return
*/
public static ZooKeeper getInstance(String domain, Watcher w){
try {
return new ZooKeeper(domain,SESSION_TIMEOUT, w);
} catch (IOException e) {
e.printStackTrace();
}
return null;
} public static String createNode(ZooKeeper zk, String path, byte[] data){
try {
return zk.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
} public static List<String> getChildrenNode(ZooKeeper zk, String path){
try {
return zk.getChildren(path, false);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
} public static Stat setNodeData(ZooKeeper zk, String path, byte[] data, int version){
try {
return zk.setData(path, data, version);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
} public static byte[] getNodeData(ZooKeeper zk, String path){
try {
return zk.getData(path, false, null);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
} public static void deleteNode(ZooKeeper zk, String path, int version){
try {
zk.delete(path, version);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
} public static void closeZk(ZooKeeper zk){
try {
if(zk != null) {
zk.close();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
} public static void addAuth(ZooKeeper zk, String userName, String passwd){
try {
zk.addAuthInfo(String.valueOf(Ids.AUTH_IDS), DigestAuthenticationProvider.generateDigest(userName + ":" + passwd).getBytes());
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
} public static void main(String[] args) {
// try {
// ZooKeeper zk = new ZooKeeper("localhost", 2181, null);
// addAuth(zk, "roger", "123456");
// } catch (IOException e) {
// e.printStackTrace();
// }
try {
System.out.println(DigestAuthenticationProvider.generateDigest("roger:123456"));
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
}
}

继承父类:

SyncPrimitive

负责zookeeper连接及根节点的初始化。

实现zookeeper的Watcher
package org.windwant.zookeeper;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List; public class SyncPrimitive implements Watcher { protected ZooKeeper zk = null;
protected Integer mutex; SyncPrimitive(Integer mutex) {
this.mutex = mutex;
} /**
* 初始化zookeeper
* @param domain
*/
protected void initZK(String domain){
System.out.println(Thread.currentThread().getName() + ": init zookeeper...");
try {
zk = new ZooKeeper(domain, 30000, this);
System.out.println(Thread.currentThread().getName() + ": zookeeper connected " + zk);
} catch (IOException e) {
e.printStackTrace();
}
} /**
* 初始化应用根节点 并发处理
* @param root
*/
protected void initZKRootNode(String root){
//并发控制
synchronized (mutex) {
try {
if (zk != null) {
if (zk.exists(root, false) != null) {
List<String> child = zk.getChildren(root, false);
if (child != null && !child.isEmpty()) {
//zookeeper multi操作;或者 Transaction(multi封装) commit操作;
List<Op> ops = new ArrayList<>();
child.forEach(c -> {
ops.add(Op.delete(root + "/" + c, -1));
});
List<OpResult> opRsts = zk.multi(ops);
System.out.println(Thread.currentThread().getName() + ": deleted child node success!");
}
zk.setData(root, String.valueOf(0).getBytes(), -1);
System.out.println(Thread.currentThread().getName() + ": app root node " + root + " init success! ");
} else {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println(Thread.currentThread().getName() + ": app root node " + root + " create success! ");
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} public void process(WatchedEvent event) {
if(event.getState().equals(Event.KeeperState.SyncConnected)) {
}
}
}

zookeeper 屏障 Barrier:

SyncPrimitiveBarrier
enter():加入屏障队列
leave():离开屏障队列
package org.windwant.zookeeper;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat; import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom; public class SyncPrimitiveBarrier extends SyncPrimitive { private String root;
//屏障阈值
private int size;
private String name; /**
* Barrier constructor
*
* @param domain
* @param root
* @param size
*/
public SyncPrimitiveBarrier(String domain, String root, Integer size) {
super(size);
this.root = root;
this.size = size; initZK(domain);
initZKRootNode(root); // My node name
try {
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
} catch (UnknownHostException e) {
System.out.println(e.toString());
} } /**
* Join barrier
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/ boolean enter() throws KeeperException, InterruptedException{
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
//当前节点数小于阈值,则创建节点,进入barrier
if (list.size() < size) {
System.out.println("node: " + list.size());
this.name = zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
if (list.size() + 1 == size) {
System.out.println("set data node size" + list.size());
zk.setData(root, String.valueOf(list.size() + 1).getBytes(), -1);
}
System.out.println(Thread.currentThread().getName() + ": " + name + " enter barrier!");
return true;
} else {
//否则不进入
System.out.println(Thread.currentThread().getName() + ": " + name + " barrier full, inaccessible!");
return false;
}
}
} /**
* Wait until all reach barrier
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/ boolean leave() throws KeeperException, InterruptedException, UnsupportedEncodingException {
while (true) {
int data = Integer.parseInt(new String(zk.getData(root, false, new Stat()), "UTF-8"));
if (data == size) {
System.out.println("leave size: " + data);
//离开
zk.delete(name, -1);
System.out.println(Thread.currentThread().getName() + ": " + name + " left barrier!");
return true;
} else {
System.out.println(Thread.currentThread().getName() + ": " + name + " waitting for others!");
Thread.sleep(1000);//每秒检查一次
}
}
} public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new Thread(() -> {
SyncPrimitiveBarrier syncPrimitiveBarrier = new SyncPrimitiveBarrier("localhost:2181", "/barrier_test", 3);
boolean flag = false;
try {
//模拟需要到达barrier的时间
Thread.sleep(ThreadLocalRandom.current().nextInt(1,5)*1000);
flag = syncPrimitiveBarrier.enter(); //尝试离开barrier
syncPrimitiveBarrier.leave();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}).start(); }
}
}

zookeeper 消息队列:

SyncPrimitiveQueue
produce(int i):生成消息放入队列
consume():消费队列消息
package org.windwant.zookeeper;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat; import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom; /**
* Producer-Consumer queue
*/
public class SyncPrimitiveQueue extends SyncPrimitive { private String root;
private int queueSize; /**
* Constructor of producer-consumer queue
*
* @param domain
* @param name
*/ SyncPrimitiveQueue(String domain, String name, Integer queueSize) {
super(queueSize);
this.root = name;
this.queueSize = queueSize;
initZK(domain);
initZKRootNode(root);
} /**
* Add element to the queue.
*
* @param i
* @return
*/ public boolean produce(int i) throws KeeperException, InterruptedException{
synchronized (mutex) {
List<String> children = zk.getChildren(root, false);
if(children != null && children.size()>=mutex){
System.out.println(Thread.currentThread().getName() + ": producer queue full, waiting for consuming");
mutex.wait();
}
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value; b.putInt(i);
value = b.array();
String node = zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + ": produce value: " + node);
mutex.notifyAll();//通知消费
return true;
}
} /**
* Remove first element from the queue.
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
int consume() throws KeeperException, InterruptedException{
int retvalue = -1;
Stat stat = null; // Get the first element available
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
System.out.println(Thread.currentThread().getName() + ": resource not awailable, waitting for produce!");
mutex.wait();
} else {
list.sort((String s1, String s2) -> s1.compareTo(s2)); //消费序号最小的节点
String dest = list.get(0);
System.out.println(Thread.currentThread().getName() + ": cosume value: " + root + "/" + dest);
byte[] b = zk.getData(root + "/" + dest,
false, stat);
zk.delete(root + "/" + dest, 0); //消费后删除
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
mutex.notifyAll();
return retvalue;
}
}
}
} public static void main(String[] args) {
SyncPrimitiveQueue syncPrimitiveQueue = new SyncPrimitiveQueue("localhost:2181", "/queue_test", 10);
//生产 每隔三秒 模拟慢生产
new Thread(() -> {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
try {
syncPrimitiveQueue.produce(i);
Thread.sleep(ThreadLocalRandom.current().nextInt(0, 3)*1000);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start(); try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} //消费 每隔一秒 模拟快消费
new Thread(() -> {
for (int i = 0; i < Integer.MAX_VALUE ; i++) {
try {
syncPrimitiveQueue.consume();
Thread.sleep(ThreadLocalRandom.current().nextInt(0, 3)*1000);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

zookeeper分布式锁:

SynZookeeperLock
getInstance(String domain):获取zookeeper实例
tryLock(String domain, String path, byte[] data, CountDownLatch c):尝试获取分布式锁。
package org.windwant.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.ZooDefs.Ids; import java.io.IOException;
import java.util.concurrent.*; /**
* zookeeper 分布式锁
*/
public class SynZookeeperLock {
private static final int SESSION_TIMEOUT = 30000; public static ZooKeeper getInstance(String domain){
try {
CountDownLatch c = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper(domain, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
c.countDown(); // 唤醒当前正在执行的线程
}
}
});
//阻塞直到连接完成
c.await();
return zk;
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
/**
* 获取分布式锁
* 使用临时节点,避免进程获取锁后,down掉未释放锁问题
* @param domain
* @param path
* @param data
* @param c
*/
public static void tryLock(String domain, String path, byte[] data, CountDownLatch c){
//每次获取锁使用新的会话连接
ZooKeeper zk = getInstance(domain);
zk.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path1, ctx, name) -> {
//节点创建成功,获取锁
if (rc == 0) {
System.out.println(Thread.currentThread().getName() + ":result " + rc + " lock " + path + ", created!");
try {
//模拟获取锁后3秒释放
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + ":task complete,try release lock!");
zk.delete(path, -1, (rc1, path2, ctx1) -> {
if(rc1 == 0){
System.out.println(Thread.currentThread().getName() + ":lock released!");
}
}, null); } catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放等待
c.countDown();
}
} else if(rc == -110) {//节点已存在,则说明锁已被其它进程获取,则创建watch,并阻塞等待
System.out.println(Thread.currentThread().getName() + ":result " + rc + " lock " + path + " already created, waiting!");
try {
zk.exists(path, event -> {
//watch 到锁删除事件,则触发重新获取锁
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
System.out.println(Thread.currentThread().getName() + ":get node deleted event! try lock!");
//释放连接,避免服务器因为连接数限制
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
SynZookeeperLock.tryLock(domain, path, data, c);
c.countDown();
}
});
} catch (KeeperException e) {
//包括ConnectionLossException(网络,服务器故障) 需要确认客户端重连执行情况 之前的请求是否需要重新执行
e.printStackTrace();
c.countDown();
} catch (InterruptedException e) {
//线程中断,打断请求
e.printStackTrace();
c.countDown();
}
}else {
//-4 -112
System.out.println(Thread.currentThread().getName() + ": connection lose or session invalid");
c.countDown();
// tryLock(domain, path, data, c);
}
}, new Object());
try {
//阻塞等待结果
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
c.countDown();
}
} public static void main(String[] args) {
String lockPath = "/testlock";
byte[] lock = "lock".getBytes();
String domain = "127.0.0.1:2181";
//测试获取锁线程 注意服务器最大连接数限制
for (int i = 0; i < 20; i++) {
Thread tmp = new Thread( () -> tryLock(domain, lockPath, lock, new CountDownLatch(1)));
tmp.start();
}
}
}

项目地址:https://github.com/windwant/windwant-demo/tree/master/zookeeper-service

最新文章

  1. Beta分布和Dirichlet分布
  2. java笔记--使用SwingWoker类完成耗时操作
  3. 《Programming WPF》翻译 第8章 6.我们进行到哪里了?
  4. 2014 BDTC 參会有感
  5. centos7 下nfs的配置
  6. (数字IC)低功耗设计入门(五)——RTL级低功耗设计(续)
  7. Nginx 作用
  8. Source Qualifter组件中sqlquery过长导致截取
  9. python基础学习(二)注释和算术运算符
  10. /etc/shadow中密码段的生成方式
  11. DBUtils的增删改查
  12. python 全栈开发,Day91(Vue实例的生命周期,组件间通信之中央事件总线bus,Vue Router,vue-cli 工具)
  13. [LeetCode] 106. Construct Binary Tree from Postorder and Inorder Traversal_Medium tag: Tree Traversal
  14. MySQL 开启和查看bin-log日志
  15. 多线程通信(wait和notify)
  16. 使用JWPL (Java Wikipedia Library)操作维基百科数据
  17. 使用清华源和阿里源替代Ubuntu源
  18. spring 5.0.1.RELEASE官方任然不支持velocity(平台升级)
  19. http get请求参数拼接
  20. 辞树的QAQ水题(字符串统计,思维)

热门文章

  1. spread表格树实现
  2. Oracle 表分组 group by和模糊查询like
  3. SOA (面向服务的体系结构)
  4. eclipse新建maven项目(2)
  5. Javascript 语言精粹 代码片段合集
  6. [js开源组件开发]模拟下拉选项框select
  7. 在不知下面有几个元素的时候,要去除最后一个元素的下边框jquery代码
  8. javascript 函数初探 (六)--- 闭包初探#1
  9. CXF:通过WebService上传文件,包括大文件的处理
  10. linux集群运维工具:clustershell和pssh