设为首页 加入收藏

TOP

FutureTask在线程池中应用和源码解析(二)
2018-11-20 12:08:18 】 浏览:268
Tags:FutureTask 线程 池中 应用 源码 解析
ption { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

解析如下:

以上两个方法,原理一样,其中一个设置超时时间,支持最多阻塞多长时间。
状态如果小于 COMPLETING,说明还没到最终状态,(不管是否是成功、异常、取消)。
调用 awaitDone 方法阻塞线程,最终调用 report 方法返回结果。

awaitDone 方法

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //线程可中断,如果当前阻塞获取结果线程执行interrupt()方法,则从队列中移除该节点,并抛出中断异常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            int s = state;
            // 如果已经是最终状态,退出返回
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //这里做了个优化,competiting到最终状态时间很短,通过yield比挂起响应更快。
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // 初始化该阻塞节点
            else if (q == null)
                q = new WaitNode();
            // cas方式写到阻塞waiters栈中
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 这里做阻塞时间处理。
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // 阻塞线程,有超时时间
                LockSupport.parkNanos(this, nanos);
            }
            else
                // 阻塞线程
                LockSupport.park(this);
        }
    }

解析如下:

整体流程已写到注解中,整体实现是放在一个死循环中,唯一出口,是达到最终状态。
然后是构建节点元素,并将该节点入栈,同时阻塞当前线程等待运行主任务的线程唤醒该节点。

report 方法

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

然后是report方法,如果是正常结束,返回结果,如果不是正常结束(取消,中断)抛出异常。

最后分析下取消流程。

cancel 方法

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

解析如下:

mayInterruptIfRunning参数是是否允许运行中被中断取消。

  1. 根据入参是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则直接返回false。
  2. 如果允许运行中被中断取消,调用runner.interupt()进行中断取消,设置状态为INTERRUPTED。
  3. 唤醒所有在get()方法等待的线程。

此处有两种状态转换:

  1. 如果mayInterruptIfRunning为true:status状态转换为 new -> INTERRUPTING->INTERRUPTED。主动去中断执行线程,然后唤醒所有等待结果的线程。
  2. 如果mayInterruptIfRunning为false:status状态转换为 new -> CANCELLED。

不会去中断执行线程,直接唤醒所有等待结果的线程,从 awaitDone 方法中可以看到,唤醒等待线程后,直接从跳转回 get 方法,然后把结果返回给获取结果的线程,当然此时的结果是 null。

总结

以上就是 FutureTask 的源码简单解析,实现比较简单,FutureTask 就是一个实现 Future 模式,支持取消的异步处理器。

首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇谈谈 Java 类加载机制 下一篇Java字节码结构剖析三:方法表

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目