Java 7之多线程并发容器 - ArrayBlockingQueue(一)

2014-11-24 02:36:22 · 作者: · 浏览: 4

Java中有些多线程编程模式在很大程序上都依赖于Queue实现的线程安全性,所以非常有必要认识,首先来看一下接口定义,如下:

public interface Queue
  
    extends Collection
   
     { // 向队列中添加元素 boolean add(E e); boolean offer(E e); // 删除队列元素 E remove(); E poll(); // 检查队列元素 E element(); E peek(); } 
   
  
BlockingQueue类继承了如上的接口,定义如下:

public interface BlockingQueue
  
    extends Queue
   
     { boolean add(E e); boolean offer(E e); void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; E take() throws InterruptedException; E poll(long timeout, TimeUnit unit) throws InterruptedException; int remainingCapacity(); boolean remove(Object o); public boolean contains(Object o); int drainTo(Collection
     c); int drainTo(Collection
     c, int maxElements); } 
   
  
这个接口中本身定义的方法,加上从Queue接口中继承的方法后,可以将BlockingQueue方法大概分为4种形式,如下:

vcbky/u1xM/fs8zKubbTwdDW2NDCseS1w7/Vz9DG8MC0o6zI57TTttPB0NbQ0saz/dK7uPa78tXftuC49tSqy9ijrLvy1d/N6sirx+W/1bbTwdChozxicj4Kz8LD5sC0xKPE4tK7uPbX6Mj7ttPB0LXEvPK1pcq1z9ajrMjnz8KjujwvcD4KPHA+PC9wPgo8cHJlIGNsYXNzPQ=="brush:java;">public class BlockingQueue { private List queue = new LinkedList(); private int limit = 10; public BlockingQueue(int limit) { this.limit = limit; } public synchronized void enqueue(Object item) throws InterruptedException { while (this.queue.size() == this.limit) { wait(); } if (this.queue.size() == 0) { notifyAll(); // 通知所有的线程来取出,如果是加入线程则继续等待 } this.queue.add(item); } public synchronized Object dequeue() throws InterruptedException { while (this.queue.size() == 0) { wait(); } if (this.queue.size() == this.limit) { notifyAll(); // 通知所有的线程来加入,如果是取出线程则继续等待 } return this.queue.remove(0); } }必须注意到,在enqueue和dequeue方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用notifyAll方法。如果队列的大小既不等于上限,也不等于下限,任何线程调用enqueue或者dequeue方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。

Java提供了BlockingQueue接口的两个基本实现:LinkedBlockingQueue和ArrayBlockingQueue。他们都是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。他们的用法之间稍有区别,如已知队列的大小而能确定合适的边界时,用ArrayBlockingQueue非常高效。

下面来看ArrayBlockingQueue类的最主要的一个构造函数,如下:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
Lock的作用是提供独占锁机制,来保护竞争资源;而Condition是为了更加精细的对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。
notEmpty表示锁的非空条件。当某线程想从队列中取数据时,而此时又没有数据,则该线程通过notEmpty.await()进行等待;当其它线程向队列中插入了元素之后,就调用notEmpty.signal()唤醒之前通过notEmpty.await()进入等待状态的线程。同理,notFull表示“锁的满条件”。当某线程想向队列中插入元素,而此时队列已满时,该线程等待;当其它线程从队列中取出元素之后,就唤醒该等待的线程。

1、添加元素


    public boolean add(E e) {
        return super.add(e);
    }
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }
    public boolean offer(E e, long timeout, TimeUnit unit)  throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            insert(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
(1)add(E e)方法会调用AbstractQueue类中的方法,代码如下:

   public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
还是