这篇文章主要说说DelayedWorkQueue。

在ScheduledThreadPoolExecutor使用DelayedWorkQueue来存放要执行的任务,因为这些任务是带有延迟的,而每次执行都是取第一个任务执行,因此在DelayedWorkQueue中任务必然按照延迟时间从短到长来进行排序的。

DelayedWorkQueue使用堆来实现的。

和以前分析BlockingQueue的实现类一样,首先来看offer方法,基本就是一个添加元素到堆的逻辑。

        public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 因为元素时存储在一个数组中,随着堆变大,当数组存储不够时,需要对数组扩容
if (i >= queue.length)
grow();
size = i + 1;
// 如果原来队列为空
if (i == 0) {
queue[0] = e; // 这个i就是RunnableScheduledFuture用到的heapIndex
setIndex(e, 0);
} else {
// 添加元素到堆中
siftUp(i, e);
}
// 如果队列原先为空,那么可能有线程在等待元素,这时候既然添加了元
// 素,就需要通过Condition通知这些线程
if (queue[0] == e) {
// 因为有元素新添加了,第一个等待的线程可以结束等待了,因此这里
// 删除第一个等待线程
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}

这里顺带看一下siftUp,熟悉堆的实现的朋友应该很容易看懂这是一个把元素添加已有堆中的算法。

        private void siftUp(int k, RunnableScheduledFuture key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}

那么接着就看看poll:

        public RunnableScheduledFuture poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 因为即使拿到任务,线程还是需要等待,而这个等待过程是由队列帮助完成的
// 因此poll方法只能返回已经到执行时间点的任务
RunnableScheduledFuture first = queue[0];
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else
return finishPoll(first);
} finally {
lock.unlock();
}
}

因为poll方法只能返回已经到了执行时间点的任务,所以对于我们理解队列如何实现延迟执行没有意义,因此重点看看take方法:

        public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 尝试获取第一个元素,如果队列为空就进入等待
RunnableScheduledFuture first = queue[0];
if (first == null)
available.await();
else {
// 获取任务执行的延迟时间
long delay = first.getDelay(TimeUnit.NANOSECONDS);
// 如果任务不用等待,立刻返回该任务给线程
if (delay <= 0)
// 从堆中拿走任务
return finishPoll(first);
// 如果任务需要等待,而且前面有个线程已经等待执行任务(leader线程
// 已经拿到任务了,但是执行时间没有到,延迟时间肯定是最短的),
// 那么执行take的线程肯定继续等待,
else if (leader != null)
available.await();
// 当前线程的延迟时间是最短的情况,那么更新leader线程
// 用Condition等待直到时间到点,被唤醒或者被中断
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
// 重置leader线程以便进行下一次循环
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 队列不为空发出signal很好理解,这里附带了没有leader线程
// 的条件是因为leader线程存在时表示leader线程正在等待执行时间点的
// 到来,如果此时发出signal会触发awaitNanos提前返回
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

take方法的重点就是leader线程,因为存在延迟时间,即使拿到任务,线程还是需要等待的,leader线程就那个最先执行任务的线程。

因为线程拿到任务之后还是需要等待一段延迟执行的时间,所以对于超时等待的poll方法来说就有点意思了:

        public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture first = queue[0];
// 任务队列为空的情况
if (first == null) {
// nanos小于等于0有两种可能:
// 1. 参数值设定
// 2. 等待已经超时
if (nanos <= 0)
return null;
else
// 等待一段时间,返回剩余的等待时间
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
if (nanos <= 0)
return null;
// leader线程存在并且nanos大于delay的情况下,
// 依然等待nanos这么长时间,不用担心会超过delay设定
// 的时间点,因为leader线程到时间之后会发出signal
// 唤醒线程,而那个时候显然还没有到delay设定的时间点
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
// 剩余的超时时间
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

通过分析以上代码基本上已经理清楚了DelayedWorkQueue实现延迟执行的原理:

1. 按照执行延迟从短到长的顺序把任务存储到堆;

2. 通过leader线程让拿到任务的线程等到规定的时间点再执行任务;

最新文章

  1. Linux进程间通信(九):数据报套接字 socket()、bind()、sendto()、recvfrom()、close()
  2. C++成员变量内存对齐问题,ndk下非对齐的内存访问导致BUS_ADRALN
  3. 编写SqlHelper使用,在将ExecuteReader方法封装进而读取数据库中的数据时会产生Additional information: 阅读器关闭时尝试调用 Read 无效问题,解决方法与解释
  4. C#实现数字字符串左补齐0的3种方法
  5. 【Android Api 翻译2】Android Testing(1) 浅尝Android测试的奥秘
  6. NYOJ-36 最长公共子序列 AC 分类: NYOJ 2014-01-03 20:54 155人阅读 评论(0) 收藏
  7. android之AlarmManager 全局定时器
  8. springMVC数据封装成POJO
  9. Magento Api 记录
  10. nginx优化 突破十万并发(转)
  11. [BZOJ 1004] [HNOI2008] Cards 【Burnside引理 + DP】
  12. 使用Teleport Pro离线下载网页所有内容
  13. php封装redis负载均衡类
  14. 关于wkwebview
  15. [DP之普通系列]
  16. Kindeditor编辑插件的使用
  17. LightGBM总结
  18. python中的正则表达式--re模块
  19. java 集合(一)ArrayList的继承树
  20. TOJ 3973 Maze Again &amp;&amp; TOJ 3128 简单版贪吃蛇

热门文章

  1. gevent的同步与异步
  2. AOP的实现的几种方式
  3. yii2之数据验证
  4. 我只是想获取access_token而已
  5. ES6 for-of循环和迭代器使用细节
  6. hbase+springboot+redis实现分页
  7. C#与SQl数据的对应关系(tinyint、smallint、int、bigint)
  8. Matplotlib常用示例入门
  9. T9
  10. python添加自定义cookies