设为首页 加入收藏

TOP

Apache Spark Jobs 性能调优(二)
2017-10-17 09:21:13 】 浏览:7939
Tags:Apache Spark Jobs 性能
据通过网络传递一遍,而后者只需要根据每个key 局部的 partition 累积结果,在 shuffle 的之后把局部的累积值相加后得到结果。
  • 当输入和输入的类型不一致时,避免使用 reduceByKey。举个例子,我们需要实现为每一个key查找所有不相同的 string。一个方法是利用 map 把每个元素的转换成一个 Set,再使用 reduceByKey 将这些 Set 合并起来
  • rdd.map(kv => (kv._1, new Set[String]() + kv._2))
    .reduceByKey(_ ++ _)

    这段代码生成了无数的非必须的对象,因为每个需要为每个 record 新建一个 Set。这里使用 aggregateByKey 更加适合,因为这个操作是在 map 阶段做聚合。

    val zero = new collection.mutable.Set[String]()
    rdd.aggregateByKey(zero)(
    (set, v) => set += v,
    (set1, set2) => set1 ++= set2)
    • 避免 flatMap-join-groupBy 的模式。当有两个已经按照key分组的数据集,你希望将两个数据集合并,并且保持分组,这种情况可以使用 cogroup。这样可以避免对group进行打包解包的开销。

    什么时候不发生 Shuffle

    当然了解在哪些 transformation 上不会发生 shuffle 也是非常重要的。当前一个 transformation 已经用相同的patitioner 把数据分 patition 了,Spark知道如何避免 shuffle。参考一下代码:

    rdd1 = someRdd.reduceByKey(...)
    rdd2 = someOtherRdd.reduceByKey(...)
    rdd3 = rdd1.join(rdd2)

    因为没有 partitioner 传递给 reduceByKey,所以系统使用默认的 partitioner,所以 rdd1 和 rdd2 都会使用 hash 进行分 partition。代码中的两个 reduceByKey 会发生两次 shuffle 。如果 RDD 包含相同个数的 partition, join 的时候将不会发生额外的 shuffle。因为这里的 RDD 使用相同的 hash 方式进行 partition,所以全部 RDD 中同一个 partition 中的 key的集合都是相同的。因此,rdd3中一个 partiton 的输出只依赖rdd2和rdd1的同一个对应的 partition,所以第三次shuffle 是不必要的。

    举个例子说,当 someRdd 有4个 partition, someOtherRdd 有两个 partition,两个 reduceByKey 都使用3个partiton,所有的 task 会按照如下的方式执行:

    如果 rdd1 和 rdd2 在 reduceByKey 时使用不同的 partitioner 或者使用相同的 partitioner 但是 partition 的个数不同的情况,那么只有一个 RDD (partiton 数更少的那个)需要重新 shuffle。

    相同的 tansformation,相同的输入,不同的 partition 个数:

    当两个数据集需要 join 的时候,避免 shuffle 的一个方法是使用 broadcast variables。如果一个数据集小到能够塞进一个executor 的内存中,那么它就可以在 driver 中写入到一个 hash table中,然后 broadcast 到所有的 executor 中。然后map transformation 可以引用这个 hash table 作查询。

    什么情况下 Shuffle 越多越好

    尽可能减少 shuffle 的准则也有例外的场合。如果额外的 shuffle 能够增加并发那么这也能够提高性能。比如当你的数据保存在几个没有切分过的大文件中时,那么使用 InputFormat 产生分 partition 可能会导致每个 partiton 中聚集了大量的record,如果 partition 不够,导致没有启动足够的并发。在这种情况下,我们需要在数据载入之后使用 repartiton (会导致shuffle)提高 partiton 的个数,这样能够充分使用集群的CPU。

    另外一种例外情况是在使用 recude 或者 aggregate action 聚集数据到 driver 时,如果数据把很多 partititon 个数的数据,单进程执行的 driver merge 所有 partition 的输出时很容易成为计算的瓶颈。为了缓解 driver 的计算压力,可以使用reduceByKey 或者 aggregateByKey 执行分布式的 aggregate 操作把数据分布到更少的 partition 上。每个 partition中的数据并行的进行 merge,再把 merge 的结果发个 driver 以进行最后一轮 aggregation。查看 treeReduce 和treeAggregate 查看如何这么使用的例子。

    这个技巧在已经按照 Key 聚集的数据集上格外有效,比如当一个应用是需要统计一个语料库中每个单词出现的次数,并且把结果输出到一个map中。一个实现的方式是使用 aggregation,在每个 partition 中本地计算一个 map,然后在 driver中把各个 partition 中计算的 map merge 起来。另一种方式是通过 aggregateByKey 把 merge 的操作分布到各个partiton 中计算,然后在简单地通过 collectAsMap 把结果输出到 driver 中。

    二次排序

    还有一个重要的技能是了解接口 repartitionAndSortWithinPartitions transformation。这是一个听起来很晦涩的transformation,但是却能涵盖各种奇怪情况下的排序,这个 transformation 把排序推迟到 shuffle 操作中,这使大量的数据有效的输出,排序操作可以和其他操作合并。

    举例说,Apache Hive on Spark 在join的实现中,使用了这个 transformation 。而且这个操作在 secondary sort 模式中扮演着至关重要的角色。secondary sort 模式是指用户期望数据按照 key 分组,并且希望按照特定的顺序遍历 value。使用 repartitionAndSortWithinPartitions 再加上一部分用户的额外的工作可以实现 secondary sort。

    在这篇文章中,首先完成在 Part I 中提到的一些东西。作者将尽量覆盖到影响 Spark 程序性能的方方面面,你们将会了解到资源调优,或者如何配置 Spark 以压榨出集群每一分资源。然后我们将讲述调试并发度,这是job性能中最难也是最重要的参数。最后,你将了解到数据本身的表达形式,Spark 读取在磁盘的上的形式(主要是Apache Avro和 Apache Parquet)以及当数据需要缓存或者移动的时候内存中的数据形式。

    调试资源分配

    Spark 的用户邮件邮件列表中经常会出现 “我有一个500个节点的集群,为什么但是我的应用一次只有两个 task 在执行”,鉴于 Spark 控制资源使用的参数的数量,这些问题不应该出现。但是在本章中,你将学会压榨出你集群的每一分资源。推荐的配置将根据不同的集群管理系统( YARN、M

    首页 上一页 1 2 3 4 下一页 尾页 2/4/4
    】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
    上一篇使用 Eureka 实现服务注册与发现 下一篇一、JAVA环境变量配置详解——Jav..

    最新文章

    热门文章

    Hot 文章

    Python

    C 语言

    C++基础

    大数据基础

    linux编程基础

    C/C++面试题目