设为首页 加入收藏

TOP

FutureTask在线程池中应用和源码解析(一)
2018-11-20 12:08:18 】 浏览:269
Tags:FutureTask 线程 池中 应用 源码 解析

FutureTask 是一个支持取消的异步处理器,一般在线程池中用于异步接受callable返回值。

主要实现分三部分:

  1. 封装 Callable,然后放到线程池中去异步执行->run。
  2. 获取结果-> get。
  3. 取消任务-> cancel。

接下来主要学习下该模型如何实现。

举例说明FutureTask在线程池中的应用

// 第一步,定义线程池,
ExecutorService executor = new ThreadPoolExecutor(
        minPoolSize,
        maxPollSize,
        keepAliveTime,
        TimeUnit.SECONDS,
        new SynchronousQueue<>());

// 第二步,放到线程池中执行,返回FutureTask
FutureTask  task = executor.submit(callable);

// 第三步,获取返回值
T data = task.get();

学习FutureTask实现

类属性

//以下是FutureTask的各种状态
private volatile int state; 
private static final int NEW          = 0; 
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

private Callable<V> callable; //执行的任务
private Object outcome; //存储结果或者异常
private volatile Thread runner;//执行callable的线程
private volatile WaitNode waiters; //调用get方法等待获取结果的线程栈

其中各种状态存在 最终状态 status>COMPLETING
1)NEW -> COMPLETING -> NORMAL(有正常结果)
2) NEW -> COMPLETING -> EXCEPTIONAL(结果为异常) 
3) NEW -> CANCELLED(无结果) 
4) NEW -> INTERRUPTING -> INTERRUPTED(无结果)

类方法

从上面举例说明开始分析。

run()方法

FutureTask 继承 Runnable,ExecutorService submit 把提交的任务封装成 FutureTask 然后放到线程池 ThreadPoolExecutor 的 execute 执行。

public void run() {
    //如果不是初始状态或者cas设置运行线程是当前线程不成功,直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
              // 执行callable任务 这里对异常进行了catch
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex); // 封装异常到outcome
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        // 这里如果是中断中,设置成最终状态
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

以上是 run 方法源码实现很简单,解析如下:

  1. 如果不是始状态或者 cas 设置运行线程是当前线程不成功,直接返回,防止多个线程重复执行。
  2. 执行 Callable 的 call(),即提交执行任务(这里做了catch,会捕获执行任务的异常封装到 outcome 中)。
  3. 如果成功执行 set 方法,封装结果。

set 方法

protected void set(V v) {
    //cas方式设置成completing状态,防止多个线程同时处理
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v; // 封装结果
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终设置成normal状态

        finishCompletion();
    }
}

解析如下:

  1. cas方式设置成completing状态,防止多个线程同时处理
  2. 封装结果到outcome,然后设置到最终状态normal
  3. 执行finishCompletion方法。

finishCompletion方法

// state > COMPLETING; 不管异常,中断,还是执行完成,都需要执行该方法来唤醒调用get方法阻塞的线程
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        // cas 设置waiters为null,防止多个线程执行。
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 循环唤醒所有等待结果的线程
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    //唤醒线程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
   //该方法为空,可以被重写
    done();
    callable = null;        // to reduce footprint
}

解析如下:

遍历waiters中的等待节点,并通过 LockSupport 唤醒每一个节点,通知每个线程,该任务执行完成(可能是执行完成,也可能 cancel,异常等)。

以上就是执行的过程,接下来分析获取结果的过程->get。

get 方法

public V get() throws InterruptedException, ExecutionExce
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇谈谈 Java 类加载机制 下一篇Java字节码结构剖析三:方法表

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目