生产-消费者队列,用于多节点的分布式数据结构,生产和消费数据。生产者创建一个数据对象,并放到队列中;消费者从队列中取出一个数据对象并进行处理。在ZooKeeper中,队列可以使用一个容器节点下创建多个子节点来实现;创建子节点时,CreateMode使用 PERSISTENT_SEQUENTIAL,ZooKeeper会自动在节点名称后面添加唯一序列号。EPHEMERAL_SEQUENTIAL也有同样的特点,区别在于会话结束后是否会自动删除。


1      对前续代码的重构


以下是 process(WatchedEvent)的代码

final public void process(WatchedEvent event) {

if (Event.EventType.None.equals(event.getType())) {

// 连接状态发生变化

if (Event.KeeperState.SyncConnected.equals(event.getState())) {

// 连接建立成功



} else if (Event.EventType.NodeCreated.equals(event.getType())) {


} else if (Event.EventType.NodeDeleted.equals(event.getType())) {


} else if (Event.EventType.NodeDataChanged.equals(event.getType())) {


} else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {





ZooKeeperBarrier(String address, String tableSerial, int tableCapacity, String customerName)

throws IOException {


this.tableSerial = createRootNode(tableSerial);

this.tableCapacity = tableCapacity;

this.customerName = customerName;


protected void processNodeChildrenChanged(WatchedEvent event) {

log.info("{} 接收到了通知 : {}", customerName, event.getType());

// 子节点有变化

synchronized (mutex) {




2      队列的生产者


String elementName = queueName + "/element";

ArrayList<ACL> ids = ZooDefs.Ids.OPEN_ACL_UNSAFE;

CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL;

getZooKeeper().create(elementName, value, ids, createMode);

注意,重点是PERSISTENT_SEQUENTIAL,PERSISTENT是表示永久存储直到有命令删除,SEQUENTIAL表示自动在后面添加自增的唯一序列号。这样,尽管elementName都一样,但实际生成的zNode名字在 “element”后面会添加格式为%010d的10个数字,如0000000001。如一个完整的zNode名可能为/queue/element0000000021。

3      测试日志




49:47.866 [INFO] ZooKeeperQueueTest.testQueue(29) 开始ZooKeeper队列测试,本次将测试 10 个数据

49:48.076 [DEBUG] ZooKeeperQueue.log(201)

+ Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 连接到ZooKeeper]

|-- elapsed time                   [开始链接]   119.863 milliseconds.

|-- elapsed time           [等待连接成功的Event]    40.039 milliseconds.

|-- Total        [tech.codestory.zookeeper.queue.ZooKeeperQueue 连接到ZooKeeper]   159.911 milliseconds.

49:48.082 [DEBUG] ZooKeeperQueue.log(201)

+ Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 连接到ZooKeeper]

|-- elapsed time                   [开始链接]   103.795 milliseconds.

|-- elapsed time           [等待连接成功的Event]    65.899 milliseconds.

|-- Total        [tech.codestory.zookeeper.queue.ZooKeeperQueue 连接到ZooKeeper]   170.263 milliseconds.

49:48.102 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 1 , 然后等待 1700 毫秒

49:48.134 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 1 , 然后等待 4000 毫秒

49:49.814 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 2 , 然后等待 900 毫秒

49:50.717 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 3 , 然后等待 1300 毫秒

49:52.020 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 4 , 然后等待 3700 毫秒

49:52.139 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 2 , 然后等待 2800 毫秒

49:54.947 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 3 , 然后等待 4500 毫秒

49:55.724 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 5 , 然后等待 3500 毫秒

49:59.228 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 6 , 然后等待 4200 毫秒

49:59.454 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 4 , 然后等待 2400 毫秒

50:01.870 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 5 , 然后等待 4900 毫秒

50:03.435 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 7 , 然后等待 4500 毫秒

50:06.776 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 6 , 然后等待 3600 毫秒

50:07.938 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 8 , 然后等待 1900 毫秒

50:09.846 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 9 , 然后等待 3200 毫秒

50:10.388 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 7 , 然后等待 2900 毫秒

50:13.051 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 10 , 然后等待 4900 毫秒

50:13.294 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 8 , 然后等待 300 毫秒

50:13.600 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 9 , 然后等待 4800 毫秒

50:18.407 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 10 , 然后等待 2400 毫秒

4      队列的消费者


/** 队列的同步信号 */

private static Integer queueMutex = Integer.valueOf(1);


protected void processNodeChildrenChanged(WatchedEvent event) {

synchronized (queueMutex) {





* 从队列中删除第一个对象


* @return

* @throws KeeperException

* @throws InterruptedException


int consume() throws KeeperException, InterruptedException {

while (true) {

synchronized (queueMutex) {

List<String> list = getZooKeeper().getChildren(queueName, true);

if (list.size() == 0) {


} else {

// 获取第一个子节点的名称

String firstNodeName = getFirstElementName(list);

// 删除节点,并返回节点的值

return deleteNodeAndReturnValue(firstNodeName);





5      完整源码

5.1   ZooKeeperBase.java

package tech.codestory.zookeeper;

import java.io.IOException;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.slf4j.profiler.Profiler;


* 为 ZooKeeper测试代码创建一个基类,封装建立连接的过程


* @author code story

* @date 2019/8/16


public class ZooKeeperBase implements Watcher {

/** 日志,不使用 @Slf4j ,是要使用子类的log */

Logger log = null;

/** 等待连接建立成功的信号 */

private CountDownLatch connectedSemaphore = new CountDownLatch(1);

/** ZooKeeper 客户端 */

private ZooKeeper zooKeeper = null;

/** 避免重复根节点 */

static Integer rootNodeInitial = Integer.valueOf(1);

/** 构造函数 */

public ZooKeeperBase(String address) throws IOException {

log = LoggerFactory.getLogger(getClass());

Profiler profiler = new Profiler(this.getClass().getName() + " 连接到ZooKeeper");


zooKeeper = new ZooKeeper(address, 3000, this);

try {



} catch (InterruptedException e) {

log.error("InterruptedException", e);







* 创建测试需要的根节点


* @param rootNodeName

* @return


public String createRootNode(String rootNodeName) {

synchronized (rootNodeInitial) {

// 创建 tableSerial 的zNode

try {

Stat existsStat = getZooKeeper().exists(rootNodeName, false);

if (existsStat == null) {

rootNodeName = getZooKeeper().create(rootNodeName, new byte[0],



} catch (KeeperException e) {

log.error("KeeperException", e);

} catch (InterruptedException e) {

log.error("InterruptedException", e);



return rootNodeName;


/** 读取ZooKeeper对象,供子类调用 */

protected ZooKeeper getZooKeeper() {

return zooKeeper;



final public void process(WatchedEvent event) {

if (Event.EventType.None.equals(event.getType())) {

// 连接状态发生变化

if (Event.KeeperState.SyncConnected.equals(event.getState())) {

// 连接建立成功



} else if (Event.EventType.NodeCreated.equals(event.getType())) {


} else if (Event.EventType.NodeDeleted.equals(event.getType())) {


} else if (Event.EventType.NodeDataChanged.equals(event.getType())) {


} else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {





* 处理事件: NodeCreated


* @param event


protected void processNodeCreated(WatchedEvent event) {}


* 处理事件: NodeDeleted


* @param event


protected void processNodeDeleted(WatchedEvent event) {}


* 处理事件: NodeDataChanged


* @param event


protected void processNodeDataChanged(WatchedEvent event) {}


* 处理事件: NodeChildrenChanged


* @param event


protected void processNodeChildrenChanged(WatchedEvent event) {}


5.2   ZooKeeperQueue.java

package tech.codestory.zookeeper.queue;

import java.io.IOException;

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.List;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.data.ACL;

import org.apache.zookeeper.data.Stat;

import lombok.extern.slf4j.Slf4j;

import tech.codestory.zookeeper.ZooKeeperBase;


* ZooKeeper实现Queue


* @author code story

* @date 2019/8/16



public class ZooKeeperQueue extends ZooKeeperBase {

/** 队列名称 */

private String queueName;

/** 队列的同步信号 */

private static Integer queueMutex = Integer.valueOf(1);


* 构造函数


* @param address

* @param queueName

* @throws IOException


public ZooKeeperQueue(String address, String queueName) throws IOException {


this.queueName = createRootNode(queueName);



protected void processNodeChildrenChanged(WatchedEvent event) {

synchronized (queueMutex) {





* 将对象添加到队列中


* @param i

* @return


boolean produce(int i) throws KeeperException, InterruptedException {

ByteBuffer b = ByteBuffer.allocate(4);

byte[] value;

// Add child with value i


value = b.array();

String elementName = queueName + "/element";

ArrayList<ACL> ids = ZooDefs.Ids.OPEN_ACL_UNSAFE;

CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL;

getZooKeeper().create(elementName, value, ids, createMode);

return true;



* 从队列中删除第一个对象


* @return

* @throws KeeperException

* @throws InterruptedException


int consume() throws KeeperException, InterruptedException {

while (true) {

synchronized (queueMutex) {

List<String> list = getZooKeeper().getChildren(queueName, true);

if (list.size() == 0) {


} else {

// 获取第一个子节点的名称

String firstNodeName = getFirstElementName(list);

// 删除节点,并返回节点的值

return deleteNodeAndReturnValue(firstNodeName);






* 获取第一个子节点的名称


* @param list

* @return


private String getFirstElementName(List<String> list) {

Integer min = Integer.MAX_VALUE;

String minNode = null;

for (String s : list) {

Integer tempValue = Integer.valueOf(s.substring(7));

if (tempValue < min) {

min = tempValue;

minNode = s;



return minNode;



* 删除节点,并返回节点的值


* @param minNode

* @return

* @throws KeeperException

* @throws InterruptedException


private int deleteNodeAndReturnValue(String minNode)

throws KeeperException, InterruptedException {

String fullNodeName = queueName + "/" + minNode;

Stat stat = new Stat();

byte[] b = getZooKeeper().getData(fullNodeName, false, stat);

getZooKeeper().delete(fullNodeName, stat.getVersion());

ByteBuffer buffer = ByteBuffer.wrap(b);

return buffer.getInt();



5.3   ZooKeeperQueueTest.java

package tech.codestory.zookeeper.queue;

import java.io.IOException;

import java.security.SecureRandom;

import java.util.Random;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.KeeperException;

import org.testng.annotations.Test;

import lombok.extern.slf4j.Slf4j;


* ZooKeeperQueue测试


* @author code story

* @date 2019/8/16



public class ZooKeeperQueueTest {

final String address = "";

final String queueName = "/queue";

final Random random = new SecureRandom();

// 随机生成10-20之间的个数

final int count = 10 + random.nextInt(10);

/** 等待生产者和消费者线程都结束 */

private CountDownLatch connectedSemaphore = new CountDownLatch(2);


public void testQueue() {

log.info("开始ZooKeeper队列测试,本次将测试 {} 个数据", count);

new QueueProducer().start();

new QueueConsumer().start();

try {


} catch (InterruptedException e) {

log.error("InterruptedException", e);




* 队列的生产者


class QueueProducer extends Thread {


public void run() {

try {

ZooKeeperQueue queue = new ZooKeeperQueue(address, queueName);

for (int i = 0; i < count; i++) {

int elementValue = i + 1;

long waitTime = random.nextInt(50) * 100;

log.info("生产对象 : {} , 然后等待 {} 毫秒", elementValue, waitTime);




} catch (IOException e) {

log.error("IOException", e);

} catch (InterruptedException e) {

log.error("InterruptedException", e);

} catch (KeeperException e) {

log.error("KeeperException", e);






* 队列的消费者


class QueueConsumer extends Thread {


public void run() {

try {

ZooKeeperQueue queue = new ZooKeeperQueue(address, queueName);

for (int i = 0; i < count; i++) {

try {

int elementValue = queue.consume();

long waitTime = random.nextInt(50) * 100;

log.info("消费对象: {} , 然后等待 {} 毫秒", elementValue, waitTime);


} catch (KeeperException e) {


log.error("KeeperException", e);

} catch (InterruptedException e) {

log.error("InterruptedException", e);




} catch (IOException e) {

log.error("IOException", e);






