直处于状态S2
事件E3发生,线程T1 使count自增,导致队列元素的数量等于count(1=1),队列转换到状态S1
事件E3发生后、事件E8发生前,队列一直处于状态S1
很多读者可能第一次从状态机的角度来理解并发程序设计,所以猴子选择先写出状态迁移序列,如果能理解上述序列,我们再进行进一步的抽象。实际的状态机定义比下面要严谨的多,不过这里的描述已经足够了。
现在补充定义如下,不考虑入队和出队的区别:
- 队列元素的数量等于count的状态定义为状态S1
- 队列元素的数量大于count的状态定义为状态S2
- enqueue操作定义为状态转换S1->S2
- count自增操作定义为状态转换S2->S1
LinkedBlockingQueue中的同步机制保证了不会有其他线程看到状态S2,即,S1->S2->S1两个状态转换只能由线程T1连续完成,其他线程无法在中间插入状态转换。
在猴子的理解中,并发程序设计的本质是状态机,即维护合法的状态和状态转换。以上是一个极其简单的场景,用状态机举例子就可以描述;然而,复杂场景需要用状态机做数学证明,这使得用状态机描述并发程序设计不太受欢迎(虽然口头描述也不能算严格证明)。不过,理解实现中的各种代码顺序、猛不丁蹦出的trick,这些只是“知其所以然”;通过简单的例子来掌握其状态机本质,才能让我们了解其如何保证线程安全性,自己也能写出类似的实现,做到“知其然而知其所以然”。后面会继续用状态机分析ConcurrentLinkedQueue的源码,敬请期待。
非阻塞方法offer()和poll()
分析了两个阻塞方法put()、take()后,非阻塞方法就简单了。
瞬时版
以offer为例,poll()同理。假设此时队列非空。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
case1:入队前,队列非满
入队前需得到锁putLock。检查队列非满(隐含表明“无需等待条件notFull”),直接入队。入队后,检查队列非满,随机通知一个生产者(包括使用put()方法的生产者,下同)条件notFull满足。最后,检查入队前队列非空,则无需通知条件notEmpty。
可以看到,瞬时版offer()在队列非满时的行为与put()相同。
case2:入队前,队列满
入队前需得到锁putLock。检查队列满,直接退出try-block。后同case1。
队列满时,offer()与put()的区别就显现出来了。put()通过while循环阻塞,一直等到条件notFull得到满足;而offer()却直接返回。
一个小point:
c在申请锁putLock前被赋值为-1。接下来,如果入队成功,会执行c = count.getAndIncrement();一句,则释放锁后,c的值将大于等于0。于是,这里直接用c是否大于等于0来判断是否入队成功。这种实现牺牲了可读性,只换来了无足轻重的性能或代码量的优化。自己在开发时,不要编写这种代码。
超时版
同上,以offer()为例。假设此时队列非空。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
该方法同put()很像,12-13行判断nanos超时的情况(吞掉了timeout参数非法的异常情况),所以区别只有14行:将阻塞的notFull.await()换成非阻塞的超时版notFull.awaitNanos(nanos)。
awaitNanos()的实现有点意思,这里不表。其实现类中的Javadoc描述非常干练:“Block until signalled, interrupted, or timed out.”,返回值为剩余时间。剩余时间小于等于参数nanos,表示:
- 条件notFull满足(剩余时间大于0)
- 等待的总时长已超过timeout(剩余时间小于等于0)
nanos首先被初始化为timeout;接下来,消费者线程可能阻塞、收到信号多次,每次收到信号被唤醒,返回的剩余时间都大于0并小于等于参数nanos,再用剩余时间作为下次等待的参数nanos,直到剩余时间小于等于0。以此实现总时长不超过timeout的超时检测。
其他同put()方法。
- 12-13行判断nanos参数非法后,直接返回了false。实现有问题,有可能违反接口声明。
- 根据Javadoc的返回值声明,返回值true表示入队成功,false表示入队失败。但如果传进来的timeout是一个负数,那么5行初始化的nanos也将是一个负数;进而一进入while循环,就在13行返回了false。然而,这是一种参数非法的情况,返回false让人误以为参数正常,只是入队失败。这违反了接口声明,并且非常难以发现。
- 应该在函数头部就将参数非法的情况检查出来,相应抛出IllegalArgumentException。
LinkedBlockingQueue与ArrayBlo