设为首页 加入收藏

TOP

fork/join 全面剖析(六)
2019-09-17 18:08:02 】 浏览:106
Tags:fork/join 全面 剖析
eption = ex; } finally { pool.deregisterWorker(this, exception); } } } } /** * Top-level runloop for workers, called by ForkJoinWorkerThread.run. */ final void runWorker(WorkQueue w) { w.growArray(); // allocate queue 进行队列的初始化 int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift for (ForkJoinTask<?> t;;) { //又是无限循环处理任务! if ((t = scan(w, r)) != null) //在这里获取任务! w.runTask(t); else if (!awaitWork(w, r)) break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } }

 

  其实只要看下面的英文注释就知道了大概scan(WorkQueue w, int r)就是用来窃取任务的!

/**
     * Scans for and tries to steal a top-level task. Scans start at a
     * random location, randomly moving on apparent contention,
     * otherwise continuing linearly until reaching two consecutive
     * empty passes over all queues with the same checksum (summing
     * each base index of each queue, that moves on each steal), at
     * which point the worker tries to inactivate and then re-scans,
     * attempting to re-activate (itself or some other worker) if
     * finding a task; otherwise returning null to await work.  Scans
     * otherwise touch as little memory as possible, to reduce
     * disruption on other scanning threads.
     *
     * @param w the worker (via its WorkQueue)
     * @param r a random seed
     * @return a task, or null if none found
     */
    private ForkJoinTask<?> scan(WorkQueue w, int r) {
        WorkQueue[] ws; int m;
        if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
            int ss = w.scanState;                     // initially non-negative
            for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
                WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
                int b, n; long c;
                if ((q = ws[k]) != null) {   //随机选中了非空队列 q
                    if ((n = (b = q.base) - q.top) < 0 &&
                        (a = q.array) != null) {      // non-empty
                        long i = (((a.length - 1) & b) << ASHIFT) + ABASE;  //从尾部出队,b是尾部下标
                        if ((t = ((ForkJoinTask<?>)
                                  U.getObjectVolatile(a, i))) != null &&
                            q.base == b) {
                            if (ss >= 0) {
                                if (U.compareAndSwapObject(a, i, t, null)) { //利用cas出队
                                    q.base = b + 1;
                                    if (n < -1)       // signal others
                                        signalWork(ws, q); 
                                    return t;  //出队成功,成功窃取一个任务!
                                }
                            }
                            else if (oldSum == 0 &&   // try to activate 队列没有激活,尝试激活
                                     w.scanState < 0)
                                tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                        }
                        if (ss < 0)                   // refresh
                            ss = w.scanState;
                        r ^= r << 1; r ^= r >>> 3; r ^= r << 10; 
                        origin = k = r & m;           // move and rescan
                        oldSum = checkSum = 0;
                        continue;
                    }
                    checkSum += b;
                }
//k = k + 1表示取下一个队列 如果(k + 1) & m == origin表示 已经遍历完所有队列了 if ((k = (k + 1) & m) == origin) { // continue until stable if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive break; int ns = ss | INACTIVE; // try to inactivate long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int)c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; // back out } checkSum = 0; } } } return null; }

  所以我们知道任务的窃取从workerThread运行的那一刻就已经开始了!先随机选中一条队列看能不能窃取到任务,取不到则窃取下一条队列,直接遍历完一遍所有的队列,如果都窃取不到就返回null。

 

    以上就是我阅读fork/join源码之后总结出来一些心得,写了那么多我觉得也只是描述了个大概而已,真正详细有用的东西还需要仔细去阅读里面的代码才行。如果大家有兴趣的话,不妨也去尝试一下吧-。-~

        

   &n

首页 上一页 3 4 5 6 下一页 尾页 6/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇每天学点SpringCloud(七):路由.. 下一篇Activiti6.0 spring5 工作流引擎 ..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目