前言
AQS(Abstract Queued Synchronizer)是JUC并发包中的核心基础组件,作者是大名鼎鼎的Doug Lea。通过AQS可以实现大部分的同步需求。
宏观架构
AQS包括一个state和一个FIFO的CLH队列,如下图所示:
CLH队列中的每个节点Node就可以对应与争用该资源的线程,Node的数据结构如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
static final class {
static final Node SHARED = new Node(); /** 独占模式 */ static final Node EXCLUSIVE = null; /** 当前节点已取消 */ static final int CANCELLED = 1; /** 当前线程处于同步状态,如果取消或释放,通知下一个等待节点 */ static final int SIGNAL = -1; /** wait 在某个condition中 */ static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; /** 当前节点已取消 */ volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { }
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } ```
### 获取锁的过程 以使用默认构造函数的reentrantLock为例子:
```java ReentrantLock reentrantLock = new ReentrantLock(); reentrantLock.lock();
lock()代码如下:
1 2 3 4 5 6 7
final void lock() { if (compareAndSetState(0, 1)) //成功获得独占的state资源 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
如果当前的state值为0,当前线程获得lock,将state的值通过cas的方式设置为1。如果不是0,则添加到队列中。通过acquire方法去申请资源。
1 2 3 4 5 6
public final void acquire(int arg) { //tryAcquire再次尝试获取锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果当前已经获取到锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
再次获取锁尝试失败后,调用addWaiter将线程封装成节点信息,加入到等待队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { 大专栏 AQS总结 pred.next = node; return node; } } enq(node); return node; }
addWaiter首先会通过cas的方式快速的去添加到队列的尾部,如果添加不成功,调enq(node)再次入队;enq(node)是一个死循环,不断的通过cas去添加到节点,直到成功。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { //cas的方式 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
线程节点进入队列后,调用acquireQueued,acquireQueuedxiang
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //如果发现自己的前面节点是头节点,表明该节点是正拿到锁的线程,此时再次尝试获取锁资源,因为之前的线程有可能已经解锁了。 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
最关键的应该是shouldParkAfterFailedAcquire方法,如果每个线程都在这么自旋的去拿锁,cpu肯定炸了。所以,当当前的前一个节点处于SIGNAL状态的时候,可以挂起当前线程。这个操作就好比在排队的时候和前一个人说:我出去买点吃的,你轮到的时候叫我一下。当前面的节点轮到的时候,会唤醒当前线程,然后又开始自旋,判断自己能否拿到同步状态,如果拿到,就获取到了锁,这就是一个完整的获得同步状态的过程。 至于如何挂起当前线程,使用的是LockSupport的park()挂起当前线程。park可以精确的进行挂起,精确到thread。
释放锁的过程
1
reentrantLock.lock(); //释放锁
释放的过程正好相反,通过release来释放锁。
1 2 3 4 5 6 7 8 9 10 11
public final boolean release(int arg) { //如果成功的释放了资源 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒下一个节点对应的线程 unparkSuccessor(h); return true; } return false; }
tryRelease的内容主要是:获取state的值,减去要释放的值,如果state已经是0,把当前的线程设置为null。要注意的是,这里完全没有使用cas,因为当前线程还持有锁,绝对的线程安全。
1 2 3 4 5 6 7 8 9 10 11 12
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
最为关键的 unparkSuccessor(h),这个时候头节点已经处于了获取同步的状态,通过unparkSuccessor(h)来唤醒头节点的后一个节点。从而后一个节点就可以自旋的去获取同步状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
总结
个人认为AQS在很多地方使用cas和自旋的方式,一定程度上提升吞吐率,之前看到过测试ReentrantLock的吞吐比synchronized要高很多,不对synchronized一直在优化,估计现在性能也差不多了,以后做个测试。本文只是总结了AQS的独占式的获取同步状态,还有共享式的获取同步状态,还支持很多的特性,将在后面进行总结。
最新文章
Python 2.7_First_try_爬取阳光电影网_20161206
Mongodb 语法,update,insert,delete,find
jQuery无缝滚动插件
DHCP工作过程详解
find the nth digit
Unity3D之空间转换学习笔记(一):场景物体变换
关于PreferenceActivity的使用和一些问题的解决(自己定义Title和取值)
WebService笔记-Schema约束
转:嵌入式linux启动时运行的inittab文件
在SQL Server 中启用 FileStream
linux内存管理子系统
最小路径覆盖 hdu 1151 hdu 3335
iOS 手机时区获取问题
JS(三)
python中线程和进程(二)
caffe编译错误记录
项目总结22:Java UDP Socket数据的发送和接收
(转载)Sublime Text 3 快捷键大全
JeeWx捷微3.1小程序版本发布,支持微信公众号,微信企业号,支付窗——JAVA版开源微信管家
js,JavaScript,a标签onclick传递参数不对,A标签调用js函数写法总结
热门文章
jupyterhub 安装配置
吴裕雄--天生自然 pythonTensorFlow图形数据处理:解决module &#39;tensorflow&#39; has no attribute &#39;Session&#39;
JavaScript可以做嵌入式开发了
python与mysql部分函数和控制流语法对比
linux下tab作用的描述?
流程控制 if-while-for -语句
mysql命令行操作大全
vue项目中的elementUI的table组件导出成excel表
mediawiki资料
用@font-face应用自定义字体