设为首页 加入收藏

TOP

fork/join 全面剖析(二)
2019-09-17 18:08:02 】 浏览:109
Tags:fork/join 全面 剖析
立的小任务,然后两两求和合并。

       其实显然易见负责整个fork/join初始化工作的就是ForkJoinPool!初始化代码就是那一行 ForkJoinPool forkJoinPool = new ForkJoinPool(),点进去查看源码。

ForkJoinPool forkJoinPool = new ForkJoinPool();
//最终调用到这段代码 
public ForkJoinPool(int parallelism, 
                    ForkJoinWorkerThreadFactory factory, 
                    UncaughtExceptionHandler handler, 
                    boolean asyncMode) { 
    this(checkParallelism(parallelism), //并行度,当前机器的cpu核数
            checkFactory(factory), //工作线程创建工厂
            handler, //异常处理handler
            asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //任务队列出队模式 异步:先进先出,同步:后进先出
            "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}

    看完初始化的代码我们可以知道原来创建ForkJoinPool创建workerThread的工作都是统一由一个叫ForkJoinWorkerThreadFactory的工厂去创建,创建出来的线程都有一个统一的前辍名称"ForkJoinPool-" + nextPoolId() + "-worker-".队列出队模式是LIFO(后进先出),那这样后面的入队的任务是会被先处理的。所以上面提到对代码做了一些修改就是先处理rightTask,再处理leftTask。这其实是对代码的一种优化!

    //执行子任务
     leftTask.fork();
     rightTask.fork();
 
     Integer rightResult = rightTask.join();
     Integer leftResult = leftTask.join();

 

4、任务的提交逻辑?

    fork/join其实大部分逻辑处理操作都集中在提交任务和处理任务这两块,了解任务的提交基本上后面就很容易理解了。

    fork/join提交任务主要分为两种:

    第一种:第一次提交到forkJoinPool 

     ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(countTask);

  第二种:任务切分之后的提交

     leftTask.fork();
     rightTask.fork();

      

   提交到forkJoinPool :

   代码调用路径 submit(ForkJoinTask<T> task) ->  externalPush(ForkJoinTask<?> task)  ->  externalSubmit(ForkJoinTask<?> task)

   下面贴上externalSubmit的详细代码,着重留意注释的部分。

 private void externalSubmit(ForkJoinTask<?> task) {
        int r;                                    // initialize caller's probe
        if ((r = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();
            r = ThreadLocalRandom.getProbe();
        }
        for (;;) { //采用循环入队的方式
            WorkQueue[] ws; WorkQueue q; int rs, m, k;
            boolean move = false;
            if ((rs = runState) < 0) {
                tryTerminate(false, false);     // help terminate 
                throw new RejectedExecutionException();
            }
            else if ((rs & STARTED) == 0 ||     // initialize 初始化操作
                     ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                int ns = 0;
                rs = lockRunState();
                try {
                    if ((rs & STARTED) == 0) {
                        U.compareAndSwapObject(this, STEALCOUNTER, null,
                                               new AtomicLong());
                        // create workQueues array with size a power of two 
                        int p = config & SMASK; // ensure at least 2 slots //config就是cpu的核数
                        int n = (p > 1) ? p - 1 : 1;
                        n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                        n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; //算出workQueues的大小n,n一定是2的次方数
                        workQueues = new WorkQueue[n];  //初始化队列,然后跳到最外面的循环继续把任务入队~
                        ns = STARTED;
                    }
                } finally {
                    unlockRunState(rs, (rs & ~RSLOCK) | ns);
                }
            }
            else if ((q = ws[k = r & m & SQMASK]) != null) { //选中了一个一个非空队列
                if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { //利用cas操作加锁成功!
                    ForkJoinTask<?>[] a = q.array;
                    int s = q.top;
                    boolean submitted = false; // initial submission or resizing
                    try {                      // locked version of push
                        if ((a != null && a.length > s + 1 - q.base) ||
                            (a = q.growArray()) != null) {
                            int j = (((a.length - 1) & s) << ASHIFT) + ABASE; //计算出任务在队列中的位置
                            U.putOrderedObject(a, j, task);  //把任务放在队列中
                            U.
首页 上一页 1 2 3 4 5 6 下一页 尾页 2/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇每天学点SpringCloud(七):路由.. 下一篇Activiti6.0 spring5 工作流引擎 ..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目