个元素,后续只有P1入队1个元素,显然此时队列非满。因此,14-15行“入队完成时的通知”是必要的,保证了只要队列非满,每次入队后都能唤醒1个阻塞的生产者,来等待锁释放后竞争锁。即,P1完成入队后,如果检查到队列非满,会随机唤醒一个生产者P2,让P2在P1释放锁putLock后竞争锁,继续入队,P3同理。
case3:入队前,队列空
入队前需得到锁putLock。检查队列空,则无需等待条件notFull,直接入队。入队后,如果队列非满,则同case1;如果队列满,则同case2。最后,假设检查入队前队列空(队列非空的情况同case1),则随机通知一个消费者条件notEmpty满足。
注意点:
- 只有入队前队列空的情况下,才需要通知条件notEmpty满足。因为如果队列非空,则出队操作不会阻塞在条件notEmpty上。另一方面,虽然已经有生产者完成了入队,但可能有消费者在生产者释放锁putLock后、通知条件notEmpty满足前,使队列变空;不过这没有影响,take()方法的while循环能够在线程竞争到锁之后再次确认。
- 通过入队和出队前检查队列长度(while+await),隐含保证了队列空时只允许入队操作,不存在竞争队列。
case4:入队前,队列长度为1
case4是一个特殊情况,分析方法类似于case1,但可能入队与出队之间存在竞争,我们稍后分析。
阻塞的出队操作TAKE()
在队头入队。takeLock和notEmpty配合完成同步。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
依旧是四种case,put()和take()是对偶的,很容易分析,不赘述。
“CASE4 队列长度为1”时的特殊情况
队列长度为1时,到底入队和出队之间存在竞争吗?这取决于LinkedBlockingQueue的底层数据结构。
最简单的是使用朴素链表,可以自己实现,也可以使用JDK提供的非线程安全集合类,如LinkedList等。但是,队列长度为1时,朴素链表中的head、tail指向同一个节点,从而入队、出队更新同一个节点时存在竞争。
朴素链表:一个节点保存一个元素,不加任何控制和trick。典型如LinkedList。
增加dummy node可解决该问题(或者叫哨兵节点什么的)。定义Node(item, next),描述如下:
dummy = new Node(null, null)
head = dummy.next // head 为 null <=> 队列空
tail = dummy // tail.item 为 null <=> 队列空
tail.next = new Node(newItem, null)
tail = tail.next
oldItem = head.item
dummy = dummy.next
dummy.item = null
head = dummy.next
return oldItem
在新的数据结构中,更新操作发生在dummy和tail上,head仅仅作为示意存在,跟随dummy节点更新。队列长度为1时,虽然head、tail仍指向同一个节点,但dummy、tail指向不同的节点,从而更新dummy和tail时不存在竞争。
源码中的head即为dummy,first即为head:
...
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
...
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
...
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
...
ENQUEUE和COUNT自增的先后顺序
以put()为例,count自增一定要晚于enqueue执行,否则take()方法的while循环检查会失效。
用一个最简单的场景来分析,只有一个生产者线程T1,一个消费者线程T2。
如果先count自增再enqueue
假设目前队列长度0,则事件发生顺序:
- T1线程:count 自增
- T2线程:while 检查 count > 0,无需等待条件 notEmpty
- T2线程:dequeue 执行
- T1线程:enqueue 执行
很明显,在事件1发生后事件4发生前,虽然count>0,但队列中实际是没有元素的。因此,事件3 dequeue会执行失败(预计抛出NullPointerException)。事件4也就不会发生了。
如果先enqueue再count自增
如果先enqueue再count自增,就不会存在该问题。
仍假设目前队列长度0,则事件发生顺序:
- T1线程:enqueue 执行
- T2线程:while 检查 count == 0,等待条件 notEmpty
- T1线程:count 自增
- T1线程:通知条件notFull满足
- T1线程:通知条件notEmpty满足
- T2线程:收到条件notEmpty
- T2线程:while 检查 count > 0,无需等待条件 notEmpty
- T2线程:dequeue 执行
换个方法,用状态机来描述:
- 事件E1发生前,队列处于状态S1
- 事件E1发生,线程T1 增加了一个队列元素,导致队列元素的数量大于count(1>0),队列转换到状态S2
- 事件E1发生后、直到事件E3发生前,队列一