原文出处:
等你归去来
谈到阻塞,相信大家都不会陌生了。阻塞的应用场景真的多得不要不要的,比如 生产-消费模式,限流统计等等。什么 ArrayBlockingQueue、 LinkedBlockingQueue、DelayQueue 等等,都是阻塞队列的实现啊,多简单!
阻塞,一般有两个特性很亮眼:1. 不耗 CPU 等待;2. 线程安全;
额,要这么说也 OK 的。毕竟,我们遇到的问题,到这里就够解决了。但是有没有想过,这容器的阻塞又是如何实现的呢?
好吧,翻开源码,也很简单了:(比如 ArrayBlockingQueue 的 take、put….)
// ArrayBlockingQueue /** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 阻塞的点 notFull.await(); enqueue(e); } finally { lock.unlock(); } } /** * Inserts the specified element at the tail of this queue, waiting * up to the specified wait time for space to become available if * the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; // 阻塞的点 nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 阻塞的点 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
看来,最终都是依赖了 AbstractQueuedSynchronizer 类(著名的AQS)的 await 方法,看起来像那么回事。那么这个同步器的阻塞又是如何实现的呢?
Java的代码总是好跟踪的:
// AbstractQueuedSynchronizer.await()
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 此处进行真正的阻塞 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
如上,可以看到,真正的阻塞工作又转交给了另一个工具类: LockSupport 的 park 方法了,这回跟锁扯上了关系,看起来已经越来越接近事实了:
// LockSupport.park()
/** * Disables the current thread for thread scheduling purposes unless the * permit is available. * * <p>If the permit is available then it is consumed and the