设为首页 加入收藏

TOP

Java并发之Condition的实现分析(一)
2018-10-11 16:13:01 】 浏览:341
Tags:Java 并发 Condition 实现 分析

一、Condition的概念

介绍

回忆 synchronized 关键字,它配合 Object 的 wait()、notify() 系列方法可以实现等待/通知模式。

对于 Lock,通过 Condition 也可以实现等待/通知模式。

Condition 是一个接口。
Condition 接口的实现类是 Lock(AQS)中的 ConditionObject。
Lock 接口中有个 newCondition() 方法,通过这个方法可以获得 Condition 对象(其实就是 ConditionObject)。
因此,通过 Lock 对象可以获得 Condition 对象。

Lock lock  = new ReentrantLock();
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();

二、Condition的实现分析

实现

ConditionObject 类是 AQS 的内部类,实现了 Condition 接口。

public class ConditionObject implements Condition, java.io.Serializable {
        private transient Node firstWaiter;
        private transient Node lastWaiter;
        ...

可以看到,等待队列和同步队列一样,使用的都是同步器 AQS 中的节点类 Node。
同样拥有首节点和尾节点,
每个 Condition 对象都包含着一个 FIFO 队列。
结构图:

等待

调用 Condition 的 await() 方法会使线程进入等待队列,并释放锁,线程状态变为等待状态。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    //释放同步状态(锁)
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //判断节点是否放入同步对列
    while (!isOnSyncQueue(node)) {
        //阻塞
        LockSupport.park(this);
        //如果已经中断了,则退出
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
}

分析上述方法的大概过程:

  1. 将当前线程创建为节点,加入等待队列;
  2. 释放锁,唤醒同步队列中的后继节点;
  3. while循环判断节点是否放入同步队列:
  • 没有放入,则阻塞,继续 while 循环(如果已经中断了,则退出)
  • 放入,则退出 while 循环,执行后面的判断
  1. 退出 while 说明节点已经在同步队列中,调用 acquireQueued() 方法加入同步状态竞争。
  2. 竞争到锁后从 await() 方法返回,即退出该方法。

addConditionWaiter() 方法:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        //清除条件队列中所有状态不为Condition的节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //将该线程创建节点,放入等待队列
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

过程分析:同步队列的首节点移动到等待队列。加入尾节点之前会清除所有状态不为 Condition 的节点。

通知

调用 Condition 的 signal() 方法,可以唤醒等待队列的首节点(等待时间最长),唤醒之前会将该节点移动到同步队列中。

public final void signal() {
    //判断是否获取了锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

过程:

  1. 先判断当前线程是否获取了锁;
  2. 然后对首节点调用 doSignal() 方法。
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
       (first = firstWaiter) != null);
}

过程:

  1. 修改首节点;
  2. 调用 transferForSignal() 方法将节点移动到同步队列。
final boolean transferForSignal(Node node) {
    //将节点状态变为0   
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //将该节点加入同步队列
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

调用同步器的 enq 方法,将节点移动到同步队列,
满足条件后使用 LockSupport 唤醒该线程。

当 Condition 调用 signalAll() 方法:

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first !=
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇搭建大众点评CAT监控平台 下一篇ActiveMQ结合Spring收发消息

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目