设为首页 加入收藏

TOP

多线程编程学习七( Fork/Join 框架).(一)
2019-09-07 07:10:06 】 浏览:68
Tags:线程 编程 学习 Fork/Join 框架

一、介绍

使用 java8 lambda 表达式大半年了,一直都知道底层使用的是 Fork/Join 框架,今天终于有机会来学学 Fork/Join 框架了。

Fork/Join 框架是 Java 7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

Fork/Join 的运行流程示意图:

比如,一个 1+2+3+...+100 的工作任务,我们可以把它 Fork 成 10 个子任务,分别计算这 10 个子任务的运行结果。最后再把 10 个子任务的结果 Join 起来,汇总成最后的结果。

为了减少线程间的竞争,通常把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其它线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。线程的这种执行方式,我们称之为“工作窃取”算法。

二、设计

实现 Fork/Join 框架的设计,大抵需要两步:

1. 分割任务

首先我们需要创建一个 ForkJoin 任务,把大任务分割成子任务,如果子任务不够小,则继续往下分,直到分割出的子任务足够小。

在 Java 中我们可以使用 ForkJoinTask 类,它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下,我们只需要继承它的子类:

  • RecursiveAction — 用于没有返回结果的任务
  • RecursiveTask — 用于有返回结果的任务

2. 任务执行并返回结果

分割的子任务分别放在双端队列里,然后启动几个线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

在 Java 中任务的执行需要通过 ForkJoinPool 来执行。

三、示例

来一个阿里面试题:百万级 Integer 数据量的一个 array 求和。

public class ArrayCountTask extends RecursiveTask<Long> {
    /**
     * 阈值
     */
    private static final Integer THRESHOLD = 10000;

    private Integer[] array;
    private Integer start;
    private Integer end;

    public ArrayCountTask(Integer[] array, Integer start, Integer end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        // 最小子任务计算
        if (end - start <= THRESHOLD) {
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
        } else {
            // 把大于阈值的任务继续往下拆分,有点类似递归的思维。 recursive 就是递归的意思。
            int middle = (start + end) >>> 1;
            ArrayCountTask leftArrayCountTask = new ArrayCountTask(array, start, middle);
            ArrayCountTask rightArrayCountTask = new ArrayCountTask(array, middle, end);
            // 执行子任务
            //leftArrayCountTask.fork();
            //rightArrayCountTask.fork();

            // invokeAll 方法使用
            invokeAll(leftArrayCountTask, rightArrayCountTask);

            //等待子任务执行完,并得到其结果
            Long leftJoin = leftArrayCountTask.join();
            Long rightJoin = rightArrayCountTask.join();
            // 合并子任务的结果
            sum = leftJoin + rightJoin;
        }

        return sum;
    }
}
    public static void main(String[] args) {
        // 1. 造一个 int 类型的百万级别数组
        Integer[] array = new Integer[150000000];
        for (int i = 0; i < array.length; i++) {
            array[i] = new Random().nextInt(100);
        }
        // 2.普通方式计算结果
        long start = System.currentTimeMillis();
        long sum = 0;
        for (int i = 0; i < array.length; i++) {
            sum += array[i];
        }
        long end = System.currentTimeMillis();
        System.out.println("普通方式计算结果:" + sum + ",耗时:" + (end - start));
        long start2 = System.currentTimeMillis();
        // 3.fork/join 框架方式计算结果
        ArrayCountTask arrayCountTask = new ArrayCountTask(array, 0, array.length);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        sum = forkJoinPool.invoke(arrayCountTask);
        long end2 = System.currentTimeMillis();
        System.out.println("fork/join 框架方式计算结果:" + sum + ",耗时:" + (end2 - start2));

        // 结论:
        // 1. 电脑 i5-4300m,双核四线程
        // 2. 数组量少的时候,fork/join 框架要进行线程创建/切换的操作,性能不明显。
        // 3. 数组量超过 100000000,fork/join 框架的性能才开始体现。

    }

ForkJoinTask 与一般任务的主要区别在于它需要实现 compute 方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用 fork 方法时,又会进入 compute 方法,看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。使用 join 方法会等待子任务执行完并得到其结果。

在执行子任务时调用 fork 方法并不是最佳的选择,最佳的选择是 invokeAll 方法。因为执行 compute() 方法的线程本身也是一个 worker 线程,当对两个子任务调用 fork() 时,这个w

首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇搭建自己的技术博客系列(二)把 .. 下一篇Spring Cloud开发人员如何解决服..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目