大家先从ThreadPoolExecutor的整体流程入手:

针对ThreadPoolExecutor代码。我们来看下execute方法:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//poolSize大于等于corePoolSize时不添加线程,反之新初始化线程
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//线程运行状态外为运行,同一时候能够加入到队列中
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
//poolSize大于等于corePoolSize时。新初始化线程
else if (!addIfUnderMaximumPoolSize(command))
//无法加入初始化运行线程,怎么运行reject操作(调用RejectedExecutionHandler)
reject(command); // is shutdown or saturated
}
}

我们再看下真正的线程运行者(Worker):

	private final class Worker implements Runnable {
/**
* Runs a single task between before/after methods.
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
/*
* If pool is stopping ensure thread is interrupted;
* if not, ensure thread is not interrupted. This requires
* a double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
//当线程池的运行状态为关闭等。则运行当前线程的interrupt()操作
if ((runState >= STOP ||
(Thread.interrupted() && runState >= STOP)) &&
hasRun)
thread.interrupt();
/*
* Track execution state to ensure that afterExecute
* is called only if task completed or threw
* exception. Otherwise, the caught runtime exception
* will have been thrown by afterExecute itself, in
* which case we don't want to call it again.
*/
boolean ran = false;
beforeExecute(thread, task);
try {
//任务运行
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
} /**
* Main run loop
*/
public void run() {
try {
hasRun = true;
Runnable task = firstTask;
firstTask = null;
//推断是否存在须要运行的任务
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
//假设没有,则将工作线程移除,当poolSize为0是则尝试关闭线程池
workerDone(this);
}
}
} /* Utilities for worker thread control */ /**
* Gets the next task for a worker thread to run. The general
* approach is similar to execute() in that worker threads trying
* to get a task to run do so on the basis of prevailing state
* accessed outside of locks. This may cause them to choose the
* "wrong" action, such as trying to exit because no tasks
* appear to be available, or entering a take when the pool is in
* the process of being shut down. These potential problems are
* countered by (1) rechecking pool state (in workerCanExit)
* before giving up, and (2) interrupting other workers upon
* shutdown, so they can recheck state. All other user-based state
* changes (to allowCoreThreadTimeOut etc) are OK even when
* performed asynchronously wrt getTask.
*
* @return the task
*/
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
//当线程池大于corePoolSize,同一时候,存在运行超时时间,则等待对应时间,拿出队列中的线程
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
//堵塞等待队列中能够取到新线程
r = workQueue.take();
if (r != null)
return r;
//推断线程池运行状态。假设大于corePoolSize,或者线程队列为空,也或者线程池为终止的工作线程能够销毁
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
} /**
* Performs bookkeeping for an exiting worker thread.
* @param w the worker
*/
//记录运行任务数量,将工作线程移除。当poolSize为0是则尝试关闭线程池
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
if (--poolSize == 0)
tryTerminate();
} finally {
mainLock.unlock();
}
}

通过上述代码,总结下四个keyword的使用方法

  • corePoolSize 核心线程数量

线程保有量,线程池总永久保存运行线程的数量

  • maximumPoolSize 最大线程数量

最大线程量,线程最多不能超过此属性设置的数量,当大于线程保有量后,会新启动线程来满足线程运行。

  • 线程存活时间

获取队列中任务的超时时间。当阈值时间内无法获取线程,则会销毁处理线程,前提是线程数量在corePoolSize 以上

  • 运行队列

运行队列是针对任务的缓存,任务在提交至线程池时。都会压入到运行队列中。所以这里大家最好设置下队列的上限。防止溢出

ThreadPoolExecuter的几种实现

  public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • CachedThreadPool 运行线程不固定,

优点:能够把新增任务所有缓存在一起,

     坏处:仅仅能用在短时间完毕的任务(占用时间较长的操作能够导致线程数无限增大,系统资源耗尽)

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
  • 单线程线程池
       优点:针对单cpu,单线程避免系统资源的抢夺
       坏处:多cpu多线程时。不能全然利用cpu资源
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
  • 固定长度线程池
        优点:线程数量固定,不会存在线程反复初始化
        坏处:没有对队列大小进行限制,线程初始化后,再也不能回收线程资源

版权声明:本文博客原创文章。博客,未经同意,不得转载。

最新文章

  1. SQL Server 在缺少文件组的情况下如何还原数据库
  2. 变量声明和定义及extern 转载
  3. 如何实现Outlook 2010 下载邮件后自动删除服务器上的邮件
  4. STM32的bulk双缓冲传输速度的讨论,硬件的坑永远填不完
  5. (*p)++和*(p++)和*p++的区别
  6. TCP digest
  7. [Hadoop源码解读](二)MapReduce篇之Mapper类
  8. 跨域(cross-domain)访问 cookie (读取和设置)
  9. 安卓TCP通信
  10. Java课程设计—学生成绩管理系统(54号童欢)
  11. 【Android 应用开发】 Android 各种版本简介 ( Support 支持库版本 | Android Studio 版本 | Gradle 版本 | jcenter 库版本 )
  12. 【题解】Luogu P4381 [IOI2008]Island
  13. idea 本地tomcat启动控制台乱码问题
  14. Codeforces 922F Divisibility 构造
  15. Nowcoder186C 失衡天平 背包
  16. Centos6.5+Redmine
  17. implementation compile的区别
  18. ThinkPHP5调用PHPExcel类实现导入导出
  19. 关于如何在本地IIS搭建网站
  20. iOS 解决汉字联想输入,导致字数限制失效的问题

热门文章

  1. [内核编程] visual studio 2010配置驱动开发环境
  2. html 页面 黑白
  3. android安卓开发基础小笔记,添加按钮事件,打开新窗体,窗体传值,回传
  4. Android中的动画具体解释系列【3】——自己定义动画研究
  5. ios开发事件处理之:三 :寻找最合适的view
  6. JNI:no implementation found in native...
  7. jquery中的this与$(this)的区别总结(this:html元素)($(this):JQuery对象)
  8. 开发文档生成工具----强大的Doxygen工具使用手册
  9. 【转】dbx用法讲解
  10. JSON 表达式