LinkedBlockingQueue

LinkedBlockingQueue 是基于链表实现的,可以选择有界或无界的阻塞队列。
队列的元素按照 FIFO 的顺序访问,新增元素添加到队列尾部,移除元素从队列头部开始。
队列头部通过 takeLock 进行并发控制,队列尾部通过 putLock 进行并发控制,
该队列最多可以有两个线程同时操作,其吞吐量要高于 ArrayBlockQueue。

创建实例

    /**
* 单向链表节点
*/
static class Node<E> {
/**
* 存储的元素
*/
E item; /**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next; Node(E x) { item = x; }
} /** 阻塞队列的容量,如果不指定,则为 Integer.MAX_VALUE */
private final int capacity; /** 当前的元素总数 */
private final AtomicInteger count = new AtomicInteger(); /**
* 链表头结点
* Invariant: head.item == null
*/
transient Node<E> head; /**
* 链表尾节点
* Invariant: last.next == null
*/
private transient Node<E> last; /** 执行读取操作时必须持有的锁 */
private final ReentrantLock takeLock = new ReentrantLock(); /** 队列为空时执行的 take 操作,当前线程将在此条件阻塞等待 */
private final Condition notEmpty = takeLock.newCondition(); /** 执行写入操作时必须持有的锁 */
private final ReentrantLock putLock = new ReentrantLock(); /** 队列已满时执行的 put 操作,当前线程将在此条件阻塞等待 */
private final Condition notFull = putLock.newCondition(); /**
* 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue 实例
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
} /**
* 创建一个容量为 capacity 的 LinkedBlockingQueue 实例
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException();
}
this.capacity = capacity;
last = head = new Node<>(null);
}

写入元素

  • 在队列尾部插入元素,如果队列已满,则阻塞等待有可用空间之后,再尝试插入
    /**
* 在队列尾部插入元素,如果队列已满,则阻塞等待有可用空间之后,再尝试插入。
*/
@Override
public void put(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
// 创建新节点
final Node<E> node = new Node<>(e);
// 读取写锁
final ReentrantLock putLock = this.putLock;
// 读取元素计数值
final AtomicInteger count = this.count;
// 可中断地锁定
putLock.lockInterruptibly();
try {
// 如果当前队列已满
while (count.get() == capacity) {
// 则在非满条件上阻塞等待,线程被唤醒后进行重试
notFull.await();
}
// 加入队列尾部
enqueue(node);
// 递增元素总数
c = count.getAndIncrement();
// 如果添加元素之后还有可用空间
if (c + 1 < capacity) {
// 唤醒在非满条件上阻塞等待的线程
notFull.signal();
}
} finally {
// 释放写锁
putLock.unlock();
}
// 如果是添加的第一个元素
if (c == 0) {
// 唤醒在非空条件上阻塞等待的线程来读取元素
signalNotEmpty();
}
} /**
* 将节点插入队列尾部
*/
private void enqueue(Node<E> node) {
last = last.next = node;
} /**
* Signals a waiting take. Called only from put/offer
* 唤醒一个等待读取元素的线程
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
  • 如果队列有可用空间,则将元素添加到队列尾部,并返回 true;如果队列已满,则立即返回 false,元素被丢弃。
    /**
* 如果队列有可用空间,则将元素添加到队列尾部,并返回 true;
* 如果队列已满,则立即返回 false,元素被丢弃。
*/
@Override
public boolean offer(E e) {
if (e == null) {
throw new NullPointerException();
}
final AtomicInteger count = this.count;
// 队列已满
if (count.get() == capacity) {
// 返回 false
return false;
}
int c = -1;
// 新建节点
final Node<E> node = new Node<>(e);
// 读取写锁
final ReentrantLock putLock = this.putLock;
// 获取锁
putLock.lock();
try {
// 进行二次判断
if (count.get() < capacity) {
// 将节点加入到队列尾部
enqueue(node);
// 递增元素总个数
c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}
}
} finally {
putLock.unlock();
}
if (c == 0) {
signalNotEmpty();
}
return c >= 0;
}
  • 在指定的超时时间内,尝试将元素插入到队列尾部,插入成功返回 true
    /**
* 在指定的超时时间内,尝试将元素插入到队列尾部,插入成功返回 true
*/
@Override
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
// 转换为纳秒
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 队列已满
while (count.get() == capacity) {
// 已经超时则直接返回 false
if (nanos <= 0L) {
return false;
}
// 最多阻塞等待指定的超时时间后再次尝试添加元素
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<>(e));
c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0) {
signalNotEmpty();
}
return true;
}

读取元素

  • 如果队列为空,则阻塞等待有可用元素,否则移除并获取队列头部元素
    /**
* 如果队列为空,则阻塞等待有可用元素,否则移除并获取队列头部元素
* created by ZXD at 6 Dec 2018 T 20:20:15
* @return
* @throws InterruptedException
*/
@Override
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列为空
while (count.get() == 0) {
// 在非空条件上阻塞等待,线程被唤醒后再次尝试读取
notEmpty.await();
}
// 移除队列头部元素
x = dequeue();
// 递减总数
c = count.getAndDecrement();
// 如果还有元素可用
if (c > 1) {
// 唤醒在非空条件上阻塞等待的线程
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 如果有可用空间
if (c == capacity) {
// 唤醒在非满条件上阻塞等待的线程来插入元素
signalNotFull();
}
return x;
} /**
* 移除并返回头部节点元素
*/
private E dequeue() {
// 读取头节点
final Node<E> h = head;
// 读取后继节点
final Node<E> first = h.next;
// 清除旧头结点
h.next = h; // help GC
// 写入新头节点
head = first;
// 读取元素值
final E x = first.item;
// 清除头结点的元素值,它只作为一个标记节点
first.item = null;
// 返回元素值
return x;
}
  • 如果队列为空,则直接返回 null,否则尝试移除并返回头部元素
    /**
* 如果队列为空,则直接返回 null,否则尝试移除并返回头部元素
* created by ZXD at 6 Dec 2018 T 20:23:51
* @return
*/
@Override
public E poll() {
final AtomicInteger count = this.count;
// 队列为空时直接返回 null
if (count.get() == 0) {
return null;
}
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 有元素可用
if (count.get() > 0) {
// 移除并返回头部元素
x = dequeue();
c = count.getAndDecrement();
if (c > 1) {
notEmpty.signal();
}
}
} finally {
takeLock.unlock();
}
if (c == capacity) {
signalNotFull();
}
return x;
}
  • 尝试在指定的超时时间内移除并返回头部元素,如果已经超时,则返回 null
    /**
* 尝试在指定的超时时间内移除并返回头部元素,如果已经超时,则返回 null
* created by ZXD at 6 Dec 2018 T 20:27:27
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列为空
while (count.get() == 0) {
// 已经超时则直接返回 null
if (nanos <= 0L) {
return null;
}
// 在非空条件上阻塞等待指定的纳秒数,被唤醒后再次进行读取
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1) {
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity) {
signalNotFull();
}
return x;
}

最新文章

  1. 设置tomcat远程debug
  2. docker入门-学习笔记
  3. SQL Server 2012 联机丛书安装
  4. Pyside开篇杂记
  5. [SDK2.2]Windows Azure Virtual Network (2) 创建简单的Virtual Network
  6. java继承的对象中构造函数的调用顺序
  7. storm安装
  8. 学习使用Vim(二)——User Manuals, Getting Started
  9. XenCenter注册码一年申请
  10. LDA主题模型学习笔记3.5:变分參数推导
  11. PHP静态延迟绑定简单示例
  12. Jquery 文字上下滚动效果示例代码
  13. Linux常用命令(第二版) --文件搜索命令
  14. Wiki leaks files backup
  15. 【PHP篇】字符串基础
  16. SQL 語法
  17. (BFS 二叉树) leetcode 515. Find Largest Value in Each Tree Row
  18. vimrc同步文档
  19. 剧透 &amp; 报名 | 蚂蚁金服ATEC城市峰会&#183;上海即将开幕
  20. hg和git命令对照表

热门文章

  1. 8. golang 基本类型转换
  2. 数据库与前端与Django目录
  3. activemq高可用
  4. JAVA中自定义properties文件介绍
  5. 02python程序设计基础——字符串
  6. diff 比较两个文件的不同
  7. hdu 4638 Group(离线+树状数组)
  8. react:如何创建一个新项目
  9. Ubuntu Text editor文本编辑器相关设置
  10. SpringBoot之集成数据库