下面这个例子使用synchronized关键字和wait() 、notifyAll()方法实现同步。
public abstract class BaseBoundedBuffer{ private final V[] buf; private int tail; private int head; private int count; protected BaseBoundedBuffer(int capacity) { this.buf = (V[]) new Object[capacity]; } protected synchronized final void doPut(V v) { buf[tail] = v; if (++tail == buf.length) tail = 0; ++count; } protected synchronized final V doTake() { V v = buf[head]; buf[head] = null; if (++head == buf.length) head = 0; --count; return v; } public synchronized final boolean isFull() { return count == buf.length; } public synchronized final boolean isEmpty() { return count == 0; } }
public class BoundedBuffer下面来使用Condition来实现。extends BaseBoundedBuffer { // CONDITION PREDICATE: not-full (!isFull()) // CONDITION PREDICATE: not-empty (!isEmpty()) public BoundedBuffer() { this(100); } public BoundedBuffer(int size) { super(size); } // BLOCKS-UNTIL: not-full public synchronized void put(V v) throws InterruptedException { while (isFull()) wait(); doPut(v); notifyAll(); } // BLOCKS-UNTIL: not-empty public synchronized V take() throws InterruptedException { while (isEmpty()) wait(); V v = doTake(); notifyAll(); return v; } // BLOCKS-UNTIL: not-full // Alternate form of put() using conditional notification public synchronized void alternatePut(V v) throws InterruptedException { while (isFull()) wait(); boolean wasEmpty = isEmpty(); doPut(v); if (wasEmpty) notifyAll(); } }
Condition接口中定义的方法如下所示:
public interface Condition {
// 造成当前线程在接到信号或被中断之前一直处于等待状态。
void await();
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean await(long time, TimeUnit unit);
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout);
// 造成当前线程在接到信号之前一直处于等待状态。
void awaitUninterruptibly();
// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
boolean awaitUntil(Date deadline);
void signal(); // 唤醒一个等待线程
void signalAll(); // 唤醒所有等待线程
}在Condition对象中,与wait()、notifyAll()和notify()方法相对应的分别为await、signalAll和signal。
举个具体的例子,如下:
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition(); // 不满
final Condition notEmpty = lock.newCondition(); // 不空
final Object[] items = new Object[5];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock(); // 获取锁
try {
// 如果缓冲已满,则等待;直到缓冲不是满的,才将x添加到缓冲中
while (count == items.length)
notFull.await();
items[putptr] = x; // 将x添加到缓冲中
// 将put统计数putptr+1;如果缓冲已满,则设putptr为0。
if (++putptr == items.length) putptr = 0;
++count; // 将缓冲数量+1
notEmpty.signal(); // 唤醒take线程,因为take线程通过notEmpty.await()等待
// 打印写入的数据
System.out.println(Thread.currentThread().getName() + " put "+ (Integer)x);
} finally {
lock.unlock(); // 释放锁
}
}
public Object take() throws InterruptedException {
lock.lock(); // 获取锁
try {
while (count == 0) // 如果缓冲为空,则等待;直到缓冲不为空,才将x从缓冲中取出
notEmpty.await();
Object x = items[takeptr]; // 将x从缓冲中取出
// 将take统计数takeptr+1;如果缓冲为空,则设takeptr为0。
if (++takeptr == items.length)
takeptr = 0;
--count; // 将缓冲数量-1
notFull.signal(); // 唤醒put线程,因为put线程通过notFull.await()等待
// 打印取出的数据
System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
return x;
} finally {
lock.unlock(); // 释放锁
}
}
}
如上使用了ReentrantLock独占锁和Condition对象给出了有界缓存的实现,即