public class ThreadPoolExecutor1 extends AbstractExecutorService1 {
// 11100000000000000000000000000000 = -536870912, 高3位表示线程池状态, 后29位表示线程个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, ));
private static final int COUNT_BITS = Integer.SIZE - ;// 29
// 2^29-1 = 00011111 11111111 11111111 11111111 = 536870911
private static final int CAPACITY = ( << COUNT_BITS) - ;
// -1 = 11111111 11111111 11111111 11111111
// 11100000 00000000 00000000 00000000 = -536870912,运行状态
private static final int RUNNING = - << COUNT_BITS;
// 00000000000000000000000000000000,关闭状态
private static final int SHUTDOWN = << COUNT_BITS;
// 2^29 = 00100000 00000000 00000000 00000000 = 536870912,停止状态
private static final int STOP = << COUNT_BITS;
// 2^30 = 01000000 00000000 00000000 00000000 = 1073741824,整理状态
private static final int TIDYING = << COUNT_BITS;
// 2^29+2^30,3*2^29,2^29+2^29+2^29 = 01100000 00000000 00000000 00000000 = 1610612736,终止状态
private static final int TERMINATED = << COUNT_BITS; // C的高3位,高3位表示线程池的运行状态。
//111RUNNING:运行中,接受新任务处理队列中的任务。
//000SHUTDOWN:不接收新任务处理队列中的任务;
//001STOP:不接收新任务也不处理队列中的任务还中断正在运行的任务;
//010TIDYING:所有的任务都已经终止;011TERMINATED:terminated()方法已经执行完成
private static int runStateOf(int c) {
return c & ~CAPACITY;//11100000 00000000 00000000 00000000
} //C的低29位,跟00011111 11111111 11111111 11111111比较,低29位表示线程池中线程数,最大2^29-1。
private static int workerCountOf(int c) {//对2^29取余,ctl加了多少次1,最大加2^29-1次。表示多少个worker已经在运行了。
return c & CAPACITY;//00011111 11111111 11111111 11111111
} private static int ctlOf(int rs, int wc) {//rs是状态值,wc是worker线程数量,进行或操作,就是修改状态,但是不改变数量。
return rs | wc;
} private static boolean runStateLessThan(int c, int s) {//ctl是不是小于某个状态值,
return c < s;
} /*ctl从11100000000000000000000000000000=-536870912开始,慢慢加1,一直越来越大,最后=111111111111111111111111=-1
worker线程数量最大2^29-1=536870911个,就不能再增加了,所以ctl的范围是(-536870912,-1)一直小于0*/ /*SHUTDOWN=0,
STOP=001=2^29=536870912,
TIDYING=010=2^30=1073741824
RUNNING=111=-536870912
TERMINATED=011=3*2^29=1610612736
*/
private static boolean runStateAtLeast(int c, int s) {//ctl是不是大于某个状态值
return c >= s;//RUNNING:111 SHUTDOWN:000 STOP:001 TIDYING:010 TERMINATED:011
} private static boolean isRunning(int c) {//RUNNING正常状态,ctl(-536870912,-1)一直小于0,小于0就是正常
return c < SHUTDOWN;
} //ctl+1
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + );
} //ctl-1
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - );
} //ctl-1直到成功。
private void decrementWorkerCount() {
do {
} while (!compareAndDecrementWorkerCount(ctl.get()));
} private final HashSet1<Worker> workers = new HashSet1<Worker>();
private final BlockingQueue<Runnable> workQueue;//队列。先加到workers里面去,workers满了就加到workQueue里去。 private final ReentrantLock mainLock = new ReentrantLock();//锁住workers
private final Condition termination = mainLock.newCondition(); //跟踪获得的最大池大小。仅在主锁下访问。
private int largestPoolSize; //已完成任务总数
private long completedTaskCount; private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler1 handler; private volatile long keepAliveTime;//闲置线程存活时间,闲置线程在阻塞等待队列,
private volatile boolean allowCoreThreadTimeOut;//等待队列时候是否启用keepAliveTime超时 //核心池大小
private volatile int corePoolSize; //最大池大小。 受CAPACITY限制。
private volatile int maximumPoolSize; private static final RejectedExecutionHandler1 defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler1 {
public AbortPolicy() {
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) {//抛出异常,不是什么都不做
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
} private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); //执行终结器时要使用的上下文,
private final AccessControlContext acc; private final class Worker extends AbstractQueuedSynchronizer1 implements Runnable {//是一个AQS和runnable任务,Worker里面的方法只要里面的一个线程访问,
//这个类永远不会被序列化,但是我们提供了一个serialversionID来禁止javac警告。
private static final long serialVersionUID = 6138294804551838833L;
//this worker 正在运行在的Thread。线程池里面创建的线程,不是外部线程。
final Thread thread;
//要运行的初始任务。
Runnable firstTask;
//这个worker完成的任务
volatile long completedTasks; //一个worker就是一个任务。同时是一个AQS队列,同时是一个runnable
Worker(Runnable firstTask) {
//初始设置为-1来抑制中断方法的执行。unlock才变为0。只有在runWorker()方法里面先unlock置为0其他方法才能获取锁。
//interruptIdleWorkers方法来tryLock获取锁来中断时候,是中断不了的。runWorker方法运行时才表明关联线程已启动,这时去中断关联线程才有意义,
//lock()方法也执行不了,只能先执行unlock()方法,才能去获取锁。

setState(-);
this.firstTask = firstTask;//executorService1.execute(new Runnable() = firstTask)
this.thread = getThreadFactory().newThread(this);//this = ThreadPoolExecutor1$Worker,返回new Thread(ThreadPoolExecutor1$Worker)
} public void run() {
try {
runWorker(this);//this = ThreadPoolExecutor1$Worker。这个方法在自己的thread里面运行,不会有多线程问题
} catch (InterruptedException e) {
e.printStackTrace();
}
} protected boolean isHeldExclusively() {
return getState() != ;//true:有人获取锁,false:没人获取锁
} //worker类继承自AQS并实现了自己的加锁解锁方法,不可重入的锁。
protected boolean tryAcquire(int unused) {
if (compareAndSetState(, )) {//尝试获取锁
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;//排AQS队
} protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState();//state初始等于-1,
return true;
} public void lock() throws InterruptedException {//会去排队
acquire();
} public boolean tryLock() {//不会去排队
return tryAcquire();
} public void unlock() {
release();//释放锁,并且唤醒head中的第一个
} public boolean isLocked() {
return isHeldExclusively();
} //中断worker关联线程
void interruptIfStarted() {
Thread t;
//state=-1不能中断,说明还没有运行起来
if (getState() >= && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
} /*
1.advanceRunState(SHUTDOWN);//更新状态为SHUTDOWN=00000000000
第一个条件c>=0000000,说明ctl不是在正常的增加(正常增加都是小于0),c=SHUTDOWN000xxxx|STOP001xxx|TIDYING010xxx|TERMINATED011xxx返回true。就什么都不做。
c=RUNNING111xxxxx就返回false,第二个条件吧ctl变成000xxxxx
也就是说:SHUTDOWN000xxxxx|STOP001xxxxx|TIDYING010xxxxx|TERMINATED011xxxxx不能变为shutdown状态,running111xxxxx状态可以变为shutdown状态。 2.advanceRunState(STOP);//更新状态为STOP00100000000
第一个条件c>=00100000000,说明ctl已经不正常了,c=TIDYING010xxxxx|TERMINATED011xxxxx|STOP001xxxxx,返回true。就什么都不做。
c=RUNNING111xxxxx|SHUTDWON000xxxxx返回false。第二个条件吧ctl变成001xxxxx
也就是说:TIDYING010|TERMINATED011|STOP001不能变为STOP状态,running状态|SHUTDOWN可以变为STOP状态。
*/
//状态是不可逆的,如果跑到前面的状态了,就不动,否则修改。
private void advanceRunState(int targetState) {//更新状态
for (;;) {
int c = ctl.get();
//c>=targetState就不修改状态。c<targetState就去修改状态。
if (runStateAtLeast(c, targetState) ||
//ctlOf(状态值,数量值)进行或操作,就是修改状态,但是不改变数量
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
} //尝试终止线程池
final void tryTerminate() {
for (;;) {//死循环
int c = ctl.get(); //以下两种情况终止线程池,其他情况直接返回:
//1.状态为stop
//2.状态为shutdown且任务队列为空
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))//SHUTDWON但是队列不空,不停止线程池
return; //false&&false&&(false&&):c>=0&&c<010&&c!=0:c=STOP状态
//false&&false&&(true&&false):c>=0&&c<010&&c=0&&队列空:c=SHUTDOWN状态队列空 //若线程不为空则中断一个闲置线程后直接返回
if (workerCountOf(c) != ) { // worker数量不等于0
interruptIdleWorkers(ONLY_ONE);//中断线程,是否只中断一次
return;
} //worker数量等于0,并且 处于stop状态或者SHUTDOWN状态队列空
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, ))) {//设置状态为TIDYING=010
try {//线程池终止后做的事情
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, ));//设置状态为TERMINATED=011
termination.signalAll();//唤醒条件队列所有线程
}
return;
}
} finally {
mainLock.unlock();
}
//状态设置失败则再重试
}
}
/*
调用该方法来尝试终止线程池,在进入for循环后第一个if判断过滤了不符合条件的终止操作,只有状态为stop,
或者状态为shutdown且任务队列为空这两种情况才能继续执行。
第二个if语句判断工作者数量是否为0,不为0的话也直接返回。经过这两重判断之后才符合终止线程池的条件,
于是先通过CAS操作将线程池状态设置为tidying状态,在tidying状态会调用用户自己实现的terminated()方法来做一些处理。
到了这一步,不管terminated()方法是否成功执行最后都会将线程池状态设置为terminated,也就标志着线程池真正意义上的终止了。
最后会唤醒所有等待线程池终止的线程,让它们继续执行
*/ private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);//检查每一个线程的权限。线程池的线程全在workers里面的worker里面的thread上面。
} finally {
mainLock.unlock();
}
}
} //中断所有线程,即使是活动的。忽略SecurityExceptions(在这种情况下,某些线程可能保持不间断)。
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();//中断workers的线程,加锁防止workers变化。
} finally {
mainLock.unlock();
}
} private void interruptIdleWorkers(boolean onlyOne) {//中断worker的线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {//没有中断并且,获取锁成功,tryLock不会阻塞排队,只会把state=0变成1,
//前提是没有线程获取w里面的锁。就可以中断w里面的线程。 其他线程要想操作w就要先获取锁。
//worker每处理一个任务,会加锁一次解锁一次。
try {
t.interrupt();//Worker的线程中断,加锁防止workers变化。
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
} //中断空闲worker。中断可以获取锁的worker里面的线程,就是worker w还没有调用lock方法的worker。
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
} private static final boolean ONLY_ONE = true; final void reject(Runnable command) {
handler.rejectedExecution(command, this);
} void onShutdown() {
} //RUNNING返回true,SHUTDOWN并且shutdownOK=true返回true,SHUTDOWN并且shutdownOK=false返回false,
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
} private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[])) {
if (q.remove(r))//从workqueeu中移除
taskList.add(r);//添加到taskList
}
}
return taskList;
} /*addWorker本事只是为线程池添加一个Worker,其本身所做的事情其实很简单,但难就难在要确保安全有效得添加一个Worker。
为此addWorker()方法做了很多额外的工作。比如判断线程池的运行状态,当前Worker数量是否已经饱和等等。可以发现在这个方法,
或者说整个ThreadPoolExecutor中,很多时候都是使用双重检查的方式来对线程池状态进行检查。其实这都是为了效率,
最简单不过直接使用Synchronized或ReentranLock进行同步,但这样效率会低很多,所以在这里,
只有在万不得已的情况下,才会使用悲观的ReentranLock。*/ //添加任务,executorService1.execute(new Runnable() = firstTask),任意个线程并发,
private boolean addWorker(Runnable firstTask, boolean core) {//core是不是核心线程 //两层循环,外层循环判断线程池状态,状态不符合就return,内层循环判断线程数,线程数超过限定值return。 retry: for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//前3位不变,后面29位=0。
//c处于RUNNING状态,c=111xxxxxxxxx,rs=111 0000000000000000
//c处于SHUTDOWN状态,c=000xxxxxxxxx,rs=000 0000000000000000
//c处于STOP状态,c=001xxxxxxxxx,rs=001 0000000000000000
//c处于TIDYING状态,c=010xxxxxxxxx,rs=010 0000000000000000
//c处于TERMINATED状态,c=011xxxxxxxxx,rs=011 0000000000000000 //状态判断。rs后面全是0,ctl后面不是0。 //rs正常运行小于0,SHUTDOWN=0,其他STOP,TIDYING,TERMINATED都是大于0 //true&&!(false):rs>=0(不是RUNNING状态)&& rs != SHUTDOWN=000:rs>0处于STOP,TIDYING,TERMINATED,就不新开worker线程了
//true&&!(true&&false):rs>=0&&rs=0&&firstTask!=null:SHUTDOWN状态&&有第一个任务,
//true&&!(true&&true&&false):rs>=0&&rs=0&&firstTask=null&&workQueue=null:SHUTDOWN状态&&没有第一个任务&&队列空,任务没有队列也空了就不用新开worker线程了,
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false; //只有以下两种情况会继续添加线程
//1.false:rs<0(处于RUNNING状态),
//2.true&&!(true&&true&&true):rs>=0&&rs=0&&firstTask=null&&workQueue!=null:shutdown状态,首任务空,队列还有任务。其他地方调用addWorker(null,true|false),队列还有任务要开worker线程。
for (;;) {
int wc = workerCountOf(c);//已经运行的worker线程,worker线程数目大于CAPACITY=2^29-1就不嗯能够再加worker线程数目了。 //以下三种情况不继续添加线程:
//1.线程数大于线程池总容量
//2.当前线程为核心线程,且核心线程数达到corePoolSize
//3.当前线程非核心线程,且总线程达到maximumPoolSize
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;//不开worker线程,加到队列里面去,
if (compareAndIncrementWorkerCount(c))//ctl加一成功
break retry;//加1成功就退出
c = ctl.get(); //ctl加一失败,
if (runStateOf(c) != rs)//状态没变重新加1。
continue retry;//状态变了重新获取状态。
}
} boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//executorService1.execute(new Runnable() = firstTask),Worker里面有一个firstTask和Thread,Thread里面有这个Worker w。
w = new Worker(firstTask);//firstTask可以为null
//Worker里面封装了这个任务,并且实例化了一个线程,总共3个Worker,3个任务(3个第一个任务),3个线程。线程run时候就开一个线程去执行Worker里的第一个任务,线程Thread run的时候,是Thread
//里面的Runnable去run,所以Worker要设置成这个Thread的Runnable,然后让Worker去run(转调外部类的run方法,把worker传进去)。Worker里面仅仅保存的是这个worker的第一个任务,第一个任务执行完会死循环执行queue队列的任务。
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//单线程进来,其余阻塞
try {
// 保持锁止时重新检查。如果线程发生故障或在获取锁之前关闭,请退出。
int rs = runStateOf(ctl.get());
//true|:running状态,增加worker
//false|true:shutdown状态并且task=null,增加worker
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) //如果线程已经开启则抛出异常
throw new IllegalThreadStateException();
workers.add(w);//Worker加到workers里面去,多线程并发要加锁
int s = workers.size();
if (s > largestPoolSize)//记录线程达到的最大值
largestPoolSize = s;
workerAdded = true;//添加成功
}
//false|(fasle|):STOP|TIDYING|TERMINATER:不增加worker
//false|(true|false):SHUTDOWN,task不为null:不增加worker
} finally {
mainLock.unlock();
}
if (workerAdded) {//添加成功
t.start();//线程123执行start(),就会去执行Worker的run方法。
workerStarted = true;//启动成功
}
}
} finally {
if (!workerStarted)//线程启动成功
addWorkerFailed(w);//新建worker失败
}
return workerStarted;//是否启动成功
} //新建worker失败
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;//加锁
mainLock.lock();
try {
if (w != null)
workers.remove(w);//workers中移除
decrementWorkerCount();//减少ctl
tryTerminate();
} finally {
mainLock.unlock();
}
} //工作线程如果从getTask方法中获得null,则会退出while循环并随后执行processWorkerExit方法。移除自己。completedAbruptly=fasle没有异常,
//该方法会在这个工作线程终止之前执行一些操作:统计该工作者完成的任务数,然后将其从workers集合中删除,每删除一个工作者之后都会去调用tryTerminate方法尝试终止线程池,但并不一定会真的终止线程池。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) //若非正常完成则将线程数减为0
decrementWorkerCount();//ctl减1 final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;//统计完成的任务总数
workers.remove(w);//移除work
} finally {
mainLock.unlock();
} tryTerminate();//尝试终止线程池
/*
从tryTerminate方法返回后再次去检查一遍线程池的状态,如果线程池状态为running或者shutdown,
并且线程数小于最小值,则恢复一个工作者。这个最小值是怎样计算出来的呢?
我们来看看。如果allowCoreThreadTimeOut为true则最小值为0,否则最小值为corePoolSize。
但还有一个例外情况,就是虽然允许核心线程超时了,但是如果任务队列不为空的话,那么必须保证有一个线程存在,因此这时最小值设为1
后面就是判断如果工作线程数大于最小值就不新增线程了,否则就新增一个非核心线程。
从这个方法可以看到,每个线程退出时都会去判断要不要再恢复一个线程,因此线程池中的线程总数也是动态增减的。
*/
int c = ctl.get();
if (runStateLessThan(c, STOP)) {//SHUTDOWN|RUNNING,则将线程数恢复到最小值
if (!completedAbruptly) {//线程正常完成任务被移除
int min = allowCoreThreadTimeOut ? : corePoolSize;//允许核心线程超时最小值为0, 否则最小值为核心线程数
if (min == && !workQueue.isEmpty())//如果任务队列还有任务, 则保证至少有一个线程
min = ;
if (workerCountOf(c) >= min)//若线程数大于最小值则不新增了
return;
}
addWorker(null, false);//新增工作线程
}
} private Runnable getTask() {//不是worker的方法,corePoolSize个多线程并发访问,
boolean timedOut = false; //上一次获取任务是否超时 for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//rs是一个值,ctl是多个值。 //if判断,从这里我们可以看到,如果线程池状态为shutdown,会继续消费任务队列里面的任务;如果线程池状态为stop,则停止消费任务队列里剩余的任务。 //true&&(true|): rs=STOP|TIDYIN|TERMINATED
//true&&(false|true): rs=SHUTDOWN并且队列空。SHUTDOWN了还会去执行队列。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//ctl-1,销毁当前线程
return null;//这个线程 退出这个for循环和外部的while循环,这个worker线程退出终止,执行完。
} //false:rs=RUNNING:
//true&&(false|false):rs=SHUTDOWN并且队列不为空:每个线程都不会终止,继续处理队列的任务。
int wc = workerCountOf(c); // 是否开始超时等待:1.允许核心线程超时,2.线程数大于corePoolSize
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //是否退出当前线程,
if ( ( wc > maximumPoolSize || (timed && timedOut) )
&& ( wc > || workQueue.isEmpty() )) {
if (compareAndDecrementWorkerCount(c))
return null;//终止当前线程
continue;
} //不退出当前线程:
try {//会阻塞等到队列有任务,超时等待返回空,继续死循环。
//注意:闲置线程会一直在这阻塞
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true; //是因为超时退出阻塞的,
} catch (InterruptedException retry) {
timedOut = false;//不是因为超时退出阻塞的,
}
}
} final void runWorker(Worker w) throws InterruptedException {//不是worker的方法,是外部类的方法,会有corePoolSize个多线程访问问题,
Thread wt = Thread.currentThread();
Thread t = w.thread;//相等的 Runnable task = w.firstTask;//第一个任务,不是队列的任务
w.firstTask = null;
//设置work的state=0和去掉owenerThread属性
//Worker也是一个ReentantLock,但是3个线程,每个线程一个woker,woker w不是线程共享的锁,不会多线程获取这把锁,unlock()不会有多线程访问,
w.unlock(); //把state由-1变成0。interruptIdleWorkers方法就可以中断这个线程了。
boolean completedAbruptly = true;//有异常
try {
//先执行初始是一个任务task,执行完之后从workQueue中取任务去执行。
while (task != null || (task = getTask()) != null) {//不断的从任务队列中获取任务,直到getTask方法返回null,然后工作线程退出while循环最后执行processWorkerExit方法来移除自己。
//就有多线程调用w.lock(),每个线程一个woker,woker w不是线程共享的锁,此处代码不会多线程获取这把锁,
//设置work的state=1和owenerThread属性
//如果shutdown方法里面的interrupt方法,调用了w.tryLock(),那么当前线程就会加入到w的队列,并且当前线程阻塞等待唤醒,唤醒之后继续这里执行。
//每次都使用锁以保证当前worker在运行task过程中不会被中断。

w.lock(); //其他线程可以从workers中获取一个worker,其他线程没有获取到锁,就不能对这个线程中断。本线程的下面执行就不能被中断。 //ctl大于等于STOP:STOP|TIDYING|TERMINATED
if ( ( runStateAtLeast(ctl.get(), STOP)
|| (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
)
&& !wt.isInterrupted()
)
wt.interrupt();//线程中断
try {
beforeExecute(wt, task);//正常执行,就继续运行。
Throwable thrown = null;
try {
task.run();//外部添加的任务run
} catch (RuntimeException x) {
thrown = x;//收集异常,给afterExecute
throw x;
} catch (Error x) {
thrown = x;//收集异常,给afterExecute
throw x;
} catch (Throwable x) {
thrown = x;//收集异常,给afterExecute
throw new Error(x);
} finally {
afterExecute(task, thrown);//异不异常都会处理,thrown来区分。afterexecute这也可能引发一个异常,
}
} finally {
task = null;//将执行完的任务置空
w.completedTasks++;//将完成的任务数加一
w.unlock();
}
}
completedAbruptly = false;//正常完成,没有异常
} finally {
processWorkerExit(w, completedAbruptly);//异不异常都会处理,completedAbruptly来区分。移除自己。
}
} public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors1.defaultThreadFactory(),
defaultHandler);
} public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
} public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler1 handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
} public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler1 handler) {
if (corePoolSize < || maximumPoolSize <= || maximumPoolSize < corePoolSize || keepAliveTime < )
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
} //这个任务有可能新建线程执行,有可能在已经存在的线程里面去执行。线程池已经shutdown了或者池子满了就丢弃任务,
public void execute(Runnable command) {
//executorService1.execute(new Runnable() = firstTask),会有多线程访问。任意多个线程调用同一个线程池executorService.execute方法。
if (command == null)
throw new NullPointerException();
int c = ctl.get();
int m;
/*于corepoolsize就新建线程,大于sorepoolsize就加入队列,队列满了但是小于maxsize就新建线程,大于maxsize就丢弃任务。*/
//最大开corePoolSize=3个线程,来一个新建一个Worker添加到workers里面去然后Worker.start(),最多添加3个。workers变成3个之后不会较小,一直不变。
//后面进来的添加到workQueue里面去,前面3个线程只有执行完了就会去阻塞等到队列有任务。前面3个线程一直不会退出。 if ( (m = workerCountOf(c)) < corePoolSize) {//任意多个线程可以并发访问。当前 运行的线程 数量是否少于corePoolSize。
if (addWorker(command, true))//添加到workers里面去,然后start。创建一个新的工作线程来执行任务。
return;
c = ctl.get();
} //上面代码任意个线程并发,下面代码超过corePoolSize的线程并发。若队列已满则返回false。
if (isRunning(c) && workQueue.offer(command)) {//c<0(运行状态)并且任务加到队列成功,3个核心的线程刚才在阻塞等待workQueue现在队列有元素了,会唤醒去处理 //在成功将任务放入到任务队列后,还会再次检查线程池是否是Running状态,如果不是则将刚刚添加的任务从队列中移除,然后再执行拒绝策略。
int recheck = ctl.get();//这里进行再次检查状态, //!true:还是处于正常运行状态:不移除,
//!fasle:不处于运行态,queue中移除command并且tryTerminate()成功移除后再执行拒绝策略 if (!isRunning(recheck) && remove(command))
reject(command);//丢弃任务,如果创建一个新的工作线程将使当前运行的线程数量超过maximumPoolSize,则交给RejectedExecutionHandler来处理任务。 //处于运行状态,task添加到队列中,没有worker线程,
//若线程数为0则新建一个worker。稍后这个空闲的worker就会自动去队列里面取任务来执行
//如果从队列中移除任务失败,则再检查一下线程数是否为0(有可能刚好全部线程都被终止了),是的话就新建一个非核心线程去处理。
else if (workerCountOf(recheck) == )
addWorker(null, false);//开一个线程但是不执行初始任务,等着执行队列任务 //不是运行状态,或者队列添加失败,拒绝策略拒绝task
//如果任务队列已经满了,此时offer方法会返回false,添加到队列失败了,接下来会再次调用addWorker方法新增一个非核心线程来处理该任务。如果这个线程创建失败,则最后会执行拒绝策略
} else if (!addWorker(command, false))
reject(command);
} //平缓关闭线程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;//获取锁,新建不了worker
mainLock.lock();
try {
checkShutdownAccess();//检查是否有关闭的权限
advanceRunState(SHUTDOWN);//更新状态为SHUTDOWN,这时线程池会拒绝接收外部传过来的任务,
interruptIdleWorkers();//中断闲置的线程,还没有执行lock的worker w的线程,剩余的线程会继续消费完任务队列里的任务之后才会终止。
onShutdown(); //对外提供的钩子
} finally {
mainLock.unlock();
}
tryTerminate();//尝试终止线程池
} //立刻关闭线程池
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//检查是否有关闭的权限
advanceRunState(STOP);//更新状态为stop,这是线程池也不再接收外界的任务
interruptWorkers();//中断所有工作线程
tasks = drainQueue();//排干任务队列
} finally {
mainLock.unlock();
}
tryTerminate();//尝试终止线程池
return tasks;//最后返回未被处理的任务集合。
}
//调用shutdown()和shutdownNow()方法后还未真正终止线程池,这两个方法最后都会调用tryTerminate()方法来终止线程池。 public boolean isShutdown() {
return !isRunning(ctl.get());
} //ctl>=0并且小于0110000000000,ctl=SHUTDOWN|STOP|TIDYING
public boolean isTerminating() {
int c = ctl.get();
return !isRunning(c) && runStateLessThan(c, TERMINATED);
} //ctl>=TERMINATED
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
} public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))//状态>=011,表示已经处于TERMINATED
return true;
if (nanos <= )
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
} protected void finalize() {
SecurityManager sm = System.getSecurityManager();
if (sm == null || acc == null) {
shutdown();
} else {
PrivilegedAction<Void> pa = () -> {
shutdown();
return null;
};
AccessController.doPrivileged(pa, acc);
}
} public void setThreadFactory(ThreadFactory threadFactory) {
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
} public ThreadFactory getThreadFactory() {
return threadFactory;
} public void setRejectedExecutionHandler(RejectedExecutionHandler1 handler) {
if (handler == null)
throw new NullPointerException();
this.handler = handler;
} public RejectedExecutionHandler1 getRejectedExecutionHandler() {
return handler;
} public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < )
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();//中断所有可以获取锁的worker
else if (delta > ) {
//我们不知道“需要”多少新线程。作为一种启发式方法,预启动足够多的新工作人员(最多新的核心大小)
//来处理队列中当前的任务数,但如果队列在执行此操作时变为空,则停止。
int k = Math.min(delta, workQueue.size());
while (k-- > && addWorker(null, true)) {//开k个worker线程
if (workQueue.isEmpty())//队列空就停止
break;
}
}
} public int getCorePoolSize() {
return corePoolSize;
} //worker线程数量小于corePoolSize。就开一个空初始任务的Worker和线程。
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true);
} //与pretartcorethread相同,只是安排至少启动一个线程,即使corepoolsize为0。
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == )
addWorker(null, false);
} //启动3个worker线程去执行,有任务进来,就不初始化线程,直接都给队列,然后这些线程从队列取任务。
//饿初始化线程池的线程,不是懒初始化,等到任务来了才初始化线程。
public int prestartAllCoreThreads() {
int n = ;
while (addWorker(null, true))
++n;
return n;
} public boolean allowsCoreThreadTimeOut() {
return allowCoreThreadTimeOut;
} public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= )
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)//true,启用超时
interruptIdleWorkers();
}
} public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
//已经开的worker数量大于maximumPoolSize就中断所有可以获取锁的worker的线程
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
} public int getMaximumPoolSize() {
return maximumPoolSize;
} public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < )
throw new IllegalArgumentException();
if (time == && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < )
interruptIdleWorkers();
} public long getKeepAliveTime(TimeUnit unit) {
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
} public BlockingQueue<Runnable> getQueue() {
return workQueue;
} public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);//workQueue队列中移除task
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
} /**
尝试从工作队列中删除所有已取消的@link future任务。
此方法可作为存储回收操作使用,对功能没有其他影响。
被取消的任务永远不会执行,但可能会累积到工作队列中,直到工作线程可以主动删除它们。
调用此方法将尝试立即删除它们。但是,此方法可能无法在存在其他线程干扰的情况下删除任务。
*/
public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
// Take slow path if we encounter interference during traversal.
// Make copy for traversal and call remove for cancelled entries.
// The slow path is more likely to be O(N*N).
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
q.remove(r);
} tryTerminate(); // In case SHUTDOWN and now empty
} public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? : workers.size();
} finally {
mainLock.unlock();
}
} public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = ;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
} public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
} public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
} //返回已完成执行的任务的大致总数。 近似值,在连续调用中不会减少。
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
} public String toString() {
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
ncompleted = completedTaskCount;
nactive = ;
nworkers = workers.size();
for (Worker w : workers) {
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally {
mainLock.unlock();
}
int c = ctl.get();
String rs = (runStateLessThan(c, SHUTDOWN) ? "Running"
: (runStateAtLeast(c, TERMINATED) ? "Terminated" : "Shutting down"));
return super.toString() + "[" + rs + ", pool size = " + nworkers + ", active threads = " + nactive
+ ", queued tasks = " + workQueue.size() + ", completed tasks = " + ncompleted + "]";
} protected void beforeExecute(Thread t, Runnable r) {} protected void afterExecute(Runnable r, Throwable t) {} protected void terminated() {}// Executor中断调用 public static class CallerRunsPolicy implements RejectedExecutionHandler1 {
public CallerRunsPolicy() {
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) {
if (!e.isShutdown()) {//池子关闭了,就丢弃这个任务,什么都不执行就是丢弃。
r.run();//池子没有关闭,在调用者线程执行这个任务,不再使用线程池的线程来执行任务。
}
}
} public static class DiscardPolicy implements RejectedExecutionHandler1 {
public DiscardPolicy() {
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) {
}//什么都不执行就是丢弃任务
} public static class DiscardOldestPolicy implements RejectedExecutionHandler1 {
public DiscardOldestPolicy() {
} public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) {
if (!e.isShutdown()) {//池子没有关闭
e.getQueue().poll();//移除队列中的第一个元素,丢弃阻塞队列中靠最前的任务
e.execute(r);//把这个任务丢进去
}//池子关闭了什么都不做丢弃
}
}
}
public class ThreadPoolExample1 {

    public static void main(String[] args) {

        ThreadPoolExecutor1 executorService1 = new ThreadPoolExecutor1(, , 6L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()); // executorService1.prestartAllCoreThreads(); // ThreadPoolExecutor1 executorService1 = (ThreadPoolExecutor1) Executors1.newCachedThreadPool();
// ExecutorService executorService2 = Executors1.newSingleThreadExecutor(); new Thread(() -> {//外部线程,不是线程池线程,丢任务给池子。
int i = ;
while (true) {
Threadd w = new Threadd(executorService1,"子线程"+(++i));//外部线程,不是线程池线程,丢任务给池子。
w.start();
}
},"发射线程").start(); executorService1.shutdown();
}
static int j =;
static class Threadd extends Thread {//外部线程,不是线程池线程
ThreadPoolExecutor1 executorService1;
Threadd(ThreadPoolExecutor1 executorService1,String name) {
super(name);
this.executorService1 = executorService1;
}
@Override
public void run() {//多个线程(不是main线程)来调用executorService.execute()来丢任务给池子。
try {
executorService1.execute(new Runnable() {//execute的参数是一个任务
String name = "任务"+(j++);
@Override
public void run() {
System.out.println(name+"完成");
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

最新文章

  1. JavaScript高级程序设计学习笔记--函数表达式
  2. 关于CGContextSetBlendMode: invalid context 0x0的错误
  3. PeopleSoft登录流程
  4. 1074: [SCOI2007]折纸origami - BZOJ
  5. mysql 数据库备份,恢复。。。。
  6. web应用的发布
  7. 常用Java片段
  8. WPF和Winform的一些界面控件
  9. 句柄C++
  10. SharePoint 修改项目的new图标显示天数
  11. OpenFeign封装为springboot starter
  12. mvc 提交Html内容的处理
  13. [翻译] AnimatedPath 动画路径(持续更新)
  14. 打开Activity时,不自动显示(弹出)虚拟键盘
  15. 重签名android测试包
  16. 基于OpenCV的微信跳一跳外挂
  17. leetcode-551-Student Attendance Record I(判断是否出现连续几个相同字符)
  18. mac下配置环境变量-mongo
  19. Odoo,OpenERP widget标签
  20. 洛谷P2762 太空飞行计划问题(最小割)

热门文章

  1. OCaml (ML家族语言)很好很强大
  2. 2019 安易迅java面试笔试题 (含面试题解析)
  3. Mac 下安装 jdk
  4. Java开源工具
  5. python高级编程——网络编程(一)
  6. springboot启动报错,Error starting ApplicationContext. To display the conditions report re-run your application with &#39;debug&#39; enabled.
  7. idea跳转到指定行列快捷键
  8. css层叠规则(层叠样式表)
  9. uni-app常用 HTML5+APP 设置
  10. Spring项目配置多数据源