1. Callable、Future、RunnableFuture、FutureTask的继承关系

在多线程编程中,我们一般通过一个实现了Runnable接口的对象来创建一个线程,这个线程在内部会执行Runnable对象的run方法。如果说我们创建一个线程来完成某项工作,希望在完成以后该线程能够返回一个结果,但run方法的返回值是void类型,直接实现run方法并不可行,这时我们就要通过FutureTask类来间接实现。

FutureTask实现了RunnableFuture接口,而RunnableFuture接口实际上仅仅是Runnable接口和Future接口的合体。Future接口提供取消任务、检测任务是否执行完成、等待任务执行完成获得结果等方法。从图中可以看出,FutureTask类中的run方法已经实现好了(图中的代码仅仅是核心代码),这个run方法实际上就是调用了由构造函数传递进来的call方法,并将返回值存储在FutureTask的私有数据成员outcome中。这样一来我们将FutureTask传递给一个Thread时,表面上我们仍然执行的是run,但在run方法的内部实际上执行的是带有返回值的call方法,这样即使得java多线程的执行框架保持不变,又实现了线程完成后返回结果的功能。同时FutureTask又将结果存储在outcome中,我们可以通过调用FutureTask对象的get方法获取outcome(也就是call方法的返回结果)。

Future接口功能介绍

boolean cancel(boolean mayInterruptIfRunning);

功能:设置线程的中断标志位

参数:mayInterruptIfRunning为ture,如果线程可以取消则设置线程的中断标志位

返回值:若线程已经完成,返回false;否则返回true

注意:要实现取消线程执行的功能,call函数需要在循环条件中检查中断标志位,以跳出循环

boolean isCancelled();

判断线程是否取消

boolean isDone();

线程执行完成,返回true;如果cancel方法返回true,则该方法也返回true

V get() throws InterruptedException, ExecutionException;

获取call方法的返回结果,如果call方法没有执行完成,则会阻塞当前线程,直到call方法执行完毕,才被唤醒

V get(long timeout, TimeUnit unit)

设置时限的get方法。

2. Future及FutureTask的使用

Future以及FutureTask是线程池实现的基础元素,但不是说Future及FutureTask只能在线程池中才能使用,下面的例子就说明了FutureTask独立使用的情况。在这个例子中,我们首先随机产生了2000个整数存于数组中,然后创建了两个线程,一个线程寻找前1000个数的最大值,另个一线程寻找后1000个数的最大值。主线程比较这两个线程的返回结果来确定这2000个数的最大值值。

package javaleanning;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask; public class FutureDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException{
int[] a = new int[2000];
Random rd = new Random();
for(int i = 0; i < 2000; i++){
a[i] = rd.nextInt(20000);
} class FindMax implements Callable<Integer>{
private int begin,end,int a[];
public FindMax(int a[],int begin, int end){
this.a = a;
                                this.begin = begin;
this.end = end;
}
@Override
public Integer call() throws Exception {
int maxInPart = a[begin];
for(int i = begin; i <= end; i++){
if(a[i] > maxInPart){
maxInPart = a[i];
}
}
return new Integer(maxInPart);
}
} FutureTask<Integer> findMaxInFirstPart =
                              new FutureTask<Integer>(new FindMax(a,0,999));
FutureTask<Integer> findMaxInSecondPart =
                              new FutureTask<Integer>(new FindMax(a,1000,1999));

		new Thread(findMaxInFirstPart).start();
new Thread(findMaxInSecondPart).start(); int maxInFirst = (int) findMaxInFirstPart.get();
int maxInSecond = (int) findMaxInSecondPart.get();
System.out.println("Max is " +
                            (maxInFirst > maxInSecond ? maxInFirst:maxInSecond));
//验证结果是否正确
int max = a[0];
for(int i = 0; i < 2000; i++){
if(a[i] > max){
max = a[i];
}
}
System.out.println(max);
}
}

3. FutureTask的实现原理

构造函数

public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
} public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

FutureTask有两个构造函数,通常来说我们使用第一个构造函数。这里要强调一下第二个构造函数,它有两个类型参数,分别是Runnable类型和泛型V,然后由这两个构造一个Callable对象。当线程运行结束以后会返回由构造函数传递进来的这个泛型result对象,也就是说返回的值并不是通过运行得到的,而是由构造函数获取的一个指定的对象。

重要数据成员

private volatile int state;
private Object outcome;
private volatile Thread runner;
private volatile WaitNode waiters;

state表明了线程运行call方法的状态,初始状态为0,完成后由run方法将其设置为1。通过get方法获取结果时就必须检查state的值,如果该值为0,表明需要等待该结果,get方法就会将当前线程阻塞。

outcome表示了call方法的返回结果

runner表示运行FutureTask方法的线程,其值会在run方法中进行初始化

waiters指向了因获取结果而等待的线程组成的队列

重要方法

public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

从代码中可以看出run方法中调用了从构造函数传递来的call方法。

protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

当call方法执行完毕后,run方法调用又调用了set方法,它主要实现两个功能,一个是将结果赋值给outcome,另一个是通过finishCompletion唤醒由调用此FutureTask对象的get方法而阻塞的线程

private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
} done(); callable = null; // to reduce footprint
}

public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

在get方法中首先判断了state的值,如果call方法还未完成,就会通过awaitDone来阻塞自己。

private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
} int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

在cannel方法中,如果允许对线程中断,则设置该线程的中断标志位,并通过finishCompletion方法唤醒因等待结果而阻塞的线程。

参考文章

[1] http://www.cnblogs.com/dolphin0520/p/3949310.html

[2] http://www.open-open.com/lib/view/open1384351141649.html

最新文章

  1. iOS 自定义返回按钮,保留系统滑动返回
  2. BestCoder12 1002.Help him(hdu 5059) 解题报告
  3. Python变量类型
  4. 创建一个spring helloworld
  5. How to bind to data when the DataContext is not inherited【项目】
  6. web前端开发(5)
  7. InetAddress Example program in Java
  8. Unity3D内存释放
  9. 分析统计&lt;第三篇&gt;
  10. git中常见的几个命令
  11. jsLibrary.js
  12. Python语言:Day11练习题
  13. DevExpress VCL Controls 2019发展路线图(No.2)
  14. Python系列:二、数据类型--技术流ken
  15. HDU 2006 求奇数的乘积
  16. reportgen :python生产pptx
  17. node调试工具--nodemon
  18. 【BZOJ3585】mex
  19. DbGridEh中改变行的颜色
  20. topcoder srm 663 div1

热门文章

  1. Eclipse JAVA项目的 目录结构 和 导入
  2. Windows Phone 8.1 新特性 - 常用的启动器
  3. Python学习之路—Day1
  4. Bootstrap 导航栏和登陆框
  5. 如何在MFC中创建非矩形button
  6. hdu 4193 - Non-negative Partial Sums(滚动数列)
  7. Mac环境下svn的使用
  8. windows下与linux下安装redis及redis扩展
  9. StrongSwan 5.1.1 发布,Linux 的 IPsec 项目
  10. 循序渐进做项目系列(1):最简单的C/S程序——让服务器来做加法