现代的计算机已经向多CPU方向发展,即使是普通的PC,甚至现在的智能手机、多核处理器已被广泛应用。在未来,处理器的核心数将会发展的越来越多。

虽然硬件上的多核CPU已经十分成熟,但是很多应用程序并未这种多核CPU做好准备,因此并不能很好地利用多核CPU的性能优势。

为了充分利用多CPU、多核CPU的性能优势,级软基软件系统应该可以充分“挖掘”每个CPU的计算能力,决不能让某个CPU处于“空闲”状态。为此,可以考虑把一个任务拆分成多个“小任务”,把多个"小任务"放到多个处理器核心上并行执行。当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

如下面的示意图所示:

第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。

第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Java提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合成总的计算结果。

ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool提供了如下两个常用的构造器。

  • public ForkJoinPool(int parallelism):创建一个包含parallelism个并行线程的ForkJoinPool
  • public ForkJoinPool() :以Runtime.getRuntime().availableProcessors()的返回值作为parallelism来创建ForkJoinPool

创建ForkJoinPool实例后,可以钓鱼ForkJoinPool的submit(ForkJoinTask<T> task)或者invoke(ForkJoinTask<T> task)来执行指定任务。其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它有两个抽象子类:RecursiveAction和RecursiveTask。

  • RecursiveTask代表有返回值的任务
  • RecursiveAction代表没有返回值的任务。

一、RecursiveAction

下面以一个没有返回值的大任务为例,介绍一下RecursiveAction的用法。

大任务是:打印0-200的数值。

小任务是:每次只能打印50个数值。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit; //RecursiveAction为ForkJoinTask的抽象子类,没有返回值的任务
class PrintTask extends RecursiveAction {
// 每个"小任务"最多只打印50个数
private static final int MAX = 50; private int start;
private int end; PrintTask(int start, int end) {
this.start = start;
this.end = end;
} @Override
protected void compute() {
// 当end-start的值小于MAX时候,开始打印
if ((end - start) < MAX) {
for (int i = start; i < end; i++) {
System.out.println(Thread.currentThread().getName() + "的i值:"
+ i);
}
} else {
// 将大任务分解成两个小任务
int middle = (start + end) / 2;
PrintTask left = new PrintTask(start, middle);
PrintTask right = new PrintTask(middle, end);
// 并行执行两个小任务
left.fork();
right.fork();
}
}
} public class ForkJoinPoolTest {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 提交可分解的PrintTask任务
forkJoinPool.submit(new PrintTask(0, 200));
forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
// 关闭线程池
forkJoinPool.shutdown();
} }

运行结果如下:

ForkJoinPool-1-worker-2的i值:75
ForkJoinPool-1-worker-2的i值:76
ForkJoinPool-1-worker-2的i值:77
ForkJoinPool-1-worker-2的i值:78
ForkJoinPool-1-worker-2的i值:79
ForkJoinPool-1-worker-2的i值:80
ForkJoinPool-1-worker-2的i值:81
ForkJoinPool-1-worker-2的i值:82
ForkJoinPool-1-worker-2的i值:83
ForkJoinPool-1-worker-2的i值:84
ForkJoinPool-1-worker-2的i值:85
ForkJoinPool-1-worker-2的i值:86
ForkJoinPool-1-worker-2的i值:87
ForkJoinPool-1-worker-2的i值:88
ForkJoinPool-1-worker-2的i值:89
ForkJoinPool-1-worker-2的i值:90
ForkJoinPool-1-worker-2的i值:91
ForkJoinPool-1-worker-2的i值:92
ForkJoinPool-1-worker-2的i值:93
ForkJoinPool-1-worker-2的i值:94
ForkJoinPool-1-worker-2的i值:95
ForkJoinPool-1-worker-2的i值:96
ForkJoinPool-1-worker-2的i值:97
ForkJoinPool-1-worker-2的i值:98
ForkJoinPool-1-worker-2的i值:99
ForkJoinPool-1-worker-2的i值:50
ForkJoinPool-1-worker-2的i值:51
ForkJoinPool-1-worker-2的i值:52
ForkJoinPool-1-worker-2的i值:53
ForkJoinPool-1-worker-2的i值:54
ForkJoinPool-1-worker-2的i值:55
ForkJoinPool-1-worker-2的i值:56
ForkJoinPool-1-worker-2的i值:57
ForkJoinPool-1-worker-2的i值:58
ForkJoinPool-1-worker-2的i值:59
ForkJoinPool-1-worker-2的i值:60
ForkJoinPool-1-worker-2的i值:61
ForkJoinPool-1-worker-2的i值:62
ForkJoinPool-1-worker-2的i值:63
ForkJoinPool-1-worker-2的i值:64
ForkJoinPool-1-worker-2的i值:65
ForkJoinPool-1-worker-2的i值:66
ForkJoinPool-1-worker-2的i值:67
ForkJoinPool-1-worker-2的i值:68
ForkJoinPool-1-worker-2的i值:69
ForkJoinPool-1-worker-1的i值:175
ForkJoinPool-1-worker-1的i值:176
ForkJoinPool-1-worker-1的i值:177
ForkJoinPool-1-worker-1的i值:178
ForkJoinPool-1-worker-1的i值:179
ForkJoinPool-1-worker-1的i值:180
ForkJoinPool-1-worker-1的i值:181
ForkJoinPool-1-worker-1的i值:182
ForkJoinPool-1-worker-1的i值:183
ForkJoinPool-1-worker-1的i值:184
ForkJoinPool-1-worker-1的i值:185
ForkJoinPool-1-worker-1的i值:186
ForkJoinPool-1-worker-1的i值:187
ForkJoinPool-1-worker-1的i值:188
ForkJoinPool-1-worker-1的i值:189
ForkJoinPool-1-worker-1的i值:190
ForkJoinPool-1-worker-1的i值:191
ForkJoinPool-1-worker-1的i值:192
ForkJoinPool-1-worker-1的i值:193
ForkJoinPool-1-worker-1的i值:194
ForkJoinPool-1-worker-1的i值:195
ForkJoinPool-1-worker-1的i值:196
ForkJoinPool-1-worker-1的i值:197
ForkJoinPool-1-worker-1的i值:198
ForkJoinPool-1-worker-1的i值:199
ForkJoinPool-1-worker-1的i值:150
ForkJoinPool-1-worker-1的i值:151
ForkJoinPool-1-worker-1的i值:152
ForkJoinPool-1-worker-1的i值:153
ForkJoinPool-1-worker-1的i值:154
ForkJoinPool-1-worker-1的i值:155
ForkJoinPool-1-worker-1的i值:156
ForkJoinPool-1-worker-1的i值:157
ForkJoinPool-1-worker-1的i值:158
ForkJoinPool-1-worker-1的i值:159
ForkJoinPool-1-worker-1的i值:160
ForkJoinPool-1-worker-1的i值:161
ForkJoinPool-1-worker-1的i值:162
ForkJoinPool-1-worker-1的i值:163
ForkJoinPool-1-worker-1的i值:164
ForkJoinPool-1-worker-1的i值:165
ForkJoinPool-1-worker-1的i值:166
ForkJoinPool-1-worker-1的i值:167
ForkJoinPool-1-worker-1的i值:168
ForkJoinPool-1-worker-1的i值:169
ForkJoinPool-1-worker-1的i值:170
ForkJoinPool-1-worker-1的i值:171
ForkJoinPool-1-worker-1的i值:172
ForkJoinPool-1-worker-1的i值:173
ForkJoinPool-1-worker-1的i值:174
ForkJoinPool-1-worker-1的i值:125
ForkJoinPool-1-worker-1的i值:126
ForkJoinPool-1-worker-1的i值:127
ForkJoinPool-1-worker-1的i值:128
ForkJoinPool-1-worker-1的i值:129
ForkJoinPool-1-worker-1的i值:130
ForkJoinPool-1-worker-1的i值:131
ForkJoinPool-1-worker-1的i值:132
ForkJoinPool-1-worker-1的i值:133
ForkJoinPool-1-worker-1的i值:134
ForkJoinPool-1-worker-1的i值:135
ForkJoinPool-1-worker-1的i值:136
ForkJoinPool-1-worker-1的i值:137
ForkJoinPool-1-worker-1的i值:138
ForkJoinPool-1-worker-1的i值:139
ForkJoinPool-1-worker-1的i值:140
ForkJoinPool-1-worker-1的i值:141
ForkJoinPool-1-worker-1的i值:142
ForkJoinPool-1-worker-1的i值:143
ForkJoinPool-1-worker-1的i值:144
ForkJoinPool-1-worker-1的i值:145
ForkJoinPool-1-worker-1的i值:146
ForkJoinPool-1-worker-1的i值:147
ForkJoinPool-1-worker-1的i值:148
ForkJoinPool-1-worker-1的i值:149
ForkJoinPool-1-worker-1的i值:100
ForkJoinPool-1-worker-1的i值:101
ForkJoinPool-1-worker-1的i值:102
ForkJoinPool-1-worker-1的i值:103
ForkJoinPool-1-worker-1的i值:104
ForkJoinPool-1-worker-1的i值:105
ForkJoinPool-1-worker-1的i值:106
ForkJoinPool-1-worker-1的i值:107
ForkJoinPool-1-worker-1的i值:108
ForkJoinPool-1-worker-1的i值:109
ForkJoinPool-1-worker-1的i值:110
ForkJoinPool-1-worker-1的i值:111
ForkJoinPool-1-worker-1的i值:112
ForkJoinPool-1-worker-1的i值:113
ForkJoinPool-1-worker-1的i值:114
ForkJoinPool-1-worker-1的i值:115
ForkJoinPool-1-worker-1的i值:116
ForkJoinPool-1-worker-1的i值:117
ForkJoinPool-1-worker-1的i值:118
ForkJoinPool-1-worker-1的i值:119
ForkJoinPool-1-worker-1的i值:120
ForkJoinPool-1-worker-1的i值:121
ForkJoinPool-1-worker-1的i值:122
ForkJoinPool-1-worker-1的i值:123
ForkJoinPool-1-worker-1的i值:124
ForkJoinPool-1-worker-1的i值:25
ForkJoinPool-1-worker-1的i值:26
ForkJoinPool-1-worker-1的i值:27
ForkJoinPool-1-worker-1的i值:28
ForkJoinPool-1-worker-1的i值:29
ForkJoinPool-1-worker-1的i值:30
ForkJoinPool-1-worker-1的i值:31
ForkJoinPool-1-worker-1的i值:32
ForkJoinPool-1-worker-1的i值:33
ForkJoinPool-1-worker-1的i值:34
ForkJoinPool-1-worker-1的i值:35
ForkJoinPool-1-worker-1的i值:36
ForkJoinPool-1-worker-1的i值:37
ForkJoinPool-1-worker-1的i值:38
ForkJoinPool-1-worker-1的i值:39
ForkJoinPool-1-worker-1的i值:40
ForkJoinPool-1-worker-1的i值:41
ForkJoinPool-1-worker-1的i值:42
ForkJoinPool-1-worker-1的i值:43
ForkJoinPool-1-worker-1的i值:44
ForkJoinPool-1-worker-1的i值:45
ForkJoinPool-1-worker-1的i值:46
ForkJoinPool-1-worker-1的i值:47
ForkJoinPool-1-worker-1的i值:48
ForkJoinPool-1-worker-1的i值:49
ForkJoinPool-1-worker-1的i值:0
ForkJoinPool-1-worker-1的i值:1
ForkJoinPool-1-worker-1的i值:2
ForkJoinPool-1-worker-1的i值:3
ForkJoinPool-1-worker-1的i值:4
ForkJoinPool-1-worker-1的i值:5
ForkJoinPool-1-worker-1的i值:6
ForkJoinPool-1-worker-1的i值:7
ForkJoinPool-1-worker-1的i值:8
ForkJoinPool-1-worker-1的i值:9
ForkJoinPool-1-worker-1的i值:10
ForkJoinPool-1-worker-1的i值:11
ForkJoinPool-1-worker-1的i值:12
ForkJoinPool-1-worker-1的i值:13
ForkJoinPool-1-worker-1的i值:14
ForkJoinPool-1-worker-1的i值:15
ForkJoinPool-1-worker-1的i值:16
ForkJoinPool-1-worker-1的i值:17
ForkJoinPool-1-worker-1的i值:18
ForkJoinPool-1-worker-1的i值:19
ForkJoinPool-1-worker-1的i值:20
ForkJoinPool-1-worker-1的i值:21
ForkJoinPool-1-worker-1的i值:22
ForkJoinPool-1-worker-1的i值:23
ForkJoinPool-1-worker-1的i值:24
ForkJoinPool-1-worker-2的i值:70
ForkJoinPool-1-worker-2的i值:71
ForkJoinPool-1-worker-2的i值:72
ForkJoinPool-1-worker-2的i值:73
ForkJoinPool-1-worker-2的i值:74

从上面结果来看,ForkJoinPool启动了两个线程来执行这个打印任务,这是因为笔者的计算机的CPU是双核的。不仅如此,读者可以看到程序虽然打印了0-199这两百个数字,但是并不是连续打印的,这是因为程序将这个打印任务进行了分解,分解后的任务会并行执行,所以不会按顺序从0打印 到199。

二、RecursiveTask

下面以一个有返回值的大任务为例,介绍一下RecursiveTask的用法。

大任务是:计算随机的100个数字的和。

小任务是:每次只能20个数值的和。

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask; //RecursiveTask为ForkJoinTask的抽象子类,有返回值的任务
class SumTask extends RecursiveTask<Integer> {
// 每个"小任务"最多只打印50个数
private static final int MAX = 20;
private int arr[];
private int start;
private int end; SumTask(int arr[], int start, int end) {
this.arr = arr;
this.start = start;
this.end = end;
} @Override
protected Integer compute() {
int sum = 0;
// 当end-start的值小于MAX时候,开始打印
if ((end - start) < MAX) {
for (int i = start; i < end; i++) {
sum += arr[i];
}
return sum;
} else {
System.err.println("=====任务分解======");
// 将大任务分解成两个小任务
int middle = (start + end) / 2;
SumTask left = new SumTask(arr, start, middle);
SumTask right = new SumTask(arr, middle, end);
// 并行执行两个小任务
left.fork();
right.fork();
// 把两个小任务累加的结果合并起来
return left.join() + right.join();
}
} } public class ForkJoinPoolTest2 {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int arr[] = new int[100];
Random random = new Random();
int total = 0;
// 初始化100个数字元素
for (int i = 0; i < arr.length; i++) {
int temp = random.nextInt(100);
// 对数组元素赋值,并将数组元素的值添加到total总和中
total += (arr[i] = temp);
}
System.out.println("初始化时的总和=" + total);
// 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 提交可分解的PrintTask任务
Future<Integer> future = forkJoinPool.submit(new SumTask(arr, 0,
arr.length));
System.out.println("计算出来的总和=" + future.get());
// 关闭线程池
forkJoinPool.shutdown();
} }

计算结果如下:

初始化时的总和=4283
=====任务分解======
=====任务分解======
=====任务分解======
=====任务分解======
=====任务分解======
=====任务分解======
=====任务分解======
计算出来的总和=4283

从上面结果来看,ForkJoinPool将任务分解了7次,程序通过SumTask计算出来的结果,和初始化数组时统计出来的总和是相等的,这表明计算结果一切正常。

读者还参考以下文章加深对ForkJoinPool的理解:

http://www.infoq.com/cn/articles/fork-join-introduction/

http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/

==================================================================================================

  作者:欧阳鹏  欢迎转载,与人分享是进步的源泉!

  转载请保留原文地址:http://blog.csdn.net/ouyang_peng

==================================================================================================

最新文章

  1. 原生ajax实现登录(一部分代码)
  2. sql 比模糊查询速度快的查询方法
  3. django 同步数据库
  4. C# 多线程的等待所有线程结束 用 ManualResetEvent 控制
  5. nginx配置PATH_INFO模式
  6. CSS Hack技术(一)
  7. Swift数据类型及数据类型转换
  8. 微信公众号支付流程(Node实现)
  9. [转] __thread关键字
  10. typedef与define的区别
  11. lucene+盘古分词
  12. Android tools:context=&quot;.MainActivity&quot;的作用
  13. maven 阿里云仓库配置
  14. js获取单选框的值
  15. vue2总结
  16. 【死磕 Spring】—— IoC 之深入理解 Spring IoC
  17. Ubuntu集群 配置ntp服务
  18. 【神经网络】BP反向传播神经网络
  19. mysql批量新增或者更新
  20. JavaScript学习笔记(七)—— 再说函数

热门文章

  1. 深入理解 JavaScript Function
  2. Android BroadcastReceiver 注册和反注册
  3. 区间最小值(2) (线段树 更新区间)2015年 JXNU_ACS 算法组暑假第一次周赛
  4. 转:使用Fabric自动化你的任务
  5. Netty4.0 用户指南
  6. Android Studio中利用JavaDoc生成项目API文档
  7. 聚合数据Android SDK 12306火车票查询订票演示示例
  8. Iowait的成因、对系统影响及对策
  9. Allegro Desgin Compare的用法与网表比较
  10. 浅谈 Fork/Join