Queue API的几种实现详解
Queue API的几种方法的使用
方法名称 | 作用 | 描述 |
---|---|---|
add | 添加元素到队列 | 如果队列满了就抛异常java.lang.IllegalStateException |
remove | 移除并且返回队列头部元素 | 如果队列为null,就抛异常java.util.NoSuchElementException |
element | 返回队列头部元素,不会移除元素 | 如果队列为null,就抛异常java.util.NoSuchElementException |
offer | 添加元素到队列 | 如果队列满了就返回false,不会阻塞 |
poll | 移除并且返回队列头部元素 | 如果队列为null,就返回null,不会阻塞 |
peek | 返回队列头部元素,不会移除元素 | 如果队列为null,就返回null,不会阻塞 |
put | 添加元素到队列 | 如果队列满了就阻塞 |
take | 移除并且返回队列头部元素 | 如果队列为null,就阻塞 |
ArrayBlockingQueue原理及源码解析
根据名字,可以知道,ArrayBlockingQueue底层是数组实现的,而且是阻塞的队列,下面看下put元素和take元素时的图解:
上面的图基本上就是ArrayBlockingQueue队列使用时的底层实现原理了,下面根据源码来看一下。
ArrayBlockingQueue的成员变量
/** 存放元素 */
final Object[] items;
/** 记录下一次从什么位置开始取元素或者移除元素 */
int takeIndex;
/** 记录下一次从什么位置开始放元素 */
int putIndex;
/** 队列中元素的数量 */
int count;
/** 可重入锁,用来放元素和取元素时加锁 */
final ReentrantLock lock;
/** 取元素的等待集合 */
private final Condition notEmpty;
/** 存放元素的等待集合 */
private final Condition notFull;
ArrayBlockingQueue的offer和put方法
offer方法源码:
/**
* 存放一个元素到队列
* 如果队列未满,就放入队列的尾部
* 如果队列已满,就返回false
*
* @throws NullPointerException 如果存放的元素是null,抛异常
*/
public boolean offer(E e) {
//校验放入的元素是否为空,每次放入元素到队列,都会校验
checkNotNull(e);
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果队列中的元素已经满了,就返回false
if (count == items.length)
return false;
else {
//未满就调用方法,放入元素到队列
enqueue(e);
return true;
}
} finally {
//释放锁
lock.unlock();
}
}
put方法源码:
/**
* 存放一个元素到队列
* 如果队列未满,就放入队列的尾部
* 如果队列已满,就阻塞等待
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
//校验放入的元素是否为空,每次放入元素到队列,都会校验
checkNotNull(e);
//加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列中的元素已经满了,就阻塞,挂起当前线程
//这里使用while而不使用if,是为了防止伪唤醒
while (count == items.length)
notFull.await();
//未满就调用方法,放入元素到队列
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}
参数校验:
private static void checkNotNull(Object v) {
//如果传入的元素是null,就抛异常
if (v == null)
throw new NullPointerException();
}
共同调用的enqueue方法:
/**
* 将指定的元素放入队列的尾部
* 只有持有锁才可以调用
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
//获取队列中的元素
final Object[] items = this.items;
//putIndex就是要放入的元素在队列中的的索引
items[putIndex] = x;
//如果放入元素之后,队列满了,就把putIndex置为0
//意思是下一次向队列中放元素,就是放入队列的第一个位置了
//putIndex的作用就是记录下一次元素应该放到哪里
if (++putIndex == items.length)
putIndex = 0;
//元素的个数加一
count++;
//唤醒拿元素没有拿到挂起的线程,告诉它:
//元素已经放入了队列,可以取元素了
notEmpty.signal();
}
ArrayBlockingQueue的poll和take方法
poll方法源码:
/**
* 从队列中拿元素
* 如果队列中没有元素了,就返回null
* 如果队列中有元素,就返回
*/
public E poll() {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果队列中没有元素,就返回Null
//否则就调用方法取元素然后返回
return (count == 0) ? null : dequeue();
} finally {
//释放锁
lock.unlock();
}
}
take方法源码:
/**
* 从队列中拿元素
* 如果队列中没有元素了,就阻塞
* 如果队列中有元素,就返回
*/
public E take() throws InterruptedException {
//加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列中没有元素,就让线程阻塞
//使用while而不使用if,是为了防止伪唤醒
while (count == 0)
//挂起线程
notEmpty.await();
//如果队列中有元素存在,就取出返回
return dequeue();
} finally {
//释放锁
lock.unlock();
}
}
dequeue方法源码:
/**
* 从队列中取元素
* 只有持有锁才可以调用
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
//获取队列中的元素
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取要取出的是哪一个元素
E x = (E) items[takeIndex];
//取出后设置为Null
items[takeIndex] = null;
//要是取出的是队列中的最后一个元素
//就把takeIndex置为0,意思是下一次取元素从队列第一个开始取
if (++takeIndex == items.length)
takeIndex = 0;
//队列中元素的个数减一
count--;
if (itrs != null)
itrs.elementDequeued();
//唤醒因为存放元素时,队列满了,挂起的线程
//告诉它,可以存放元素了
notFull.signal();
//返回取出的元素
return x;
}
ArrayBlockingQueue的peek方法
/**
* 返回队列头部的元素
* 如果队列中没有元素了,就返回null
* 如果队列中有元素,就返回
*/
public E peek() {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//有元素就返回,没有就返回null
return itemAt(takeIndex); // null when queue is empty
} finally {
//释放锁
lock.unlock();
}
}
注意:实例化ArrayBlockingQueue时必须指定队列的容量大小,否则会编译错误
ArrayBlockingQueue<String> queue =
new ArrayBlockingQueue<String>(3);
LinkedBlockingDeque原理及源码解析
根据名字,可以知道LinkedBlockingDeque,底层是使用链表的方式存储元素的,而且是阻塞队列,初始化一个LinkedBlockingDeque可以不用指定队列的容量,即可以指定一个无界的队列。
LinkedBlockingDeque的成员变量
private static final long serialVersionUID = -387911632671998426L;
/** 链表节点类 */
static final class Node<E> {
/**
* 链表中的元素
*/
E item;
/**
* 上一个节点
*/
Node<E> prev;
/**
* 下一个节点
*/
Node<E> next;
//构造函数
Node(E x) {
item = x;
}
}
/**
* 指向第一个节点的指针
*/
transient Node<E> first;
/**
* 指向最后一个节点的指针
*/
transient Node<E> last;
/** 队列中元素的个数 */
private transient int count;
/** 队列的容量 */
private final int capacity;
/** 可重入锁 */
final ReentrantLock lock = new ReentrantLock();
/** 取元素的等待集合 */
private final Condition notEmpty = lock.newCondition();
/** 存放元素的等待集合 */
private final Condition notFull = lock.newCondition();
LinkedBlockingDeque的构造函数
/**
* 无参构造函数
* 可以创建一个无界的队列,队列容量是int的最大值
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
/**
* 创建一个指定边界的队列
* 队列的大小由编码时指定
* @param capacity 指定的队列容量值
* @throws IllegalArgumentException if {@code capacity} is less than 1
*/
public LinkedBlockingDeque(int capacity) {
//如果传入的指定队列容量值小于0,就抛异常
if (capacity <= 0) throw new IllegalArgumentException();
//指定的队列容量大小
this.capacity = capacity;
}
/**
* 创建一个包含指定集合元素的队列
*
* @param c 要包含这个元素的集合
* @throws NullPointerException 如果指定的集合或者其中的元素是null,抛异常
*/
public LinkedBlockingDeque(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
//加锁
final ReentrantLock lock = this.lock;
lock.lock(); // Never contended, but necessary for visibility
try {
//遍历指定的集合,然后放入队列
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (!linkLast(LinkedBlockingDeque.Node<E>(e)))
throw new IllegalStateException("Deque full");
}
} finally {
//释放锁
lock.unlock();
}
}
LinkedBlockingDeque的offer和put方法
offer方法源码:
/**
* 添加一个元素到队列
* 队列未满,就直接添加进去
* 队列已满,就返回false
* @throws NullPointerException 添加元素为null。抛异常
*/
public boolean offer(E e) {
return offerLast(e);
}
/**
* @throws NullPointerException {@inheritDoc}
*/
public boolean offerLast(E e) {
//如果添加的元素是null,就抛异常
if (e == null) throw new NullPointerException();
//初始化一个链表,把元素放入链表
Node<E> node = new Node<E>(e);
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkLast(node);
} finally {
//释放锁
lock.unlock();
}
}
/**
* 把元素添加到链表,如果队列元素满了,就返回false
*/
private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
//如果添加进去元素,队列的长度大于队列的容量,就返回false
if (count >= capacity)
return false;
//获取链表的最后一个节点
Node<E> l = last;
//把链表的最后一个节点做为上一个节点
node.prev = l;
//把当前要添加的元素放入链表
last = node;
//如果链表的第一个节点是null,就把元素放入链表的第一个位置
if (first == null)
first = node;
else
//否则就把元素放入链表最后一个节点的下一个节点中
l.next = node;
//队列中元素的个数加一
++count;
//唤醒拿元素时阻塞的线程
notEmpty.signal();
//添加成功,返回true
return true;
}
put方法源码:
/**
* 添加一个元素到队列
* 队列未满,就直接添加进去
* 队列已满,就阻塞
* @throws NullPointerException {@inheritDoc}
* @throws InterruptedException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
putLast(e);
}
/**
* @throws NullPointerException {@inheritDoc}
* @throws InterruptedException {@inheritDoc}
*/
public void putLast(E e) throws InterruptedException {
//如果添加的元素是null,就返回NullPointerException
if (e == null) throw new NullPointerException();
//初始化一个链表,把元素放入链表
Node<E> node = new Node<E>(e);
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//当元素没有添加成功,就挂起
//linkLast方法在上面offer中已经写了
while (!linkLast(node))
notFull.await();
} finally {
//释放锁
lock.unlock();
}
}
LinkedBlockingDeque的poll和take方法
poll方法源码:
/**
* 从队列中取元素
* 队列有元素存在,取出元素
* 队列为空,返回null
*/
public E poll() {
return pollFirst();
}
public E pollFirst() {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//队列有元素就返回,否则就返回null
return unlinkFirst();
} finally {
//释放锁
lock.unlock();
}
}
take方法源码:
/**
* 从队列中取元素
* 队列有元素存在,取出元素
* 队列为空,就阻塞
*/
public E take() throws InterruptedException {
return takeFirst();
}
public E takeFirst() throws InterruptedException {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
//当队列为空时,就阻塞
//while防止伪唤醒
while ( (x = unlinkFirst()) == null)
notEmpty.await();
//队列有元素存在,返回取出的元素
return x;
} finally {
//释放锁
lock.unlock();
}
}
unlinkFirst方法源码:
/**
* Removes and returns first element, or null if empty.
* 删除并返回第一个元素,如果为空则返回null
*/
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
//获取队列中第一个元素
//及链表的第一个节点
Node<E> f = first;
//第一个元素为null,就返回null
if (f == null)
return null;
//获取第一个节点的下一个节点
Node<E> n = f.next;
//获取第一个节点的元素值
E item = f.item;
//把值设置为null
f.item = null;
f.next = f; // help GC
//把队列的第一个元素设置为:
//要移除元素的下一个节点
first = n;
//如果是null,就把最后一个节点设置为null
if (n == null)
last = null;
else
//否则就把上一个节点设置为Null
n.prev = null;
//队列的元素个数减一
--count;
//唤醒存放元素时,挂起的线程
notFull.signal();
//返回取出的元素
return item;
}
LinkedBlockingDeque的peek方法
/**
* 返回队列头部的元素
* 队列有元素存在,返回元素
* 队列为空,返回null
*/
public E peek() {
return peekFirst();
}
public E peekFirst() {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果队列头部元素为空就返回null
//否则就返回头部元素
return (first == null) ? null : first.item;
} finally {
//释放锁
lock.unlock();
}
}
ConcurrentLinkedDeque
根据英文意思,可以知道,ConcurrentLinkedDeque是一个非阻塞的队列,底层是链表实现,具体源码请查看其他文章。
SynchronousQueue的简单使用
SynchronousQueue是一个容量为0的队列,队列内部不存储元素;当put一个元素时,如果没有take方法去拿元素,就会一直阻塞,直到有take方法去拿元素才会结束;同样的,take元素时,如果没有put元素,那么就会一直阻塞,直到有put元素,才会结束,下面看下示例:
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
Thread th = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("start work...");
//向队列放入元素
queue.offer("hello");
System.out.println("end work...");
}
});
th.start();
//打印取出的元素
System.out.println(queue.poll());
//打印结果为null
try {
//打印结果为hello
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
从上面的例子可以看出,在SynchronousQueue队列中,offer进去的元素可能会丢失。
SynchronousQueue<String> queue = new SynchronousQueue<>();
Thread th = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("start work...");
//向队列放入元素
try {
//会阻塞
queue.put("hello");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end work...");
}
});
th.start();
try {
//打印结果为hello
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
put元素时,如果没有take方法去拿元素就会阻塞;如果这时使用poll方法去拿元素,取出的元素是null,而且不会结束阻塞。
take元素时,如果没有put或者offer元素进队列,也会阻塞。
PriorityQueue的简单使用
PriorityQueue是一个优先级队列,会自动对元素进行排序,也可以自己指定排序规则。
public static void main(String[] args) {
PriorityQueue<String> queue = new PriorityQueue<String>();
//入队列
queue.offer("36");
queue.offer("21");
queue.offer("57");
queue.offer("78");
queue.offer("22");
//出队列
System.out.println(queue.poll());//21
System.out.println(queue.poll());//22
System.out.println(queue.poll());//36
System.out.println(queue.poll());//57
System.out.println(queue.poll());//78
}
PriorityQueue还可以自定义排序规则,通过实现compare方法即可:
public static void main(String[] args) {
PriorityQueue<Student> queue = new PriorityQueue<Student>(10, new Comparator<Student>() {
//自定义比较规则
@Override
public int compare(Student o1, Student o2) {
if (o1.age > o2.age) {
return 1;
} else {
return -1;
}
}
});
//入队列
queue.offer(new Student("小明", 15));
queue.offer(new Student("小红", 12));
queue.offer(new Student("小黑", 16));
//出队列
System.out.println( queue.poll().age);//12
System.out.println(queue.poll().age);//15
System.out.println(queue.poll().age);//16
}
static class Student {
public String name;
public int age;
public Student(String name, int age) {
this.name = name;
this.age = age;
}
}
结束语
本文只讲了两个队列的源码,可能存在不足或者不够深入的地方,还希望有朋友可以多多指正,其他几个队列的源码,后续有时间的话再做解析,感谢阅读!
最新文章
- Masonry tableviewCell布局(转)
- 北邮oj 题
- Linux压缩指令
- JSONP使用笔记
- phpstorm用正则删除PHP代码空行小技巧
- Puppet Agent/Master HTTPS Communications
- linux下shell脚本学习
- 未能加载文件或程序集&ldquo;App_global.asax&rdquo;或它的某一个依赖项
- svn小技巧——重定向svn diff
- Delphi的指针(转)
- 面向切面编程(Aop)
- 在 ServiceModel 客户端配置部分中,找不到引用协定“XX”的默认终结点元素的解决方法
- 【PyQt5-Qt Designer】文本框读写操作
- How to generate a new dictionary file of mmseg
- DLL中不能调用CoInitialize和CoInitializeEx
- 给Java新手的一些建议&mdash;&mdash;Java知识点归纳(Java基础部分)
- /.nav-tabs :是普通标签页 .nav-pills:胶囊式标签页 action ;默认的激活项,给<;li>;加默认显示的是哪个标签页内容 .nav是标签页的一个基类,给ul加 .nav-stacked: 垂直排列BootStrap
- Swift-assert使用时机
- WPF之数据绑定
- java代码关于匿名内部类和接口的方法使用