countDown()
# 片段1 java.util.concurrent.CountDownLatch#countDown
public void countDown() {
// 将共享状态State的值减一
sync.releaseShared(1);
}
# 片段 2 java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
// tryReleaseShared(arg)方法是CountDownLatch 重写的方法,该方法见片段3
if (tryReleaseShared(arg)) {
// 当同步状态为0时会执行这个方法
doReleaseShared();
return true;
}
return false;
}
# 片段 3 java.util.concurrent.CountDownLatch.Sync#tryReleaseShared 模板方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 自旋+CAS 更新同步状态
for (;;) {
// 获得同步状态
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 如果同步更新后的值为0放回true
return nextc == 0;
}
}
# 片段 4 java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
// 获得同步队列头节点
Node h = head;
// 如果头节点存在且不等于尾节点(tail),进入if里
if (h != null && h != tail) {
// 获得头节点的等待状态
int ws = h.waitStatus;
// 判断等待状态是否为Node.SIGNAL,等待状态为SIGNAL表示需要唤醒后继节点
if (ws == Node.SIGNAL) {
//使用compareAndSetWaitStatus(h, Node.SIGNAL, 0)方法来尝试将头节点的等待状态从Node.SIGNAL设置为0。如果设置成功,则调用unparkSuccessor(h)方法唤醒后继节点。如果设置失败,则继续循环重新检查情况。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后续节点 见代码片段5
unparkSuccessor(h);
}
//如果等待状态为0,并且使用compareAndSetWaitStatus(h, 0, Node.PROPAGATE)方法尝试将头节点的等待状态从0设置为Node.PROPAGATE,则继续循环。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//头节点不变退出循环
if (h == head) // loop if head changed
break;
}
}
# 片段5 java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 如果等待状态ws小于0,尝试将等待状态清除为0,忽略修改失败和其他线程可能对等待状态的更改。
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获得头节点的下一个节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 循环遍历从尾节点开始,直到找到非取消状态的节点或者已经遍历到给定节点node为止。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 对首个等待线程进行唤醒
LockSupport.unpark(s.thread);
}
2.3 方法countDownLatch.await()
# 片段1 java.util.concurrent.CountDownLatch#await()
public void await() throws InterruptedException {
// 见代码片段2
sync.acquireSharedInterruptibly(1);
}
# 片段2 java.util.concurrent.locks.Ab