以下源码以公平锁为例, 理解公平锁后,对AQS、CAS、CLH队列、独占锁、共享锁都会清晰,再去理解非公平锁就会比较简单。
注:本文是基于对ReentrantLock有基本使用经验,至少知道lock和unlock的使用。 建议在Idea里同步参考AbstractQueuedSynchronizer源码,跳转着看调用关系更直观。
几个基本概念
1. AQS:AbstractQueuedSynchronizer类
AQS是JUC中管理“锁”的抽象类,锁的许多公共方法都是在这个类中实现,AQS是独占锁和共享锁的公共父类。
2. AQS锁的类别 -- 分为“独占锁”和“共享锁”两种。
(01) 独占锁 -- 锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为“公平锁”和“非公平锁”。
(02) 共享锁 -- 能被多个线程同时拥有,能被共享的锁。
3.ReentrantLock与AQS的关系
ReentrantLock与sync是组合关系,ReentrantLock中包含了Sync对象。
Sync是AQS的子类,Sync有两个子类FairSync(公平锁)和NonFairSync(非公平锁)。
ReentrantLock是一个独占锁,至于它到底是公平锁还是非公平锁,就取决于sync对象是FairSync的实例还是NonFairSync的实例。
4. CLH队列
双端队列,每个线程在自己节点自旋,直到获取锁或者取消。
5.AQS中CLH队列节点,每个节点代表一个排队的线程。
private transient volatile Node head; // CLH队列的队首
private transient volatile Node tail; // CLH队列的队尾
// CLH队列的节点
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// 线程已被取消
static final int CANCELLED = 1;
// 后继线程需要被unpark(唤醒)
// 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
static final int SIGNAL = -1;
// 线程(处在Condition休眠状态)在等待Condition唤醒
static final int CONDITION = -2;
// (共享锁)其它线程获取到“共享锁”
static final int PROPAGATE = -3;
// 若waitStatus=0,则意味着当前线程不属于上面的任何一种状态。
volatile int waitStatus;
// 前一节点
volatile Node prev;
// 后一节点
volatile Node next;
// 节点所对应的线程
volatile Thread thread;
// nextWaiter是“区别当前CLH队列是 ‘独占锁’队列 还是 ‘共享锁’队列 的标记”
// 若nextWaiter=SHARED,则CLH队列是“独占锁”队列;
// 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),则CLH队列是“共享锁”队列。
Node nextWaiter;
// “共享锁”则返回true,“独占锁”则返回false。
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回前一节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
// 构造函数。thread是节点所对应的线程,mode是用来表示thread的锁是“独占锁”还是“共享锁”。
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 构造函数。thread是节点所对应的线程,waitStatus是线程的等待状态。
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
一,获取锁
1,ReentrantLock.lock
获取锁的入口方法,通过调用sync的lock实现。
public void lock() {
sync.lock();
}
2,sync.lock
final void lock() {
acquire(1);
}
3,acquire
public final void acquire(int arg) {
if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
4,tryAcquire
protected final boolean tryAcquire(int acquires) {
// 获取“当前线程”
final Thread current = Thread.currentThread();
// 获取“独占锁”的状态
int c = getState();
// c=0意味着“锁没有被任何线程锁拥有”,
if (c == 0) {
// 若“锁没有被任何线程锁拥有”,
// 则判断“当前线程”是不是CLH队列中的第一个线程,
// 若是的话,则获取该锁,设置锁的状态,并且设置锁的拥有者为“当前线程”。
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 如果“独占锁”的拥有者已经为“当前线程”,
// 则将更新锁的状态。
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
5, addWaiter
private Node addWaiter(Node mode) {
// 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。
enq(node);
return node;
}
6,acquireQueued
acquireQueued()的作用就是当前线程在这个方法自旋或阻塞(回忆CLH队列),直到获取锁返回。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// interrupted表示在CLH队列的调度中,
// “当前线程”在休眠时,有没有被中断过。
boolean interrupted = false;
for (;;) {
// 获取上一个节点。
// node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 前继节点释放锁后,本节点获取到锁,并维护CLH队列头节点
setHead(node);
p.next = null; // help GC
failed = false;
// 当前线程只有前置节点是head,且成功(唤醒)获取锁才返回!
return interrupted;
}
// 当前线程parkAndCheckInterrupt处阻塞,直到唤醒
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
注意: 线程解除阻塞,可能是由于其它线程调用了该线程的unpark()函数,也 可能是由于线程被中断!
即使线程被中断而获取到cpu执行权利,而如果该线程前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞!
7,shouldParkAfterFailedAcquire
判断“当前线程”是否需要被阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前继节点状态为SIGNAL,表明当前节点需要(将来某个时间)被unpark,此时则返回true,阻塞当前线程!
if (ws == Node.SIGNAL)
return true;
// 如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
① 当一个线程刚被加入队列尾部,waitstatus是0,前继节点waitstatus也是0;
② shouldParkAfterFailedAcquire会走到compareAndSetWaitStatus,设置前置waitstatus为-1;
③ 再返回acquireQueued去循环一次,再进入shouldParkAfterFailedAcquire的时候就会返回true了,当前线程就会原地阻塞,等待唤醒;
当前节点waitstatus被修改的时机(主要):
-
等后继节点加入CLH队列,通过AQS的shouldParkAfterFailedAcquire方法修改为Node.SIGNAL;
-
当前节点释放锁,通过AQS的unparkSuccessor方法修改为0;
![](https://img-blog.csdnimg.cn/direct/4ebd45e2441b4ab1b61562e43c5ba468.png)
8, parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
// 通过LockSupport的park()阻塞“当前线程”。
LockSupport.park(this);
// 返回线程的中断状态。
return Thread.interrupted();
}
阻塞当前线程, 直到被唤醒,并且返回“线程被唤醒之后”的中断状态。
线程被阻塞之后,一般有2种情况唤醒:
1. unpark() 唤醒:前继节点使用完锁之后,通过unpark()唤醒当前线程。
2. 中断唤醒:其它线程通过interrupt()中断当前线程。
LockSupport()中的park()、unpark()的作用和Object中的wait()、notify()作用类似,是阻塞/唤醒。它们的用法不同,park(), unpark()是轻量级的,而wait(), notify()是必须先通过Synchronized获取同步锁。
注意:如果线程是阻塞在LockSupport.park()上,给线程中断后,不会抛出异常,线程唤醒继续执行,且线程中断标志为true。可以参考另篇关于中断的博客。
二,释放锁
1, unlock
public void unlock() {
sync.release(1);
}
2, release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
如果锁完全释放,唤醒后继线程后,后继线程在 acquireQueued自旋获得锁,同时把后继线程设置为head!
3, tryRelease
protected final boolean tryRelease(int releases) {
// c是本次释放锁之后的状态
int c = getState() - releases;
// 如果“当前线程”不是“锁的持有者”,则抛出异常!
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 设置当前线程的锁的状态。
setState(c);
return free;
}
4, unparkSuccessor
private void unparkSuccessor(Node node) {
// 获取当前线程的状态
int ws = node.waitStatus;
// 如果状态<0,则设置状态=0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//获取当前节点的“有效的后继节点(线程状态<=0)”,无效的话,则通过for循环进行获取。
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒“后继节点对应的线程”
if (s != null)
LockSupport.unpark(s.thread);
}
三,CLH队列建立、获取、释放锁
![](https://img-blog.csdnimg.cn/direct/2c231a9c60d14ec092dfd316c8e1a46c.jpeg)