前提概要

在开发过程中在使用多线程进行并行处理一些事情的时候,大部分场景在处理多线程并行执行任务的时候,可以通过List添加Future来获取执行结果,有时候我们是不需要获取任务的执行结果的,方便后面引出ExecutorCompletionService。

CompletionService的介绍

  • CompletionService 接口是一个独立的接口,并没有扩展ExecutorService 。 其默认实现类是ExecutorCompletionService。

  • 接口CompletionService 的功能是:以异步的方式一边执行未完成的任务,一边记录、处理已完成任务的结果。从而可以将任务的执行与处理任务的执行结果分离开来。

CompletionService的实现原理

  • CompletionService就是监视着 Executor线程池执行的任务,用BlockingQueue将完成的任务的结果存储下来。

  • 要不断遍历与每个任务关联的Future,然后不断去轮询,判断任务是否已经完成,功能比较繁琐。

public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

方法摘要

提交一个 Callable 任务;一旦完成,便可以由take()、poll()方法获取

Future submit(Callable task):

提交一个 Runnable 任务,并指定计算结果;

Future submit(Runnable task, V result):

获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。

Future take() throws InterruptedException

获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null。

Future poll()

获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则将等待指定的时间(如果有必要)。

Future poll(long timeout, TimeUnit unit) throws InterruptedException

例子,程序提交了多个任务,但只要有一个任务完成并返回一个非空的结果,并可以忽略掉其余的任务。

 void eample(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
CompletionService<Result> completionService = new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
//提交多个任务
for (Callable<Result> s : solvers)
futures.add(completionService.submit(s));
//
for (int i = 0; i < n; ++i) {
try {
//等待获取一个已经完成的任务
Result r = completionService.take().get();
//判断返回结果是否为空
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {}
}
}
finally {
//取消所有任务
for (Future<Result> f : futures)
f.cancel(true);
}
if (result != null)
use(result);
}

ExecutorCompletionService的介绍

  • ExecutorCompletionService内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。

  • ExecutorCompletionService实现了CompletionService,内部通过Executor以及BlockingQueue来实现接口提出的规范,ExecutorCompletionService,提交任务后,可以按任务返回结果的先后顺序来获取各任务执行后的结果,该类实现了接口CompletionService

构造方法

  • 指定一个Executor来执行任务,存储完成的任务的完成队列是LinkedBlockingQueue ;
  • Executor由调用者传递进来,而Blocking可以使用默认的LinkedBlockingQueue,也可以由调用者传递。
ExecutorCompletionService(Executor executor):

指定了任务执行器Executor和已完成的任务队列completionQueue

ExecutorCompletionService(Executor executor, BlockingQueue<Future> completionQueue)
实现构造器
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
  • 该接口定义了一系列方法:提交实现了Callable或Runnable接口的任务,并获取这些任务的结果。

  • 包装后提交任务的submit()方法,该类还会将提交的任务封装成QueueingFuture,这样就可以实现FutureTask.done()方法,以便于在任务执行完毕后,将结果放入阻塞队列中。

public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
QueueingFuture为内部类:

在提交任务时,将任务封装成QueueingFuture:

private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}

其中,done()方法就是在任务执行完毕后,将任务放入队列中。

  • 在调用take()、poll()方法时,会从阻塞队列中获取Future对象,以取得任务执行的结果。

  • 它继承自 FutureTask,并且重写了 done 方法,其方法把任务放到我们包装线程池创建的堵塞队列里面;就是当任务执行完成后,就会被放到队列里面去了。

  • 调用其take() 方法,就是阻塞等待,等到的一定是能够获取的结果的future,然后再调用get()方法获取执行结果;

最后,如果工作中并行处理任务不需要获取结果的,我们正常使用线程池提交就可以,任务技术只要适合工作的业务场景就是好的。

最新文章

  1. xutils3
  2. VBA编程常用语句
  3. HTML5学习总结-08 应用缓存(Application Cache)
  4. 异步-学习笔记3 Task
  5. 做leetcode的几点体会分享(转)
  6. Bootstrap新手学习笔记——css
  7. 高斯判别分析 Gaussian Discriminant Analysis
  8. Xcode - 方法注释插件
  9. win7 安装 redis +php扩展
  10. SQL Server索引进阶第十一篇:索引碎片分析与解决
  11. block、块级作用域
  12. LeetCode 104. Maximum Depth of Binary Tree (二叉树的最大深度)
  13. ecsmart的开发经历
  14. 136.137.260. Single Number &amp;&amp; 位运算
  15. 杭电ACM1285----确定比赛名次『拓扑排序』
  16. windows上安装Maven与Gradle
  17. Eclipse中项目不会自动编译问题的坑和注意点
  18. CentOS 7.4 初次手记:第三章 CentOS基础了解
  19. npm run dev 报错:missing script:dev
  20. 关于调用&amp;&amp;传址

热门文章

  1. 论文笔记:(TOG2019)DGCNN : Dynamic Graph CNN for Learning on Point Clouds
  2. 解决 Flask 项目无法用 .env 文件中解析的参数设置环境变量的错误
  3. Nmap的多进程应用与研究
  4. 基于Java和Bytemd用120行代码实现一个桌面版Markdown编辑器
  5. 关于shell脚本——echo、for语句、while语句、until语句
  6. Golang语言系列-09-接口
  7. Python语言系列-02-基础数据类型
  8. 有关SQL注入的一些小知识点
  9. 常见web中间件漏洞(四)Tomcat漏洞
  10. Git(12)-- Git 分支 - 分支简介