Java 定时任务可以用Timer + TimerTask来做,或者使用ScheduledExecutorService,使用ScheduledExecutorService有两个好处:

1. 如果任务执行时间过长,TimerTask会出现延迟执行的情况。比如,第一任务在1000ms执行了4000ms,第二个任务定时在2000ms开始执行。这里由于第一个任务要执行4000,所以第二个任务实际在5000ms开始执行。这是由于Timer是单线程,且顺序执行提交的任务

2. 如果执行任务抛出异常,Timer是不会执行会后面的任务的

使用ScheduledExecutorService可以避免上面两种情况,因为ScheduledExecutorService是线程池,有多个线程执行。

下面是一个使用ScheduledExecutorService实现定时任务的Demo

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; /**
* Created by gxf on 2017/6/26.
*/
public class TestSchedule {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
Task task = new Task();
scheduledExecutorService.scheduleAtFixedRate(task, -10, 1, TimeUnit.SECONDS);
}
} class Task implements Runnable{
public void run(){
System.out.println("do task...");
}
}

这里每隔1秒,控制台会输出do task

ScheduledExecutorService是一个接口

public interface ScheduledExecutorService extends ExecutorService {

其中一个实现是

public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {

在Demo中

ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(5);

跟进ScheduledThreadPoolExecutor

/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

这里Super调用

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

设定了线程池的各个参数,核心线程数,最大线程数,任务队列等。但这里还有线程创建,有任务提交了,才会创建线程池

继续Demo中

Task task = new Task();

这里只是创建了一个Runable对象

继续Demo

scheduledExecutorService.scheduleAtFixedRate(task, -10, 1, TimeUnit.SECONDS);

这里已经把任务提交给线程池了,进入方法

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

前面都是一些条件检查和包装,看最后的delayedExecute(t)

/**
* Main execution method for delayed or periodic tasks. If pool
* is shut down, rejects the task. Otherwise adds task to queue
* and starts a thread, if necessary, to run it. (We cannot
* prestart the thread to run the task because the task (probably)
* shouldn't be run yet.) If the pool is shut down while the task
* is being added, cancel and remove it if required by state and
* run-after-shutdown parameters.
*
* @param task the task
*/
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}

ok,这里可以看到任务被添加到了一个队列里面。在看最后,ensurePrestart()

/**
* Same as prestartCoreThread except arranges that at least one
* thread is started even if corePoolSize is 0.
*/
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}

这里可以看出,如果线程池里面的线程数,小于核心线程数,会继续添加线程。进入addWork(null, true)

 private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false; for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
} boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

第32行和33行可以看出,任务已经提交给Work类了,第57行,t.start()这里启动线程,执行提交的任务

到这里,提交的任务已经开始执行了。这里,我们在看一下Worker这个包装类

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;

可以看到有Thread, task,这里,其实就是把task指向要执行的任务,thread作为作为线程执行任务。

在ThreadPoolExecutor中

private final HashSet<Worker> workers = new HashSet<Worker>();

这个保存我们生成的线程,有了这个就不用重新创建和销毁线程了

我们在看一下Worker的run()方法

/** Delegates main run loop to outer runWorker  */
public void run() {
runWorker(this);
}

继续跟进

 final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

第8行getTask()取要执行的任务,第23行执行任务

继续跟进第8行getTask()

 private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out? for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
} int wc = workerCountOf(c); // Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
} try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

这里主要看第27-29,前面我们说过,任务是放到一个队列里面的。其实,是个阻塞队列,在队列为空的时候取,会阻塞线程。这里就用到了这个功能,在29行workQueue.take()如果没有任务,线程就会阻塞。有任务就会取任务进行执行。

简单点:线程池就是一个Set对象 + BlockingQueue对象, Workers + BlockingQueue

最新文章

  1. C#中ToString()格式详解
  2. 用cookie记住用户名
  3. 【BZOJ 2005】【NOI 2010】能量采集 数论+容斥原理
  4. 如何准备IREB考试
  5. hdu 1250 Hat&#39;s Fibonacci(java,简单,大数)
  6. Android 异步加载神器Loader全解析
  7. 项目SVN的IP地址发生变化时修改SVN为新的IP地址
  8. Fortify 4.0 帮助文档下载
  9. 解决DropDownList 有一个无效 SelectedValue,因为它不在项目列表中。这是怎么回事?
  10. 初学java,遇到的陌生词语(1)
  11. ecstore实现图片分离(静态资源分离)配置文件
  12. MySQL教程及经常使用命令1.1
  13. iwinfo 的使用
  14. grunt轻松入门
  15. Day9 进程同步锁 进程队列 进程池 生产消费模型 进程池 paramike模块
  16. 记一次生产数据库&quot;意外&quot;重启的经历
  17. 使用HTML meta no-cache标签来禁用缓存
  18. Java高级类特性(二)
  19. js导出excel:前端当前数据的导出
  20. matchesSelector()方法

热门文章

  1. Ajaxa的原生使用方法
  2. 浅谈C#中的委托、事件与异步
  3. Java过滤器详细文档,简介,实例,应用
  4. c#Udp分包组包方法
  5. 三元运算符,i++(先用后加) ++i (先加后用)区别
  6. HDU - 1223 DP 分类
  7. 1093 字符串A+B (20 分)
  8. 玩转微信2次开发1_交互通信api.php(微擎版)
  9. [转] SpringBoot RESTful 应用中的异常处理小结
  10. ssh,hibernate动态映射多表