Queue API的几种方法的使用

方法名称 作用 描述
add 添加元素到队列 如果队列满了就抛异常java.lang.IllegalStateException
remove 移除并且返回队列头部元素 如果队列为null,就抛异常java.util.NoSuchElementException
element 返回队列头部元素,不会移除元素 如果队列为null,就抛异常java.util.NoSuchElementException
offer 添加元素到队列 如果队列满了就返回false,不会阻塞
poll 移除并且返回队列头部元素 如果队列为null,就返回null,不会阻塞
peek 返回队列头部元素,不会移除元素 如果队列为null,就返回null,不会阻塞
put 添加元素到队列 如果队列满了就阻塞
take 移除并且返回队列头部元素 如果队列为null,就阻塞

ArrayBlockingQueue原理及源码解析

根据名字,可以知道,ArrayBlockingQueue底层是数组实现的,而且是阻塞的队列,下面看下put元素和take元素时的图解:



上面的图基本上就是ArrayBlockingQueue队列使用时的底层实现原理了,下面根据源码来看一下。

ArrayBlockingQueue的成员变量

    /** 存放元素 */
final Object[] items; /** 记录下一次从什么位置开始取元素或者移除元素 */
int takeIndex; /** 记录下一次从什么位置开始放元素 */
int putIndex; /** 队列中元素的数量 */
int count; /** 可重入锁,用来放元素和取元素时加锁 */
final ReentrantLock lock; /** 取元素的等待集合 */
private final Condition notEmpty; /** 存放元素的等待集合 */
private final Condition notFull;

ArrayBlockingQueue的offer和put方法

offer方法源码:

    /**
* 存放一个元素到队列
* 如果队列未满,就放入队列的尾部
* 如果队列已满,就返回false
*
* @throws NullPointerException 如果存放的元素是null,抛异常
*/
public boolean offer(E e) {
//校验放入的元素是否为空,每次放入元素到队列,都会校验
checkNotNull(e);
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果队列中的元素已经满了,就返回false
if (count == items.length)
return false;
else {
//未满就调用方法,放入元素到队列
enqueue(e);
return true;
}
} finally {
//释放锁
lock.unlock();
}
}

put方法源码:

    /**
* 存放一个元素到队列
* 如果队列未满,就放入队列的尾部
* 如果队列已满,就阻塞等待
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
//校验放入的元素是否为空,每次放入元素到队列,都会校验
checkNotNull(e);
//加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列中的元素已经满了,就阻塞,挂起当前线程
//这里使用while而不使用if,是为了防止伪唤醒
while (count == items.length)
notFull.await();
//未满就调用方法,放入元素到队列
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}

参数校验:

private static void checkNotNull(Object v) {
//如果传入的元素是null,就抛异常
if (v == null)
throw new NullPointerException();
}

共同调用的enqueue方法:

    /**
* 将指定的元素放入队列的尾部
* 只有持有锁才可以调用
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
//获取队列中的元素
final Object[] items = this.items;
//putIndex就是要放入的元素在队列中的的索引
items[putIndex] = x;
//如果放入元素之后,队列满了,就把putIndex置为0
//意思是下一次向队列中放元素,就是放入队列的第一个位置了
//putIndex的作用就是记录下一次元素应该放到哪里
if (++putIndex == items.length)
putIndex = 0;
//元素的个数加一
count++;
//唤醒拿元素没有拿到挂起的线程,告诉它:
//元素已经放入了队列,可以取元素了
notEmpty.signal();
}

ArrayBlockingQueue的poll和take方法

poll方法源码:

    /**
* 从队列中拿元素
* 如果队列中没有元素了,就返回null
* 如果队列中有元素,就返回
*/
public E poll() {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果队列中没有元素,就返回Null
//否则就调用方法取元素然后返回
return (count == 0) ? null : dequeue();
} finally {
//释放锁
lock.unlock();
}
}

take方法源码:

    /**
* 从队列中拿元素
* 如果队列中没有元素了,就阻塞
* 如果队列中有元素,就返回
*/
public E take() throws InterruptedException {
//加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列中没有元素,就让线程阻塞
//使用while而不使用if,是为了防止伪唤醒
while (count == 0)
//挂起线程
notEmpty.await();
//如果队列中有元素存在,就取出返回
return dequeue();
} finally {
//释放锁
lock.unlock();
}
}

dequeue方法源码:

    /**
* 从队列中取元素
* 只有持有锁才可以调用
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
//获取队列中的元素
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取要取出的是哪一个元素
E x = (E) items[takeIndex];
//取出后设置为Null
items[takeIndex] = null;
//要是取出的是队列中的最后一个元素
//就把takeIndex置为0,意思是下一次取元素从队列第一个开始取
if (++takeIndex == items.length)
takeIndex = 0;
//队列中元素的个数减一
count--;
if (itrs != null)
itrs.elementDequeued();
//唤醒因为存放元素时,队列满了,挂起的线程
//告诉它,可以存放元素了
notFull.signal();
//返回取出的元素
return x;
}

ArrayBlockingQueue的peek方法

/**
* 返回队列头部的元素
* 如果队列中没有元素了,就返回null
* 如果队列中有元素,就返回
*/
public E peek() {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//有元素就返回,没有就返回null
return itemAt(takeIndex); // null when queue is empty
} finally {
//释放锁
lock.unlock();
}
}

注意:实例化ArrayBlockingQueue时必须指定队列的容量大小,否则会编译错误

ArrayBlockingQueue<String> queue =
new ArrayBlockingQueue<String>(3);

LinkedBlockingDeque原理及源码解析

根据名字,可以知道LinkedBlockingDeque,底层是使用链表的方式存储元素的,而且是阻塞队列,初始化一个LinkedBlockingDeque可以不用指定队列的容量,即可以指定一个无界的队列。

LinkedBlockingDeque的成员变量

  private static final long serialVersionUID = -387911632671998426L;

    /** 链表节点类 */
static final class Node<E> {
/**
* 链表中的元素
*/
E item; /**
* 上一个节点
*/
Node<E> prev; /**
* 下一个节点
*/
Node<E> next; //构造函数
Node(E x) {
item = x;
}
} /**
* 指向第一个节点的指针
*/
transient Node<E> first; /**
* 指向最后一个节点的指针
*/
transient Node<E> last; /** 队列中元素的个数 */
private transient int count; /** 队列的容量 */
private final int capacity; /** 可重入锁 */
final ReentrantLock lock = new ReentrantLock(); /** 取元素的等待集合 */
private final Condition notEmpty = lock.newCondition(); /** 存放元素的等待集合 */
private final Condition notFull = lock.newCondition();

LinkedBlockingDeque的构造函数

    /**
* 无参构造函数
* 可以创建一个无界的队列,队列容量是int的最大值
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
} /**
* 创建一个指定边界的队列
* 队列的大小由编码时指定
* @param capacity 指定的队列容量值
* @throws IllegalArgumentException if {@code capacity} is less than 1
*/
public LinkedBlockingDeque(int capacity) {
//如果传入的指定队列容量值小于0,就抛异常
if (capacity <= 0) throw new IllegalArgumentException();
//指定的队列容量大小
this.capacity = capacity;
} /**
* 创建一个包含指定集合元素的队列
*
* @param c 要包含这个元素的集合
* @throws NullPointerException 如果指定的集合或者其中的元素是null,抛异常
*/
public LinkedBlockingDeque(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
//加锁
final ReentrantLock lock = this.lock;
lock.lock(); // Never contended, but necessary for visibility
try {
//遍历指定的集合,然后放入队列
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (!linkLast(LinkedBlockingDeque.Node<E>(e)))
throw new IllegalStateException("Deque full");
}
} finally {
//释放锁
lock.unlock();
}
}

LinkedBlockingDeque的offer和put方法

offer方法源码:

    /**
* 添加一个元素到队列
* 队列未满,就直接添加进去
* 队列已满,就返回false
* @throws NullPointerException 添加元素为null。抛异常
*/
public boolean offer(E e) {
return offerLast(e);
}
    /**
* @throws NullPointerException {@inheritDoc}
*/
public boolean offerLast(E e) {
//如果添加的元素是null,就抛异常
if (e == null) throw new NullPointerException();
//初始化一个链表,把元素放入链表
Node<E> node = new Node<E>(e);
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkLast(node);
} finally {
//释放锁
lock.unlock();
}
}
    /**
* 把元素添加到链表,如果队列元素满了,就返回false
*/
private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
//如果添加进去元素,队列的长度大于队列的容量,就返回false
if (count >= capacity)
return false;
//获取链表的最后一个节点
Node<E> l = last;
//把链表的最后一个节点做为上一个节点
node.prev = l;
//把当前要添加的元素放入链表
last = node;
//如果链表的第一个节点是null,就把元素放入链表的第一个位置
if (first == null)
first = node;
else
//否则就把元素放入链表最后一个节点的下一个节点中
l.next = node;
//队列中元素的个数加一
++count;
//唤醒拿元素时阻塞的线程
notEmpty.signal();
//添加成功,返回true
return true;
}

put方法源码:

    /**
* 添加一个元素到队列
* 队列未满,就直接添加进去
* 队列已满,就阻塞
* @throws NullPointerException {@inheritDoc}
* @throws InterruptedException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
putLast(e);
}
   /**
* @throws NullPointerException {@inheritDoc}
* @throws InterruptedException {@inheritDoc}
*/
public void putLast(E e) throws InterruptedException {
//如果添加的元素是null,就返回NullPointerException
if (e == null) throw new NullPointerException();
//初始化一个链表,把元素放入链表
Node<E> node = new Node<E>(e);
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//当元素没有添加成功,就挂起
//linkLast方法在上面offer中已经写了
while (!linkLast(node))
notFull.await();
} finally {
//释放锁
lock.unlock();
}
}

LinkedBlockingDeque的poll和take方法

poll方法源码:

    /**
* 从队列中取元素
* 队列有元素存在,取出元素
* 队列为空,返回null
*/
public E poll() {
return pollFirst();
}
public E pollFirst() {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//队列有元素就返回,否则就返回null
return unlinkFirst();
} finally {
//释放锁
lock.unlock();
}
}

take方法源码:

    /**
* 从队列中取元素
* 队列有元素存在,取出元素
* 队列为空,就阻塞
*/
public E take() throws InterruptedException {
return takeFirst();
}
 public E takeFirst() throws InterruptedException {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
//当队列为空时,就阻塞
//while防止伪唤醒
while ( (x = unlinkFirst()) == null)
notEmpty.await();
//队列有元素存在,返回取出的元素
return x;
} finally {
//释放锁
lock.unlock();
}
}

unlinkFirst方法源码:

    /**
* Removes and returns first element, or null if empty.
* 删除并返回第一个元素,如果为空则返回null
*/
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
//获取队列中第一个元素
//及链表的第一个节点
Node<E> f = first;
//第一个元素为null,就返回null
if (f == null)
return null;
//获取第一个节点的下一个节点
Node<E> n = f.next;
//获取第一个节点的元素值
E item = f.item;
//把值设置为null
f.item = null;
f.next = f; // help GC
//把队列的第一个元素设置为:
//要移除元素的下一个节点
first = n;
//如果是null,就把最后一个节点设置为null
if (n == null)
last = null;
else
//否则就把上一个节点设置为Null
n.prev = null;
//队列的元素个数减一
--count;
//唤醒存放元素时,挂起的线程
notFull.signal();
//返回取出的元素
return item;
}

LinkedBlockingDeque的peek方法

    /**
* 返回队列头部的元素
* 队列有元素存在,返回元素
* 队列为空,返回null
*/
public E peek() {
return peekFirst();
}
public E peekFirst() {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果队列头部元素为空就返回null
//否则就返回头部元素
return (first == null) ? null : first.item;
} finally {
//释放锁
lock.unlock();
}
}

ConcurrentLinkedDeque

根据英文意思,可以知道,ConcurrentLinkedDeque是一个非阻塞的队列,底层是链表实现,具体源码请查看其他文章。

SynchronousQueue的简单使用

SynchronousQueue是一个容量为0的队列,队列内部不存储元素;当put一个元素时,如果没有take方法去拿元素,就会一直阻塞,直到有take方法去拿元素才会结束;同样的,take元素时,如果没有put元素,那么就会一直阻塞,直到有put元素,才会结束,下面看下示例:

 public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>(); Thread th = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("start work...");
//向队列放入元素
queue.offer("hello");
System.out.println("end work...");
}
});
th.start();
//打印取出的元素
System.out.println(queue.poll());
//打印结果为null
try {
//打印结果为hello
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}

从上面的例子可以看出,在SynchronousQueue队列中,offer进去的元素可能会丢失。

SynchronousQueue<String> queue = new SynchronousQueue<>();

Thread th = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("start work...");
//向队列放入元素
try {
//会阻塞
queue.put("hello");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end work...");
}
});
th.start(); try {
//打印结果为hello
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}

put元素时,如果没有take方法去拿元素就会阻塞;如果这时使用poll方法去拿元素,取出的元素是null,而且不会结束阻塞。

take元素时,如果没有put或者offer元素进队列,也会阻塞。

PriorityQueue的简单使用

PriorityQueue是一个优先级队列,会自动对元素进行排序,也可以自己指定排序规则。

public static void main(String[] args) {
PriorityQueue<String> queue = new PriorityQueue<String>();
//入队列
queue.offer("36");
queue.offer("21");
queue.offer("57");
queue.offer("78");
queue.offer("22");
//出队列
System.out.println(queue.poll());//21
System.out.println(queue.poll());//22
System.out.println(queue.poll());//36
System.out.println(queue.poll());//57
System.out.println(queue.poll());//78
}

PriorityQueue还可以自定义排序规则,通过实现compare方法即可:

public static void main(String[] args) {
PriorityQueue<Student> queue = new PriorityQueue<Student>(10, new Comparator<Student>() {
//自定义比较规则
@Override
public int compare(Student o1, Student o2) {
if (o1.age > o2.age) {
return 1;
} else {
return -1;
}
}
});
//入队列
queue.offer(new Student("小明", 15));
queue.offer(new Student("小红", 12));
queue.offer(new Student("小黑", 16));
//出队列
System.out.println( queue.poll().age);//12
System.out.println(queue.poll().age);//15
System.out.println(queue.poll().age);//16
} static class Student {
public String name;
public int age; public Student(String name, int age) {
this.name = name;
this.age = age;
}
}

结束语

本文只讲了两个队列的源码,可能存在不足或者不够深入的地方,还希望有朋友可以多多指正,其他几个队列的源码,后续有时间的话再做解析,感谢阅读!

最新文章

  1. Masonry tableviewCell布局(转)
  2. 北邮oj 题
  3. Linux压缩指令
  4. JSONP使用笔记
  5. phpstorm用正则删除PHP代码空行小技巧
  6. Puppet Agent/Master HTTPS Communications
  7. linux下shell脚本学习
  8. 未能加载文件或程序集&ldquo;App_global.asax&rdquo;或它的某一个依赖项
  9. svn小技巧——重定向svn diff
  10. Delphi的指针(转)
  11. 面向切面编程(Aop)
  12. 在 ServiceModel 客户端配置部分中,找不到引用协定“XX”的默认终结点元素的解决方法
  13. 【PyQt5-Qt Designer】文本框读写操作
  14. How to generate a new dictionary file of mmseg
  15. DLL中不能调用CoInitialize和CoInitializeEx
  16. 给Java新手的一些建议&mdash;&mdash;Java知识点归纳(Java基础部分)
  17. /.nav-tabs :是普通标签页 .nav-pills:胶囊式标签页 action ;默认的激活项,给&lt;li&gt;加默认显示的是哪个标签页内容 .nav是标签页的一个基类,给ul加 .nav-stacked: 垂直排列BootStrap
  18. Swift-assert使用时机
  19. WPF之数据绑定
  20. java代码关于匿名内部类和接口的方法使用

热门文章

  1. node初体验(二)
  2. kubernetes和docker----2.学习Pod资源
  3. WPF 之绘画(十一)
  4. Java基本概念:封装
  5. 后端程序员之路 38、Scala入门
  6. 《C++ Primer》笔记 第11章 关联容器
  7. Hi3559板载u-boot、kernel及rootfs烧录过程及心得
  8. HDOJ-4027(线段树+区间更新(每个节点更新的值不同))
  9. Vulnhub dc-4靶机通关
  10. Flink实时计算topN热榜