Java 并发类库AbstractQueuedSynchronizer 分析(二)

2014-11-24 02:40:47 · 作者: · 浏览: 4
if(尝试释放成功){
    unpark等待队列中第一个节点
}else{
    return false
}

要满足以上两个操作,需要以下3点来支持:
1、原子操作同步状态;
2、阻塞或者唤醒一个线程;

3、内部应该维护一个队列。

AQS的实现采用了模板设计模式,在AbstractQueuedSynchronizer类中,定义了

protected boolean tryAcquire(int arg);
protected int tryAcquireShared(int arg);
protected boolean tryRelease(int arg);
protected boolean tryReleaseShared(int arg);
等未具体实现的方法,子类需要实现这些方法,来完成不同的同步器实现。

3.2 获取、释放锁操作

3.2.1 获取操作

获取锁操作的代码如下:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&   
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt(); //如果获取锁的过程中有中断,则在获取操作完成后,响应中断。
    }

上述逻辑主要包括:
1. 尝试获取(调用tryAcquire更改状态,需要保证原子性);
在tryAcquire方法中使用了同步器提供的对state操作的方法,利用compareAndSet保证只有一个线程能够对状态进行成功修改,而没有成功修改的线程将进入sync队列排队(通过调用addWaiter方法)

addWaiter方法如下:

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure 首先在尾部快速添加,失败后再调用enq方法
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {  //通过CAS操作,来进行入队操作
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

2. 如果获取不到,将当前线程构造成节点Node并加入sync队列;
进入队列的每个线程都是一个节点Node,从而形成了一个双向队列,类似CLH队列,这样做的目的是线程间的通信会被限制在较小规模(也就是两个节点左右)。
3. 再次尝试获取(调用acquireQueued方法),如果没有获取到那么将当前线程从线程调度器上摘下,进入等待状态。

acquireQueued代码如下:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { //如果为头结点,且获取锁成功,则退出, Note:head其实保存的是已经获取锁的节点,是哑节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) //parkAndCheckInterrupt的实现会调用park方法,使当前线程进入等待状态
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
上述逻辑主要包括:
1. 获取当前节点的前驱节点;
需要获取当前节点的前驱节点,而头结点所对应的含义是当前站有锁且正在运行。
2. 当前驱节点是头结点并且能够获取状态,代表该当前节点占有锁;
如果满足上述条件,那么代表能够占有锁,根据节点对锁占有的含义,设置头结点为当前节点。
3. 否则进入等待状态。
如果没有轮到当前节点运行,那么将当前线程从线程调度器上摘下,也就是进入等待状态。


需要注意的是,acquire在执行过程中,并不能及时的对外界中断进行相应,必须等待执行完毕之后,如果由外部中断,则进行中断响应。与acquire方法类似,acquireInterruptibly方法提供了获取状态能力,当然在无法获取状态的情况下会进入sync队列进行排队,这类似acquire,但是和acquire不同的地方在于它能够在外界对当前线程进行中断的时候提前结束获取状态的操作,换句话说,就是在类似synchronized获取锁时,外界能够对当前线程进行中断,并且获取锁的这个操作能够响应中断并提前返回。一个线程处于synchronized块中或者进行同步I/O操作时,对该线程进行中断操作,这时该线程的中断标识位被设置为true,但是线程依旧继续运行。

3.2.2 释放操作

释放操作代码如下:
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
上述逻辑主要包括:
1. 尝试释放状态;
tryRelease能够保证原子化的将状态设置回去,当然需要使用compareAndSet来保证。如果释放状态成功过之后,将会进入后继节点的唤醒过程。
2. 唤醒当前节点的后继节点所包含的线程。
通过LockSupport的unpark方法将休眠中的线程唤醒,让其继续acquire状态。