设为首页 加入收藏

TOP

fork/join 全面剖析(四)
2019-09-17 18:08:02 】 浏览:103
Tags:fork/join 全面 剖析
putOrderedInt(q, QTOP, s + 1); //更新一次存放的位置 submitted = true; } } finally { U.compareAndSwapInt(q, QLOCK, 1, 0); //利用cas操作释放锁! } if (submitted) { signalWork(ws, q); return; //任务入队成功了!跳出循环! } } move = true; // move on failure } else if (((rs = runState) & RSLOCK) == 0) { // create new queue 选中的队列是空,初始化完队列,然后继续入队! q = new WorkQueue(this, null); q.hint = r; q.config = k | SHARED_QUEUE; q.scanState = INACTIVE; rs = lockRunState(); // publish index if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }

   通过对externalSubmit方法的代码进行分析,我们知道了第一次提交任务给forkJoinPool时是在无限循环for (;;)中入队。第一步先检查workQueues是不是还没有创建,如果没有,则进行创建。之后跳到外层for循环并随机选取workQueues里面一个队列,并判断队列是否已创建。没有创建,则进行创建!后又跳到外层for循环直到选到一个非空队列并且加锁成功!这样最后才把任务入队~。

     所以我们知道fork/join的任务队列workQueues并不是初始化的时候就创建好了,而是在有任务提交的时候才创建!并且每次入队时都需要利用cas操作来进行加锁和释放锁!

 

     任务切分之后的提交:

  public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this); //workerThread直接入自己的workQueue
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

  

final void externalPush(ForkJoinTask<?> task) {
        WorkQueue[] ws; WorkQueue q; int m;
        int r = ThreadLocalRandom.getProbe();
        int rs = runState;
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            U.compareAndSwapInt(q, QLOCK, 0, 1)) { //随机选取了一个非空队列,并且加锁成功!下面是普通的入队过程~
            ForkJoinTask<?>[] a; int am, n, s;
            if ((a = q.array) != null &&
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                int j = ((am & s) << ASHIFT) + ABASE;
                U.putOrderedObject(a, j, task);
                U.putOrderedInt(q, QTOP, s + 1);
                U.putIntVolatile(q, QLOCK, 0);
                if (n <= 1)
                    signalWork(ws, q);
                return; //结束方法
            }
            U.compareAndSwapInt(q, QLOCK, 1, 0); //一定要释放锁!
        }
//这个就是上面的externalSummit方法,逻辑是一样的~ externalSubmit(task); }

  从代码中我们知道了提交一个fork任务的过程和第一次提交到forkJoinPool的过程是大同小异的。主要区分了提交任务的线程是不是workerThread,如果是,任务直接入workerThread当前的workQueue,不是则尝试选中一个workQueue q。如果q非空并且加锁成功则进行入队,否则执行与第一次任务提交到forkJoinPool差不多的逻辑~。

     

     5、任务的消费

        提交到任务的最终目的,是为了消费任务并最终获取到我们想要的结果。介绍任务消费之前我们先了解一个我们的任务ForkJoinTask有哪些关键属性和方法。

    /** The run status of this task */
    volatile int status; // accessed directly by pool and workers
    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
    static final int NORMAL      = 0xf0000000;  // must be negative
    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
    static final int SMASK       = 0x0000ffff;  // short bits for tags

  

  final int doExec() { //任务的执行入口
    int s; boolean completed;
    if ((s = status) >= 0) {
        try {
            completed = exec();
        } catch (Throwable rex) {
            return setExceptionalCompletion(rex);
        }
        if (completed)
            s = setCompletion(NORMAL);
    }
    return s;
   }
首页 上一页 1 2 3 4 5 6 下一页 尾页 4/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇每天学点SpringCloud(七):路由.. 下一篇Activiti6.0 spring5 工作流引擎 ..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目