【Java多线程】CompletionService
2024-10-16 07:29:09
什么是CompletionService?
当我们使用ExecutorService启动多个Callable时,每个Callable返回一个Future,而当我们执行Future的get方法获取结果时,可能拿到的Future并不是第一个执行完成的Callable的Future,就会进行阻塞,从而不能获取到第一个完成的Callable结果,那么这样就造成了很严重的性能损耗问题。
而CompletionService正是为了解决这个问题,它是Java8的新增接口,它的实现类是ExecutorCompletionService
。CompletionService会根据线程池中Task的执行结果按执行完成的先后顺序排序,任务先完成的可优先获取到。
ExecutorCompletionService中的方法
构造方法
构建ExecutorCompletionService对象
executor:关联的线程池
completionQueue:自定义的结果存储队列
ExecutorCompletionService(Executor executor)
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
submit方法
提交一个Callable或者Runnable类型的任务,并返回Future
Future<V> submit(Callable<V> task)
Future<V> submit(Runnable task, V result)
take方法
阻塞方法,从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会阻塞,直到有任务完成返回结果。
Future<V> take() throws InterruptedException
poll方法
从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会返回null,该方法不会阻塞。
timeout:最多等待多长时间
unit:时间单位
Future<V> poll()
Future<V> poll(long timeout, TimeUnit unit)
案例
问题复现
不使用CompletionService时出现的问题
package com.brycen.part3.threadpool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<Integer>> callables = Arrays.asList(
()->{
mySleep(20);
System.out.println("=============20 end==============");
return 20;
},
()->{
mySleep(10);
System.out.println("=============10 end==============");
return 10;
}
);
List<Future<Integer>> futures = new ArrayList<>();
//提交任务,并将future添加到list集合中
futures.add(executorService.submit(callables.get(0)));
futures.add(executorService.submit(callables.get(1)));
//遍历Future,因为不知道哪个任务先完成,所以这边模拟第一个拿到的就是执行时间最长的任务,那么执行时间较短的任务就必须等待执行时间长的任务执行完
for (Future future:futures) {
System.out.println("结果: "+future.get());
}
System.out.println("============main end=============");
}
private static void mySleep(int seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
- 即使休眠10秒的任务先执行完成也不会输出结果,因为在拿结果的时候可能先拿的休眠20秒的任务的结果,而休眠20秒的任务还没有执行完,此时就会阻塞住,从而影响了性能。
=============10 end==============
=============20 end==============
结果: 20
结果: 10
============main end=============
利用CompletionService解决问题
package com.brycen.part3.threadpool;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<Integer>> callables = Arrays.asList(
()->{
mySleep(20);
System.out.println("=============20 end==============");
return 20;
},
()->{
mySleep(10);
System.out.println("=============10 end==============");
return 10;
}
);
//构建ExecutorCompletionService,与线程池关联
CompletionService completionService = new ExecutorCompletionService(executorService);
//提交Callable任务
completionService.submit(callables.get(0));
completionService.submit(callables.get(1));
//获取future结果,不会阻塞
Future<Integer> pollFuture = completionService.poll();
//这里因为没有执行完成的Callable,所以返回null
System.out.println(pollFuture);
//获取future结果,最多等待3秒,不会阻塞
Future<Integer> pollTimeOutFuture = completionService.poll(3,TimeUnit.SECONDS);
//这里因为没有执行完成的Callable,所以返回null
System.out.println(pollTimeOutFuture);
//通过take获取Future结果,此方法会阻塞
for(int i=0;i<callables.size();i++){
System.out.println(completionService.take().get());
}
System.out.println("============main end=============");
}
private static void mySleep(int seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
null
null
=============10 end==============
10
=============20 end==============
20
============main end=============
文档:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CompletionService.html
最新文章
- SQL批量更新 关系表更新
- 去繁从简--简化Message和Signal设置
- Git撤销操作
- TFS简介
- Hive的Transform功能
- GreenDao官方文档翻译(上)
- Codeforces Gym 100637B B. Lunch 找规律
- (转)使用Migrations更新数据库结构(Code First )
- ANDROID_MARS学习笔记_S04_009_用java.lang.ref.SoftReference作缓存,android.os.Handler和new Thread异步加载略图片
- Qt入门(2)——使用Qt编写的Hello world
- chapter8_4 错误处理
- 记一个CRenderTarget中的BUG及解决办法
- BZOJ 2839: 集合计数 [容斥原理 组合]
- mac下redis安装、设置、启动停止
- Android 6.0出现的init: cannot execve(‘XXX’):Permission denied问题:禁止SELINUX的权限设置
- hive数据类型及其数据转换
- C++ DWORD 转byte char 数组、指针
- Linux 内存占用大排查
- [CSS] Frequently used method or solutions for issues
- WebForm跨页面传值取值、C#服务端跳转页面、 Button的OnClientClick属性和超链接点击弹出警示框
热门文章
- [python]django的mode设置表结构和serializers序列化数据
- 深入理解Spring IOC容器及扩展
- [源码解析] PyTorch 分布式(8) -------- DistributedDataParallel之论文篇
- 【华为昇腾】 序言:从昇腾AI软硬件平台聊起
- [cf1515I]Phoenix and Diamonds
- MS17-010漏洞利用
- idea配置MyBatis
- radio两行每行只能选择一个的解决方案!
- k8s statefulset controller源码分析
- MySQL的B+树索引和hash索引的区别