一、概述

Java是天生就支持并发的语言,支持并发意味着多线程,线程的频繁创建在高并发及大数据量是非常消耗资源的,因为java提供了线程池。在jdk1.5以前的版本中,线程池的使用是及其简陋的,但是在JDK1.5后,有了很大的改善。JDK1.5之后加入了java.util.concurrent包,java.util.concurrent包的加入给予开发人员开发并发程序以及解决并发问题很大的帮助。这篇文章主要介绍下并发包下的Executor接口,Executor接口虽然作为一个非常旧的接口(JDK1.5 2004年发布),但是很多程序员对于其中的一些原理还是不熟悉,因此写这篇文章来介绍下Executor接口,同时巩固下自己的知识。如果文章中有出现错误,欢迎大家指出。

二、Executors工厂类

对于数据库连接,我们经常听到数据库连接池这个概念。因为建立数据库连接时非常耗时的一个操作,其中涉及到网络IO的一些操作。因此就想出把连接通过一个连接池来管理。需要连接的话,就从连接池里取一个。当使用完了,就“关闭”连接,这不是正在意义上的关闭,只是把连接放回到我们的池里,供其他人在使用。所以对于线程,也有了线程池这个概念,其中的原理和数据库连接池是差不多的,因此java jdk中也提供了线程池的功能。

线程池的作用:线程池就是限制系统中使用线程的数量以及更好的使用线程

根据系统的运行情况,可以自动或手动设置线程数量,达到运行的最佳效果:配置少了,将影响系统的执行效率,配置多了,又会浪费系统的资源。用线程池配置数量,其他线程排队等候。当一个任务执行完毕后,就从队列中取一个新任务运行,如果没有新任务,那么这个线程将等待。如果来了一个新任务,但是没有空闲线程的话,那么把任务加入到等待队列中。

为什么要用线程池:

  1. 减少线程创建和销毁的次数,使线程可以多次复用
  2. 可以根据系统情况,调整线程的数量。防止创建过多的线程,消耗过多的内存(每个线程1M左右)

Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

比较重要的几个类:

ExecutorService

真正的线程池接口。

ScheduledExecutorService

能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。

ThreadPoolExecutor

ExecutorService的默认实现。

ScheduledThreadPoolExecutor

继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。

1. newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor()

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

2. newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads)

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

3. newCachedThreadPool

public static ExecutorService newCachedThreadPool()

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,

那么就会回收部分空闲(60秒处于等待任务到来)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池的最大值是Integer的最大值(2^31-1)。

4. newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

实例:

注:使用了java8的lambda表达式以及stream

1.newSingleThreadExecutor

public class SingleThreadExecutorTest {

    public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
IntStream.range(0, 5).forEach(i -> executor.execute(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("finished: " + threadName);
})); try {
//close pool
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (!executor.isTerminated()) {
executor.shutdownNow();
}
}
}
}

输出结果:

finished: pool-1-thread-1
finished: pool-1-thread-1
finished: pool-1-thread-1
finished: pool-1-thread-1
finished: pool-1-thread-1

线程名都一样,说明是同一个线程

2.newFixedThreadPool

public class FixedThreadExecutorTest {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(3);

        IntStream.range(0, 6).forEach(i -> executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
String threadName = Thread.currentThread().getName();
System.out.println("finished: " + threadName);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
//close pool 同上...
  
}
}

输出结果:

finished: pool-1-thread-2
finished: pool-1-thread-3
finished: pool-1-thread-1
finished: pool-1-thread-1
finished: pool-1-thread-2
finished: pool-1-thread-3

只创建了三个线程

3.newCachedThreadPool

public class CachedThreadExecutorTest {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newCachedThreadPool();
IntStream.range(0, 6).forEach(i -> executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
String threadName = Thread.currentThread().getName();
System.out.println("finished: " + threadName);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
//close pool 同上...
}
}

输出结果:

finished: pool-1-thread-1
finished: pool-1-thread-2
finished: pool-1-thread-3
finished: pool-1-thread-6
finished: pool-1-thread-5
finished: pool-1-thread-4

创建了6个线程

4.newScheduledThreadPool

public class ScheduledThreadExecutorTest {

    public static void main(String[] args) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); executor.scheduleAtFixedRate(() -> System.out.println(System.currentTimeMillis()), 1000, 2000, TimeUnit.MILLISECONDS); //close pool 同上
}
}

输出结果:

1457967177735
1457967179736
1457967181735

三:ThreadPoolExecutor详解

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。

  在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}

从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

ThreadPoolExecutor的完整构造方法的签名是:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 
  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有 任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者 prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线 程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize 时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize。即当线程池中的线程数大于corePoolSize 时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize 时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS;               //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    ArrayBlockingQueue; //有界队列
    LinkedBlockingQueue; //无界队列
    SynchronousQueue; //特殊的一个队列,只有存在等待取出的线程时才能加入队列,可以说容量为0,是无界队列

    ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

  • threadFactory:线程工厂,主要用来创建线程;
  • handler:表示当拒绝处理任务时的策略,有以下四种取值:
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    ThreadPoolExecutorExecutors类的底层实现。

    在JDK帮助文档中,有如此一段话:

    “强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)Executors.newSingleThreadExecutor()(单个后台线程)

    它们均为大多数使用场景预定义了设置。”

    下面介绍一下几个类的源码:

    ExecutorService   newFixedThreadPool (int nThreads):固定大小线程池。

    可以看到,corePoolSize和maximumPoolSize的大小是一样的(实际上,后面会介绍,如果使用无界queue的话 maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表明什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的。

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

  ExecutorService   newSingleThreadExecutor():单线程

 public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

ExecutorService newCachedThreadPool():无界线程池,可以进行自动线程回收

这个实现就有意思了。首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。可能对于该BlockingQueue有些陌生,简单说:该 QUEUE中,每个插入操作必须等待另一个线程的对应移除操作。

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

public abstract class AbstractExecutorService implements ExecutorService 

 而ExecutorService又是继承了Executor接口

public interface ExecutorService extends Executor 

我们看一下Executor接口的实现:

public interface Executor {
void execute(Runnable command);
}

到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

  Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

  然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

  然后ThreadPoolExecutor继承了类AbstractExecutorService。

  在ThreadPoolExecutor类中有几个非常重要的方法:

public void execute(Runnable command)

public <T> Future<T> submit(Callable<T> task)

public void shutdown()

public List<Runnable> shutdownNow()  //返回未执行的任务

execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

  submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中 并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的 实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

  shutdown()和shutdownNow()是用来关闭线程池的。

最新文章

  1. python:Django
  2. Cygwin的安装,卸载,以及安装gdb
  3. Delphi中GUID相等检查中经典指针应用
  4. LeetCode OJ学习
  5. 未在本地计算机上注册“Microsoft.Jet.OLEDB.4.0”提供程序(Oledb)
  6. JS邮箱验证-正则验证
  7. Eclipse中如何关联Javadoc
  8. maven pom.xml 中各个标签元素的作用
  9. {style}/index_article.htm {style}表示什么意思啊
  10. Day3 Python基础之while、for循环(二)
  11. Ios项目添加Pods
  12. jQuery的DOM操作之设置和获取HTML、文本和值 html()text()val()
  13. p2751 Job Processing
  14. mybatis 之引入多个model
  15. springmvc框架开发中解决产生的乱码情况
  16. Spark的基本说明
  17. staticmethod classmethod
  18. commit
  19. ZigBee设备入网流程之关联方式
  20. 【51NOD】独木舟

热门文章

  1. Java编程思想读书笔记之内部类
  2. SQL对字符串数组的处理详解
  3. Android 多语言
  4. Android自定义控件之轮播图控件
  5. .NET下Excel报表的打印
  6. C# 零散知识 扩展方法 类型约束
  7. Android 之 自动匹配字符AutoCompleteTextView
  8. MYSQL的慢查询两个方法
  9. python idle 清屏问题的解决
  10. 附带详细注释的log4net的app.config文件配置例子