原文出处:
猴子007
线程池的实现核心之一是FutureTask。在提交任务时,用户实现的Callable实例task会被包装为FutureTask实例ftask;提交后任务异步执行,无需用户关心;当用户需要时,再调用FutureTask#get()获取结果——或异常。
随之而来的问题是,如何优雅的获取ftask的结果并处理异常?本文讨论使用FutureTask的正确姿势。
今天换个风格。
源码分析
从提交一个Callable实例task开始。
submit()
ThreadPoolExecutor直接继承AbstractExecutorService的实现。
public abstract class AbstractExecutorService implements ExecutorService { ... public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } ... }
后续流程可参考源码|从串行线程封闭到对象池、线程池。最终会在ThreadPoolExecutor#runWorker()中执行task.run()。
task即5行创建的ftask,看newTaskFor()。
newTaskFor()
AbstractExecutorService#newTaskFor()创建一个RunnableFuture类型的FutureTask。
public abstract class AbstractExecutorService implements ExecutorService { ... protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } ... }
看FutureTask的实现。
FutureTask
public class FutureTask<V> implements RunnableFuture<V> { ... private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; ... public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } ... }
构造方法的重点是初始化ftask状态为NEW。
状态机
状态转换比较少,直接给状态序列:
* NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED
状态在后面有用.
run()
简化如下:
public class FutureTask<V> implements RunnableFuture<V> { ... public void run() { try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } ... }
如果执行时未抛出异常
如果未抛出异常,则ran==true,FutureTask#set()设置结果。
public class FutureTask<V> implements RunnableFuture<V> { ... protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } ... }
- outcome中保存结果result
- 连续两步设置状态到NORMAL
- finishCompletion()执行一些清理
记住outcome。
相当于4行获取独占锁,5-6行执行锁中的操作(注意,7行是不加锁的)。
如果执行时抛出了异常
如果运行时抛出了异常,则被12行catch捕获,FutureTask#setException()设置结果;同时,ran==false,因此不执行FutureTask#set()。
public class FutureTask<V> implements RunnableFuture<V> { ... protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } ... }
- outcome中保存异常t
- 连续两步设置状态到EXCEPTIONAL
- finishCompletion()执行一