DelayQueue

DelayQueue 是基于 PriorityQueue 实现的线程安全的无界优先级阻塞队列,
队列的头部元素必须在超时后才能移除,元素必须实现 Delayed 接口。

创建实例

    // 控制访问的互斥锁
private final transient ReentrantLock lock = new ReentrantLock();
// 持有元素的优先级队列
private final PriorityQueue<E> q = new PriorityQueue<>(); /**
* 在延迟队列头部超时阻塞等待的线程,当有 leader 时,其他线程将无限期等待下去。
* leader 线程读取到元素之后,必须唤醒其他等待的线程。
*/
private Thread leader; /**
* 当队列为空或有 leader 在阻塞等待时,当前线程将在该条件上阻塞等待。
*/
private final Condition available = lock.newCondition(); /**
* 创建一个空的延迟队列
*/
public DelayQueue() {}

读取元素

    /**
* 移除并获取延时队列的头部元素,如果没有元素超时,则阻塞等待
*/
@Override
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 读取队列头部元素
E first = q.peek();
// 1)如果队列为空
if (first == null) {
// 当前线程在 available 条件上阻塞等待
available.await();
} else {
// 获取元素的超时时间
final long delay = first.getDelay(NANOSECONDS);
// 如果元素已经超时
if (delay <= 0L) {
// 则移除并返回头部元素
return q.poll();
}
first = null; // don't retain ref while waiting
// 1)如果已经有线程在阻塞等待
if (leader != null) {
// 当前线程在 available 条件上阻塞等待
available.await();
} else {
// 2)当前线程是第一个阻塞等待的线程
final Thread thisThread = Thread.currentThread();
// 写入 leader
leader = thisThread;
try {
// 阻塞等待指定的超时时间
available.awaitNanos(delay);
} finally {
// 线程被激活后尝试移除 leader
if (leader == thisThread) {
leader = null;
}
}
}
}
}
} finally {
// 如果没有 leader 并且队列中有元素存在
if (leader == null && q.peek() != null) {
// 唤醒其他阻塞等待的线程来读取元素
available.signal();
}
lock.unlock();
}
}

添加元素

    /**
* 将目标元素 e 插入到延时队列中,由于是无界的,该操作不会被阻塞
*/
@Override
public void put(E e) {
offer(e);
} @Override
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 插入元素
q.offer(e);
// 如果当前元素是优先级最高的元素
if (q.peek() == e) {
// 清除 leader
leader = null;
// 唤醒在可用条件上等待的线程
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

最新文章

  1. NoSQL
  2. json接口
  3. ios 随记录
  4. struts2拦截器
  5. HDU 5795 A Simple Nim (博弈) ---2016杭电多校联合第六场
  6. SAP FI/CO凭证不一致的解决办法
  7. as3资源加载-Loader和URLLoader
  8. linux 文件系统的管理 (硬盘) 工作原理
  9. UVa 10375 (唯一分解定理) Choose and divide
  10. Linux 学习笔记 文件权限
  11. NHibernate——基本映射(5)
  12. 使用Iterator遍历Sheet(POI)验证及解释结果有序性
  13. vue.js项目安装
  14. C#利用Attribute实现简易AOP介绍
  15. CRM系统(第四部分)
  16. 下载工具axel 和 mwget
  17. asp.net core2.1 bundleconfig.json合并压缩资源文件
  18. 前端js实现 blob转base64位 和 base64位转blob
  19. 火狐浏览器之伪造IP地址
  20. [SpringMVC] - 简单说明什么是SpringMVC

热门文章

  1. yaf框架安装
  2. HNUSTOJ-1253 Babelfish(字典树)
  3. 4G 内存怎么读取一个 5G 的数据?
  4. git error: failed to push some refs to &#39;git@github.com:xxx/xxx.git&#39;
  5. git 中 HEAD detached from 802e836
  6. 解决stanfordnlp一直运行不报错也没有结果
  7. 你不知道的props和state
  8. docker配置Nginx
  9. 微信授权获取code/openid
  10. Divergent series