同步辅助类:
CountDownLatch是一个同步辅助类,在jdk5中引入,它允许一个或多个线程等待其他线程操作完成之后才执行。
实现原理 :
CountDownLatch是通过计数器的方式来实现,计数器的初始值为线程的数量。每当一个线程完成了自己的任务之后,就会对计数器减1,当计数器的值为0时,表示所有线程完成了任务,此时等待在闭锁上的线程才继续执行,从而达到等待其他线程完成任务之后才继续执行的目的。
CountDownLatch主要方法:
CountDownLatch具体是通过同步器来实现的,使用AQS状态来表示计数:
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
1、构造函数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
通过传入一个数值来创建一个CountDownLatch,数值表示线程可以从等待状态恢复,countDown方法必须被调用的次数
2、countDown方法
public void countDown() {
sync.releaseShared(1);
}
线程调用此方法对count进行减1。当count本来就为0,此方法不做任何操作,当count比0大,调用此方法进行减1,当new count为0,释放所有等待当线程。
3、await方法
(1)不带参数
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
调用此方法时,当count为0,直接返回true,当count比0大,线程会一直等待,直到count的值变为0,或者线程被中断(interepted,此时会抛出中断异常)。
(2)带参数
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
调用此方法时,当count为0,直接返回true,当count比0大,线程会等待一段时间,等待时间内如果count的值变为0,返回true;当超出等待时间,返回false;或者等待时间内线程被中断,此时会抛出中断异常。
CountDownLatch实践
司机和工人,工人必须等到司机来了才能装货上车,司机必须得等到所有工人把货物装上车了之后才能把车开走。
工人类:
public class Worker implements Runnable {
private String workerCode;
private CountDownLatch startLatch;
private CountDownLatch latch;
Worker(CountDownLatch startLatch, CountDownLatch latch, String workerCode) {
this.startLatch = startLatch;
this.latch = latch;
this.workerCode = workerCode;
}
public void run() {
try {
startLatch.await();
doWork();
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doWork() {
System.out.println("Worker " + workerCode + " is loading goods...");
}
}
司机类:
public class Driver {
public static void main(String[] args) throws InterruptedException {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(10);
ExecutorService executor = Executors.newFixedThreadPool(10);
for(int i=0; i<10; i++) {
executor.execute(new Worker(startLatch, latch, "worker" + i));
}
System.out.println("Driver is here.");
startLatch.countDown();
System.out.println("Workers get to work.");
latch.await();
System.out.println("Driver is ready to go.");
executor.shutdown();
}
}
运行结果:
Driver is here.
Workers get to work.
Worker worker0 is loading goods...
Worker worker1 is loading goods...
Worker worker2 is loading goods...