首先声明:
如果你听过阻塞队列,你可能是个小白,但是如果你学会了阻塞队列,你已经是一名一流的Java工程师。
Java有提供阻塞队列API,本文基于环形数组队列实现。
- 先看下某些知识的专业术语,你知道哪些❓
📕 原子变量(AutomicInteger)、可重入锁、双锁、死锁、级联通知思想、线程状态
目录
一、阻塞队列的实现
1. 何时使用
阻塞队列就是向队列添加或者删除元素时,如果为空或者已满,将该线程置为等待状态,等待被唤醒。
⭐插播知识:线程的7中状态(GPT回答)
- 新建状态(New):当线程对象被创建但尚未启动,即线程调用start()方法之前的状态。
- 就绪状态(Runnable):线程对象创建后,其他线程调用了该对象的start()方法,线程就进入就绪状态。就绪状态的线程位于“可运行线程池”中,只等待获取CPU的使用权。
- 运行状态(Running):当就绪状态的线程获取了CPU,开始执行程序代码时,就进入了运行状态。
- 阻塞状态(Blocked):线程因为某种原因(如等待获取锁或执行了sleep()、join()方法,或发出了I/O请求)放弃CPU使用权,暂时停止运行。阻塞状态可以分为等待阻塞、同步阻塞和其他阻塞。
- 等待状态(Waiting):线程在等待另一个线程执行特定动作(如通知或中断)。此状态下,线程不会被分配CPU时间片,直到被其他线程显式唤醒。
- 超时等待状态(Timed_Waiting):线程等待另一个线程的通知或中断,但有一个明确的等待时间。到达时间后,线程会自动从该状态唤醒。
- 死亡状态(Terminated):线程执行完毕或因异常退出run()方法后,进入死亡状态。一旦线程终止,就不能复生。
2. 代码实现(终极版)
双锁 + 级联通知
package Demo3;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueue2<E> implements BlockQueue<E> {
private final E[] arr;
private AtomicInteger size = new AtomicInteger();
private int head = 0;
private int tail = 0;
public BlockingQueue2(int capacity) {
arr = (E[]) new Object[capacity];
}
private ReentrantLock offerLock = new ReentrantLock(); // 新增元素锁
private ReentrantLock pollLock = new ReentrantLock(); // 删除元素锁
private Condition headWaits = pollLock.newCondition(); // 线程等待
private Condition tailWaits = offerLock.newCondition();
public boolean isEmpty() {
return size.get() == 0;
}
public boolean isFull() {
return size.get() == arr.length;
}
@Override
public void offer(E e) throws InterruptedException {
// 加锁
offerLock.lockInterruptibly();
int c;
try {
// 判断队列是否满
while (isFull()) {
// 释放锁 并 加入等待队列
tailWaits.await();
}
// 未满 添加元素
arr[tail] = e;
if (++tail == arr.length) {
tail = 0;
}
c = size.getAndIncrement(); // size++ 返回的是size
if (c + 1 < arr.length) {
// 还有空位 唤醒自己的offer
tailWaits.signal();
}
} finally {
offerLock.unlock();
}
// 唤醒等待线程 - 放在offerLock外面,防止死锁
// 减少加锁和释放锁 当第一次添加元素时,唤醒poll等待线程
if(c == 0){
pollLock.lock();
try {
headWaits.signal();
} finally {
pollLock.unlock();
}
}
}
@Override
public boolean offer(E e, long timeout) throws InterruptedException {
offerLock.lockInterruptibly();
int c;
try {
long t = TimeUnit.MINUTES.toNanos(timeout);
// 判断队列是否满
while (isFull()) {
if (t <= 0) {
return false;
}
// 加入等待队列
t = tailWaits.awaitNanos(t);
}
// 未满 添加元素
arr[tail] = e;
if (++tail == arr.length) {
tail = 0;
}
c = size.getAndIncrement();
if (c + 1 < arr.length) {
// 还有空位 唤醒自己的offer
tailWaits.signal();
}
} finally {
offerLock.unlock();
}
// 当队列由空到有一个元素时,唤醒poll线程
if (c == 0) {
pollLock.lock();
try {
headWaits.signal();
} finally {
pollLock.unlock();
}
}
return true;
}
@Override
public E poll() throws InterruptedException {
E e;
int c;
pollLock.lockInterruptibly();
try {
while (isEmpty()) {
headWaits.await();
}
e = arr[head];
if (++head == arr.length) {
head = 0;
}
c = size.getAndDecrement();
// 如果poll等待队列还有元素 自己唤醒
if (c > 1) {
headWaits.signal();
}
} finally {
pollLock.unlock();
}
// 一定要放在pollLock的外面,防止死锁的发生
// 当队列从 满 -> 不满时,唤醒offer的第一个等待线程
if (c == arr.length) {
pollLock.lock();
try {
tailWaits.signal();
} finally {
pollLock.unlock();
}
}
return e;
}
}
二、代码解析
这里拿offer方法举例
1. 双锁的获取
使用双锁是因为入队和出队互不干扰,性能好。
注意:
所有poll操作的线程是一把锁,所有offer操作的线程是一把锁。
2. 原子变量
3. 唤醒poll线程
级联通知思想:
为了减少加锁和释放锁,只有在第一次offer的时候唤醒poll等待线程。
例如:
当前队列为空,有两个线程来做poll(取)操作,那么他们都会变为等待线程,此时执行两次offer进行入队操作,只有在第一个线程offer的时候,唤醒poll的等待线程,poll的等待线程再唤醒另一个poll的等待线程,因为poll操作的锁是同一把,所以可以直接释放,提高了性能。
三、Java中阻塞队列API
ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。此队列按照 FIFO(先进先出)的原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按照 FIFO(先进先出)排序元素。相对于
ArrayBlockingQueue
,它通常具有更高的吞吐量,但在大多数并发应用场景中,其性能略逊于ArrayBlockingQueue
。由于其为链表结构,因此在插入和删除元素时具有比ArrayBlockingQueue
更好的性能。另外,它还有一个可选的容量限制构造函数,如果未指定容量限制,其容量就是Integer.MAX_VALUE
。PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。默认情况下元素采取自然顺序升序排序。也可以自定义类实现
Comparable
接口来定义排序规则,或者初始化PriorityBlockingQueue
时,传入一个自定义的Comparator
来定义排序规则。SynchronousQueue:一个不存储元素的阻塞队列。每一个插入操作必须等待一个相应的删除操作,反之亦然。
这些阻塞队列类都提供了put()
和take()
等方法,这些方法在队列满或空时会阻塞调用线程,直到队列状态改变。同时,它们也提供了非阻塞的offer()
和poll()
等方法,这些方法在无法立即执行时会立即返回。