ForkJoinPool 源码
ForkJoinPool----FJP
先看task.fork方法,含义是将当前任务,放到当前线程的工作队列中。但是第一次执行这个方法是在主线程中,主线程是不可能被FJP管理的。那么就进入ForkJoinPool.common.externalPush,在common这个default的线程池里执行这个任务,
externalPush的意思,是把外面的任务,放到当前线程池中执行。刚进入externalPush,会检查随机数不是0,workQueues不是空,这些条件第一次肯定是不满足的,那么进入externalSubmit,先初始化随机数,
ctl是一个volatile long类型的控制变量,从高到低,前16位是(当前活跃线程数-最小并发数),往后16位是(总线程数-最小并发数),再往后16位是栈顶(Treiber stack)等待线程的标志,最后16位是栈顶等待线程在线程池数组中的位置。
runState是一个volatile int类型的控制变量,来标志当前线程池运行状态,有锁住,信号,已启动,停止,终止,关闭几种状态。(当然也可以不从fork方法进来,而是pool.invoke,意味着已经有一个FJP了,直接执行任务)
FJP的逻辑很清晰:externalPush是统一入口(没有FJP,用common,有的话直接进),没初始化的进入externalSubmit,externalSubmit逻辑很清晰:初始化workQueues,初始化当前线程对应的workQueue,hash冲突了重新生成随机数rehash。都
完成之后,提交到自己的workQueue中,然后signalWork。
signalWork:方法的含义是现在新添加了一个task,需要去看下有没有满负荷(判断的标准是活跃线程是不是已经达到了并发数)。有空闲线程,那么唤起(去第三个和第四个16位找空闲线程栈顶信息,unpark方法),如果没有空闲线程但是总线程数不足(c & ADD_WORKER是判断第二个16位是正数还是负数,负数说明总线程数不足)
那么addWorker。
唤起的线程或新增的线程,会在run方法里进行无限循环,现在自己的任务队列里面找任务,找不到去别的线程的任务队列里找(steal),steal和自己从自己队列找是不同方向,所以能减少并发。直到所有的task都被取走,当前线程
挂起休眠。
ForkJoinTask.join,其实fork把任务放进任务队列之后,任务就可以被执行了,但是如果想获取任务执行的结果,需要让task.join,
最新文章
- atoi()函数
- 备库Seconds_Behind_Master的计算
- SHA-1 加密算法破解现已只需要 10 天
- 2795: [Poi2012]A Horrible Poem
- Nginx支持多站点配置小结
- 那晚征服的一道js经典的面试题
- HDU4570:Multi-bit Trie(区间DP)
- Could not fetch https://api.github.com/repos/RobinHerbots/jquery
- [LeetCode] 30. Substring with Concatenation of All Words 解题思路 - Java
- ERP和MES破冰之路 [普实MES升级中国“智”造]
- MYSQL:插入记录检查记录是否存在,存在则更新,不存在测插入记录SQL
- jQuery 遍历 – 后代
- ExtJs radiogroup form.loadRecord方法无法赋值正确解决办法
- [翻译]各个类型的IO - 阻塞, 非阻塞,多路复用和异步
- 利用aiohttp制作异步爬虫
- Tomcat启动时卡在 INFO HostConfig.deployDirectory Deploy
- A + B,末k位不相同
- conductor Kitchensink示例
- PyQt5信号与槽
- Alpha冲刺(11/10)