LinkedBlockingQueue 链表队列,其元素构成为:

static class Node<E> {
E item; Node<E> next; Node(E x) { item = x; }
}

该队列有两种锁及判断队列不为空和队列未满的条件对象

    /** 用于消费队列的锁,如操作:take,poll 等等 */
private final ReentrantLock takeLock = new ReentrantLock(); /** 用于消费时判断不为空的条件对象 */
private final Condition notEmpty = takeLock.newCondition(); /** 添加元素的锁,如操作:put,offer等等 */
private final ReentrantLock putLock = new ReentrantLock(); /** 用于添加元素时判断队列未满的条件对象 */
private final Condition notFull = putLock.newCondition();

队列操作

添加元素

有三种方式:

1.offer

当队列已满时,添加失败,返回false,源码如下:

    public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//判断队列是否已满,若队列已满,则返回false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
//获取放入元素的锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//队列未满,这里是double检查,是获取锁后又检查了一次队列是否已满
if (count.get() < capacity) {
//添加元素到队尾
enqueue(node);
//获取添加元素前的队列大小,并将队列大小+1
c = count.getAndIncrement();
//判断队列未满
if (c + 1 < capacity)
//则通知下一个线程继续添加元素
notFull.signal();
}
} finally {
putLock.unlock();
}
//队列中有元素,则通知消费线程可以消费
if (c == 0)
signalNotEmpty();
return c >= 0;
}

2.put

添加元素到队尾,若队列已满,则添加操作进入到阻塞状态,直到队列中有元素有出队。

    public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//这里是double检查,是获取锁后又检查了一次队列是否已满,如果队列已满,则线程进入阻塞,直到队列中有元素出队
while (count.get() == capacity) {
notFull.await();
}
//添加元素到队尾
enqueue(node);
c = count.getAndIncrement();
//判断队列未满
if (c + 1 < capacity)
//通知其它线程添加元素
notFull.signal();
} finally {
putLock.unlock();
}
//判断队列已有元素,则通知阻塞的消费线程进行消费
if (c == 0)
signalNotEmpty();
}

put指定超时的操作

指定put的等待超时时间,等待超时后,则返回操作false

    //指定超时时间
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException { if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
//当超时时间过了后,则不再继续待,返回操作false
if (nanos <= 0)
return false;
//自旋锁,计算超时时间
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}

3.add

实际上执行的是offer操作,判断offer操作是否成功,若失败,则抛出队列已满的异常信息,其实现在父类AbstractQueue中

  public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

出队操作

1.take

与put相反,若队列为空,则阻塞等待,直到队列有元素入队

 public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//判断队列为空,则进入阻塞
while (count.get() == 0) {
notEmpty.await();
}
//直到队列有元素,返回队头元素,下一个元素设置为队头。
x = dequeue();
c = count.getAndDecrement();
//判断队列中有元素,通知下一个线程进行消费
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//判断队列未满,通知入队的线程进行入队操作
if (c == capacity)
signalNotFull();
return x;
}

take指定超时时间

当指定了take操作的超时时间后,take等待超时时,若队列还未有元素,则返回null

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//当队列为空时,进入到阻塞
while (count.get() == 0) {
//超时时间已过,则返回null,不再阻塞等待
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

2.poll

若队列为空,则直接返回null,否则返回队头元素

    public E poll() {
final AtomicInteger count = this.count;
//队列为空,则返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
//获取消费锁
takeLock.lock();
try {
//double检查,再次判断队列不为空,则返回队头元素
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
//队列还有元素,则通知其它消费线程进行消费操作
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//判断出队后,队列未满,则通知入队线程进行入队操作
if (c == capacity)
signalNotFull();
return x;
}

3.remove

poll操作,若返回null,则返回空队列异常,其操作在父类AbstractQueue中

    public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

4.peek

返回队列第一个元素,元素并不出队。

    public E peek() {
//队列为空,则返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//只是返回队头元素的引用,元素不出队
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}

其它操作

1.remove(object)

移除指定元素

    public boolean remove(Object o) {
if (o == null) return false;
//入队锁和消费锁,均上锁
fullyLock();
try {
//从队头开始,遍历队列
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//找到指定的元素
if (o.equals(p.item)) {
//移除该元素,将该元素的前一个和后一个关连起来
unlink(p, trail);
//若队列中有指定元素,返回true
return true;
}
}
//未找到指定元素,返回false
return false;
} finally {
fullyUnlock();
}
}

2.contains(object)

判断队列是否存在指定元素

    public boolean contains(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}

3.toArray() ,toArray(T[] a)

将队列转换成数组

    public Object[] toArray() {
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
fullyUnlock();
}
}
    public <T> T[] toArray(T[] a) {
fullyLock();
try {
int size = count.get();
if (a.length < size)
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size); int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = (T)p.item;
if (a.length > k)
a[k] = null;
return a;
} finally {
fullyUnlock();
}
}

4.clear()

清空队列

    public void clear() {
fullyLock();
try {
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
fullyUnlock();
}
}

入队实现

    private void enqueue(Node<E> node) {
//将原队尾元素指向新元素node,新元素的node的next为null
last = last.next = node;
}

出队实现

    private E dequeue() {
//取队头元素
Node<E> h = head;
//取出队头元素的下一个元素
Node<E> first = h.next;
//将原队头元素的下一个元素设置指向为原队头,即本身node,即该node只有自已引用自己
h.next = h; // help GC
//设置新队头元素
head = first;
//取出节点数据
E x = first.item;
//将节点的数据设置为null,则该node除了自己引用自己外,无其它引用,可以被垃圾回收
first.item = null;
return x;
}

最新文章

  1. docker学习(6) docker中搭建java服务及nginx反向代理
  2. 先贴上代码:Random快排,快排的非递归实现
  3. 3110 PHP常见问题
  4. Centos挂载windows共享文件夹
  5. SQL查询(笔记2——实体查询)
  6. bjfu1099 度度熊大战僵尸
  7. ubuntu系统使用快捷键打开终端方式总结
  8. shell中的type命令
  9. maven项目建立pom.xml报无法解析org.apache.maven.plugins:maven-resources-plugin:2.4.3
  10. Leetcode 073 Set Matrix Zeroes
  11. React源码学习——ReactClass
  12. Airflow Comman Line 测试
  13. python vs C++ 类
  14. [Hive安装问题]
  15. python使用PDB进行调试
  16. 小程序 切换到tabBar页面不刷新问题
  17. .NetCore实践篇:分布式监控系统zipkin踩坑之路(二)
  18. 05_ssm基础(四)之Spring与持久层的整合
  19. 开源|如何使用CNN将视频从2D到3D进行自动转换(附源代码)
  20. leetcode-849-到最近的人的最大距离

热门文章

  1. C# 编程--数组
  2. Java 编程技巧之数据结构
  3. shell 中cut
  4. pycharm windows 远程修改服务器代码
  5. rabbitMQ初始化配置
  6. 「NOI2016」网格 解题报告
  7. Kali Linux下运行nfc工具测试!
  8. composer 手动安装及简单使用 windows
  9. TTL 与 CMOS
  10. Java + selenium 启动谷歌浏览器