上篇文章12分钟从Executor自顶向下彻底搞懂线程池中我们聊到线程池,而线程池中包含阻塞队列
这篇文章我们主要聊聊并发包下的阻塞队列
阻塞队列
什么是队列?
队列的实现可以是数组、也可以是链表,可以实现先进先出的顺序队列,也可以实现先进后出的栈队列
那什么是阻塞队列?
在经典的生产者/消费者模型中,生产者们将生产的元素放入队列,而消费者们从队列获取元素消费
当队列已满,我们会手动阻塞生产者,直到消费者消费再来手动唤醒生产者
当队列为空,我们会手动阻塞消费者,直到生产者生产再来手动唤醒消费者
在这个过程中由于使用的是普通队列,阻塞与唤醒我们需要手动操作,保证同步机制
阻塞队列在队列的基础上提供等待/通知功能,用于线程间的通信,避免线程竞争死锁
生产者可以看成往线程池添加任务的用户线程,而消费者则是线程池中的工作线程
当阻塞队列为空时阻塞工作线程获取任务,当阻塞队列已满时阻塞用户线程向队列中添加任务(创建非核心线程、拒绝策略)
API
阻塞队列提供一下四种添加、删除元素的API,我们常用阻塞等待/超时阻塞等待的API
方法名 | 抛出异常 | 返回true/false | 阻塞等待 | 超时阻塞等待 |
---|---|---|---|---|
添加 | add(Object) | offer(Object) | put(Object) | offer(Object,long,TimeUnit) |
删除 | remove() | poll() | take() | poll(long,TimeUnit) |
- 抛出异常:队满add 抛出异常
IllegalStateExceptio
;队空remove 抛出异常NoSuchElementException
- 返回值: 队满offer返回false,队空poll返回null
- 阻塞等待: 队满时put会阻塞线程 或 队空时take会阻塞线程
- 超时阻塞等待: 在阻塞等待、返回true/false的基础上增加超时等待(等待一定时间就退出等待)
阻塞队列的公平与不公平
什么是阻塞队列的公平与不公平?
当阻塞队列已满时,如果是公平的,那么阻塞的线程根据先后顺序从阻塞队列中获取元素,不公平则反之
实际上阻塞队列的公平与不公平,要看实现阻塞队列的锁是否公平
阻塞队列一般默认使用不公平锁
ArrayBlockingQueue
从名称看就可以知道它是数组实现的,我们先来看看它有哪些重要字段
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
?
//存储元素的数组
final Object[] items;
?
//记录元素出队的下标
int takeIndex;
?
//记录元素入队的下标
int putIndex;
?
//队列中元素数量
int count;
?
//使用的锁
final ReentrantLock lock;
?
//出队的等待队列,作用于消费者
private final Condition notEmpty;
?
//入队的等待队列,作用于生产者
private final Condition notFull;
}
看完关键字段,我们可以知道:ArrayBlockingQueue
由数组实现、使用并发包下的可重入锁、同时用两个等待队列作用生产者和消费者
为什么出队、入队要使用两个下标记录?
实际上它是一个环形数组,在初始化后就不改变大小,后续查看源码自然能明白它是环形数组
在构造器中、初始化数组容量,同时使用非公平锁
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
?
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
//锁是否为公平锁
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
ArrayBlockingQueue的公平性是由ReentrantLock来实现的
我们来看看入队方法,入队方法都大同小异,我们本文都查看支持超时、响应中断的方法
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//检查空指针
checkNotNull(e);
//获取超时纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
&nb