文本将主要讲述 ThreadPoolExecutor 一个特殊的子类 ScheduledThreadPoolExecutor,主要用于执行周期性任务;所以在看本文之前最好先了解一下 ThreadPoolExecutor ,可以参考 ThreadPoolExecutor 详解;另外 ScheduledThreadPoolExecutor 中使用了延迟队列,主要是基于完全二叉堆实现的,可以参考 完全二叉堆

一、ScheduledThreadPoolExecutor 结构概述

1. 继承关系

public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor implements ScheduledExecutorService {}

在源码中可以看到,ScheduledThreadPoolExecutor 的状态管理、入队操作、拒绝操作等都是继承于 ThreadPoolExecutorScheduledThreadPoolExecutor 主要是提供了周期任务和延迟任务相关的操作;

  • schedule(Runnable command, long delay, TimeUnit unit) // 无返回值的延迟任务
  • schedule(Callable callable, long delay, TimeUnit unit) // 有返回值的延迟任务
  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // 固定频率周期任务
  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) // 固定延迟周期任务

就 `ScheduledThreadPoolExecutor` 的运行逻辑而言,大致可以表述为:

  • 首先将 Runnable/Callable 封装为 ScheduledFutureTask,延迟时间作为比较属性;
  • 然后加入 DelayedWorkQueue 队列中,每次取出队首延迟最小的任务,超时等待,然后执行;
  • 最后判断是否为周期任务,然后将其重新加入 DelayedWorkQueue 队列中;

其内部结构如图所示:

这里需要注意的:

  • ScheduledThreadPoolExecutor 中的队列不能指定,只能是 DelayedWorkQueue;因为他是 无界队列,所以再添加任务的时候线程最多可以增加到 coreSize,这里不清楚的可以查看 ThreadPoolExecutor 详解 ,就不再重复了;
  • ScheduledThreadPoolExecutor 重写了 ThreadPoolExecutor 的 execute() 方法,其执行的核心方法变成 delayedExecute()

2. ScheduledFutureTask

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final long sequenceNumber; // 任务序号,从 AtomicLong sequencer 获取,当延迟时间相同时,序号小的先出
private long time; // 下次任务执行时间
private final long period; // 0 表示非周期任务,正值表示固定频率周期任务,负值表示固定延迟周期任务
RunnableScheduledFuture<V> outerTask = this; // 重复执行的任务,传入的任务可以使用 decorateTask() 重新包装
int heapIndex; // 队列索引
}

其中最重要的方法必然是 run 方法了:

public void run() {
boolean periodic = isPeriodic(); // 是否为周期任务,period != 0
if (!canRunInCurrentRunState(periodic)) // 当前状态能否继续运行,详细测试后面还会讲到
cancel(false); // 取消任务
else if (!periodic) // 不是周期任务时,直接运行
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) { // 时周期任务
setNextRunTime(); // 设置下次执行时间
reExecutePeriodic(outerTask); // 重新入队
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning); // 设置中断状态
if (cancelled && removeOnCancel && heapIndex >= 0) // 当设置 removeOnCancel 状态时,移除任务
remove(this); // 默认为 false
return cancelled;
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) { // 如果当前状态可以执行
super.getQueue().add(task); // 则重新入队
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else ensurePrestart(); // 确保有线程执行任务
}
}

此外还有 DelayedWorkQueue,但是这里不准备讲了,可以查看 完全二叉堆 了解实现的原理;

二、scheduleAtFixedRate 与 scheduleWithFixedDelay

scheduleAtFixedRatescheduleWithFixedDelay 是我们最常用的两个方法,但是他们的区别可能不是很清楚,这里重点讲一下,

1. scheduleAtFixedRate

// 测试
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.scheduleAtFixedRate(() -> {
sleep(1000); // 睡眠 1s,
log.info("run task");
}, 1, 2, TimeUnit.SECONDS); // 延迟 1s,周期 2s

// 打印

[19:41:28,489 INFO ] [pool-1-thread-1] - run task

[19:41:30,482 INFO ] [pool-1-thread-1] - run task

[19:41:32,483 INFO ] [pool-1-thread-1] - run task

[19:41:34,480 INFO ] [pool-1-thread-1] - run task

可以看到的确时固定周期 2s 执行的,但是如果任务执行时间超过周期呢?

// 测试
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.scheduleAtFixedRate(() -> {
int i = 2000 + random.nextInt(3) * 1000;
sleep(i);
log.info("run task, sleep :{}", i);
}, 1, 2, TimeUnit.SECONDS); // 延迟 1s,周期 2s

// 打印

[19:42:53,428 INFO ] [pool-1-thread-1] - run task, sleep :2000

[19:42:55,430 INFO ] [pool-1-thread-1] - run task, sleep :2000

[19:42:59,430 INFO ] [pool-1-thread-1] - run task, sleep :4000

[19:43:02,434 INFO ] [pool-1-thread-1] - run task, sleep :3000

[19:43:06,436 INFO ] [pool-1-thread-1] - run task, sleep :4000

可以看到如果任务执行时间超出周期时,下一次任务会立刻运行;就好像周期是一个有弹性的袋子,能装下运行时间的时候,是固定大小,装不下的时候就会被撑大,图像化表示如下:

2. scheduleWithFixedDelay

// 测试
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.scheduleAtFixedRate(() -> {
int i = 1000 + random.nextInt(5) * 1000;
sleep(i);
log.info("run task, sleep :{}", i);
}, 1, 2, TimeUnit.SECONDS); // 延迟 1s,周期 2s

// 打印

[20:05:40,682 INFO ] [pool-1-thread-1] - run task, sleep :1000

[20:05:45,686 INFO ] [pool-1-thread-1] - run task, sleep :3000

[20:05:49,689 INFO ] [pool-1-thread-1] - run task, sleep :2000

[20:05:55,690 INFO ] [pool-1-thread-1] - run task, sleep :4000

[20:06:01,692 INFO ] [pool-1-thread-1] - run task, sleep :4000

可以看到无论执行时间是多少,其结果都是在执行完毕后,停顿固定的时间,然后执行下一次任务,其图形化表示为:

三、 源码分析

1. 延迟任务

public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
} public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
} public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null) throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(
command,new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
delayedExecute(t);
return t;
} public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (callable == null || unit == null) throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(
callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
delayedExecute(t);
return t;
}

可以看到所有的周期任务,最终执行的都是 delayedExecute 方法,其中 decorateTask 是一个钩子函数,其之类可以利用他对任务进行重构过滤等操作;

private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown()) reject(task); // 如果线程池已经关闭,则拒绝任务
else {
super.getQueue().add(task); // 任务入队
if (isShutdown() && // 再次检查,线程池是否关闭
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart(); // 确保有线程执行任务
}
}

2. 周期任务

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit) {
if (command == null || unit == null) throw new NullPointerException();
if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period)); // 注意这里添加的是正值 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
} public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
if (command == null || unit == null) throw new NullPointerException();
if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay)); // 注意这里添加的是负值 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

从上面代码可以看到 scheduleAtFixedRatescheduleWithFixedDelay 只有周期任务的时间不同,其他的都一样,那么下面我们看一下他们的任务时间计算;

public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
} private void setNextRunTime() {
long p = period;
if (p > 0) // 正值表示 scheduleAtFixedRate
time += p; // 不管任务执行时间,直接加上周期时间,也就是一次任务超时,会影响后续任务的执行,
// 超时的时候,getDelay 是负值,所以在延迟队列中必然排在最前面,立刻被取出执行
else
time = triggerTime(-p); // 计算触发时间
} long triggerTime(long delay) { // 这里可以看到,每次的确是在当前时间的基础上,加上延迟时间;
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

这里特别要注意 scheduleAtFixedRate 一次任务超时,会持续影响后面的任务周期安排,所以在设定周期的时候要特别注意; 例如:

// 测试
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.scheduleAtFixedRate(() -> {
int i = random.nextInt(5) * 1000;
sleep(i);
log.info("run task, sleep :{}", i);
}, 1, 2, TimeUnit.SECONDS);

// 打印

[20:29:11,310 INFO ] [pool-1-thread-1] - run task, sleep :1000

[20:29:16,304 INFO ] [pool-1-thread-1] - run task, sleep :4000

[20:29:19,304 INFO ] [pool-1-thread-1] - run task, sleep :3000

[20:29:21,305 INFO ] [pool-1-thread-1] - run task, sleep :2000

[20:29:22,305 INFO ] [pool-1-thread-1] - run task, sleep :1000

[20:29:23,306 INFO ] [pool-1-thread-1] - run task, sleep :1000

[20:29:27,306 INFO ] [pool-1-thread-1] - run task, sleep :4000

[20:29:30,307 INFO ] [pool-1-thread-1] - run task, sleep :3000

如图所示:

3. 取消任务

private volatile boolean continueExistingPeriodicTasksAfterShutdown; //关闭后继续执行周期任务,默认false
private volatile boolean executeExistingDelayedTasksAfterShutdown = true; //关闭后继续执行延迟任务,默认true
private volatile boolean removeOnCancel = false; // 取消任务是,从队列中删除任务,默认 false @Override void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); // 继续延迟任务
boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); // 继续周期任务
if (!keepDelayed && !keepPeriodic) { // 都是 false,直接清除
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
// Traverse snapshot to avoid iterator exceptions
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // also remove if already cancelled
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();
}

总结

  • scheduleAtFixedRate,固定频率周期任务,注意一次任务超时,会持续的影响后续的任务周期;
  • scheduleWithFixedDelay,固定延迟周期任务,即每次任务结束后,超时等待固定时间;
  • 此外 ScheduledThreadPoolExecutor 线程最多为核心线程,最大线程数不起作用,因为 DelayedWorkQueue 是无界队列;

最新文章

  1. Weblogic的架构
  2. C++中三种new的用法
  3. 利用Native Client OLEDB 11 高效率地对SQL SERVER 进行查询和插入操作
  4. ubuntu之使用sublime text3搭建Python IDE
  5. 如何自定义Grunt任务
  6. Apache Commons fileUpload实现文件上传之一
  7. css的框架——base.css
  8. JavaEE中遗漏的10个最重要的安全控制
  9. MyFirstStruts2
  10. AB串
  11. Java开发知识之Java控制语句
  12. .net C# 抽奖,中奖
  13. Django之视图Views
  14. golang 并发顺序输出数字
  15. ssh stricthostkeychecking=0
  16. 初学Hadoop之图解MapReduce与WordCount示例分析
  17. 自动换行的两种代码(C#)
  18. Leetcode 12
  19. 苏宁易购Android架构演进史
  20. elasticsearch2.x插件之一:kopf

热门文章

  1. Python_shelve模块操作二进制文件
  2. 最好的营销是&ldquo;调情&rdquo;
  3. 项目在tomcat里运行一段时间总是自动崩掉的问题排查与解决
  4. Redis数据结构简介
  5. vue-quasar-admin 一个包含通用权限控制的后台管理系统
  6. Python3实现ICMP远控后门(下)之“Boss”出场
  7. Nginx与Lua
  8. github 用git bash上传项目 最后提示 Everything up-to-date 但没传上去
  9. Python3 requests与http.cookiejar的使用(cookie的保存与加载)
  10. Kafka元数据缓存(metadata cache)