设为首页 加入收藏

TOP

FutureTask源码解析(一)
2017-09-30 12:05:32 】 浏览:4399
Tags:FutureTask 源码 解析
站在使用者的角度,future是一个经常在多线程环境下使用的Runnable,使用它的好处有两个:
  1. 线程执行结果带有返回值
  2. 提供了一个线程超时的功能,超过超时时间抛出异常后返回。

那,怎么实现future这种超时控制呢?来看看代码:

public class FutureTask<V> implements RunnableFuture<V> {
    /** Synchronization control for FutureTask */
    private final Sync sync;

FutureTask的实现只是依赖了一个内部类Sync实现的,Sync是AQS (AbstractQueuedSynchronizer)的子类,这个类承担了所有future的功 能,AbstractQueuedSynchronizer的作者是大名鼎鼎的并发编程大师Doug Lea,它的作用远远不止实现一个Future这么简单,后面在说。

下面,我们从一个future提交到线程池开始,直到future超时或者执行结束来看看future都做了些什么。怎么做的。

首先,向线程池ThreadPoolExecutor提交一个future:

future = exec.submit( new WebDivideFuture (cookieUtils, jediusUtil, request, selectFactory, result, testInfos) );

ThreadPoolExecutor将提交的任务用FutureTask包装一下:

public <T> Future<T> submit (Callable<T> task) {
    if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor (Callable<T> callable) {
    return new FutureTask<T>(callable);
}

然后尝试将包装后的Future用Thread类包装下后启动,

红色标记的地方表示,当当前线程池的大小小于corePoolSize时,将任务提交,否则将该任务加入到workQueue中去,如果workQueue装满了,则尝试在线程数小于MaxPoolSize的条件下提交该任务。

顺便说明下,我们使用线程池时,常常看到有关有界队列,无界队列作为工作队列的字眼:使用无界队列时,线程池的大小永远不大于corePoolSize,使用有界队列时的maxPoolSize才有效,原因就在这里,如果是

无界队列,红框中的add永远为true 下方的addIfUnderMaximumPoolSize怎么也走不到了,也就不会有线程数量大于MaxPoolSize的情况

言归正传,看看addIfUnderCorePoolSize 中做了什么事:

new了一个Thread,将我们提交的任务包装下后就直接启动了

private boolean addIfUnderCorePoolSize (Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolsize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

我们知道,线程的start方法会调用我们runnable接口的run方法,因此不难猜测FutureTask也是实现了Runnable接口的

public class FutureTask<V> implements RunnableFuture<V> {
    /** Synchronization control for FutureTask */

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

FutureTask的run()方法中是这么写:

public void run() {
    sync.innerRun();
}

innerRun方法先使用原子方式更改了一下自己的一个标志位state(用于标示任务的执行情况)

然后红色框的方法 实现回调函数call的调用,并且将返回值作为参数传递下去,放置在一个叫做result的泛型变量中,

然后future只管等待一段时间后去拿result这个变量的值就可以了。   至于怎么实现的“等待一段时间再去拿” 后面马上说明。

innerSet在经过一系列的状态判断后,最终将V这个call方法返回的值赋值给了result

说到这里,我们知道,future是通过将call方法的返回值放在一个叫做result的变量中,经过一段时间的等待后再去拿出来返回就可以了。

怎么实现这个 “等一段时间”呢?

要从Sync的父类AbstractQueuedSynchronizer这个类说起:

我 们知道AbstractQueuedSynchronizer 后者的中文名字叫做 同步器,顾名思义,是用来控制资源占用的一种方式。对于FutureTask来说,“资源”就是result,线程执行的结果。思路就是通过控制对 result这个资源的访问来决定是否需要马上去取得result这个结果,当超时时间未到,或者线程未执行结束时,是不能去取result的。当线程正 常执行结束后,一系列的标志位会被修改,并告诉等待future执行结果的各个线程,可以来获取result了。

这里会涉及到 独占锁和共享锁的概念。

独占锁:同一时间只有一个线程获取锁。再有线程尝试加锁,将失败。 典型例子 reentrantLock

共享锁:同一时间可以有多个线程获取锁。 典型例子,本例中的FutureTask

为什么说他们?因为Sync本质上就是想完成一个共享锁的功能,所以Sync继承了AbstractQueuedSynchronizer 所以Sync的方法使用的是Abstra

首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FutureTask源码解析 下一篇解码OutOfMemoryError:PermGen S..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目