设为首页 加入收藏

TOP

聊聊高并发(二十四)解析java.util.concurrent各个组件(六) 深入理解AQS(四)
2019-05-13 14:11:41 】 浏览:11
Tags:聊聊 并发 二十四 解析 java.util.concurrent 各个 组件 深入 理解 AQS

最近整体过了下AQS的结构,也在网上看了一些讲AQS的文章,大部分的文章都是泛泛而谈。重新看了下AQS的代码,把一些新的要点拿出来说一说。


AQS是一个管程,提供了一个基本的同步器的能力,包含了一个状态,修改状态的原子操作,以及同步线程的一系列操作。它是CLHLock的变种,CLHLock是一个基于队列锁的自旋锁算法。AQS也采用了队列来作为同步线程的结构,它维护了两个队列,一个是作为线程同步的同步队列,另一个是基于Unsafe来进行阻塞/唤醒操作的条件队列。所以理解队列操作是理解AQS的关键。

1. 理解 head, tail引用

2. 理解 next, prev引用

3. 理解队列节点何时入队,何时出队


关于head引用,需要记住的是

1. head引用始终指向获得了锁的节点,它不会被取消。acquire操作成功就表示获得了锁,acquire过程中如果中断,那么acquire就失败了,这时候head就会指向下一个节点。

* because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.

而获得了锁的之后,如果线程中断了,那么就需要release来释放head节点。如果线程中断了不释放锁,就有可能造成问题。所以使用显式锁时,必须要在finally里面释放锁

Lock lock = new ReentrantLock();
		lock.lock();
		try{
			// 如果中断,可以处理获得抛出,要保证在finally里面释放锁
		}finally{
			lock.unlock();
		}

再来看看获得锁时对head引用的处理,只有节点的前驱节点是head时,它才有可能获得锁,而获得锁之后,要把自己设置为head节点,同时把老的head的next设置为null。

这里有几层含义:

1. 始终从head节点开始获得锁

2. 新的线程获得锁之后,之前获得锁的节点从队列中出队

3. 一旦获得了锁,acquire方法肯定返回,这个过程中不会被中断

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

关于tail引用,它负责无锁地实现一个链式结构,采用CAS + 轮询的方式。节点的入队操作都是在tail节点

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }


next引用在队列中扮演了很重要的作用,它出现的频率很高。关于next引用,它有几种值的情况

1. next = null

2. next指向非null的下一个节点

3. next = 节点自己


next = null的情况有三种

1. 队尾节点,队尾节点的next没有显式地设置,所以为null

2. 队尾节点入队列时的上一个队尾节点next节点有可能为null,因为enq不是原子操作,CAS之后是复合操作

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    // 这个期间next可能为null
                   t.next = node;
                    return t;
                }
            }
        }
    }

3. 获取锁时,之前获取锁的节点的next设置为null

if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }

next指向非null的下一个节点,这种情况就是正常的在同步队列中等待的节点,入队操作时设置了前一个节点的next值,这样可以在释放锁时,通知下一个节点来获取锁

 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

next指向自己,这个是取消操作时,会把节点的前一个节点指向它的后一个节点,最后把next域设置为自己

private void cancelAcquire(Node node) {
 // Ignore if node doesn't exist
 if (node == null)
 return;

 node.thread = null;

 // Skip cancelled predecessors
 Node pred = node.prev;
 while (pred.waitStatus > 0)
 node.prev = pred = pred.prev;

 // predNext is the apparent node to unsplice. CASes below will
 // fail if not, in which case, we lost race vs another cancel
 // or signal, so no further action is necessary.
 Node predNext = pred.next;

 // Can use unconditional write instead of CAS here.
 // After this atomic step, other Nodes can skip past us.
 // Before, we are free of interference from other threads.
 node.waitStatus = Node.CANCELLED;

 // If we are the tail, remove ourselves.
 if (node == tail && compareAndSetTail(node, pred)) {
 compareAndSetNext(pred, predNext, null);
 } else {
 // If successor needs signal, try to set pred's next-link
 // so it will get one. Otherwise wake it up to propagate.
 int ws;
 if (pred != head &&
 ((ws = pred.waitStatus) == Node.SIGNAL ||
 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
 pred.thread != null) {
 Node next = node.next;
 if (next != null && next.waitStatus <= 0)
 compareAndSetNext(pred, predNext, next);
 } else {
 unparkSuccessor(node);
 }

 node.next = node; // help GC
 }
 }

prev引用比较简单,它主要是维护链表结构。CLHLock是在前一个节点的状态自旋,AQS里面的节点不是在前一个状态等待,而是释放的时候由前一个节点通知队列来查找下一个要被唤醒的节点。


最后说说节点进入队列和出队列的情况。


节点入队列只有一种情况,那就是它的tryAcquire操作失败,没有获得锁,就进入同步队列等待,如果tryAcquire成功了,就不需要进入同步队列等待了。AQS提供了充分的灵活性,它提供了tryAcquire和tryRelase方法给子类扩展,基类负责维护队列操作,子类可以自己决定是否要进入队列。

所以实际子类扩展的时候有两种类型,一种是公平的同步器,一种是非公平的同步器。这里需要注意的是,所谓的非公平,不是说不使用队列来维护阻塞操作,而是说在获取竞争时,不考虑先来的线程,后来的线程可以直接竞争资源。非公平和公平的同步器竞争失败后,都需要进入AQS的同步队列进行等待,而同步队列是先来先服务的公平的队列。

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

出队列有两种情况,

1. 后一个线程获得锁是,head引用指向当前获得锁的线程,前一个获得锁的节点自动出队列

2. 取消操作时,节点出队列,取消只有两种情况,一种是线程被中断,还有一种是等待超时



编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇ICC副本>>>>(logback.. 下一篇flume读取日志数据写入kafka &nbs..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }