再看一下RecursiveTask的定义
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
/**
* The result of the computation.
*/
V result;
/**
* The main computation performed by this task.
* @return the result of the computation
*/
protected abstract V compute(); //我们实现的处理逻辑
public final V getRawResult() { //获取返回计算结果
return result;
}
protected final void setRawResult(V value) {
result = value;
}
/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute(); //存储计算结果
return true;
}
}
在代码中我们看到任务的真正执行链路是 doExec -> exec -> compute -> 最后设置status 和 result。既然定义状态status并且还是volatile类型我们可以推断出workerThread在获取到执行任务之后都会先判断status是不是已完成或者异常状态,才决定要不要处理该任务。
下面看一下任务真正的处理逻辑代码!
Integer rightResult = rightTask.join()
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
//执行处理前先判断staus是不是已完成,如果完成了就直接返回
//因为这个任务可能被其它线程窃取过去处理完了
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
代码的调用链是从上到下。整体处理逻辑如下:
线程是workerThread:
先判断任务是否已经处理完成,任务完成直接返回,没有则直接尝试出队tryUnpush(this) 然后执行任务处理doExec()。如果没有出队成功或者处理成功,则执行wt.pool.awaitJoin(w, this, 0L)。wt.pool.awaitJoin(w, this, 0L)的处理逻辑简单来说也是在一个for(;;)中不断的轮询任务的状态是不是已完成,完成就直接退出方法。否就继续尝试出队处理。直到任务完成或者超时为止。
线程不是workerThread:
直接进行入externalAwaitDone()
private int externalAwaitDone() {
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
externalAwaitDone的处理逻辑其实也比较简单,当前线程自己先尝试把任务出队ForkJoinPool.common.tryExternalUnpush(this) ? doExec()然后处理掉,如果不成功就交给workerThread去处理,然后利用object/wait的经典方法去监听任务status的状态变更。
6、任务的窃取
一直说fork/join的任务是work-stealing(工作窃取),那任务究竟是怎么被窃取的呢。我们分析一下任务是由workThread来窃取的,workThread是一个线程。线程的所有逻辑都是由run()方法执行,所以任务的窃取逻辑一定在run()方法中可以找到!
public void run() { //线程run方法
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue); //在这里处理任务队列!
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exc