一、概述

FutureTask包装器是一种非常便利的机制,同时实现了Future和Runnable接口。

类图如下:

FutureTask是一种可以取消的异步的计算任务。它的计算是通过Callable实现的,它等价于可以携带结果的Runnable,并且有三个状态:等待、运行和完成。完成包括所有计算以任意的方式结束,包括正常结束、取消和异常。

Future有个get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。

FutureTask有下面几个重要的方法:

1.get()

阻塞一直等待执行完成拿到结果

2.get(int timeout, TimeUnit timeUnit)

阻塞一直等待执行完成拿到结果,如果在超时时间内,没有拿到抛出异常

3.isCancelled()

是否被取消

4.isDone()

是否已经完成

5.cancel(boolean mayInterruptIfRunning)

试图取消正在执行的任务

二、FutureTask的状态转换过程

* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED

三、FutureTask的执行过程

创建一个futureTask对象task
提交task到调度器executor等待调度或者在另外一个线程中执行task 等待调度中... 如果此时currentThread调取执行结果task.get(),会有几种情况
if task 还没有被executor调度或正在执行中
阻塞当前线程,并加入到一个阻塞链表中waitNode
else if task被其它Thread取消,并取消成功 或task处于中断状态
throw exception
else if task执行完毕,返回执行结果,或执行存在异常,返回异常信息 如果此时有另外一个线程调用task.get() 执行过程同上

四、应用场景

1. Future用于异步获取执行结果或者取消任务。

2. 在高并发场景下确保任务只执行一次。

五、源码分析

1.核心状态

  /**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

2.构造函数

 public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
} public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

3.获取执行结果

 public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
} public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

4.执行方法

 public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

5.设置状态

 protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
} protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

六、代码示例

1. FutureTask执行多任务计算的使用场景

利用FutureTask和ExecutorService,可以用多线程的方式提交计算任务,主线程继续执行其他任务,当主线程需要子线程的计算结果时,在异步获取子线程的执行结果。

 public class FutureTaskForMultiCompute {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建任务集合
List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();
// 创建线程池
ExecutorService exec = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
FutureTask<Integer> ft = new FutureTask<>(new ComputeTask(i, ""+i));
taskList.add(ft);
// 提交给线程池执行任务,也可以通过exec.invokeAll(taskList)一次性提交所有任务;
exec.submit(ft);
} System.out.println("所有计算任务提交完毕, 主线程接着干其他事情!"); // 开始统计各计算线程计算结果
Integer totalResult = 0;
for (FutureTask<Integer> ft : taskList) {
totalResult += ft.get();
} // 关闭线程池
exec.shutdown();
System.out.println("多任务计算后的总结果是:" + totalResult);
} }
class ComputeTask implements Callable<Integer> { private Integer result = 0;
private String taskName = ""; public String getTaskName(){
return this.taskName;
} public ComputeTask(Integer iniResult, String taskName){
result = iniResult;
this.taskName = taskName;
System.out.println("生成子线程计算任务: "+taskName);
} @Override
public Integer call() throws Exception {
for (int i = 0; i < 100; i++) {
result =+ i;
}
// 休眠5秒钟,观察主线程行为,预期的结果是主线程会继续执行,到要取得FutureTask的结果是等待直至完成。
Thread.sleep(5000);
System.out.println("子线程计算任务: "+taskName+" 执行完成!");
return result;
} }

2. FutureTask在高并发环境下确保任务只执行一次

在很多高并发的环境下,往往我们只需要某些任务只执行一次。这种使用情景FutureTask的特性恰能胜任。举一个例子,假设有一个带key的连接池,当key存在时,即直接返回key对应的对象;当key不存在时,则创建连接。对于这样的应用场景,通常采用的方法为使用一个Map对象来存储key和连接池对应的对应关系,典型的代码如下面所示:

 public class FutureTaskTest {
private Map<String, Connection> connectionPool = new HashMap<String, Connection>();
private ReentrantLock lock = new ReentrantLock(); public Connection getConnection(String key){
try {
lock.lock();
if(connectionPool.containsKey(key)){
return connectionPool.get(key);
}else{
//创建 Connection
Connection conn = createConnection();
connectionPool.put(key, conn);
return conn;
}
} finally{
lock.unlock();
} } //创建Connection
private Connection createConnection(){
return null;
} }

在上面的例子中,我们通过加锁确保高并发环境下的线程安全,也确保了connection只创建一次,然而确牺牲了性能。改用ConcurrentHash的情况下,几乎可以避免加锁的操作,性能大大提高,但是在高并发的情况下有可能出现Connection被创建多次的现象。这时最需要解决的问题就是当key不存在时,创建Connection的动作能放在connectionPool之后执行,这正是FutureTask发挥作用的时机,基于ConcurrentHashMap和FutureTask的改造代码如下:

 public class FutureTaskTest {
private ConcurrentHashMap<String,FutureTask<Connection>>connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>(); public Connection getConnection(String key) throws Exception{
FutureTask<Connection> connectionTask = connectionPool.get(key);
if(connectionTask!=null){
return connectionTask.get();
}
else{
Callable<Connection> callable = new Callable<Connection>(){
@Override
public Connection call() throws Exception {
// TODO Auto-generated method stub
return createConnection();
}
};
FutureTask<Connection> newTask = new FutureTask<Connection>(callable);
connectionTask = connectionPool.putIfAbsent(key, newTask);
if(connectionTask==null){
connectionTask = newTask;
connectionTask.run();
}
return connectionTask.get();
}
} //创建Connection
private Connection createConnection(){
return null;
} }

经过这样的改造,可以避免由于并发带来的多次创建连接及锁的出现。

最新文章

  1. Yii2开启enableprettyurl(url美化)无效
  2. Average Precision of VOC
  3. Stack Overflow: The Architecture - 2016 Edition
  4. 【AppCan 开发者】北京开发者交流会之行
  5. IOS 音效
  6. 回到过去美好的时光&mdash;&mdash;源代码版本管理Always Use source code Control
  7. 归并排序,递归法,C语言实现。
  8. 组策略彻底解决windows 2003 终端数
  9. 让我们写的程序生成单个的exe文件(C#winform程序举例)
  10. 将Linux下编译的warning警告信息输出到文件中[整理笔记]
  11. C#使用Windows API 隐藏/显示 任务栏 (FindWindowEx, ShowWindow)
  12. 使用 IIS Manager 对 Windows Azure 网站进行远程管理
  13. Python系列之heapq内置模块
  14. asp.net mvc CodeFirst模式数据库迁移步骤
  15. 消息服务框架(MSF)应用实例之分布式事务三阶段提交协议的实现
  16. [六] 函数式接口的复合方法示例 predicate 谓词逻辑运算 Function接口 组合运算 比较器 逆序 比较链
  17. 周末时间学习Linux
  18. 基于python的unittest测试框架集成到jenkins(Mac)
  19. Django 学习笔记(三) --- HTML 模版加载 css、js、img 静态文件
  20. SpringBoot中加密com.github.ulisesbocchio

热门文章

  1. android unity3d开发学习第一步
  2. as3垃圾回收机制
  3. 关于使用Android新版Camera即Camera2的使用介绍 暨解决Camera.PreviewCallback和MediaRecorder无法同时进行
  4. Helm安装和项目使用
  5. LINUX之内网渗透提权
  6. Net编程 详解DataTable用法【转】
  7. Dev之ribbon设置
  8. dubbo forbid 注意的几种方式
  9. TestNG测试报告美化
  10. XP如何找到网上邻居