ReentrantReadWriteLock实现了可重入的读锁和写锁,其中读锁是共享锁,写锁是互斥锁。与ReentrantLock类似,ReentrantReadWriteLock也提供了公平锁和非公平锁两种实现,以满足不同的场景。因此,实际在使用时,会涉及到读锁、写锁、公平锁、非公平锁四个不同的概念,这也使得ReentrantReadWriteLock更加复杂一些。

1.核心字段与构造器

    private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync; public ReentrantReadWriteLock() {
this(false);
} public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

ReentrantReadWriteLock通过ReadLockWriteLock两个内部类分别实现读锁和写锁,并且默认的构造器使用非公平锁。Sync这个内部类同样继承了AbstractQueuedSynchronizer(AQS),排队等候的逻辑都交由AQS实现。接下来在讲解加锁和解锁的逻辑之前,需要明确的一点是,AQS会使用state字段记录锁状态,ReentrantReadWriteLock对该字段的使用更加巧妙,使用state的高16位记录读锁持有的总次数,使用低16位记录写锁持有的总次数,这样使用一个字段即可记录两个状态。因此在下文的源码讲解中,会发现读锁重入时,重入次数需要加2^16-1,写锁重入时只需要加1,同时这也意味着读锁和写锁的重入次数的最大值是2^16-1。接下来分别看一下读锁和写锁的加锁逻辑。

2.读锁的加锁逻辑

    public void lock() {
sync.acquireShared(1);
} //位于AQS
public final void acquireShared(int arg) {
//成功获取读锁返回1,否则返回-1
if (tryAcquireShared(arg) < 0)
//如果获取读锁失败,就执行下面的逻辑加入等待队列
doAcquireShared(arg);
} //位于Sync
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread();
int c = getState();
//如果写锁已经被占有,并且占有者不是当前线程,则返回-1,即写锁被其他线程占有时不能获取读锁
//从这里可以看出,线程在持有写锁的情况下,可以继续获取读锁
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//获取读锁的重入次数
int r = sharedCount(c);
//如果读锁不需要阻塞,并且读锁的获取数量没有达到最大值(2^16-1),则尝试将读锁的持有数量加1(注意是在高16位加1,SHARED_UNIT=2^16-1),
//如果加1操作能够成功,则表示当前线程成功获取读锁
//注意,公平读锁和非公平读锁的readerShouldBlock()方法逻辑是不一样的
//非公平读锁在等待队列第一个线程请求写锁时会返回true(即在这种情况下非公平锁需要阻塞),其他情况都返回false
//公平读锁会查看当前等待队列中是否有其他线程在等待
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//r == 0表示当前线程是第一个获取读锁的线程,将firstReader指向自己,并初始化firstReaderHoldCount字段
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//如果firstReader已经指向了自己,就将firstReaderHoldCount加1,表示当前线程作为第一个获取读锁的线程,共获取读锁的次数
firstReaderHoldCount++;
} else {
//cachedHoldCounter记录的是最后一个获取读锁的线程
//使用cachedHoldCounter可以节省在ThreadLocal中的查找操作
HoldCounter rh = cachedHoldCounter;
//如果cachedHoldCounter还没初始化,或者最后一个获取读锁的线程不是当前线程,就从ThreadLocal中查看当前线程对应的HoldCounter
//注意,如果无法在ThreadLocal中查到当前线程的记录,那么就会新建一个HoldCounter加入ThreadLocalMap中,对应的count字段初始化为0
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
//执行到这里,说明rh != null,并且当前线程是最后一个获取读锁的线程,此时更新ThreadLocalMap中的value值
else if (rh.count == 0)
readHolds.set(rh);
//将读锁的持有次数加1
rh.count++;
}
//获取读锁成功,则返回1
return 1;
}
//执行到这里有三种情况:1)readerShouldBlock()返回true,即读锁需要阻塞;2)读锁重入次数已经达到最大值;3)更新state字段失败
//无论哪种情况,都说明当前线程没有成功获取读锁
return fullTryAcquireShared(current);
} //读写锁使用state字段的低16位表示写锁,高16位表示读锁
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //获取读锁被持有的次数,数量由state的高16位表示
static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //如果写锁被其他线程占有,或者当前获取读锁的线程需要阻塞,就返回-1,如果获取读锁成功则返回1,其他情况会继续自旋
//上面的注释中讲过,执行到fullTryAcquireShared有三种情况,该方法在自旋中分别处理这三种情况
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
//自旋
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
//如果写锁已经被其他线程占有,则返回-1
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
//处理读锁需要阻塞的情况
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
//获取最后一个获取锁的线程
rh = cachedHoldCounter;
//如果当前线程不是最后一个获取锁的线程,则从ThreadLocalMap中取出HoldCount对象
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
//当前线程没有获取读锁,就将ThreadLocalMap中存的HoldCount清理掉
//因为根据上面的代码逻辑,走到这里的时候,说明当前获取读锁的线程应该阻塞,
//即无法获取读锁,这种情况下需要清理ThreadLocalMap中的记录
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
//读锁获取数量达到最大,抛出异常,这段代码说明读锁的最大重入次数是2^16-1
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//如果成功更新state的值,则表示读锁获取成功,否则会继续自旋
if (compareAndSetState(c, c + SHARED_UNIT)) {
//如果当前线程是第一个持有读锁的线程,就设置firstReader字段
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
//从ThreadLocal中查找当前线程对应的HoldCounter对象
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
//更新当前线程持有锁的数量
rh.count++;
//将cachedHoldCounter指向当前线程的HoldCounter对象
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

tryAcquireShared()方法用到了ThreadLocal来记录当前线程获取的读锁数量,有兴趣的话可以参考ThreadLocal源码探究 (JDK 1.8)了解ThreadLocal的实现细节。公平锁和非公平锁以不同的方式实现了readerShouldBlock()方法,接下来分别讲解公平锁和非公平锁的实现。

  • 非公平锁的readerShouldBlock()实现
    //由名字可以看出,这个方法主要是判断读锁是否应该阻塞
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
} //如果等待队列的第一个线程请求互斥锁(即写锁),则返回true,表示应该阻塞当前的读锁
//由readerShouldBlock()方法的注释了解到,非公平的读锁在发现等待队列头节点请求互斥锁时,
//需要进行阻塞而不是抢锁,是为了避免读请求太多的情况下造成写锁线程饥饿
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
//head != null表示等待队列已初始化,
//h.next != null && !s.isShared()表示队列的头结点请求的是互斥锁
//s.thread != null表示头结点对应的线程还没有开始执行
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

对于非公平锁的读锁来说,在发现等待队列中的第一个线程请求写锁时,会主动取消抢锁,是为了避免请求写锁的线程饥饿,这是与ReentrantLock中的非公平锁一个很大的不同。与非公平锁不同的是,公平锁会先查看队列中是否有其他线程在等待。

  • 公平锁的readerShouldBlock()实现
    final boolean readerShouldBlock() {
return hasQueuedPredecessors();
} public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//只要队列中有其他节点在等候,公平锁就要求当前线程排队等待
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

至此tryAcquireShared()方法的逻辑已经介绍完了,在该方法返回-1时,表示当前无法获取读锁,就会接着执行doAcquireShared()方法,来看看该方法的源码:

    private void doAcquireShared(int arg) {
//将当前线程构造成节点,放到等待队列的末尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//中断标志
boolean interrupted = false;
for (;;) {
//获取当前线程的前一个节点
final Node p = node.predecessor();
//只有在当前节点是队列中的第一个有效节点时,才会执行下面的语句
if (p == head) {
//尝试获取读锁,获取成功则r>0,失败则r<0
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
//如果线程设置了中断标记,则将当前线程中断
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//当前线程不是队列第一个有效节点,或者获取读锁失败,就阻塞等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//线程中断之后,设置interrupted=true,之后代码逻辑会自旋一次,会在for循环的第一个条件语句中用到该字段
interrupted = true;
}
} finally {
//如果线程被中断,或者出现异常时,failed=true,需要通过cancelAcquire()方法放弃获取锁
if (failed)
cancelAcquire(node);
}
}

doAcquireShared()方法的逻辑是,查看当前线程对应的节点的前置节点是不是头结点(即当前节点是头结点的下一个节点),如果是就尝试获取读锁,获取成功的话就将自己设置成头结点,然后根据中断标志进行处理;如果前置节点不是头结点,则进入shouldParkAfterFailedAcquire(),如果其前置节点是SIGNAL状态,就会在parkAndCheckInterrupt()方法中将当前线程挂起。doAcquireShared()与同在AQS中的另一个方法doAcquireSharedInterruptibly()非常相似,在CountDownLatch源码探究 (JDK 1.8)doAcquireSharedInterruptibly()有详细的解释,包括shouldParkAfterFailedAcquire()parkAndCheckInterrupt()这两个方法,有兴趣的话可以参考,本文不再讨论这些方法的细节。

3.写锁的加锁逻辑

介绍完读锁的加锁逻辑之后,接下来看看写锁加锁的实现原理:

    public void lock() {
sync.acquire(1);
} //加锁的代码与ReentrantLock一样,复用了AQS中的处理框架
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
} //ReentrantReadWriteLock的内部类Sync重写了tryAcquire()方法
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//①如果w = 0,但是c!=0,说明读锁已结被占有,直接返回false表示获取写锁失败,说明即使线程自己持有读锁,也不能再次获取写锁
//②如果w != 0,说明写锁已经被占有,需要判断是不是当前线程占有写锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//判断写锁的持有次数有没有超限,从这里可以知道写锁的最大重入次数是2^16-1
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//更新持有的写锁数量,这里可以看出写锁是可重入的
setState(c + acquires);
return true;
}
//走到这里,说明c=0,即当前读锁和写锁都没有被占有,公平锁会先检查队列中有没有其他线程在等待锁,非公平锁不会阻塞
//如果writerShouldBlock()反复返回false,才会考虑设置state字段,设置成功表示成功获取写锁,否则返回false表示获取写锁失败
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//记录当前线程持有写锁
setExclusiveOwnerThread(current);
return true;
}

公平写锁和非公平写锁同样都重写了writerShouldBlock()方法,非公平写锁的实现非常简单,直接返回false,表示非公平的写锁不需要阻塞;公平写锁会检查等待队列中是否有其他线程在等待获取锁,两种实现方式的源码如下:

    //公平写锁的实现
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
} //非公平写锁的实现
final boolean writerShouldBlock() {
return false; // writers can always barge
}

从上面的分析可以看出,线程在持有写锁的情况下,允许同时获取读锁,但是在线程持有读锁的情况下,不允许同时获取写锁,这个细节需要注意。

4.释放锁

  • 读锁释放锁
    public void unlock() {
sync.releaseShared(1);
} public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
} //当state=0时表示读锁已完全释放,才会返回true,其他情况返回false
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//判断当前线程是否是第一个获取读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
//firstReaderHoldCount=1,说明读锁只被当前线程占有1次,释放之后更新firstReader的值
if (firstReaderHoldCount == 1)
firstReader = null;
//如果当前线程多次持有读锁,则将计数减1
else
firstReaderHoldCount--;
} else {
//执行到这里,说明当前线程不是第一个获取读锁的线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
//如果当前线程获取的读锁次数<=1,在释放锁的时候,需要清除ThreadLocal中的记录
readHolds.remove();
//没有持有读锁的线程释放读锁会报错
if (count <= 0)
throw unmatchedUnlockException();
}
//将当前线程读锁的重入次数减1
--rh.count;
}
for (;;) {
int c = getState();
//更新读锁的值
int nextc = c - SHARED_UNIT;
//如果这一步CAS操作失败,就会自旋尝试,直到成功为止
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
//nextc=0,说明读锁已释放,返回true,否则返回false
//由于读锁是共享锁,可以有多个线程同时获取读锁,只有最后一个持有读锁的线程完全释放读锁,才会返回true
return nextc == 0;
}
} private void doReleaseShared() { for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//头结点是SIGNAL状态时,将其状态更新成0,该操作会一直自旋重试,直到修改成功,成功之后会唤醒后面的等待线程
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//如果头结点不是SIGNAL状态,就自旋将其更新为PROPAGATE状态
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//h==head说明,从for循环开始到现在头结点没用发生变化
//注意:当线程释放锁的时候,会修改头结点
if (h == head) // loop if head changed
break;
}
} private void unparkSuccessor(Node node) { int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); Node s = node.next;
//waitStatus>0只有CANCELLED状态,代表节点放弃获取锁
if (s == null || s.waitStatus > 0) {
s = null;
//从队列尾部开始向前查找,目的是寻找node节点后第一个非CANCELLED状态的节点,并将s指向该节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒s节点对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}

由于读锁是共享锁,并且是可重入锁,因此在最后一个持有读锁的线程最后一次释放读锁时,读锁才能真正被释放,此时才会通过doReleaseShared()方法唤醒队列中的等待线程。

  • 写锁释放锁
    public void unlock() {
sync.release(1);
} public final boolean release(int arg) {
//如果线程是释放写锁成功,则唤醒后面的等待线程
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
} protected final boolean tryRelease(int releases) {
//线程没有持有写锁不允许释放写锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
//释放写锁成功,则设置exclusiveOwnerThread=null,表示写锁目前没有被任何线程占有
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
} //判断当前线程是否持有写锁
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}

5.其他介绍

  • HoldCounter

    HoldCounter是用来记录线程持有读锁的数量,源码中使用cachedHoldCounter来记录最后一个获取读锁的是哪个线程,由于代码很简单,因此前文并未对其进行介绍,这里做一下简单讲解:
    /**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
static final class HoldCounter {
//记录线程持有读锁的次数
int count = 0;
// Use id, not reference, to avoid garbage retention
//记录线程id
final long tid = getThreadId(Thread.currentThread());
}

HoldCounter配合使用的是ThreadLocalHoldCounter类,使用readHolds字段维持对该类的引用,下面是ThreadLocalHoldCounter的源码:

    /**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
//继承了ThreadLocal
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

最新文章

  1. 带你实现开发者头条APP(五)--RecyclerView下拉刷新上拉加载
  2. jQuery的extend方法
  3. linux文件基本属性
  4. ubuntu16.04 orbslam ./build.sh 出错eigen
  5. Windows Azure HandBook (4) 分析Windows Azure如何处理Session
  6. ASP.NET jquery.uploadify上传控件中文乱码解决办法(转)
  7. Java for LeetCode 037 Sudoku Solver
  8. HDU 4666 Hyperspace(曼哈顿距离)
  9. hot code replace
  10. java从零到变身爬虫大神(一)
  11. JS定时器实例解析
  12. C# LINQ详解(一)
  13. 关于Action中ValidateXXX方法校验一次失败后\导致以后一直返回input视图的情况
  14. 使用Flex构建Web和移动参考应用程序
  15. DFS中的奇偶剪枝学习笔记
  16. properties类是Hashtable的子类
  17. Dell BOSS 卡是什么
  18. [小知识] 关闭我的电脑里面的百度网盘以及修改win+e快捷键打开我的电脑
  19. iOS UILabel两端对齐的实现(可包括中英文/数字)
  20. 如何取option自定义属性?

热门文章

  1. 三分钟学会使用Derby数据库
  2. Typescript - 变量类型
  3. docker 使用:镜像和容器
  4. JXJJOI2018_T1_market
  5. string类中getline函数的应用
  6. Java树结构
  7. 7-3 jmu-python-回文数判断(5位数字) (10 分)
  8. 【技巧】歪脑筋优化flexbox瀑布流布局方案
  9. 解决oninput事件在中文输入法下会取得拼音的值的问题
  10. VUE实现Studio管理后台(九):开关(Switch)控件,输入框input系列