JDK版本:oracle java 1.8.0_102
继续阅读之前,需确保你对锁和条件队列的使用方法烂熟于心,特别是条件队列,否则你可能无法理解以下源码的精妙之处,甚至基本的正确性。本篇暂不涉及此部分内容,需读者自行准备。
接口定义
BlockingQueue继承自Queue,增加了阻塞的入队、出队等特性:
public interface BlockingQueue<E> extends Queue<E> { boolean add(E e); void put(E e) throws InterruptedException; // can extends from Queue. i don't know why overriding here boolean offer(E e); boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; E take() throws InterruptedException; // extends from Queue // E poll(); E poll(long timeout, TimeUnit unit) throws InterruptedException; int remainingCapacity(); boolean remove(Object o); public boolean contains(Object o); int drainTo(Collection<? super E> c); int drainTo(Collection<? super E> c, int maxElements); }
为了方便讲解,我调整了部分方法的顺序,还增加了注释辅助说明。
需要关注的是两对方法:
- 阻塞方法BlockingQueue#put()和BlockingQueue#take():如果入队(或出队,下同)失败(如希望入队但队列满,下同),则等待,一直到满足入队条件,入队成功。
- 非阻塞方法BlockingQueue#offer()和BlockingQueue#poll(),及它们的超时版本:非超时版本是瞬时动作,如果入队当前入队失败,则立刻返回失败;超时版本可在此基础上阻塞一段时间,相当于限时的BlockingQueue#put()和BlockingQueue#take()。
实现类
BlockingQueue有很多实现类。根据github的code results排名,最常用的是LinkedBlockingQueue(253k)和ArrayBlockingQueue(95k)。LinkedBlockingQueue的性能在大部分情况下优于ArrayBlockingQueue,本文主要介绍LinkedBlockingQueue,文末会简要提及二者的对比。
LinkedBlockingQueue
阻塞方法put()和take()
两个阻塞方法相对简单,有助于理解LinkedBlockingQueue的核心思想:在队头和队尾各持有一把锁,入队和出队之间不存在竞争。
前面在Java实现生产者-消费者模型中循序渐进的引出了BlockingQueue#put()和BlockingQueue#take()的实现,可以先去复习一下,了解为什么LinkedBlockingQueue要如此设计。以下是更细致的讲解。
阻塞的入队操作PUT()
在队尾入队。putLock和notFull配合完成同步。
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
现在触发一个入队操作,分情况讨论。
case1:入队前,队列非空非满(长度大于等于2)
入队前需得到锁putLock。检查队列非满,无需等待条件notFull,直接入队。入队后,检查队列非满(精确说是入队前“将满”,但不影响理解),随机通知一个生产者条件notFull满足。最后,检查入队前队列非空,则无需通知条件notEmpty。
注意点:
- 入队前队列非空非满(长度大于等于2),则head和tail指向的节点不同,入队与出队操作不会同时更新同一节点也就不存在竞争。因此,分别用两个锁同步入队、出队操作才能是线程安全的。进一步的,由于入队已经由锁putLock保护,则enqueue内部实现不需要加锁。
- 条件notFull可以只随机通知一个等待该条件的生产者线程(即,使用signal()而不是signalAll()。这顺便会减少无效竞争,提升性能)。这不会产生Object#notify()那样的缺陷,因为只有生产者在等待该条件,而且除非队列满,否则每次生产者入队后都会通知条件notFull,同时每次消费者出队后,如果队列不满,也会通知条件notFull满足(见take()方法)。
- 条件通知方法singal()是近乎“幂等”的:如果有线程在等待该条件,则随机选择一个线程通知;如果没有线程等待,则什么都不做,不会造成什么恶劣影响。
case2:入队前,队列满
入队前需得到锁putLock。检查队列满,则等待条件notFull。条件notFull可能由出队成功触发(必要的),也可能由入队成功触发(也是必要的,避免“丢失信号”的问题)。条件notFull满足后,入队。入队后,假设检查队列满(队列非满的情况同case1),则无需通知条件notFull。最后,检查入队前队列非空,则无需通知条件notEmpty。
注意点:
- “丢失信号”问题:基于锁和条件队列的机制,在等待条件时会释放锁,这会产生一些令人迷惑的问题,增加了基于条件队列设计并发程序的难度。假设队列满时,存在3个生产者P1-P3(多于一个就可以)同时阻塞在10行;如果此时5个消费者C1-C5(多于一个就可以)快速、连续的出队并通知条件notFull满足,速度快到阻塞的3个生产者都被唤醒,但还没来得及开始竞争锁putLock,则相当于只发出了一个信号,其他信号“丢失”了——因为只有最后竞争到锁的那一个生产者P1收到的信号发挥了作用,其他生产者P2-P3会再次进入阻塞状态,看起来好像没有被唤醒过。然而,C1-C5出队了5