原文出处:
hcy0411
FutureTask 是一个支持取消的异步处理器,一般在线程池中用于异步接受callable返回值。
主要实现分三部分:
- 封装 Callable,然后放到线程池中去异步执行->run。
- 获取结果-> get。
- 取消任务-> cancel。
接下来主要学习下该模型如何实现。
举例说明FutureTask在线程池中的应用
// 第一步,定义线程池, ExecutorService executor = new ThreadPoolExecutor( minPoolSize, maxPollSize, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<>()); // 第二步,放到线程池中执行,返回FutureTask FutureTask task = executor.submit(callable); // 第三步,获取返回值 T data = task.get();
学习FutureTask实现
类属性
//以下是FutureTask的各种状态 private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; private Callable<V> callable; //执行的任务 private Object outcome; //存储结果或者异常 private volatile Thread runner;//执行callable的线程 private volatile WaitNode waiters; //调用get方法等待获取结果的线程栈 其中各种状态存在 最终状态 status>COMPLETING 1)NEW -> COMPLETING -> NORMAL(有正常结果) 2) NEW -> COMPLETING -> EXCEPTIONAL(结果为异常) 3) NEW -> CANCELLED(无结果) 4) NEW -> INTERRUPTING -> INTERRUPTED(无结果)
类方法
从上面举例说明开始分析。
run()方法
FutureTask 继承 Runnable,ExecutorService submit 把提交的任务封装成 FutureTask 然后放到线程池 ThreadPoolExecutor 的 execute 执行。
public void run() { //如果不是初始状态或者cas设置运行线程是当前线程不成功,直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 执行callable任务 这里对异常进行了catch result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); // 封装异常到outcome } if (ran) set(result); } } finally { runner = null; int s = state; // 这里如果是中断中,设置成最终状态 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
以上是 run 方法源码实现很简单,解析如下:
- 如果不是始状态或者 cas 设置运行线程是当前线程不成功,直接返回,防止多个线程重复执行。
- 执行 Callable 的 call(),即提交执行任务(这里做了catch,会捕获执行任务的异常封装到 outcome 中)。
- 如果成功执行 set 方法,封装结果。
set 方法
protected void set(V v) { //cas方式设置成completing状态,防止多个线程同时处理 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; // 封装结果 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终设置成normal状态 finishCompletion(); } }
解析如下:
- cas方式设置成completing状态,防止多个线程同时处理
- 封装结果到outcome,然后设置到最终状态normal
- 执行finishCompletion方法。
finishCompletion方法
// state > COMPLETING; 不管异常,中断,还是执行完成,都需要执行该方法来唤醒调用get方法阻塞的线程 private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { // cas 设置waiters为null,防止多个线程执行。 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 循环唤醒所有等待结果的线程 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //唤醒线程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //该方法为空,可以被重写 done(); callable = null; // to reduce footprint }
解析如下:
遍历waiters中的等待节点,并通过 LockSupport 唤醒每一个节点,通知每个线程,该任务执行完成(可能是执行完成,也可能 cancel,异常等)。
以上就是执行的过程,接下来分析获取结果的过程->get。
get 方法
public V get() throws InterruptedException, ExecutionExce