设为首页 加入收藏

TOP

fork/join 全面剖析(五)
2019-09-17 18:08:02 】 浏览:105
Tags:fork/join 全面 剖析

  

  再看一下RecursiveTask的定义  

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;

    /**
     * The result of the computation.
     */
    V result;

    /**
     * The main computation performed by this task.
     * @return the result of the computation
     */
    protected abstract V compute(); //我们实现的处理逻辑

    public final V getRawResult() { //获取返回计算结果
        return result;
    }

    protected final void setRawResult(V value) {
        result = value;
    }
    /**
     * Implements execution conventions for RecursiveTask.
     */
    protected final boolean exec() {
        result = compute(); //存储计算结果
        return true;
    }

}

  

在代码中我们看到任务的真正执行链路是 doExec -> exec -> compute -> 最后设置status 和 result。既然定义状态status并且还是volatile类型我们可以推断出workerThread在获取到执行任务之后都会先判断status是不是已完成或者异常状态,才决定要不要处理该任务。

      下面看一下任务真正的处理逻辑代码!      

  Integer rightResult = rightTask.join()
   
    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
     }
 
    //执行处理前先判断staus是不是已完成,如果完成了就直接返回
   //因为这个任务可能被其它线程窃取过去处理完了
    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }  
 

    代码的调用链是从上到下。整体处理逻辑如下:

     线程是workerThread:

     先判断任务是否已经处理完成,任务完成直接返回,没有则直接尝试出队tryUnpush(this) 然后执行任务处理doExec()。如果没有出队成功或者处理成功,则执行wt.pool.awaitJoin(w, this, 0L)。wt.pool.awaitJoin(w, this, 0L)的处理逻辑简单来说也是在一个for(;;)中不断的轮询任务的状态是不是已完成,完成就直接退出方法。否就继续尝试出队处理。直到任务完成或者超时为止。

   线程不是workerThread:

   直接进行入externalAwaitDone()

 private int externalAwaitDone() {
        int s = ((this instanceof CountedCompleter) ? // try helping
                 ForkJoinPool.common.externalHelpComplete(
                     (CountedCompleter<?>)this, 0) :
                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
        if (s >= 0 && (s = status) >= 0) {
            boolean interrupted = false;
            do {
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                    synchronized (this) {
                        if (status >= 0) {
                            try {
                                wait(0L);
                            } catch (InterruptedException ie) {
                                interrupted = true;
                            }
                        }
                        else
                            notifyAll();
                    }
                }
            } while ((s = status) >= 0);
            if (interrupted)
                Thread.currentThread().interrupt();
        }
        return s;
   
 
 

    externalAwaitDone的处理逻辑其实也比较简单,当前线程自己先尝试把任务出队ForkJoinPool.common.tryExternalUnpush(this) ? doExec()然后处理掉,如果不成功就交给workerThread去处理,然后利用object/wait的经典方法去监听任务status的状态变更。

    

    6、任务的窃取

         一直说fork/join的任务是work-stealing(工作窃取),那任务究竟是怎么被窃取的呢。我们分析一下任务是由workThread来窃取的,workThread是一个线程。线程的所有逻辑都是由run()方法执行,所以任务的窃取逻辑一定在run()方法中可以找到!

 public void run() { //线程run方法
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                onStart();
                pool.runWorker(workQueue);  //在这里处理任务队列!
            } catch (Throwable ex) {
                exception = ex;
            } finally {
                try {
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exc
首页 上一页 2 3 4 5 6 下一页 尾页 5/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇每天学点SpringCloud(七):路由.. 下一篇Activiti6.0 spring5 工作流引擎 ..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目