[数据结构]——阻塞队列 多线程

首先声明:

        如果你听过阻塞队列,你可能是个小白,但是如果你学会了阻塞队列,你已经是一名一流的Java工程师。

        Java有提供阻塞队列API,本文基于环形数组队列实现。  

  • 先看下某些知识的专业术语,你知道哪些❓

📕 原子变量(AutomicInteger)、可重入锁、双锁、死锁、级联通知思想、线程状态

目录

一、阻塞队列的实现

1. 何时使用

2. 代码实现(终极版)

 

二、代码解析

1. 双锁的获取

 2. 原子变量

3. 唤醒poll线程

三、Java中阻塞队列API


一、阻塞队列的实现

1. 何时使用

        阻塞队列就是向队列添加或者删除元素时,如果为空或者已满,将该线程置为等待状态,等待被唤醒。

⭐插播知识:线程的7中状态(GPT回答)

  1. 新建状态(New):当线程对象被创建但尚未启动,即线程调用start()方法之前的状态。
  2. 就绪状态(Runnable):线程对象创建后,其他线程调用了该对象的start()方法,线程就进入就绪状态。就绪状态的线程位于“可运行线程池”中,只等待获取CPU的使用权。
  3. 运行状态(Running):当就绪状态的线程获取了CPU,开始执行程序代码时,就进入了运行状态。
  4. 阻塞状态(Blocked):线程因为某种原因(如等待获取锁或执行了sleep()、join()方法,或发出了I/O请求)放弃CPU使用权,暂时停止运行。阻塞状态可以分为等待阻塞、同步阻塞和其他阻塞。
  5. 等待状态(Waiting):线程在等待另一个线程执行特定动作(如通知或中断)。此状态下,线程不会被分配CPU时间片,直到被其他线程显式唤醒
  6. 超时等待状态(Timed_Waiting):线程等待另一个线程的通知或中断,但有一个明确的等待时间。到达时间后,线程会自动从该状态唤醒。
  7. 死亡状态(Terminated):线程执行完毕或因异常退出run()方法后,进入死亡状态。一旦线程终止,就不能复生。

2. 代码实现(终极版)

双锁 + 级联通知

package Demo3;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BlockingQueue2<E> implements BlockQueue<E> {

    private final E[] arr;
    private AtomicInteger size = new AtomicInteger();
    private int head = 0;
    private int tail = 0;

    public BlockingQueue2(int capacity) {
        arr = (E[]) new Object[capacity];
    }

    private ReentrantLock offerLock = new ReentrantLock(); // 新增元素锁
    private ReentrantLock pollLock = new ReentrantLock(); // 删除元素锁
    private Condition headWaits = pollLock.newCondition(); // 线程等待
    private Condition tailWaits = offerLock.newCondition();


    public boolean isEmpty() {
        return size.get() == 0;
    }

    public boolean isFull() {
        return size.get() == arr.length;
    }

    @Override
    public void offer(E e) throws InterruptedException {
        // 加锁
        offerLock.lockInterruptibly();
        int c;
        try {
            // 判断队列是否满
            while (isFull()) {
                // 释放锁 并 加入等待队列
                tailWaits.await();
            }
            // 未满 添加元素
            arr[tail] = e;
            if (++tail == arr.length) {
                tail = 0;
            }
            c = size.getAndIncrement(); // size++ 返回的是size
            if (c + 1 < arr.length) {
                // 还有空位 唤醒自己的offer
                tailWaits.signal();
            }
        } finally {
            offerLock.unlock();
        }
        // 唤醒等待线程 - 放在offerLock外面,防止死锁
        // 减少加锁和释放锁 当第一次添加元素时,唤醒poll等待线程
        if(c == 0){
            pollLock.lock();
            try {
                headWaits.signal();
            } finally {
                pollLock.unlock();
            }
        }

    }

    @Override
    public boolean offer(E e, long timeout) throws InterruptedException {
        offerLock.lockInterruptibly();
        int c;
        try {
            long t = TimeUnit.MINUTES.toNanos(timeout);
            // 判断队列是否满
            while (isFull()) {
                if (t <= 0) {
                    return false;
                }
                // 加入等待队列
                t = tailWaits.awaitNanos(t);
            }
            // 未满 添加元素
            arr[tail] = e;
            if (++tail == arr.length) {
                tail = 0;
            }
            c = size.getAndIncrement();
            if (c + 1 < arr.length) {
                // 还有空位 唤醒自己的offer
                tailWaits.signal();
            }
        } finally {
            offerLock.unlock();
        }
        // 当队列由空到有一个元素时,唤醒poll线程
        if (c == 0) {
            pollLock.lock();
            try {
                headWaits.signal();
            } finally {
                pollLock.unlock();
            }
        }
        return true;
    }

    @Override
    public E poll() throws InterruptedException {
        E e;
        int c;
        pollLock.lockInterruptibly();
        try {
            while (isEmpty()) {
                headWaits.await();
            }
            e = arr[head];
            if (++head == arr.length) {
                head = 0;
            }
            c = size.getAndDecrement();
            // 如果poll等待队列还有元素 自己唤醒
            if (c > 1) {
                headWaits.signal();
            }
        } finally {
            pollLock.unlock();
        }

        // 一定要放在pollLock的外面,防止死锁的发生
        // 当队列从 满 -> 不满时,唤醒offer的第一个等待线程
        if (c == arr.length) {
            pollLock.lock();
            try {
                tailWaits.signal();
            } finally {
                pollLock.unlock();
            }
        }

        return e;
    }
}

 

二、代码解析

这里拿offer方法举例

1. 双锁的获取

使用双锁是因为入队和出队互不干扰,性能好。

注意:

        所有poll操作的线程是一把锁,所有offer操作的线程是一把锁。

 2. 原子变量

3. 唤醒poll线程

级联通知思想:

        为了减少加锁和释放锁,只有在第一次offer的时候唤醒poll等待线程。

例如:

        当前队列为空,有两个线程来做poll(取)操作,那么他们都会变为等待线程,此时执行两次offer进行入队操作,只有在第一个线程offer的时候,唤醒poll的等待线程,poll的等待线程再唤醒另一个poll的等待线程因为poll操作的锁是同一把,所以可以直接释放,提高了性能。

 

三、Java中阻塞队列API

  1. ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。此队列按照 FIFO(先进先出)的原则对元素进行排序。

  2. LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按照 FIFO(先进先出)排序元素。相对于ArrayBlockingQueue,它通常具有更高的吞吐量,但在大多数并发应用场景中,其性能略逊于ArrayBlockingQueue。由于其为链表结构,因此在插入和删除元素时具有比ArrayBlockingQueue更好的性能。另外,它还有一个可选的容量限制构造函数,如果未指定容量限制,其容量就是Integer.MAX_VALUE

  3. PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。默认情况下元素采取自然顺序升序排序。也可以自定义类实现Comparable接口来定义排序规则,或者初始化PriorityBlockingQueue时,传入一个自定义的Comparator来定义排序规则。

  4. SynchronousQueue:一个不存储元素的阻塞队列。每一个插入操作必须等待一个相应的删除操作,反之亦然。

这些阻塞队列类都提供了put()take()等方法,这些方法在队列满或空时会阻塞调用线程,直到队列状态改变。同时,它们也提供了非阻塞的offer()poll()等方法,这些方法在无法立即执行时会立即返回。

相关推荐

最近更新

  1. TCP协议是安全的吗?

    2024-03-21 11:36:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-21 11:36:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-21 11:36:03       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-21 11:36:03       20 阅读

热门阅读

  1. 2024. 1华为od机试C卷【传递悄悄话】Python

    2024-03-21 11:36:03       19 阅读
  2. PDF文件底层数据结构

    2024-03-21 11:36:03       16 阅读
  3. AT+MQTTUSERCFG报错ERROR

    2024-03-21 11:36:03       18 阅读
  4. is ignored, because it exists, maybe from xml file

    2024-03-21 11:36:03       18 阅读
  5. web学习笔记(四十一)

    2024-03-21 11:36:03       18 阅读