@SuppressWarnings({"unchecked","restriction"})
public class FutureTask1<V> implements RunnableFuture<V> { /*
任务可能出现的状态转换
NEW新建 -> COMPLETING即将完成 -> NORMAL正常结束
NEW新建 -> COMPLETING即将完成 -> EXCEPTIONAL异常
NEW新建 -> CANCELLED被取消
NEW新建 -> INTERRUPTING即将被中断 -> INTERRUPTED已经被中断 第一种:初始化状态:NEW新建
第二种:中间状态:COMPLETING即将完成,INTERRUPTING即将被中断
第三种:终端状态:NORMAL正常结束,EXCEPTIONAL异常,CANCELLED被取消,INTERRUPTED已经被中断
*/
private volatile int state;//任务当前状态,任务的状态可能被其他线程修改,所以要CAS。
private static final int NEW = ;//新建
private static final int COMPLETING = ;//即将完成。get(),finishCompletion(),awaitDone()使用这个状态。
private static final int NORMAL = ;//正常结束,返回结果值。report()使用这个变量。
private static final int EXCEPTIONAL = ;//异常,不返回结果值,抛出异常。没有使用这个状态。
private static final int CANCELLED = ;//被取消,不返回结果值,抛出异常。report(),isCanceld()方法使用这个变量。
private static final int INTERRUPTING = ;//即将被中断,不返回结果值,抛出异常。run()方法最后会使用这个状态。
private static final int INTERRUPTED = ;//已经被中断,不返回结果值,抛出异常。没有使用这个状态。 //FutureTask里面只能是callable,即使传进来Runnable也会转换为callable,
//也就是FutureTask真正执行的任务。Thread里面只能丢Runnable,要想FutureTask丢到线程池里面去,也必须是Runnable。
//finishCompletion()置为null
private Callable<V> callable;
private Object outcome;//get()返回的结果
private volatile Thread runner;//执行callable任务的线程,它是CAS操作。run()的最后finally置为null。
private volatile WaitNode waiters;//等待结果的线程集合 //根据状态s决定是否返回结果
private V report(int s) throws ExecutionException {
Object x = outcome;//我们在set()方法的时候把结果写进outcome的
if (s == NORMAL)//只有是正常才返回结果。其余
return (V) x;
if (s >= CANCELLED)//CANCELLED4被取消,INTERRUPTING5即将被中断,INTERRUPTED6已经被中断,
throw new CancellationException();
throw new ExecutionException((Throwable) x);
} //把Callable封装为FutureTask1
public FutureTask1(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
} //把Runnable封装为FutureTask1,result是预期值。
public FutureTask1(Runnable runnable, V result) {
this.callable = Executors1.callable(runnable, result);//runnbale封装为callable
this.state = NEW;
} //判断是否取消了,包括cancel(false);和cancel(true);
public boolean isCancelled() {
return state >= CANCELLED;//CANCELLED被取消,INTERRUPTING即将被中断,INTERRUPTED已经被中断
} //任务是否结束,不一定是成功任务,取消了,出异常了,被中断了,也是结束,包括cancel(false);和cancel(true);run(),setException(),set(),
public boolean isDone() {
return state != NEW;
} //取消任务。如果是false NEW0->CANCELLED4 true: NEW ->INTERRUPTING5->中断线程->INTERRUPTED6
public boolean cancel(boolean mayInterruptIfRunning) {
//fasle&&:处于不是新建状态:返回fasle,状态不变。
//true&&false:处于新建状态,但是状态设置失败:返回fasle,状态不变。
if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))//即将被中断5,被取消4
return false;
//true&&true:是新建状态,并且 状态设置成功(将其置为INTERRUPTING5或者CANCELLED4):继续状态变为INTERRUPTED6
try {
if (mayInterruptIfRunning) {//true才继续变为INTERRUPTED
try {
Thread t = runner;
if (t != null)
t.interrupt();//使用了线程中断的方法来达到取消任务的目的,设置中断标记=true
} finally { // final 已经被中断6
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);//
}
}
} finally {
finishCompletion();
}
return true;
} //获取执行结果,阻塞,获得Callable的返回值
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)//COMPLETING1即将完成,NEW新建,如果任务还没执行完成,就等待任务先执行完成
s = awaitDone(false, 0L);
//s > COMPLETING:正常结束,异常,被取消,即将被中断,已经被中断
return report(s);
} //获取任务执行的结果,并且有超时机制
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
//带有超时时间的get(),如果超过指定时间,就会抛出一个TimeoutException
if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
} protected void done() {
} protected void set(V v) {//先设置即将完成,再设置正常结束
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//即将完成1
outcome = v;//把任务执行的结果写入到outcome当中,
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); //final 正常结束2
finishCompletion();//唤醒等待结果的线程
}
} //修改当前任务的状态
protected void setException(Throwable t) {//先设置即将完成,再设置异常
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//即将完成1
outcome = t;//把任务执行的结果写入到outcome当中,
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final 异常3
finishCompletion();//唤醒等待结果的线程
}
} //任务起动就调这个方法。
public void run() {
//true||:不是新建,已经执行了,就不执行
//fasle||true:是新建,runner设置当前线程失败了,就不执行
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))//不能多线程进来
return;
//false||fasle:是新建,设置runner成功就进去
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();//Callable的call方法,如果是Runnable封装的Callable,call方法返回的是RunnableAdapter的期望值。
ran = true;
} catch (Throwable ex) {//call()抛出了异常
result = null;
ran = false;
setException(ex);//状态不是NEW就不设置异常结果。先设置即将完成,再设置异常。设置1 3
}
if (ran)
set(result);//设置结果,状态不是NEW就不设置结果。先设置即将完成,再设置正常结束。设置1 2。
}
} finally {
//在任务执行的过程中,可能会调用cancel(),这里主要是不想让中断操作逃逸到run()方法之外
runner = null;//可以多线程进来了
int s = state;
if (s >= INTERRUPTING)//即将被中断5和已经被中断6。调用了cancel(true)现在s=5|6。调用cancel(false)=4不会进来。
handlePossibleCancellationInterrupt(s);
}
} protected boolean runAndReset() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
//在任务执行的过程中,可能会调用cancel(),这里主要是不想让中断操作逃逸到run()方法之外
runner = null;
s = state;
if (s >= INTERRUPTING)//INTERRUPTING5即将被中断,INTERRUPTED6已经被中断。调用了cancel(true)。
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
} private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)//5,如果cancel(true)并且现在是5
while (state == INTERRUPTING)
Thread.yield(); // 避免过多的CPU时间放在这个for循环的自旋上,让步等着变为INTERRUPTED6,等着runner线程被设置中断标记在退出,就是在收集cancel(true)的中断标记再出去。 // assert state == INTERRUPTED;
// 清除cancel(true)的中断,中断是任务task和调用者通信的机制,
// Thread.interrupted(); //重置中断状态为未中断=false; isInterrupted不会重置中断状态;
} static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;//下一个 WaitNode() {
thread = Thread.currentThread();
}
} //成功|取消 完成任务后唤醒等待结果的线程。cancel()设置s=4|6后调用这个方法,run()正常完成设置s=2,run()异常完成设置s=3后调用这个方法。
//awaitDone()唤醒的线程走if(s > COMPLETING1)返回状态值去get()获取结果。

private void finishCompletion() {
assert state > COMPLETING;//正常结束,异常,被取消,即将被中断,已经被中断
for (WaitNode q; (q = waiters) != null;) {//从头开始唤醒 //设置为null之后,其他get线程看到状态>1,直接获取结果,不加入waiters队列。
//如果其他线程成功加入到了另一个新的waiters队列,此时唤醒失败,就要另一个线程他自己去退出。
//这里准备开始清空等待队列了,就要阻止其他线程进到队列来(通过状态值),如果清空后还是进到队列去了,就要线程自己处理。

if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;//thread属性置为null
LockSupport.unpark(t);//唤醒的是获取结果的awaitDone()阻塞的线程
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; //gc q,q.next不会回收,因为还有next指向在。
System.gc();
System.gc();
System.gc();
System.gc();
System.gc();
q = next;
}
break;
}
}
//一个钩子方法,本类中,它是一个空实现,在子类中可以重写它。
done();
//最后把callable置为null.
callable = null; // to reduce footprint
} //等待任务完成,多线程进来,唤醒之后,返回状态去get获取结果。2个出口。
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;//超时时间,如果使用了超时的get()才起作用,否则这个值不起作用
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {//等待结果的线程被中断了
removeWaiter(q);//移除无效节点
throw new InterruptedException();//就抛出一个中断异常
} /*分为几部操作:每一步都在一个for循环里面,只有前面一步执行了才能在下一个循环里面执行下一步*/ int s = state; //如果当前状态大于COMPLETING,说明任务已经正常完成了或者取消完成了。就去拿结果。拿到的结果可能是null就抛出异常。
//NORMAL正常结束,EXCEPTIONAL异常,CANCELLED被取消,INTERRUPTING即将被中断,INTERRUPTED已经被中断
if (s > COMPLETING) {
if (q != null)
q.thread = null;//回收q.thread
return s;//返回任务的状态,等待结束。有可能是cancel()设置s=4|6后唤醒,有可能是run()设置s=2后唤醒,run()设置s=3后唤醒。 // 正在完成,先等一会,下次循环去初始化节点--压栈--阻塞
} else if (s == COMPLETING)
Thread.yield();//避免过多的CPU时间放在这个for循环的自旋上。再次判断状态。 else if (q == null)//初始化q节点,才能去压栈,才能去阻塞。(后面2步在下一个循环中处理,会跳过这个循环)
q = new WaitNode();//再次判断状态。 else if (!queued)//将q节点压入栈,它是一个cas操作。第一次是false,压栈成功变为true,再去阻塞。也就是说只有压栈了才能去阻塞。后来的在前面。
//有可能唤醒不了:finishCompletion()已经执行完了,才加入到队列就不能唤醒,就加入到一个新的waiters里面去了,就要再次循环,通过状态退出。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
//再次判断状态。
else if (timed) {//如果有超时限制的话,判断是否超时,如果没有超时就重试,如果超时了,就把q节点从栈中遇除
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {//超时了
removeWaiter(q);//移除无效节点,多线程进来
return state;//返回状态,去获取结果。
}
LockSupport.parkNanos(this, nanos);//LockSupport是一个并发工具,这里表示等待nanos秒后唤醒
} else
LockSupport.park(this);//开始阻塞线程,直到任务完成了才会再次唤醒了在finishCompletion()中唤醒
}
} /*
将某个节点置为无效节点,并清除栈中所有的无效节点
(通过前面的分析,应该可以推断出,无效的节点,其实就是指节点内部的thread == null)
那么产生无效节点的情况就有三种了
(1):线程被中断了
(2):s > COMPLETING,当前的任务状态> COMPLETING
(3):超时
*/
//如果在阻塞其间,任务被中断了,或者超时了,又或者任务已经完成了,都需要进行资源的回收。
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;//为了GC回收node节点中的thread成员变量
retry:
for (;;) {//从头到尾删除thread=null的。不仅仅删除node。 /* 多线程不会有影响,其他线程只是做了重复的事情,或者可以帮着其他线程做 */ for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;//q不为null,pred只要被赋值了就不为null。
else if (pred != null) {
pred.next = s;
if (pred.thread == null)
continue retry;//如果前驱节点也是一个无效节点,则重新遍历, //q.thread异常了,pred=null,说明pred没有被赋值过,第一个q就异常了。
} else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))//修改头结点是多线程在修改。修改失败就重来。
continue retry;//调到外层循环再来。
}
break;
}
}
} private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;//state
private static final long runnerOffset;//runner
private static final long waitersOffset;//waiters
static {
try {
UNSAFE = java.security.AccessController
.doPrivileged(new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
public sun.misc.Unsafe run() throws Exception {
Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
for (java.lang.reflect.Field f : k.getDeclaredFields()) {
f.setAccessible(true);
Object x = f.get(null);
if (k.isInstance(x))
return k.cast(x);
}
throw new NoSuchFieldError("the Unsafe");
}
});
Class<?> k = FutureTask1.class;
stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
} }

最新文章

  1. yii2使用小知识(连续补充)
  2. ES6学习--搭建环境
  3. win7系统中如何使文件显示出扩展名
  4. cf#382div2
  5. HighCharts之2D柱状图
  6. 解决在sublime text3在ubuntu下无法输入中文的问题
  7. CentOS 6.5_X64下安装MongoDB数据库
  8. Objective-C中的Block
  9. Eclipse下Maven插件配置
  10. css之marquee,让你的文字跳起来
  11. 在cmd中输入ls命令出现“ls不是内部或外部命令解决
  12. hostent h_addr_list
  13. vue 生命周期
  14. Listview嵌套Listview
  15. MAC OS 如何安装命令行工具:Command Line Tools
  16. wxpython发布还自己图标的程序
  17. C语言实现二叉树的建立、遍历以及表达式的计算
  18. PHP处理大数据导出Excel方法
  19. 【51nod】1312 最大异或和
  20. Opera Unite如何架设自己的网站

热门文章

  1. RabbitMQ系列(一)rabbitmq简介
  2. Winform中设置ZedGraph鼠标焦点位置画出十字线并在鼠标移出时十字线消失
  3. ef linq多表查询(三表)
  4. Python进阶(二)
  5. jieba分词原理-DAG(NO HMM)
  6. webpack加载css文件及其配置
  7. Vue.js 源码分析(八) 基础篇 依赖注入 provide/inject组合详解
  8. 设置VMware中Kali 共享文件夹
  9. @RequestMapping和@GetMapping和PostMapping
  10. mysql的floor()报错注入方法详细分析