一、背景

虽然目前处理器核心数已经发展到很大数目,但是按任务并发处理并不能完全充分的利用处理器资源,因为一般的应用程序没有那么多的并发处理任务。基于这种现状,考虑把一个任务拆分成多个单元,每个单元分别得到执行,最后合并每个单元的结果。

Fork/Join框架是JAVA7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。它非常类似于HADOOP提供的MapReduce框架,只是MapReduce的任务可以针对集群内的所有计算节点,可以充分利用集群的能力完成计算任务。ForkJoin更加类似于单机版的MapReduce。

二、工作窃取算法

指的是某个线程从其他队列里窃取任务来执行。使用的场景是一个大任务拆分成多个小任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列中,并且每个队列都有单独的线程来执行队列里的任务,线程和队列一一对应。但是会出现这样一种情况:A线程处理完了自己队列的任务,B线程的队列里还有很多任务要处理。A是一个很热情的线程,想过去帮忙,但是如果两个线程访问同一个队列,会产生竞争,所以A想了一个办法,从双端队列的尾部拿任务执行。而B线程永远是从双端队列的头部拿任务执行(任务是一个个独立的小任务),这样感觉A线程像是小偷在窃取B线程的东西一样。

工作窃取算法的优点:

         利用了线程进行并行计算,减少了线程间的竞争。

工作窃取算法的缺点:

1、如果双端队列中只有一个任务时,线程间会存在竞争。

2、窃取算法消耗了更多的系统资源,如会创建多个线程和多个双端队列。

三、框架设计

Fork/Join中两个重要的类:

1、ForkJoinTask:使用该框架,需要创建一个ForkJoin任务,它提供在任务中执行fork和join操作的机制。一般情况下,我们并不需要直接继承ForkJoinTask类,只需要继承它的子类,它的子类有两个:

a、RecursiveAction:用于没有返回结果的任务。

b、RecursiveTask:用于有返回结果的任务。

2、ForkJoinPool:任务ForkJoinTask需要通过ForkJoinPool来执行。

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask; public class CountTask extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
// 阈值
private static final int THRESHOLD = 2;
private int start;
private int end; public CountTask(int start, int end) {
this.start = start;
this.end = end;
} @Override
protected Integer compute() {
int sum = 0;
// 判断任务是否足够小
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
// 如果小于阈值,就进行运算
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println(Thread.currentThread().getName()+" A sum:"+sum);
} else {
// 如果大于阈值,就再进行任务拆分
int middle = (start + end) / 2;
System.out.println(Thread.currentThread().getName()+" start:"+start+",middle:"+middle+",end:"+end);
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待子任务执行完,并得到执行结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
System.out.println(Thread.currentThread().getName()+" B sum:"+sum);
}
return sum;
} public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();// 这边也可以指定一个最大线程数
CountTask task = new CountTask(1, 10);
// 执行一个任务
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} } }

这个程序是将1+2+3+4+5+6拆分成1+2;3+4;5+6三个部分进行子程序进行计算后合并。

四、源码解读

1、leftTask.fork();

1 public final ForkJoinTask<V> fork() {
2 Thread t;
3 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
4 ((ForkJoinWorkerThread)t).workQueue.push(this);
5 else
6 ForkJoinPool.common.externalPush(this);
7 return this;
8 }

fork方法内部会先判断当前线程是否是ForkJoinWorkerThread的实例,如果满足条件,则将task任务push到当前线程所维护的双端队列中。

 1  final void push(ForkJoinTask<?> task) {
2 ForkJoinTask<?>[] a; ForkJoinPool p;
3 int b = base, s = top, n;
4 if ((a = array) != null) { // ignore if queue removed
5 int m = a.length - 1; // fenced write for task visibility
6 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
7 U.putOrderedInt(this, QTOP, s + 1);
8 if ((n = s - b) <= 1) {
9 if ((p = pool) != null)
10 p.signalWork(p.workQueues, this);
11 }
12 else if (n >= m)
13 growArray();
14 }
15 }

在push方法中,会调用ForkJoinPool的signalWork方法唤醒或创建一个工作线程来异步执行该task任务。

2、

 public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}

通过doJoin方法返回的任务状态来判断,如果不是NORMAL,则抛异常:

 private void reportException(int s) {
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
}

来看下doJoin方法:

private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}

先查看任务状态,如果已经完成,则直接返回任务状态;如果没有完成,则从任务队列中取出任务并执行。

最新文章

  1. IOS Core Animation Advanced Techniques的学习笔记(五)
  2. java 环境配置 及java 历史
  3. webservice 服务端例子+客户端例子+CXF整合spring服务端测试+生成wsdl文件 +cxf客户端代码自动生成
  4. CentOS的SSH,Putty配置说明
  5. 在Windows7上安装coreseek3.2同时在PHP下简单实现步骤
  6. 设置sudo不输入密码 sudoers 编辑出错后的补救方法
  7. Discuz!图片查看插件(支持鼠标缩放、实际大小、旋转、下载)
  8. ANDROID_MARS学习笔记_S04_002_用AsyncTask实现异步操作
  9. Urxvt - awesome
  10. jQuery form插件使用详解
  11. 38.Odoo产品分析 (四) – 工具板块(7) – 车队管理(2)
  12. pyQT4和pyQT5的主要模块介绍
  13. blfs(systemd版本)学习笔记-wget的安装与配置
  14. Selenium Webdriver 动态设置 Proxy
  15. 老菜鸟学习:Javascript 将html转成pdf
  16. 线段树 || BZOJ 1112: [POI2008]砖块Klo
  17. 背水一战 Windows 10 (51) - 控件(集合类): ItemsControl - 项模板选择器, 数据分组
  18. not available in automatic reference counting mode
  19. UVa 1590 IP网络(简单位运算)
  20. Python常见初级错误

热门文章

  1. linux shell实现随机数多种方法(date,random,uuid)
  2. Ubuntu 16.04安装OpenVPN客户端GUI
  3. 我告诉你MSDN
  4. delphi AlphaControls
  5. MFC获取纸张大小
  6. 二十四种设计模式:策略模式(Strategy Pattern)
  7. iOS APP 上传
  8. 如何在Jconsole 监控 Jboss Tomcat
  9. MediaWiki怎样重置用户password
  10. Navicat Premium如何打开SQL文件.MDF和.LDF文件