ption {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
解析如下:
以上两个方法,原理一样,其中一个设置超时时间,支持最多阻塞多长时间。
状态如果小于 COMPLETING,说明还没到最终状态,(不管是否是成功、异常、取消)。
调用 awaitDone 方法阻塞线程,最终调用 report 方法返回结果。
awaitDone 方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//线程可中断,如果当前阻塞获取结果线程执行interrupt()方法,则从队列中移除该节点,并抛出中断异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 如果已经是最终状态,退出返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//这里做了个优化,competiting到最终状态时间很短,通过yield比挂起响应更快。
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 初始化该阻塞节点
else if (q == null)
q = new WaitNode();
// cas方式写到阻塞waiters栈中
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 这里做阻塞时间处理。
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞线程,有超时时间
LockSupport.parkNanos(this, nanos);
}
else
// 阻塞线程
LockSupport.park(this);
}
}
解析如下:
整体流程已写到注解中,整体实现是放在一个死循环中,唯一出口,是达到最终状态。
然后是构建节点元素,并将该节点入栈,同时阻塞当前线程等待运行主任务的线程唤醒该节点。
report 方法
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
然后是report方法,如果是正常结束,返回结果,如果不是正常结束(取消,中断)抛出异常。
最后分析下取消流程。
cancel 方法
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
解析如下:
mayInterruptIfRunning参数是是否允许运行中被中断取消。
- 根据入参是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则直接返回false。
- 如果允许运行中被中断取消,调用runner.interupt()进行中断取消,设置状态为INTERRUPTED。
- 唤醒所有在get()方法等待的线程。
此处有两种状态转换:
- 如果mayInterruptIfRunning为true:status状态转换为 new -> INTERRUPTING->INTERRUPTED。主动去中断执行线程,然后唤醒所有等待结果的线程。
- 如果mayInterruptIfRunning为false:status状态转换为 new -> CANCELLED。
不会去中断执行线程,直接唤醒所有等待结果的线程,从 awaitDone 方法中可以看到,唤醒等待线程后,直接从跳转回 get 方法,然后把结果返回给获取结果的线程,当然此时的结果是 null。
总结
以上就是 FutureTask 的源码简单解析,实现比较简单,FutureTask 就是一个实现 Future 模式,支持取消的异步处理器。