Fork/Join框架

Fork/Join 以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。

这个过程其实就是分治算法的并行版本,图解如下:

如何使用

我们要使用 ForkJoin 框架,必须先创建一个 ForkJoinTask。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:

  • RecursiveAction:用于没有返回结果的任务。
  • RecursiveTask :用于有返回结果的任务。

而ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。

下面使用forkJoin来计算一个Integer List之和:


import com.google.common.collect.Lists;
import com.sun.istack.internal.NotNull; import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;
import java.util.stream.IntStream; public class SumTask extends RecursiveTask<Long> { private static final int SPLIT_NUM = 10000;
@NotNull
private List<Integer> numberList; public SumTask(List<Integer> numberList) {
this.numberList = numberList;
} @Override
protected Long compute() {
// 不需要进行任务拆分
if (numberList.size() <= SPLIT_NUM) {
return numberList.stream().mapToLong(Integer::intValue).sum();
}
// 进行任务拆分
List<List<Integer>> splitNumberList = Lists.partition(numberList, numberList.size() / 2);
List<SumTask> sumTasks = splitNumberList.stream().map(SumTask::new).collect(Collectors.toList());
// 执行子任务,继续拆分
invokeAll(sumTasks);
// 合并结果
return sumTasks.stream().mapToLong(SumTask::join).sum();
} public static void main(String[] args) {
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
List<Integer> numberList = IntStream.rangeClosed(1, 100000).mapToObj(Integer::new).collect(Collectors.toList());
SumTask sumTask = new SumTask(numberList);
Long sum = forkJoinPool.invoke(sumTask);
System.out.println("1 ~ 100000 sum result is: " + sum);
}
}

上述代码的执行过程为,先将List 分为两个子List, 并发执行两个子List 的计算。然后再将子List 拆分为更小的List,依此往复,直至List无法再拆分时,计算其Sum,最后合并结果。

ForkJoinTask 与一般的任务的主要区别在于它需要实现 compute 方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用 fork 方法时,又会进入 compute 方法,形成递归调用,直到任务子任务不可再分。使用 join 方法会等待子任务执行完并得到其结果。

异常捕获

我们没办法在主线程中捕获ForkJoinTask 执行过程中抛出的异常。所以ForkJoinTask 提供了方法来检测Task 执行情况, 并提供了获取异常的方法。

        // 检查Task 执行情况
sumTask.isCancelled();
sumTask.isCompletedNormally();
sumTask.isCompletedAbnormally(); // 获取异常信息
sumTask.getException();

实现原理

参考 https://segmentfault.com/a/1190000016781127

最新文章

  1. VS2013新建MVC5项目,使用nuget更新项目引用后发生Newtonsoft.Json引用冲突的解决办法
  2. EF深入系列--细节
  3. OC9_代理正向传值
  4. 【转载】干货再次来袭!Linux小白最佳实践:《超容易的Linux系统管理入门书》(连载八)用命令实现批量添加用户
  5. HDU4632:Palindrome subsequence(区间DP)
  6. C#-设置窗体在显示器居中显示
  7. scss组件定制的一些学习
  8. 让WordPress的作者在后台只能看到自己的文章
  9. HDU 4630、BOJ 某题
  10. 如何判断webview是不是滑到底部
  11. Java实现一个简单的循环队列
  12. spring 应用
  13. bash array
  14. servlet 的servletContext
  15. scrapy 下载图片 from cuiqingcai
  16. python 小练习 8
  17. javascript 创建对象方式
  18. Windows Server 2003出现Directory Listing Denied This Virtual Directory does not allow contents to be listed.的解决方案
  19. 对va_list; va_start ; va_end ;vsprintf理解(转)
  20. css设置超出部分文档隐藏(在table标签中不好使解决方案在下)

热门文章

  1. Kubernetes的资源管理
  2. ABP Framework V4.4 RC 新增功能介绍
  3. 16 自动发布PHP项目
  4. 21、oracle打开和关闭归档日志的方法
  5. Samba常见漏洞利用
  6. Nginx:Nginx动静分离
  7. 在Xshell中文件内容显示乱码
  8. mybatis框架学习第一天
  9. ARTS起始篇
  10. VMware中的虚机如何挂载U盘