设为首页 加入收藏

TOP

并发一枝花之 BlockingQueue(二)
2017-10-27 09:06:54 】 浏览:511
Tags:并发 花之 BlockingQueue
个元素,后续只有P1入队1个元素,显然此时队列非满。因此,14-15行“入队完成时的通知”是必要的,保证了只要队列非满,每次入队后都能唤醒1个阻塞的生产者,来等待锁释放后竞争锁。即,P1完成入队后,如果检查到队列非满,会随机唤醒一个生产者P2,让P2在P1释放锁putLock后竞争锁,继续入队,P3同理。

case3:入队前,队列空

入队前需得到锁putLock。检查队列空,则无需等待条件notFull,直接入队。入队后,如果队列非满,则同case1;如果队列满,则同case2。最后,假设检查入队前队列空(队列非空的情况同case1),则随机通知一个消费者条件notEmpty满足。

注意点:

  • 只有入队前队列空的情况下,才需要通知条件notEmpty满足。因为如果队列非空,则出队操作不会阻塞在条件notEmpty上。另一方面,虽然已经有生产者完成了入队,但可能有消费者在生产者释放锁putLock后、通知条件notEmpty满足前,使队列变空;不过这没有影响,take()方法的while循环能够在线程竞争到锁之后再次确认。
  • 通过入队和出队前检查队列长度(while+await),隐含保证了队列空时只允许入队操作,不存在竞争队列。

case4:入队前,队列长度为1

case4是一个特殊情况,分析方法类似于case1,但可能入队与出队之间存在竞争,我们稍后分析。

阻塞的出队操作TAKE()

在队头入队。takeLock和notEmpty配合完成同步。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

依旧是四种case,put()和take()是对偶的,很容易分析,不赘述。

“CASE4 队列长度为1”时的特殊情况

队列长度为1时,到底入队和出队之间存在竞争吗?这取决于LinkedBlockingQueue的底层数据结构。

最简单的是使用朴素链表,可以自己实现,也可以使用JDK提供的非线程安全集合类,如LinkedList等。但是,队列长度为1时,朴素链表中的head、tail指向同一个节点,从而入队、出队更新同一个节点时存在竞争。

朴素链表:一个节点保存一个元素,不加任何控制和trick。典型如LinkedList。

增加dummy node可解决该问题(或者叫哨兵节点什么的)。定义Node(item, next),描述如下:

  • 初始化链表时,创建dummy node:
dummy = new Node(null, null)
head = dummy.next // head 为 null <=> 队列空
tail = dummy // tail.item 为 null <=> 队列空
  • 在队尾入队时,tail后移:
tail.next = new Node(newItem, null)
tail = tail.next
  • 在队头出队时,dummy后移,同步更新head:
oldItem = head.item
dummy = dummy.next
dummy.item = null
head = dummy.next
return oldItem

在新的数据结构中,更新操作发生在dummy和tail上,head仅仅作为示意存在,跟随dummy节点更新。队列长度为1时,虽然head、tail仍指向同一个节点,但dummy、tail指向不同的节点,从而更新dummy和tail时不存在竞争。

源码中的head即为dummy,first即为head:

...
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
...
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}
...
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
...

ENQUEUE和COUNT自增的先后顺序

以put()为例,count自增一定要晚于enqueue执行,否则take()方法的while循环检查会失效。

用一个最简单的场景来分析,只有一个生产者线程T1,一个消费者线程T2。

如果先count自增再enqueue

假设目前队列长度0,则事件发生顺序:

  • T1线程:count 自增
  • T2线程:while 检查 count > 0,无需等待条件 notEmpty
  • T2线程:dequeue 执行
  • T1线程:enqueue 执行

很明显,在事件1发生后事件4发生前,虽然count>0,但队列中实际是没有元素的。因此,事件3 dequeue会执行失败(预计抛出NullPointerException)。事件4也就不会发生了。

如果先enqueue再count自增

如果先enqueue再count自增,就不会存在该问题。

仍假设目前队列长度0,则事件发生顺序:

  • T1线程:enqueue 执行
  • T2线程:while 检查 count == 0,等待条件 notEmpty
  • T1线程:count 自增
  • T1线程:通知条件notFull满足
  • T1线程:通知条件notEmpty满足
  • T2线程:收到条件notEmpty
  • T2线程:while 检查 count > 0,无需等待条件 notEmpty
  • T2线程:dequeue 执行

换个方法,用状态机来描述:

  • 事件E1发生前,队列处于状态S1
  • 事件E1发生,线程T1 增加了一个队列元素,导致队列元素的数量大于count(1>0),队列转换到状态S2
  • 事件E1发生后、直到事件E3发生前,队列一
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇注解的那点事儿 下一篇Java 实现生产者 – 消费者模型

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目