java.util.concurrent.ExecutorService接口提供了许多线程管理的方法

Method 说明
shutdown 拒绝接收新的任务,待已提交的任务执行后关闭,且宿主线程不阻塞,若需要阻塞可借助awaitTermination实现
shutdownNow 停止所有正在执行的任务,挂起未执行的任务并关闭,且宿主线程不阻塞,若需要阻塞可借助awaitTermination实现
awaitTermination 当发生shutdown时,阻塞宿主线程直到约定的时间已过或者所有任务完成
submit 提交任务Callable/Runnable,可利用Future的get()方法使宿主线程阻塞直到任务结束后返回结果

有了以上方法,便可以基于此接口实现线程池的各种功能(例如java.util.concurrent.ThreadPoolExecutor/java.util.concurrent.ScheduledThreadPoolExecutor),以java.util.concurrent.ThreadPoolExecutor为例,其参数的详解

Name Type 说明
corePoolSize int 线程池中最小的线程数
maximumPoolSize int 线程池中最大的线程数
keepAliveTime long 线程空闲时间,若线程数大于corePoolSize,空闲时间超过该值的线程将被终止回收
unit TimeUnit keepAliveTime的时间单位
workQueue BlockingQueue<Runnable> 已提交但未执行的任务队列
threadFactory ThreadFactory 创建新线程的工厂
handler RejectedExecutionHandler 当线程池或队列达到上限拒绝新任务抛出异常时的处理类

同时,java.util.concurrent.Executors类提供的常用方法有

Method 说明 基类
newFixedThreadPool 线程池中含固定数量的线程 基于java.util.concurrent.ThreadPoolExecutor类
newSingleThreadExecutor 线程池中仅含一个工作线程
newCachedThreadPool 按需创建线程,若线程池中无可用线程,则创建新的线程并加入,直到线程数达到上限值(Integer.MAX_VALUE)
newWorkStealingPool 按照可用CPU数创建线程池 基于java.util.concurrent.ForkJoinPool类

java.util.concurrent.ForkJoinPool类是Fork/Join框架的实现类,Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架,该类在有递归实现的场景有更优异的表现。

测试代码如下

import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import org.junit.Assert;
import org.junit.Test; /**
* @Description: 测试ExecutorService
*/
public class ThreadExecutorServiceTest {
private static final String THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION = "This is shutdownWithAwaitTermination";
private static final int RESULT = 111; private static boolean submitRunnable() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<?> future = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("This is submitRunnable");
}
});
return future.get() == null;
} private static Integer submitRunnableWithResult() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("This is submitRunnableWithResult");
}
}, RESULT);
return future.get();
} private static Integer submitBlockCallable() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<Integer> future = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("This is submitBlockCallable");
return RESULT;
}
});
return future.get();// 阻塞
} private static boolean submitNonBlockCallable() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<Integer> future = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("This is submitNonBlockCallable");
return RESULT;
}
});
while (!future.isDone()) {// 非阻塞
System.out.println(new Date());
}
return future.isDone();
} private static String shutdown() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
final StringBuilder sb = new StringBuilder();
executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(10000);
sb.append("This is shutdown");
return RESULT;
}
});
executorService.shutdown();
return sb.toString();
} private static String shutdownWithAwaitTermination() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
final StringBuilder sb = new StringBuilder();
executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(10000);
sb.append(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION);
return RESULT;
}
});
executorService.shutdown();
executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
return sb.toString();
} @Test
public void test() throws InterruptedException, ExecutionException {
Assert.assertTrue(submitRunnable());
Assert.assertEquals(RESULT, submitRunnableWithResult().intValue());
Assert.assertEquals(RESULT, submitBlockCallable().intValue());
Assert.assertTrue(submitNonBlockCallable());
Assert.assertTrue(shutdown().isEmpty());
Assert.assertEquals(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION, shutdownWithAwaitTermination());
} }

---恢复内容结束---

java.util.concurrent.ExecutorService接口提供了许多线程管理的方法

Method 说明
shutdown 拒绝接收新的任务,待已提交的任务执行后关闭,且宿主线程不阻塞,若需要阻塞可借助awaitTermination实现
shutdownNow 停止所有正在执行的任务,挂起未执行的任务并关闭,且宿主线程不阻塞,若需要阻塞可借助awaitTermination实现
awaitTermination 当发生shutdown时,阻塞宿主线程直到约定的时间已过或者所有任务完成
submit 提交任务Callable/Runnable,可利用Future的get()方法使宿主线程阻塞直到任务结束后返回结果

有了以上方法,便可以基于此接口实现线程池的各种功能(例如java.util.concurrent.ThreadPoolExecutor/java.util.concurrent.ScheduledThreadPoolExecutor),以java.util.concurrent.ThreadPoolExecutor为例,其参数的详解

Name Type 说明
corePoolSize int 线程池中最小的线程数
maximumPoolSize int 线程池中最大的线程数
keepAliveTime long 线程空闲时间,若线程数大于corePoolSize,空闲时间超过该值的线程将被终止回收
unit TimeUnit keepAliveTime的时间单位
workQueue BlockingQueue<Runnable> 已提交但未执行的任务队列
threadFactory ThreadFactory 创建新线程的工厂
handler RejectedExecutionHandler 当线程池或队列达到上限拒绝新任务抛出异常时的处理类

同时,java.util.concurrent.Executors类提供了基于java.util.concurrent.ThreadPoolExecutor类的工具方法,常用方法有

Method 说明
newFixedThreadPool 线程池中含固定数量的线程
newSingleThreadExecutor 线程池中仅含一个工作线程
newCachedThreadPool 按需创建线程,若线程池中无可用线程,则创建新的线程并加入,直到线程数达到上限值(Integer.MAX_VALUE)

测试代码如下

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import org.junit.Assert;
import org.junit.Test;
import org.lxp.multiple.thread.task.SumTask; /**
* @Description: 测试ExecutorService
* @author Super.Li
* @date Jul 6, 2017
*/
public class ThreadExecutorServiceTest {
private static final String THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION = "This is shutdownWithAwaitTermination";
private static final int RESULT = 111; private static boolean submitRunnable() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<?> future = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("This is submitRunnable");
}
});
return future.get() == null;
} private static Integer submitRunnableWithResult() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("This is submitRunnableWithResult");
}
}, RESULT);
return future.get();
} private static Integer submitBlockCallable() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<Integer> future = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("This is submitBlockCallable");
return RESULT;
}
});
return future.get();// 阻塞
} private static boolean submitNonBlockCallable() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<Integer> future = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("This is submitNonBlockCallable");
return RESULT;
}
});
while (!future.isDone()) {// 非阻塞
System.out.println(new Date());
}
return future.isDone();
} private static String shutdown() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
final StringBuilder sb = new StringBuilder();
executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(10000);
sb.append("This is shutdown");
return RESULT;
}
});
executorService.shutdown();
return sb.toString();
} private static String shutdownWithAwaitTermination() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
final StringBuilder sb = new StringBuilder();
executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(10000);
sb.append(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION);
return RESULT;
}
});
executorService.shutdown();
executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
return sb.toString();
} private static int testForkJoinPool(List<Integer> list) throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool(8);
Future<Integer> future = forkJoinPool.submit(new SumTask(list));
return future.get();
} @Test
public void test() throws InterruptedException, ExecutionException {
Assert.assertTrue(submitRunnable());
Assert.assertEquals(RESULT, submitRunnableWithResult().intValue());
Assert.assertEquals(RESULT, submitBlockCallable().intValue());
Assert.assertTrue(submitNonBlockCallable());
Assert.assertTrue(shutdown().isEmpty());
Assert.assertEquals(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION, shutdownWithAwaitTermination());
Assert.assertEquals(10, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4 })));
Assert.assertEquals(49, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 })));
Assert.assertEquals(60, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 })));
} }

SumTask类如下:

import java.util.List;
import java.util.concurrent.RecursiveTask; public class SumTask extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
private List<Integer> list; public SumTask(List<Integer> list) {
this.list = list;
} /**
* Ensure it is necessary to divide the job to parts and finish them separately
*
* @return
*/
@Override
protected Integer compute() {
int rtn, size = list.size();
if (size < 10) {
rtn = sum(list);
} else {
SumTask subTask1 = new SumTask(list.subList(0, size / 2));
SumTask subTask2 = new SumTask(list.subList(size / 2 + 1, size));
subTask1.fork();
subTask2.fork();
rtn = subTask1.join() + subTask2.join();
}
return rtn;
} private int sum(List<Integer> list) {
return list.stream().mapToInt(number -> number.intValue()).sum();
}
}

最新文章

  1. ABP理论学习之多租户
  2. Tables without a clustered index are not supported in this version of SQL Server. Please create a clustered index and try again.
  3. 用JAVA写一个日历计划
  4. JAVA魔法堂:读取.properties配置文件
  5. 7、JavaScript总结——实现选项卡切换的效果
  6. FSharp.Data 程序集之 Http
  7. Maven Spring JUnit 在Maven Clean Install时报
  8. redis的管理工具
  9. jsonp与cors跨域的一些理解(转)
  10. windows线程池四种情形(win核心读书笔记)
  11. web 直播&amp;即时聊天------阿里云、融云
  12. ubuntu下boost编译安装
  13. ionic 禁用 手势 滑动返回
  14. 当今商业中使用的三种十分重要的IT应用系统
  15. 从Linux内核角度看中间人攻击(ARP欺骗)并利用Python scapy实现
  16. KVM -&gt; 虚拟机磁盘管理_03
  17. Contest with Drinks Easy
  18. Traceroute(路由追踪)的原理及实现
  19. 鬼知道是啥系列之——STL(lower_bound(),upper_bound() )
  20. 2019CVPR《Mask Scoring R-CNN》

热门文章

  1. 两行代码搞定UI主流框架
  2. Hotel(poj 3667)
  3. [NOIP1998] 普及组
  4. POJ3233:Matrix Power Series
  5. tiles
  6. Extjs.panel.Panel赋值的问题
  7. Cg入门6:函数2
  8. html 元素定位position-relative, absolute, fixed, static
  9. 项目中遇到的HQL查询问题
  10. UI 经常用法总结之--- UILabel UITextField (不断更新中)