阻塞队列是一种常用的并发编程工具,它能够在多线程环境下提供一种安全而高效的数据传输机制。本文将介绍阻塞队列的原理和使用场景,并通过实例演示其在多线程编程中的应用。
一、什么是阻塞队列
阻塞队列是一种特殊的队列,它具有以下几个特点:
- 阻塞特性:当队列为空时,从队列中获取元素的操作将会被阻塞,直到队列中有新的元素被添加;当队列已满时,向队列中添加元素的操作将会被阻塞,直到队列中有空的位置,这就是等待唤醒机制。
- 线程安全:阻塞队列内部通过锁或其他同步机制来保证多线程环境下的数据一致性。
- 有界性:阻塞队列可以设置容量上限,当队列满时,后续的元素将无法添加。
- 公平性:阻塞队列可以选择公平或非公平的策略来决定线程的获取顺序。公平队列会按照线程的请求顺序进行处理(线程按先来后到顺序排队获取元素),而非公平队列则允许新的线程插队执行(线程竞争)。比如:SynchronousQueue。
阻塞队列常用于解决生产者-消费者问题,它能够有效地衔接生产者和消费者之间的速度差异,提供一种协调和安全的数据交互方式。
阻塞队列底层一般采用数组和链表这两种数据结构存储元素,ArrayBlockingQueue和PriorityBlockingQueue底层都是采用数组存储的,但是ArrayBlockingQueue是必须指定数组大小,不能扩容,而PriorityBlockingQueue可以进行动态扩容(扩容的最大长度也是Integer.MAX_VALUE),LinkedBlockingQueue底层是链表结构存储,虽然是链表,但是也有长度限制,默认是Integer.MAX_VALUE,一般认为的无界阻塞队列,其实最大的队列长度也就是Integer.MAX_VALUE。
二、阻塞队列的核心方法
- 添加
方法 | 描述 | 是否阻塞 |
---|---|---|
add方法 | 往队列尾部添加元素,内部是调用offer方法 | 否 |
put方法 | 往队列尾部添加元素,如果队列已满,则阻塞等待 | 是 |
offer方法 | 往队列尾部添加元素,如果队列已满,则返回false,不会阻塞 | 否 |
- 获取
方法 | 描述 | 是否阻塞 |
---|---|---|
take方法 | take方法:移除并返回队列头部的元素,如果队列为空,则阻塞等待 | 是 |
poll方法 | 移除并返回队列头部的元素,如果队列为空,则返回null,不会阻塞 | 否 |
peek方法 | 返回队列头部的元素(不移除),如果队列为空,则返回null,不会阻塞 | 否 |
三、常见的阻塞队列实现
通过图中可以看到,BlockingQueue集成了Queue接口的功能,有多种子类实现,常用的如下:
- ArrayBlockingQueue:基于数组实现的有界阻塞队列,它的容量在创建时指定,并且不能动态扩展。
- LinkedBlockingQueue:基于链表实现的有界阻塞队列,链表的长度可以通过构造函数显式指定,如果使用默认的构造函数,则默认大小是Integer.MAX_VALUE。
- PriorityBlockingQueue:基于优先级堆排序实现的阻塞队列(可扩容),元素按照优先级顺序进行排序。
- SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个相应的删除操作,反之亦然。
四、阻塞队列的原理
常用的阻塞队列,比如:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue底层都是采用ReentrantLock锁来实现线程的互斥,而ReentrantLock底层采用了AQS框架实现线程队列的同步,线程的阻塞是调用LockSupport.park实现,唤醒是调用LockSupport.unpark实现,具体可以看我之前的文章,SynchronousQueue底层虽然没有用AQS框架,但也用的是LockSupport实现线程的阻塞与唤醒。
一文读懂LockSupport
AQS源码分析
阻塞队列的原理可以通过两个关键组件来解释:锁和条件变量。
- 锁
阻塞队列使用锁来保护共享资源,控制线程的互斥访问。在队列为空或已满时,线程需要等待相应的条件满足才能继续执行。
- 条件变量
条件变量是锁的一个补充,在某些特定的条件下,线程会进入等待状态。当条件满足时,其他线程会通过调用条件变量的唤醒方法(比如signal()或signalAll())来通知等待的线程进行下一步操作。
当一个线程试图从空的阻塞队列中获取元素时,它会获取队列的锁,并检查队列是否为空。如果为空,这个线程将进入等待状态,直到其他线程向队列中插入元素并通过条件变量唤醒它。当一个线程试图向已满的阻塞队列插入元素时,它会获取队列的锁,并检查队列是否已满。如果已满,这个线程将进入等待状态,直到其他线程从队列中获取元素并通过条件变量唤醒它。
接下来我们看下阻塞队列的获取元素和插入元素的核心代码:
ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue的带阻塞的插入和获取方法都是基于ReentrantLock锁+条件变量的等待和通知来实现的。
主要看看ArrayBlockingQueue带阻塞的插入和获取元素的主要方法吧。
/**
* 插入元素,带阻塞
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
// 这里使用的是ReentrantLock锁
final ReentrantLock lock = this.lock;
// 获取锁并支持响应中断,注意:获取锁的过程中不响应中断,是在获取到锁后根据当前线程的中断标识来处理。
lock.lockInterruptibly();
try {
// 元素大小等于数组长度时阻塞,说明放满了,生产者需要暂停,阻塞在条件变量上,等待被唤醒
while (count == items.length)
notFull.await();
// 放入元素到数组指定的下标处
enqueue(e);
} finally {
// 释放锁
lock.unlock();
}
}
/**
* 插入元素,唤醒等待获取元素的线程
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 放入元素后,通知消费线程继续获取元素
notEmpty.signal();
}
/**
* 获取元素,带阻塞
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 数组无元素时阻塞,阻塞在条件变量上,等待被唤醒
// 元素大小等于0时阻塞,说明数组被取空了,消费者需要暂停,阻塞在条件变量上,等待被唤醒
wh