【源码阅读系列】ReentrantLock&AQS底层原理

目录

原理概述

准备调试

涉及关键成员变量

线程A lock()方法

线程Block()方法

tryAcquire()

addWaiter(Node.EXCLUSIVE)方法

acquireQueued()方法

线程A unlock()方法

线程B 唤醒

总结


原理概述

ReentrantLock()的实现是通过使用 State + AQS + Node + SupportLock + 虚拟双向队列

state:表示现在锁的层数 AQS:抽象同步队列框架,是一个构建其他同步组件的基础框架,不用关注更多的线程调度细节 Node:将等待锁的线程封装成一个Node,然后放入队列中 SupportLock:Java中的一个工具类,提供了park,unpark方法 虚拟双向队列:将没有拿到线程锁并且封装成node 的节点放入虚拟的双向队列

准备调试

ReentrantLock的底层是通过AQS抽象同步队列进行实现的,有公平锁和非公平锁两种版本,这里我们测试非公平版本

这里模拟两个线程对同步资源进行加锁竞争的情况

public class AqsTest {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
​
        new Thread(() -> {
            try {
                lock.lock();
                System.out.println("线程A开始进行业务");
                Thread.sleep(1*1000);
                System.out.println("执行完毕,释放资源");
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }, "A").start();
​
        new Thread(() -> {
            try {
                System.out.println("线程B开始抢占");
                lock.lock();
                System.out.println("b - 执行完毕,释放资源");
            } finally {
                lock.unlock();
            }
        },"B").start();
    }
}

涉及关键成员变量

public abstract class AbstractQueuedSynchronizer{
    private transient volatile Node head;//头节点
    private transient volatile Node tail;//尾节点
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    static final int CANCELLED =  1;  // 设置线程节点的状态为取消获取锁的状态
    static final int SIGNAL    = -1; // 表示节点线程需要去unParking
    static final int CONDITION = -2; 
    static final int PROPAGATE = -3;
    private volatile int state;//资源State变量,默认值为0
    private static final Unsafe unsafe = Unsafe.getUnsafe();//直接操作内存的unsafe工具包
    static final class Node {
        volatile int waitStatus;//线程Node节点状态
        volatile Node prev;//前节点
        volatile Node next;//后节点
        volatile Thread thread;//Node节点的线程
    }
}

线程A lock()方法

在lock打上断点开始执行,进入到lock方法内部,然后选择NonfairSync的非公平锁的方法

这是lock的具体实现,使用compareAndSetState来修改status的值。这里我理解的 state 就是锁的层数,也是实现锁重入的关键。线程A首先进入,所以可以进行修改,将state设置成1。然后使用setExclusiveOwnerThread(Thread.currentThread())方法将锁的持有者设置成线程A

线程Block()方法

这时线程B开始执行然后尝试获取锁,但是此时state已经为1了,不满足0。

所以执行acquire(1)方法,这里一共有三个方法tryAcquire(),acquireQueued(), addWaiter()方法,我以此讲一下三个方法的作用。

tryAcquire()

顾名思义,这是尝试获取锁的方法。这里也使用了经典的Java设计模式——模板模式 可以去看看理解一下,这里如果不实现这个方法的话,直接调用会报错。

这里选择非公平锁的实现方法

进入到 final boolean nonfairTryAcquire(int acquires) 方法,这里我就不截图了,我可以写一下注释更方便了解每一条语句的作用。这里也是表现了为什么是可重入锁?

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 获取当前锁的层数
    int c = getState();
    if (c == 0) { // 如果为0 则表示可能执行期间另一个线程已经将锁释放了
        if (compareAndSetState(0, acquires)) { // 这里和上面一样 直接使用CAS操作,获取锁
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { // 判断当前锁的持有者是否是本线程
        int nextc = c + acquires; // 这里将锁的层数加一层 实现可重入锁
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);  // 设置state 为加锁以后的新值
        return true;
    }
    return false;
}

addWaiter(Node.EXCLUSIVE)方法

Node.EXCLUSIVE 表示当前锁的模式是独占模式,被线程A独占

private Node addWaiter(Node mode) {
    // 由当前线程创建一个新的Node节点 用于后面的等待队列
    Node node = new Node(Thread.currentThread(), mode);
    // 获取队尾的节点,但是这里还没节点在里面
    Node pred = tail;
    // 如果已经有尾节点的话,则使用CAS将当前线程节点添加到尾节点后面
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) { // 并且将当前线程变成尾节点 tail
            pred.next = node;
            return node;
        }
    }
    // 如果没有尾节点的话执行
    enq(node);
    return node;  // 返回线程B节点
}

enq()方法

private Node enq(final Node node) {
    for (;;) {
        // 判断还有没有尾节点
        Node t = tail;
        if (t == null) { // 还没有尾节点则创建一个空的尾节点——哨兵节点
            if (compareAndSetHead(new Node())) 
                tail = head;
        } else {  // 把当前线程的节点添加到哨兵节点后面,使用双向链表连接
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;  // 最后返回空的哨兵节点
            }
        }
    }
}

acquireQueued()方法

主要是将线程进行park()休眠

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋
        for (;;) {
            // 获取线程B节点的前置节点,空的哨兵节点
            final Node p = node.predecessor();
            // 因为一直占用 所以进不去
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //p是哨兵节点 node是node<线程B>
            //判断此时哨兵节点的waitStatus的值是不是不等于0
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire()方法

第一次执行

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // ws的默认值为0
        int ws = pred.waitStatus;
        //SIGNAL的默认值为-1
        if (ws == Node.SIGNAL) // 不成立
            return true;
        if (ws > 0) { // 不成立
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // CAS方法将ws设置为-1
        }
        return false;
    }

第二次执行

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // ws的值为-1
    int ws = pred.waitStatus;
    //SIGNAL的默认值为-1
    if (ws == Node.SIGNAL) // 成立
        return true;  // 返回true
    if (ws > 0) { 
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt()方法

使用LockSupport.park()方法对当前线程进行阻塞,然后等待被其他线程唤醒

LockSupport是concurrent包中一个工具类,不支持构造,提供了一堆static方法,比如park(),unpark()等。

线程A unlock()方法

线程的unlock方法调用sync的arelease方法来释放锁,并且唤醒阻塞的线程

release()方法

public final boolean release(int arg) {
    // 将当前锁的层数减一,实现解锁
    if (tryRelease(arg)) {
      // 获取头节点——哨兵节点
        Node h = head;
        // 如果头节点不为null,并且ws != 0,前面已经将ws设置为-1
        if (h != null && h.waitStatus != 0)
            // 将刚刚阻塞的线程进行唤醒
            unparkSuccessor(h);
        return true;
    }
    return false;
}

进入tryRelase()方法

这里也是使用了模板设计的设计模式,但是我们这里使用的ReentrantLock()类进行加锁,所以点开RL锁的实现方法

protected final boolean tryRelease(int releases) {
    // 当前的锁的层数减一,当前A线程的层数为1,所以现在c = 0
    int c = getState() - releases;
    // 判断当前的线程是否和锁占有线程是否是一个线程
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // c==0代表锁都已经全部都被释放
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 设置state的值,表示数据已经解锁
    setState(c);
    return free;
}

unparkSuccessor()方法

release()方法中最后一步就是唤醒阻塞的线程,但是如果这个时候来了新的线程来获取锁,则会直接被新来的线程拿到锁

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 此时哨兵节点的ws的值为-1
    if (ws < 0)
        // 使用CAS设置node 的ws值
        compareAndSetWaitStatus(node, ws, 0);
    // 获取哨兵节点的下一个节点也就是B线程节点
    Node s = node.next;
    // 这里判断哨兵节点后面是否还有节点,或者后面的节点的waitstatus状态,如果大于0 有一个
    if (s == null || s.waitStatus > 0) { // 不满足--****//
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 开始唤醒线程
        LockSupport.unpark(s.thread);
}

unpark()方法

Unsafe 是 Java 中的一个类,位于 sun.misc 包下,提供了一些底层的、不安全的操作,通常被用于 Java 核心库的实现,而不是普通应用程序。这个类并不是标准的 Java API 的一部分,因此它的使用不太推荐,也可能在未来版本中被移除。

Unsafe 类提供了直接内存访问、线程的创建和同步、对象的实例化等底层操作,但这些操作通常是不安全的,可能会导致不稳定的行为和安全性问题。因此,普通的 Java 应用程序在正常情况下不应该直接使用 Unsafe 类。

线程B 唤醒

线程B一直被阻塞在刚刚的parkAndChedkInterrupt()方法,现在被唤醒

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();  // 当前线程已经没有被阻塞了,所以返回的是false
}

Thread.interrupted() 方法是一个静态方法,用于检测当前线程是否被中断,并清除中断状态。

再次返回acquireQueued()方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { // 再次循环for进行抢占锁
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { // 这里进入tryAcquire()方法进行获取锁,这次能够获取成功
                setHead(node);
                p.next = null; // 设置哨兵节点的next为null,等待Jvm的垃圾回收
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

总结

当两个线程的执行是这样进行加锁和解锁,并且展示了为什么ReentrantLock()是可重入锁。

具体的实现总的来说实现步骤

  • ReentrantLock类的中的 state变量代表是否加锁,有几层锁

  • 使用 AQS 框架进行加锁,解锁

  • 将发生竞争的线程封装成Node节点,节点中的成员变量 waitStatus 来表示当前线程的状态(是否在竞争,还是取消竞争)

  • 再使用 SupportLock 静态工具类来将线程阻塞在同步管理的队列中,这个队列中是使用了FIFO(先进先出)的方式进行管理,所以被唤醒的一直最前面的线程,也是最有机会抢占锁的线程

相关推荐

  1. FutureTask阅读

    2024-01-28 22:38:01       40 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-28 22:38:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-28 22:38:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-28 22:38:01       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-28 22:38:01       20 阅读

热门阅读

  1. uniapp微信小程序-前端设计模式学习(中)

    2024-01-28 22:38:01       31 阅读
  2. 南门的树(数组)★★★ c++版本 10分

    2024-01-28 22:38:01       38 阅读
  3. git常见命令

    2024-01-28 22:38:01       29 阅读
  4. 计算机网络(第六版)复习提纲13

    2024-01-28 22:38:01       28 阅读
  5. 在排序数组中查找元素的第一个和最后一个位置

    2024-01-28 22:38:01       31 阅读
  6. 【MyBatis框架】第六章 PageHelper

    2024-01-28 22:38:01       33 阅读