AQS解密:深入理解 CountDownLatch, Semaphore 和 CyclicBarrier

1. AQS(AbstractQueuedSynchronizer)框架简介

AQS是java.util.concurrent.locks包中的一个抽象类,是实现同步器(如锁和其他多线程同步工具)的一个框架。Doug Lea设计了这个框架,旨在让开发者通过使用其提供的方法来构建线程之间的同步器,而不是直接处理线程本身。

1.1 AQS概念与重要性

AQS利用一个int成员变量表示同步状态,并通过一个FIFO线程等待队列来管理多个线程之间的协调工作,这些线程可能会竞争对同步状态的访问。AQS定义了一系列保护共享资源访问的方法,可以实现独占访问(Exclusive)和共享访问(Shared)两种模式。
AQS非常重要,因为它简化了并发编程的难度,使得开发者能够通过使用简单的回调方法(如tryAcquire、tryRelease等)来实现复杂的并发控制逻辑。许多并发工具类,如ReentrantLock, Semaphores, CountDownLatches和CyclicBarriers等,都是基于AQS构建的。

1.2 AQS在并发控制中的作用

AQS提供内部支持,用于实现大范围的同步器。而同步器的功能归结于:在多线程访问某个资源时,能够确保任一时刻,资源被单线程访问(排他性),或者被多线程按照特定的方式访问(如某些线程获取资源后必须释放才能让其他线程继续获取)。AQS通过其状态变量和等待队列,管理和调度线程,从而完成这一任务。
接下来,我们将讨论由AQS派生的几个主要同步器,并通过代码示例详细说明它们的使用和工作机制。

2. CountDownLatch:并发协调工具

2.1 CountDownLatch简介

CountDownLatch是一种同步工具类,它允许一个或多个线程等待其他线程完成一组操作之后再继续执行。CountDownLatch通过一个计数器实现,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减一。当计数器值降至零时,意味着所有的线程都完成了任务,等待的线程就可以恢复执行任务。

2.2 CountDownLatch的工作机制

CountDownLatch的工作机制非常简单。创建时需指定一个整数,在CountDownLatch上调用await()方法的线程会被阻塞,直到其他线程调用countDown()方法足够次数,计数器减至零,所有等待的线程才会被释放,并继续执行。

2.3 使用场景与案例

CountDownLatch的一个常见使用场景是控制并发测试,在多个线程准备好之前阻塞测试的主线程,直至所有线程都报告说它们准备完毕,主线程才继续执行。
另一个使用场景是等待服务的一部分初始化。例如,应用程序的启动过程可能会发起多个服务,而主服务需要等待所有其他服务都完成初始化之后才能启动。

2.4 代码示例与解析

import java.util.concurrent.CountDownLatch;

public class ServiceStarter {
    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(3);
        
        // 启动第一个服务
        new Thread(() -> {
            System.out.println("Starting service 1");
            // 模拟服务启动耗时
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Service 1 started");
            latch.countDown();
        }).start();

        // 启动第二个服务
        new Thread(() -> {
            System.out.println("Starting service 2");
            // 模拟服务启动耗时
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Service 2 started");
            latch.countDown();
        }).start();

        // 启动第三个服务
        new Thread(() -> {
            System.out.println("Starting service 3");
            // 模拟服务启动耗时
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Service 3 started");
            latch.countDown();
        }).start();

        // 主线程等待其他服务启动
        System.out.println("Main thread waiting for services to start.");
        latch.await();
        System.out.println("All services started. Main thread continuing execution.");
    }
}

在这个例子中,主线程创建了一个初始计数为3的CountDownLatch,并启动了三个服务线程。每个服务在启动过程结束后调用countDown(),主线程在调用await()后被阻塞,直到三个服务都已启动。

3. Semaphore:信号量控制

3.1 Semaphore简介

Semaphore,也称为信号量,是用于控制同时访问某个特定资源的操作数量的同步工具,它管理一系列许可(permits)。Semaphore通常用于资源池,如数据库连接池,其中有限数量的资源需要被多个线程共享。
Semaphore的核心方法包括acquire()用于获取许可,如果无可用许可则阻塞线程,以及release()用于释放许可,增加可用许可数量。

3.2 Semaphore的工作原理

Semaphore内部维护了一定数量的许可。线程通过acquire()方法请求许可,如果Semaphore内的许可数大于0,则允许该线程执行且内部可用许可数减一;如果许可数为0,则阻塞该线程直至有许可可用。当线程使用完资源后,通过调用release()来归还许可,许可数增一。

3.3 使用场景与案例

Semaphore主要用于限流,如网络连接数、数据库连接数的限制等。例如,在数据库连接池中可能会使用Semaphore来限制池中的连接数。
另一个使用场景是实现某些资源的公平访问。在多线程环境中,线程可能需要以有序的方式访问资源,利用Semaphore可以保证没有单一线程会连续无限制地占有资源。

3.4 代码示例与解析

import java.util.concurrent.Semaphore;

public class ResourceAccessControl {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(10); // 假设只允许10个线程同时访问资源

        for (int i = 0; i < 100; i++) {
            final int threadNumber = i;
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 请求资源
                    System.out.println("Thread " + threadNumber + " is accessing the resource.");
                    // 模拟资源访问时长
                    Thread.sleep((long)(Math.random() * 1000));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release(); // 释放资源
                }
            }).start();
        }
    }
}

在这个例子中,我们创建了一个带有10个许可的Semaphore,然后启动了100个线程。这些线程在执行资源访问之前尝试获取许可,如果获取成功,则模拟资源访问完成后释放许可,如果许可不足则线程会等待,直到有许可变为可用。

4. CyclicBarrier:循环屏障

4.1 CyclicBarrier简介

CyclicBarrier,字面意思为循环屏障,是一种同步工具,它允许一组线程相互等待,达到一个公共屏障点(Common Barrier Point)后再继续执行。CyclicBarrier在完成一组线程间的相互等待后,可以重置再次使用,这是其“循环”(Cyclic)的由来。

4.2 CyclicBarrier的工作方式

CyclicBarrier的构造函数接受一个整型数值,指定等待的线程数量,当指定数量的线程都执行了await()方法后,这些线程会被释放并允许继续执行。CyclicBarrier也支持一个可选的Runnable命令,在屏障被触发时执行,常用于屏障点后的一些集体操作。

4.3 CyclicBarrier与CountDownLatch的对比

CyclicBarrier与CountDownLatch都用于线程间的协调,但主要区别在于CyclicBarrier可重用,而CountDownLatch不能。CountDownLatch是一次性的,计数值达到零便无法再次使用,而CyclicBarrier在等待的线程释放后可以重置计数,以便下一轮的使用。

4.4 使用场景与案例

CyclicBarrier适用于这样的场景:多个线程必须同时到达预设屏障点才能继续执行,例如,多个玩家必须都准备好后才能开始游戏。
它也常用于多阶段计算的场景中,计算过程被分为多个步骤,而每个步骤的执行依赖于前一个步骤的完成,即分阶段的并行计算。

4.5 代码示例与解析

import java.util.concurrent.CyclicBarrier;

public class ParallelTask {
    // 创建一个新的CyclicBarrier,当4个参与者线程到达时将触发Runnable
    private static final CyclicBarrier barrier = new CyclicBarrier(4, () ->
        System.out.println("All threads reached the barrier point, let's continue!")
    );

    public static void main(String[] args) {
        for (int i = 0; i < 4; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " reached the barrier.");
                    barrier.await(); // 等待其他线程
                    // 以下代码将在所有线程都到达屏障后执行
                    System.out.println(Thread.currentThread().getName() + " crossed the barrier.");
                } catch (Exception e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

在这个例子中,我们设置了一个CyclicBarrier,等待四个线程。每个线程在执行到barrier.await()时将会等待,直到所有四个线程都到达了屏障点。达到屏障点之后,首先执行Barrier的Runnable命令,然后所有线程将同时继续执行。

相关推荐

  1. 解密Python中的“==”“is”:深入理解对等比较

    2024-05-04 05:42:05       16 阅读
  2. 深入理解Rust语句表达式

    2024-05-04 05:42:05       36 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-05-04 05:42:05       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-05-04 05:42:05       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-05-04 05:42:05       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-05-04 05:42:05       18 阅读

热门阅读

  1. 如何看待AIGC技术?【模板】

    2024-05-04 05:42:05       9 阅读
  2. python和R对比记忆

    2024-05-04 05:42:05       8 阅读
  3. Vue 2 组件创建全指南:一步一步学习

    2024-05-04 05:42:05       12 阅读
  4. NLP自然语言处理和应用场景介绍

    2024-05-04 05:42:05       7 阅读
  5. react 列表渲染 key解析和 vue的key解析的底层逻辑

    2024-05-04 05:42:05       14 阅读
  6. C++11 设计模式6. 建造者模式,也叫做生成器模式

    2024-05-04 05:42:05       11 阅读
  7. Python基础学习之数据结构

    2024-05-04 05:42:05       10 阅读
  8. 指针(1)

    指针(1)

    2024-05-04 05:42:05      8 阅读
  9. Mybatis扩展

    2024-05-04 05:42:05       9 阅读
  10. 彻底理解-进程的 概念、 组成、特征

    2024-05-04 05:42:05       12 阅读