java.util.concurrent
public class ForkJoinPool extends AbstractExecutorService
public abstract class ForkJoinTask<V> implements Future<V>, Serializable
public class ForkJoinWorkerThread extends Thread

Fork/Join

Fork/Join技术是分治算法(Divide-and-Conquer)的并行实现,它是一项可以获得良好的并行性能的简单且高效的设计技术。

ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,
把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。 JDK用来执行Fork/Join任务的工作线程池默认大小等于CPU核心数。在一个4核CPU上,最多可以同时执行4个子任务。 目的是为了帮助我们更好地利用多处理器带来的好处,使用所有可用的运算能力来提升应用的性能。 1. Fork/Join框架主要由ForkJoinPool、ForkJoinWorkerThread和ForkJoinTask来实现 2. ForkJoinPool中只可以运行ForkJoinTask类型的任务
(在实际使用中,也可以接收Runnable/Callable任务,但在真正运行时,也会把这些任务封装成ForkJoinTask类型的任务) 3. ForkJoinTask表示一个任务,
ForkJoinTask的子类中有RecursiveAction和RecursiveTask
RecursiveAction无返回结果,RecursiveTask有返回结果
工作中,一般重写RecursiveAction或RecursiveTask的compute(),完成计算或者可以进行任务拆分。 4. 调用ForkJoinTask的fork()的方法,可以让其他空闲的线程执行这个ForkJoinTask
调用ForkJoinTask的join()的方法,将多个小任务的结果进行汇总。 5. ForkJoinWorkerThread是运行ForkJoinTask任务的工作线程

work-stealing(工作窃取)算法

ForkJoinPool中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),

工作线程优先处理来自自身队列的任务(LIFO),然后以FIFO的顺序随机窃取其他队列中的任务。

ForkJoinPool

ForkJoinPool并行的实现了分治算法(Divide-and-Conquer):把任务递归的拆分为各个子任务,这样可以更好的利用系统资源

ForkJoinPool中的任务分为两种:

  1. 一种是本地提交的任务(Submission task,如 execute、submit 提交的任务)
  2. 另外一种是 fork 出的子任务(Worker task)

提交任务

外部任务(external/submissions task)提交三种方式:

    execute()是直接向池提交一个任务来异步执行,无返回结果;

    invoke()会一直阻塞到任务执行完成返回计算结果

    submit()也是异步执行,但是会返回提交的任务,在适当的时候可通过task.get()获取执行结果,实现同步到主线程

子任务(Worker task)提交:
由任务的fork()方法完成。
任务被分割(fork)之后调用了ForkJoinPool.WorkQueue.push()方法直接把任务放到队列中等待被执行。 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
t2.fork();
if (((s1 = t1.doInvoke()) & ABNORMAL) != 0)
t1.reportException(s1);
if (((s2 = t2.doJoin()) & ABNORMAL) != 0)
t2.reportException(s2);
}

示例

对于fork/join模式,假如pool里面线程数量是固定的,那么调用子任务的fork方法相当于A先分工给B和C,
然后A当监工不干活,B和C去完成A交代的任务。
所以上面的模式相当于浪费了一个线程。 如果使用invokeAll相当于A分工给B和C后,A和B和C都去完成工作。这样缩短了执行的时间。 /**
* 测试 ForkJoinPool 线程池的使用
*/
public class task { /**
* 测试使用 ForkJoinPool 无返回值的任务执行
*/
public static void noResultTask() throws Exception {
ForkJoinPool pool = new ForkJoinPool();
pool.submit(new PrintTask(1, 200));
pool.shutdown();
}
} /**
* 无返回值的打印任务
*/
class PrintTask extends RecursiveAction { private static final long serialVersionUID = 1L;
private static final int THRESHOLD = 50;
private int start;
private int end; public PrintTask(int start, int end) {
this.start = start;
this.end = end;
} @Override
protected void compute() {
//当 结束值 比 起始值 大于 50 时,按数值区间平均拆分为两个任务;否则直接打印该区间的值
if (end - start < THRESHOLD) {
for (int i = start; i <= end; i++) {
System.out.println(Thread.currentThread().getName() + ", i = " + i);
}
} else {
int middle = (start + end) / 2;
//分成两个子任务
PrintTask firstTask = new PrintTask(start, middle);
PrintTask secondTask = new PrintTask(middle + 1, end);
//任务拆分
//firstTask.fork();
//secondTask.fork();
invokeAll(firstTask,secondTask);
}
}
} public class task { /**
* 测试使用 ForkJoinPool 有返回值的任务执行,对结果进行合并。计算 1 到 200 的累加和
*/
public static void hasResultTask() throws Exception {
//线程池
ForkJoinPool pool = new ForkJoinPool();
//提交任务
ForkJoinTask<Integer> task = pool.submit(new CalculateTask(1, 200));
//得到结果
int result = task.get();
//关闭线程
pool.shutdown();
} } /**
* 有返回值的计算任务
*/
class CalculateTask extends RecursiveTask<Integer> { private static final long serialVersionUID = 1L;
private static final int THRESHOLD = 50;
private int start;
private int end; public CalculateTask(int start, int end) {
super();
this.start = start;
this.end = end;
} @Override
protected Integer compute() {
//当 结束值 比 起始值 大于 50 时,按数值区间平均拆分为两个任务,进行两个任务的累加值汇总
//否则直接计算累加值
if (end - start <= THRESHOLD) {
int result = 0;
for (int i = start; i <= end; i++) {
result += i;
}
return result;
} else {
int middle = (start + end) / 2;
CalculateTask firstTask = new CalculateTask(start, middle);
CalculateTask secondTask = new CalculateTask(middle + 1, end);
//任务拆分
invokeAll(firstTask,secondTask);
//任务合并
return firstTask.join() + secondTask.join();
}
} }

最新文章

  1. Python获取中国证券报最新资讯
  2. C#实现jQuery的方法连缀
  3. PhoneGap与Jquery Mobile组合开发android应用的配置
  4. 解决Tomcat catalina.out 不断成长导致档案过大的问题
  5. DDoS-Deflate安装及配置
  6. Win7安装IDL8.0以及破解
  7. Biba模型简介
  8. Java使用java命令运行程序出现:找不到主类错误
  9. [Cocos2d-x]节点之间的相互通讯
  10. Java Web整合开发(附录2) -- NetBeans的(默认)快捷键
  11. flashbuilder mx组件 MenuBar
  12. Java基础之RTTI 运行时类型识别
  13. 201521123101 《Java程序设计》第5周学习总结
  14. Nginx干货(一)隐藏Nginx标识与版本号
  15. [邀月博客] SQL Server 2008中SQL增强之二:Top新用途
  16. 哪些领域适合开发微信小程序
  17. jackson中@JsonProperty、@JsonIgnore等常用注解总结
  18. django by example 第四章 扩展 User 模型( model)
  19. 【杂谈】线程中断——Interrupt
  20. dma传输数据长度,在启动前必须确保是一个大于0的数字

热门文章

  1. 批量新增数据(BuckCopy)
  2. PHP ftp_mkdir() 函数
  3. Shiro学习(10)Session管理
  4. J2EE学习篇之--JDBC详解
  5. CF 600E. Lomsat gelral(dsu on tree)
  6. 前端使用vue-i18n做中英文翻译
  7. 可持化永久树 的 STL ( rope )
  8. 十折交叉验证10-fold cross validation, 数据集划分 训练集 验证集 测试集
  9. Python练习题中做错题目
  10. fiddler抓包工具详解