先把我主要学习参考的文章放上来先,这篇文章讲的挺好的,分析比较到位,最好是先看完这篇文章,在接下去看我写的。不然你会一脸懵逼,不过等你看完这篇文章,可能我的文章对你也用途不大了

深入分析AbstractQueuedSynchronizer独占锁的实现原理:ReentranLock

小弟我也是刚开始研究这个Lock锁相关,如果哪里写的有问题,希望各位大哥大姐帮忙指出,谢谢。

正文:

AbstractQueuedSynchronizer(简称AQS),抽象队列同步器,AQS是整个Concurrent包中最核心的地方,其它的并发工具也都是使用AQS来实现的,我们所熟悉的ConcurrentHashMap也是在这玩意的基础上实现的同步。

在深入学习之前,我们先来看看Concurrent包里面有什么东东:

这篇文章只会讲AbstractQueuedSynchronizer和ReentrantLock,而且也只讲加锁和放锁。

大概说一下加锁的原理:

  AbstractQueuedSynchronizer 有一个state字段,当这个字段为0时,就代表当前锁对象可以被线程获取到锁;相反,如果state这个字段不为0,那当前锁对象就不能被线程获取到锁。state这个字段就相当于sychronized关键字所用的monitor。

  AbstractQueuedSynchronizer有一个链表队列,这个队列每个节点都是一个等待获取锁的线程节点。

  当线程尝试获取锁时,先直接尝试获取锁(也就是判断state是不是为0),如果获取不到锁,那就向队尾插入一个新节点,并且当前线程不断循环去获取锁(一定次数获取不到就会挂起线程,直到被别的线程唤醒)。

  AbstractQueuedSynchronizer的state字段是用volatile进行修饰的(保证了对所有线程可见),同时对state的修改是使用compareAndSet进行乐观锁修改的,正是因为这两点,才使得不用sychronized也能在多线程下保证线程安全。

释放锁:

  释放锁其实更简单,就是讲 state减少至0,正常来讲,每次加锁都是对state进行加1操作,对应的每次释放锁都是对state进行减1操作。

  一个线程可以对一个锁进行重复次加锁,state可以一直加加加,但是当这个线程不在需要这个锁时,就必须将这个锁的state减少至0,也就是说你用了多少次lock(),就必须用多少次unLock(),否则这把锁就出错了,报废了,

再也没有线程可以获取到锁了,所有线程都卡死在这了。这一点和sychronized不同,sychronized会自动加锁放锁,但是lock必须自己手动进行。

  另外,当一个线程将锁都释放完后,会唤醒其他等待这把锁,但是被挂起了的线程,让他们重新去获取锁。

AbstractQueuedSynchronizer里面比较关键的代码:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{

    protected AbstractQueuedSynchronizer() { }

    /**
* 这玩意会形成一个双向队列连表,用于保存等待锁的线程。
* 队列首节点就是获取到锁的线程节点,新增线程请求锁,会加入到链表尾
* */
static final class Node {
/** 共享模式节点 */
static final Node SHARED = new Node();
/** 独占模式节点 */
static final Node EXCLUSIVE = null;
// 线程节点的状态
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
// 只有当前节点的前一个节点为SIGNAL时,才能当前节点才能被挂起。
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3; /**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
     // 线程的等待状态
volatile int waitStatus; volatile Node prev; volatile Node next; /**
* 当前节点的线程,也就是请求锁的线程
*/
volatile Thread thread; /**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter; /**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
} /**
* 获取前一个节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
} Node() { // Used to establish initial head or SHARED marker
} Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
} Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
} /**
* 链表队列首节点
*/
private transient volatile Node head; /**
* 链表队列尾节点
*/
private transient volatile Node tail; /**
* 同步状态值,注意,这个值就是锁的关键,也就是线程打生打死都在抢的东西
* 我们所谓的获取锁,就是state这个值为0的时候给他加1
* 释放锁就是把state的值减1
* 当这个值重新变成0时,锁就被彻底释放,其他线程就可以获取锁了。
* 如果state不为0,那是不管怎样都抢不到锁的
*
* 可能会有人问,state仅用volatile修饰,还是线程不安全的。所以,所有关于volatile修饰的变量
* 在这里都是通过compareAndSet来进行修改的
*/
private volatile int state; /**
* 独占锁的线程,这个属性是父类AbstractOwnableSynchronizer的,这里需要注意
* 在线程抢到锁后会将这个属性设成自己的线程
* */
private transient Thread exclusiveOwnerThread; /**
* 请求获取锁。
* AbstractQueuedSynchronizer请求锁是非公平锁。
* 什么是公平锁什么是非公平锁呢?
* 公平锁就是先来先拿锁,后来得排队。
* 非公平锁就是后来者先尝试插队取锁,如果别人还没用完锁并归还,那他就只能乖乖排队。
* 两者的唯一差别就是非公平锁会先进行一次获取锁操作,失败就和公平锁一样排队获取。
* */
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果插不了队取锁,就新增一个节点去连表队列尾部排队获取锁
selfInterrupt();
} /**
* 这个方法是子类必须实现的,用于具体获取锁
* */
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
} /**
* 队列节点自旋获取锁。
* 什么叫自旋呢?自旋就是自己一直循环执行
* */
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/**
* 在队列里获取锁是按先后顺序进行获取的
* 因为队列首节点就是获取到锁的线程,所以首节点后的一个节点就是下一个获取锁的线程,所以这里要求p == head,
* 这样即便多线程下,别的线程因为不是第二个节点,所以也不能去抢锁
* 另外首节点有可能还在执行,也有可能执行完了并释放了锁,所以还要尝试获取一次锁tryAcquire(arg)
* */
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果抢不到锁,他会判断自己这个线程是否需要进行挂起,不浪费CPU资源
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
} /**
* 抢锁失败后判断是否需要挂起当前线程
* 他是根据前一个节点进行判断的
* */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 只有前一个节点为SIGNAL的节点,才会将线程挂起,其他线程等待状态都会在尝试一次
// 绝大多数线程在多次获取不到锁后,都会被置为SIGNAL这个状态,然后挂起
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
} /**
* 挂起线程,让他休眠,有点类似于wait方法,CPU将不再调用他,需要对应的unpark方法进行唤醒,unpark会在获取到锁的线程
* 释放锁或者取消后才调用
* */
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
} /**
* 释放锁,如果释放锁成功了,就会调用unparkSuccessor来唤醒其他休眠的线程
* */
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
} /**
* 释放锁的具体方法,子类必须实现
* */
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
} /***/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); /*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从链表尾开始一路往前遍历,找到最前面的非cancelled的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒线程
if (s != null)
LockSupport.unpark(s.thread);
}
}
ReentrantLock 的一些关键代码:
public class ReentrantLock implements Lock, java.io.Serializable {
/**
* 同步器,继承自AbstractQueuedSynchronizer
* */
private final Sync sync; /**
* 这个锁同步器可以是公平锁也可以非公平锁
* */
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L; /**
* 子类必须实现上锁方法
*/
abstract void lock(); /**
* 非公平锁获取锁
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
} /**
* 释放锁,释放锁没有公平和非公平的说法,都一样
* */
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
} /**
* 非公平锁同步器
* */
static final class NonfairSync extends Sync { /**
* 获取锁,这个方法会一直阻塞,知道获取到锁为止,或者被取消
*/
final void lock() {
// 因为是非公平的,所以会先尝试一次获取锁,失败就调用父类的acquire方法排队获取
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
/**
* 尝试获取锁,只尝试一次,成功就成功,失败就失败
* */
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* 公平锁同步器
* */
static final class FairSync extends Sync {
/**
* 因为这是公平锁,所以他直接就进入队列进行排队
* */
final void lock() {
/**
* 当然,因为AbstractQueuedSynchronizer的acquire本身就是会尝试先获取一次锁,失败才排队,所以严格来说他还是非公平的
* */
acquire(1);
} /**
* 如果没有别的线程在排队,他才会去获取锁,否则都不获取
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
} /**
* ReentrantLock默认就是非公平锁
* */
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* ReentrantLock也支持公平锁
* */
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
/**
* 获取锁
* */
public void lock() {
sync.lock();
}
/**
* 尝试加锁
* */
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
/**
* 释放锁,这里是直接用AbstractQueuedSynchronizer的释放方法
* */
public void unlock() {
sync.release(1);
}
}

最新文章

  1. 在UWP中页面滑动导航栏置顶
  2. 相克军_Oracle体系_随堂笔记012-undo
  3. 鸟哥的Linux私房菜第零章
  4. 转:VS中的路径宏 vc++中OutDir、ProjectDir、SolutionDir各种路径
  5. win10 phpStudy 80端口被占用
  6. Jetty提交数据时报java.lang.IllegalStateException: Form too large270468&gt;200000问题解决
  7. java特权制度设计篇
  8. JavaScript面向对象基础语法总结
  9. 【PHP】PHP面向对象编程--phpOOP入门
  10. 在http请求中的Content-Type
  11. Python数据分析学习-re正则表达式模块
  12. Java安装和环境变量配置
  13. equals方法的编写建议
  14. Git的思想和基本工作原理2
  15. Azure Machine Learning
  16. 浅析基于ARM的Linux下的系统调用的实现
  17. 我的第一篇paper
  18. ajax操作json的三种方式
  19. POJ.1379.Run Away(模拟退火)
  20. MySql安装完成后设置远程访问的角本

热门文章

  1. Android简单应用程序破解——runtime.apk
  2. ClickHouse源码笔记1:聚合函数的实现
  3. Java实现 LeetCode 191 位1的个数
  4. Java实现WUST 1002: 哈夫曼树
  5. Java实现 洛谷 P1047 校门外的树
  6. java实现圆周率与级数
  7. 重学 Java 设计模式:实战桥接模式(多支付渠道「微信、支付宝」与多支付模式「刷脸、指纹」场景)
  8. Spring Cloud 系列之 Alibaba Nacos 注册中心(一)
  9. Jquery封装:下拉框插件
  10. Bash知识点记录