科普文:图解并发下的AQS原理及其实现的锁

简介

AQS(抽象队列同步器)是一个用来构建锁和同步器的框架,它维护了一个volatile修饰的state(共享资源)和一个CLH(FIFO:先进先出)双向队列。

可参考:科普文:深入理解并发下的AQS机制-CSDN博客

AQS支持两种资源共享方式:

  • 独占式:同一时间只有一个线程可以获取到资源(如:ReentrantLock)。

  • 共享式:同一时间可以有多个线程获取到资源(如:CountDownLatch、Semaphore等)。

核心思想

AQS的核心思想:

  • 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。

  • 如果被请求的共享资源被占用,则通过一套线程阻塞-唤醒机制,将暂时获取不到锁的线程加入到CLH队列中,等待被持有锁的线程在释放锁后唤醒。

整体流程

AQS整体流程,如图所示:

处理流程:

  • 1)多个线程竞争锁时,如果有线程获取到锁,则将exclusiveOwnerThread(即:占用锁的线程)设置为获取到锁的线程。

  • 2)将其他没有获取到锁的线程封装成Node节点追加到CLH队列(虚拟双向队列)的末尾,等待被唤醒。

  • 3)持有锁的线程调用await()方法释放锁,并将线程封装成Node节点追加到条件队列(单向链表)的末尾。

  • 4)持有锁的线程调用signal()方法唤醒条件队列的头节点,并该头节点转移到CLH队列的末尾,等待被唤醒。

  • 5)持有锁的线程释放锁,唤醒其后继节点获取到锁。

其中,有一个非常巧妙的设计:

  • 同步队列被设计成一个双向队列(CLH队列),获取锁时,从头到尾遍历CLH队列;释放锁时,从尾到头遍历CLH队列。

  • 条件队列被设计成一个单项链表,唤醒条件队列中的节点时,从头到尾遍历条件队列。

实现原理

数据结构

AbstractQueuedSynchronizer中定义的重要属性及数据结构:

// AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer并实现了Serializable接口
//    AbstractOwnableSynchronizer:记录独占模式下获得锁的线程
//    Serializable:序列化
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    
// 同步状态,0-表示没有线程获得锁,如果线程获得锁,则将state值+1;如果线程释放锁,则将state值-1    

    private volatile int state;    // CLH队列的头节点
    private transient volatile Node head;    // CLH队列的尾节点
    private transient volatile Node tail;    // 同步队列(CLH)和条件队列中的Node节点,用于封装线程 
   static final class Node {        // 共享模式标记   
     static final Node SHARED = new Node();        // 独占模式标记     
   static final Node EXCLUSIVE = null;        // 节点状态:已取消。当一个线程等待获取锁的过程中被中断或超时时,节点的状态可能被设置为CANCELLED
        static final int CANCELLED =  1;        // 节点状态:待唤醒。当一个节点释放锁时,会唤醒该节点的后继节点,SIGNAL就用于表示这个需要唤醒的状态 
       static final int SIGNAL    = -1;        // 节点状态:节点处于条件队列中。当一个线程在等待条件变量时,会被放入条件队列,节点的状态被设置为CONDITION
        static final int CONDITION = -2;        // 节点状态:向后传播(用于共享模式),在共享模式下,可能需要通过PROPAGATE来通知其他线程继续获取共享资源
        static final int PROPAGATE = -3;        // 节点状态(初始状态为0)
        volatile int waitStatus;        // 同步队列中节点的前驱节点   
     volatile Node prev;        // 同步队列中节点的后继节点
        volatile Node next;        // 节点对应的线程 
       volatile Thread thread;        // 条件队列中节点的后继节点 
       Node nextWaiter;    }    // 条件队列 
   public class ConditionObject implements Condition, java.io.Serializable { 
       // 条件队列的头节点
        private transient Node firstWaiter;        // 条件队列的尾节点 
       private transient Node lastWaiter;  
  }
}

state

state是volatile修饰的int类型的变量,用于表示当前同步状态(即:共享资源占用状态),初始值为0,如果线程获得锁,则将state值+1;如果线程释放锁,则将state值-1。

state有三种访问方式:

  • getState():获取同步状态。

  • setState():设置同步状态。

  • compareAndSetState():通过CAS方式设置同步状态。

Node节点

Node节点是AbstractQueuedSynchronizer类的静态内部类,主要作用是将暂时获取不到锁的封装成Node节点加入到同步队列(CLH)或条件队列中。

Node节点主要属性:

  • SHARED:共享模式标记。

  • EXCLUSIVE:独占模式标记。

  • thread:节点对应的线程。

  • waitStatus:节点状态。

  • prev:同步队列中节点的前驱节点。

  • next:同步队列中节点的后继节点。

  • nextWaiter:条件队列中节点的后继节点。

其中:

  • prev和next作用于同步队列。

  • nextWaiter作用于条件队列。

Node节点状态:

  • 0:初始状态,在等待队列中的节点,如果还没有进入到同步队列中等待获取锁,其状态为初始状态。

  • CANCELLED(1):已取消状态,当一个线程等待获取锁的过程中被中断或超时时,节点的状态可能被设置为CANCELLED。

  • SIGNAL(-1):待唤醒状态,当一个节点释放锁时,会唤醒该节点的后继节点,SIGNAL就用于表示这个需要唤醒的状态

  • CONDITION(-2):节点处于条件队列中,当一个线程在等待条件变量时,会被放入条件队列,节点的状态被设置为CONDITION。

  • PROPAGATE(-3):向后传播(用于共享模式),在共享模式下,可能需要通过PROPAGATE来通知其他线程继续获取共享资源。

CLH队列

CLH队列是一个基于先进先出(FIFO)原则的虚拟双向队列,用于存储因暂时获取不到锁而进入阻塞的线程。

CLH队列存储结构,如图所示:

其中:

  • head节点为哑节点(即:head节点中的thread属性值为null)。可以将head理解为当前持有锁的线程对应的节点。

  • Node节点中的nextWaiter属性在CLH队列中无效。

条件队列

条件队列是一个单链表结构,用于存储暂时不满足条件的线程。

条件队列存储结构,如图所示:

其中:

  • Node节点中的prev、next属性在条件队列中无效。

核心方法

AQS支持独占和共享两种资源共享模式。其核心方法如下:

// 获取独占锁
public final void acquire(int arg) {}// 获取可中断的独占锁
public final void acquireInterruptibly(int arg) throws InterruptedException {}// 获取可超时的独占锁
public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {}// 释放独占锁
public final boolean release(int arg) {}// 获取共享锁
public final void acquireShared(int arg) {}// 获取可中断的共享锁
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {}// 获取可超时的共享锁
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {}// 释放共享锁
public final boolean releaseShared(int arg) {}// 尝试获取锁(由继承AbstractQueuedSynchronizer类的子类实现)
protected boolean tryAcquire(int arg) {    throw new UnsupportedOperationException();}// 尝试释放锁(由继承AbstractQueuedSynchronizer类的子类实现)
protected boolean tryRelease(int arg) {    throw new UnsupportedOperationException();}// 判断当前线程是否正在独享资源(由继承AbstractQueuedSynchronizer类的子类实现)
protected boolean isHeldExclusively() {    throw new UnsupportedOperationException();}

AQS采用了模版设计模式,AbstractQueuedSynchronizer类中定义了获取锁、释放锁的抽象方法,具体实现由继承AbstractQueuedSynchronizer类的子类实现。

其中:

  • tryAcquire():尝试获取锁的抽象方法,由继承AbstractQueuedSynchronizer类的子类实现。

  • tryRelease():尝试释放锁的抽象方法,由继承AbstractQueuedSynchronizer类的子类实现。

  • isHeldExclusively():判断当前线程是否正在独享资源(true-独享;false-非独享),只有用到condition才需要实现该方法,由继承AbstractQueuedSynchronizer类的子类实现。

独占模式

独占模式主要包含获取独占锁和释放独占锁。

获取独占锁

实现原理

获取独占锁执行流程,如图所示:

处理流程:

  • 1)调用AbstractQueuedSynchronizer#tryAcquire方法尝试获取锁(由继承AbstractQueuedSynchronizer类的子类实现,如:ReentrantLock):

  • 如果获取锁成功,则可以在子类中进行自定义处理(如:将state值+1,并设置exclusiveOwnerThread的值为当前线程)。

  • 如果获取锁失败,则判断CLH队列中的尾节点是否为空(即:CLH队列是否已初始化):

  • 不为空,则通过CAS方式将Node节点追加到CLH队列的末尾,判断Node节点追加到CLH队列的末尾是否成功:

    • 追加失败,则通过自旋方式将Node节点追加到CLH队列的末尾(保证追加成功)。

  • 为空,则创建尾节点(即:初始化CLH队列),并通过自旋方式将Node节点追加到CLH队列的末尾(保证追加成功)

  • 2)将Node节点成功追加到CLH队列后,判断该Node节点的前驱节点是否为头节点:

    • 如果前驱节点是头节点,则再次尝试获取独占锁,获取独占锁成功,则将当前Node节点设置成头节点,同时可以在子类中进行自定义处理(如:将state值+1,并设置exclusiveOwnerThread的值为当前线程)。

    • 如果前驱节点不是头节点或者再次获取锁失败,则逆序遍历CLH队列,找到可以唤醒自己的节点,最后将该自己挂起。

源码解析

获取独占锁通过AbstractQueuedSynchronizer#acquire方法实现。

源码分析:

// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire// 获取独占锁
public final void acquire(int arg) {   
 // tryAcquire:尝试获取锁(由继承AbstractQueuedSynchronizer类的子类实现,如:ReentrantLock)
    // acquireQueued:如果获取锁失败,则将当前线程封装成Node节点(独占模式)追加到CLH队列的末尾    
// addWaiter:将封装当前线程的Node节点追加到CLH队列的末尾(保证追加成功),其中Node.EXCLUSIVE表示独占模式
    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        // 线程中断 
       selfInterrupt();}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter// 将封装当前线程的Node节点追加到CLH队列的末尾(保证追加成功)
private Node addWaiter(Node mode) {    // 将当前线程封装成Node节点,并设置资源共享模式 
   Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) { 
       node.prev = pred;        // 通过CAS方式将Node节点追加到CLH队列的末尾(多线程竞争不激烈的情况)  
      if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        } 
   }
    // 通过自旋(死循环)方式将Node节点追加到CLH队列的末尾(多线程竞争激烈的情况)
    enq(node);
    return node;}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#enq// 通过自旋(死循环)方式将Node节点追加到CLH队列的末尾
private Node enq(final Node node) {    // 通过自旋(死循环)方式将Node节点追加到CLH队列的末尾(保证追加成功)
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize 
           if (compareAndSetHead(new Node())) 
               tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
// acquireQueued方法主要做以下事情:
//   1)将Node节点追加到CLH队列的末尾后,再次判断该Node节点的前驱节点是否为头节点。
//   2)如果前驱节点是头节点,说明该Node节点是第一个加入同步队列的节点,则再次尝试获取锁。
//   3)如果获取锁成功,则将自己设置成头节点。
//   4)如果前驱节点不是头节点或者再次获取锁失败,则逆序遍历CLH队列,找到可以唤醒自己的节点,最后将自己挂起。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取Node节点的前驱节点
            final Node p = node.predecessor();            // 如果前驱节点是头节点,则再次尝试获取锁。
            if (p == head && tryAcquire(arg)) {                // 获取锁成功,将自己设置成头节点。
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // shouldParkAfterFailedAcquire:如果前驱节点不是头节点或者获取锁失败,则逆序遍历CLH队列,找到可以唤醒自己的节点 
           // parkAndCheckInterrupt:将自己挂起
            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt()) 
               interrupted = true;
        }
    } finally {
        if (failed) 
           cancelAcquire(node); 
   }}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
// 逆序遍历CLH队列,找到可以唤醒自己的节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;    // 如果前驱节点的状态为SIGNAL,则表示找到可以唤醒自己的节点
    if (ws == Node.SIGNAL) 
       return true;    // 如果前驱节点的状态为CANCELLED(1),则继续向前遍历CLH队列
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev; 
       } while (pred.waitStatus > 0);
        pred.next = node; 
   } else {
        // 找到状态不为CANCELLED的前驱节点,将该节点的状态设置为SIGNAL   
     compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    } 
   return false;
}

释放独占锁

实现原理

释放独占锁执行流程,如图所示:

处理流程:

  • 调用AbstractQueuedSynchronizer#tryRelease方法尝试释放锁(由继承AbstractQueuedSynchronizer类的子类实现,如:ReentrantLock):

  • 如果释放锁成功,则判断是否存在后继节点:

  • 如果存在后继节点,则重置头节点,并判断后继节点状态是否为null或已取消:

    • 如果后继节点不为null且状态不是已取消,则唤醒该后继节点,并返回释放锁成功。

    • 如果后继节点为null或状态为已取消,则逆序遍历CLH队列,找到一个有效状态的节点,并唤醒找到的有效节点,最后返回释放锁成功。

  • 如果不存在后继节点,则直接返回释放锁成功。

  • 如果释放锁失败,则直接返回释放锁失败。

源码解析

释放独占锁通过AbstractQueuedSynchronizer#release方法实现。

源码分析:

// java.util.concurrent.locks.AbstractQueuedSynchronizer#release
// 释放独占锁
public final boolean release(int arg) {
    // tryRelease:尝试释放锁(由继承AbstractQueuedSynchronizer类的子类实现,如:ReentrantLock)
    if (tryRelease(arg)) {
        Node h = head;        // 如果CLH队列中存在后继节点,则唤醒其后继节点
        if (h != null && h.waitStatus != 0)            // 唤醒其后继节点   
         unparkSuccessor(h);
        return true;
    } 
   return false;}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
// 唤醒后继节点
private void unparkSuccessor(Node node) { 
   int ws = node.waitStatus;    // 如果头节点状态不是CANCELLED状态,则重置头节点状态为初始状态    
if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0); 
   Node s = node.next;    // 如果后继节点为null或者状态为CANCELLED状态,则逆序遍历CLH队列,找到一个有效状态的节点
    if (s == null || s.waitStatus > 0) {
        s = null;        // 逆序遍历CLH队列,找到一个有效状态的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0) 
               s = t; 
   } 
   if (s != null)        // 唤醒找到的有效节点 
       LockSupport.unpark(s.thread);}

共享模式

独占模式主要包含获取共享锁和释放共享锁。

获取共享锁

实现原理

获取共享锁的实现原理与获取独占锁基本一致,不再重复说明。

源码解析

获取共享锁通过AbstractQueuedSynchronizer#acquireShared方法实现。

源码分析:

// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared
// 获取共享锁
public final void acquireShared(int arg) {
    // 尝试获取共享锁
    if (tryAcquireShared(arg) < 0)
        // 如果获取锁失败,则将当前线程封装成Node节点(共享模式)追加到CLH队列的末尾        
doAcquireShared(arg);}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared
// 将当前线程封装成Node节点(共享模式)追加到CLH队列的末尾
// 其实现原理与独占锁基本一致
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 设置当前节点尾head节点,如果存在剩余资源,则唤醒下一个相邻的后继节点(即:向后传播)
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted) 
                       selfInterrupt(); 
                   failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt()) 
               interrupted = true;
        }
    } finally { 
       if (failed) 
           cancelAcquire(node);
    }}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);    // 如果存在剩余资源,则唤醒下一个相邻的后继节点(即:向后传播)
    if (propagate > 0 || h == null || h.waitStatus < 0 ||        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared(); 
   }}

其中,AbstractQueuedSynchronizer#tryAcquireShared方法由继承AbstractQueuedSynchronizer类的子类实现(如:CountDownLatch、Semaphore等),其返回值:

  • 返回正数:表示获取共享锁成功,正数代码剩余资源数。

  • 返回0:表示获取共享锁成功,没有剩余资源。

  • 返回负数:表示获取共享锁失败。

获取共享锁的执行流程与获取独占锁基本一致,主要区别是当前线程获取到共享锁后,如果存在剩余资源,则会向后传播唤醒下一个相邻的后继节点。注意:如果相邻的后继节点所需要的资源数大于剩余资源数,即使剩余资源数满足其他后继节点所需要的资源数,也不会唤醒其他后继节点。

释放共享锁

实现原理

释放共享锁的实现原理与释放独占锁基本一致,不再重复说明。

源码解析

释放共享锁通过AbstractQueuedSynchronizer#releaseShared方法实现。

源码分析:

// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
// 释放共享锁
public final boolean releaseShared(int arg) {
    // 尝试释放共享锁(由继承AbstractQueuedSynchronizer类的子类实现)
    if (tryReleaseShared(arg)) {
        // 唤醒后继节点 
       doReleaseShared();
        return true;
    }
    return false;}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared
// 唤醒后继节点
private void doReleaseShared() {
    // 自旋
    for (;;) {
        // 复制头节点快照
        Node h = head;
        // 头节点不为空且头节点不是尾节点,说明存在等待唤醒的后继节点
        if (h != null && h != tail) {
            // 获取头节点状态
            int ws = h.waitStatus;
            // 如果头节点状态为SIGNAL(即:需要被唤醒的节点)
            if (ws == Node.SIGNAL) { 
               /**
                 * unparkSuccessor()方法中,当节点状态小于0时,会重置节点状态(即waitStatus=0)
                 * 如果更新通过CAS更新state失败,则重试
                 * 因为释放共享锁存在两个入口setHeadAndPropagate和releaseShared,避免两次unpark
                 */
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                 /**
                 * 头节点状态为SIGNAL,将状态设置为0后,唤醒后继节点,
                 * 此时头节点和其后继节点都被唤醒,头节点的后继节点会与头节点竞争锁, 
                * 如果头节点的后继节点获取竞争到锁,会将head设置为当前头节点的后继节点(即:当前head发生变化),
                 * 当前head发生变化后会继续循环唤醒其后继节点(即:该方法最底下那行代码) 
                */ 
               unparkSuccessor(h);
            } 
           /**
             * 如果头节点处于初始状态,则需要将节点状态设置为PROPAGATE,表示向后传播 
            * 如果设置PROPAGATE状态失败,则重试
             */
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  
              continue;
         }
        // 头节点未发生变更,则退出循环
        if (h == head) 
            break;
    }}

释放共享锁的执行流程与释放独占锁基本一致,主要区别:

  • 释放独占锁时,只有当前线程释放锁全部资源(即:state=0)后才会去唤醒后继节点。

  • 释放共享锁时,当前线程释放部分资源后就可以唤醒后继节点。示例:资源总量为16,线程A(5)和线程B(8)分别获取到资源后并发运行,线程C(7)获取共享锁时只剩下3个资源,需要等待持有资源的线程释放资源。线程A执行过程中释放2个资源,唤醒线程C,此时可用资源数为3+2=5,不满足线程C所需要的资源数(7),线程C继续等待,线程B执行过程中释放3个资源,唤醒线程C,此时可用资源数为3+2+3=8,满足线程C所需要的资源数,则线程C会有线程A和线程B一起运行。

等待-唤醒机制

await等待

实现原理

await等待执行流程,如图所示:

处理流程:

  • 1)持有锁的线程调用AbstractQueuedSynchronizer.ConditionObject#await方法,将当前线程封装成Node节点追加到条件队列的末尾。

  • 2)释放锁。

  • 3)判断节点是否处于CLH队列中,如果节点不在CLH队列中,则将自己挂起。

  • 4)如果节点已经转移到CLH队列,则尝试获取锁。

  • 5)清除条件队列中状态为已取消的节点。

源码解析

await等待通过AbstractQueuedSynchronizer.ConditionObject#await方法实现。

源码分析:

// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
// await等待
public final void await() throws InterruptedException {
    // 线程中断,则抛出InterruptedException异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程封装成Node节点追加到条件队列的末尾
    Node node = addConditionWaiter();
    // 释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // Node节点是否处于CLH队列中(可能刚加入条件队列,就被转移到了CLH队列中)
    while (!isOnSyncQueue(node)) {
        // 将自己挂起(即:使线程进入等待状态)
        LockSupport.park(this); 
       if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 如果节点已经转移到CLH队列,则尝试获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 
       interruptMode = REINTERRUPT; 
   if (node.nextWaiter != null)
 // clean up if cancelled
        // 清除条件队列中状态为已取消的节点
        unlinkCancelledWaiters();
    if (interruptMode != 0) 
       reportInterruptAfterWait(interruptMode);}

//java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter
// 将当前线程封装成Node节点追加到条件队列的末尾
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果条件队列的尾节点状态不是CONDITION,则将该节点从条件队列中清除
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 从条件队列中清除状态不是CONDITION的节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 将当前线程封装成Node节点,并设置节点状态为CONDITION,表示节点处于条件队列中
    Node node = new Node(Thread.currentThread(), Node.CONDITION); 
   // 将Node节点追加到条件队列的末尾
    if (t == null)
        firstWaiter = node; 
   else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;}

signal唤醒

实现原理

signal唤醒执行流程,如图所示:

处理流程:

  • 1)调用AbstractQueuedSynchronizer.ConditionObject#signal方法,判断是否为当前线程持有锁:

    • 不是当前线程持有锁,则抛出InterruptedException异常。

    • 是当前线程持有锁,则从头开始,从条件队列中找到有效的节点,并从条件队列中移除头节点。

  • 2)重置节点状态。

  • 3)通过自旋的方式将节点追加到CLH队列的末尾。

  • 4)将该节点的前驱节点状态设置为SIGNAL。

  • 5)将自己挂起。

源码解析

signal唤醒通过AbstractQueuedSynchronizer.ConditionObject#signal或AbstractQueuedSynchronizer.ConditionObject#signalAll方法实现。

以AbstractQueuedSynchronizer.ConditionObject#signal为例,源码分析:

// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
// signal唤醒
public final void signal() {
    // 如果当前线程不是持有锁的线程,则抛出IllegalMonitorStateException异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter; 
   if (first != null)
        // 唤醒节点
        doSignal(first);}

// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal
// 唤醒节点
private void doSignal(Node first) {
    do {
        // 从条件队列中移除头节点
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    // 转移节点(死循环,保证转移一个节点到CLH队列)
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal
// 转移节点
final boolean transferForSignal(Node node) {
    // 通过CAS将节点状态从CONDITION改成0 
   if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false; 
           // 通过自旋的方式将节点追加到CLH队列的末尾
    Node p = enq(node);
    int ws = p.waitStatus;
    // 将该节点的前驱节点状态设置为SIGNAL
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))  
      LockSupport.unpark(node.thread); 
   return true;}

相关推荐

  1. 并发

    2024-07-16 06:28:05       38 阅读
  2. 关于ListView使用及其实现原理

    2024-07-16 06:28:05       28 阅读
  3. JPA乐观实现并发执行SQL案例

    2024-07-16 06:28:05       46 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-16 06:28:05       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-16 06:28:05       72 阅读
  3. 在Django里面运行非项目文件

    2024-07-16 06:28:05       58 阅读
  4. Python语言-面向对象

    2024-07-16 06:28:05       69 阅读

热门阅读

  1. [计网初识2]web的3个核心标准html,url,http

    2024-07-16 06:28:05       27 阅读
  2. 微服务边界守卫:Eureka中服务隔离策略的实现

    2024-07-16 06:28:05       21 阅读
  3. 仓工具—Hive语法之宏(Macro)

    2024-07-16 06:28:05       22 阅读
  4. 016.自定义指纹chromium-随机tls指纹(ja4指纹)

    2024-07-16 06:28:05       24 阅读
  5. PHP基础语法

    2024-07-16 06:28:05       21 阅读
  6. 向量数据量milvus k8s helm 对接外部安装部署流程

    2024-07-16 06:28:05       16 阅读
  7. ChatGPT对话:有关花卉数据集

    2024-07-16 06:28:05       21 阅读
  8. lvs集群

    lvs集群

    2024-07-16 06:28:05      25 阅读