JUC AQS(AbstractQueuedSynchronizer)源码

以下源码以公平锁为例, 理解公平锁后,对AQS、CAS、CLH队列、独占锁、共享锁都会清晰,再去理解非公平锁就会比较简单。
注:本文是基于对ReentrantLock有基本使用经验,至少知道lock和unlock的使用。 建议在Idea里同步参考AbstractQueuedSynchronizer源码,跳转着看调用关系更直观。
 

几个基本概念

1. AQS:AbstractQueuedSynchronizer类
    AQS是JUC中管理“锁”的抽象类,锁的许多公共方法都是在这个类中实现,AQS是独占锁和共享锁的公共父类。

2. AQS锁的类别 -- 分为“独占锁”和“共享锁”两种。
    (01) 独占锁 -- 锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为“公平锁”和“非公平锁”。
    (02) 共享锁 -- 能被多个线程同时拥有,能被共享的锁。

3.ReentrantLock与AQS的关系

ReentrantLock与sync是组合关系,ReentrantLock中包含了Sync对象。

Sync是AQS的子类,Sync有两个子类FairSync(公平锁)和NonFairSync(非公平锁)。

ReentrantLock是一个独占锁,至于它到底是公平锁还是非公平锁,就取决于sync对象是FairSync的实例还是NonFairSync的实例。

4. CLH队列

双端队列,每个线程在自己节点自旋,直到获取锁或者取消。

5.AQS中CLH队列节点,每个节点代表一个排队的线程。

private transient volatile Node head;    // CLH队列的队首
private transient volatile Node tail;    // CLH队列的队尾

// CLH队列的节点
static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

       // 线程已被取消
    static final int CANCELLED =  1;
    
       // 后继线程需要被unpark(唤醒)
       // 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
    static final int SIGNAL  = -1;
    
       // 线程(处在Condition休眠状态)在等待Condition唤醒
    static final int CONDITION = -2;
    
        // (共享锁)其它线程获取到“共享锁”
    static final int PROPAGATE = -3;

        // 若waitStatus=0,则意味着当前线程不属于上面的任何一种状态。
    volatile int waitStatus;

    // 前一节点
    volatile Node prev;

    // 后一节点
    volatile Node next;

    // 节点所对应的线程
    volatile Thread thread;

    // nextWaiter是“区别当前CLH队列是 ‘独占锁’队列 还是 ‘共享锁’队列 的标记”
    // 若nextWaiter=SHARED,则CLH队列是“独占锁”队列;
    // 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),则CLH队列是“共享锁”队列。
    Node nextWaiter;

    // “共享锁”则返回true,“独占锁”则返回false。
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 返回前一节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    // 构造函数。thread是节点所对应的线程,mode是用来表示thread的锁是“独占锁”还是“共享锁”。
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 构造函数。thread是节点所对应的线程,waitStatus是线程的等待状态。
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

一,获取锁

1,ReentrantLock.lock

获取锁的入口方法,通过调用sync的lock实现。
public void lock() {
    sync.lock();
}

2,sync.lock

final void lock() {
      acquire(1);
}

3,acquire

public final void acquire(int arg) {
    if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        
            selfInterrupt();
}

4,tryAcquire

protected final boolean tryAcquire(int acquires) {
    // 获取“当前线程”
    final Thread current = Thread.currentThread();
    
    // 获取“独占锁”的状态
    int c = getState();
    
    // c=0意味着“锁没有被任何线程锁拥有”,
    if (c == 0) {
        // 若“锁没有被任何线程锁拥有”,
        // 则判断“当前线程”是不是CLH队列中的第一个线程,
        // 若是的话,则获取该锁,设置锁的状态,并且设置锁的拥有者为“当前线程”。
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 如果“独占锁”的拥有者已经为“当前线程”,
        // 则将更新锁的状态。
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

5, addWaiter

private Node addWaiter(Node mode) {
    // 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。
    enq(node);
    return node;
}

6,acquireQueued

acquireQueued()的作用就是当前线程在这个方法自旋或阻塞(回忆CLH队列),直到获取锁返回。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // interrupted表示在CLH队列的调度中,
        // “当前线程”在休眠时,有没有被中断过。
        boolean interrupted = false;
        for (;;) {
            // 获取上一个节点。
            // node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                // 前继节点释放锁后,本节点获取到锁,并维护CLH队列头节点 
                setHead(node);
                p.next = null; // help GC
                failed = false;
                // 当前线程只有前置节点是head,且成功(唤醒)获取锁才返回!
                return interrupted;
            }
            //  当前线程parkAndCheckInterrupt处阻塞,直到唤醒
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
注意: 线程解除阻塞,可能是由于其它线程调用了该线程的unpark()函数,也 可能是由于线程被中断!
即使线程被中断而获取到cpu执行权利,而如果该线程前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 

7,shouldParkAfterFailedAcquire

判断“当前线程”是否需要被阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;

// 如果前继节点状态为SIGNAL,表明当前节点需要(将来某个时间)被unpark,此时则返回true,阻塞当前线程!
    if (ws == Node.SIGNAL)
        return true;

// 如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;

// 如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
当一个线程刚被加入队列尾部,waitstatus是0,前继节点waitstatus也是0;
② shouldParkAfterFailedAcquire会走到compareAndSetWaitStatus,设置前置waitstatus为-1;
③ 再返回acquireQueued去循环一次,再进入shouldParkAfterFailedAcquire的时候就会返回true了,当前线程就会原地阻塞,等待唤醒;
当前节点waitstatus被修改的时机(主要):
  1. 等后继节点加入CLH队列,通过AQS的shouldParkAfterFailedAcquire方法修改为Node.SIGNAL;
  2. 当前节点释放锁,通过AQS的unparkSuccessor方法修改为0;

8, parkAndCheckInterrupt

private final boolean parkAndCheckInterrupt() {
    // 通过LockSupport的park()阻塞“当前线程”。
    LockSupport.park(this);
    // 返回线程的中断状态。
    return Thread.interrupted();
}
阻塞当前线程, 直到被唤醒,并且返回“线程被唤醒之后”的中断状态。
线程被阻塞之后,一般有2种情况唤醒:
1. unpark() 唤醒:前继节点使用完锁之后,通过unpark()唤醒当前线程。
2. 中断唤醒:其它线程通过interrupt()中断当前线程。
LockSupport()中的park()、unpark()的作用和Object中的wait()、notify()作用类似,是阻塞/唤醒。它们的用法不同,park(), unpark()是轻量级的,而wait(), notify()是必须先通过Synchronized获取同步锁。
注意:如果线程是阻塞在LockSupport.park()上,给线程中断后,不会抛出异常,线程唤醒继续执行,且线程中断标志为true。可以参考另篇关于中断的博客。

二,释放锁

1, unlock

public void unlock() {
    sync.release(1);
}

2, release

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
如果锁完全释放,唤醒后继线程后,后继线程在 acquireQueued自旋获得锁,同时把后继线程设置为head!

3, tryRelease

protected final boolean tryRelease(int releases) {
    // c是本次释放锁之后的状态
    int c = getState() - releases;
    // 如果“当前线程”不是“锁的持有者”,则抛出异常!
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();

    boolean free = false;
    
    // 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 设置当前线程的锁的状态。
    setState(c);
    return free;
}

4, unparkSuccessor

private void unparkSuccessor(Node node) {
    // 获取当前线程的状态
    int ws = node.waitStatus;
    // 如果状态<0,则设置状态=0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    //获取当前节点的“有效的后继节点(线程状态<=0)”,无效的话,则通过for循环进行获取。
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒“后继节点对应的线程”
    if (s != null)
        LockSupport.unpark(s.thread);
}

三,CLH队列建立、获取、释放锁

相关推荐

  1. 安装git

    2024-06-12 13:36:22       37 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-12 13:36:22       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-12 13:36:22       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-12 13:36:22       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-12 13:36:22       18 阅读

热门阅读

  1. iPadOS 18支持的设备列表

    2024-06-12 13:36:22       6 阅读
  2. Python基础学习笔记(十)——初探正则

    2024-06-12 13:36:22       6 阅读
  3. QT 中文乱码 以及 tr 的使用

    2024-06-12 13:36:22       5 阅读
  4. 【docker实战】如何登陆到自己的私有仓库?

    2024-06-12 13:36:22       9 阅读
  5. vue获取用户的mac地址

    2024-06-12 13:36:22       6 阅读
  6. oracle 查询分隔符分隔开的所有数据

    2024-06-12 13:36:22       4 阅读
  7. 了解 XML HttpRequest 及其在 Web 开发中的应用

    2024-06-12 13:36:22       10 阅读