meUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列为空 超时返回空,否则等待
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//出队
x = dequeue();
c = count.getAndDecrement();
//队列中除了当前线程获取的任务外还有任务就去唤醒消费者消费
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//原来队列已满就去唤醒生产者 生产
if (c == capacity)
signalNotFull();
return x;
}
LinkedBlockingQueue
与ArrayBlockingQueue
的出队、入队实现类似
只不过LinkedBlockingQueue
入队、出队获取/释放的锁不同,并且在此过程中不同情况回去唤醒其他的生产者、消费者从而进一步提升并发性能
LinkedBlockingQueue 由单向链表实现的阻塞队列,记录首尾节点;默认是无界、非公平的阻塞队列(初始化时要设置容量否则可能OOM),使用两把锁、两个等待队列,分别操作入队、出队的生产者、消费者,在入队、出队操作期间不同情况还会去唤醒生产者、消费者,从而进一步提升并发性能,适用于并发量大的场景
LinkedBlockingDeque
LinkedBlockingDeque
实现与LinkedBlockQueue
类似,在LinkedBlockQueue
的基础上支持从队头、队尾进行添加、删除的操作
它是一个双向链表,带有一系列First、Last的方法,比如:offerLast
、pollFirst
由于LinkedBlockingDeque
双向,常用其来实现工作窃取算法,从而减少线程的竞争
什么是工作窃取算法?
比如多线程处理多个阻塞队列的任务(一一对应),每个线程从队头获取任务处理,当A线程处理完它负责的阻塞队列所有任务时,它再从队尾窃取其他阻塞队列的任务,这样就不会发生竞争,除非队列中只剩一个任务,才会发生竞争
ForkJoin
框架就使用其来充当阻塞队列,我们后文再聊这个框架
PriorityBlockingQueue
PriorityBlockingQueue是优先级排序的无界阻塞队列,阻塞队列按照优先级进行排序
使用堆排序,具体排序算法由Comparable
或Comparator
实现比较规则
- 默认:泛型中的对象需要实现
Comparable
比较规则 ,根据compareTo方法规则排序
- 构造器中指定比较器
Comparator
根据比较器规则排序
@Test
public void testPriorityBlockingQeque() {
//默认使用Integer实现Comparable的升序
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(6);
queue.offer(99);
queue.offer(1099);
queue.offer(299);
queue.offer(992);
queue.offer(99288);
queue.offer(995);
//99 299 992 995 1099 99288
while (!queue.isEmpty()){
Sy