设为首页 加入收藏

TOP

(十)Spark学习笔记之优化
2019-04-20 01:05:29 】 浏览:81
Tags:Spark 学习 笔记 优化

调优

Spark 应用程序的优化涉及到多个方面,包括 spark 应用程序调优、资源调优、网络调优、硬盘调优等。这里主要考虑 “spark 应用程序调优” 和 “资源调优”。

Spark 应用程序调优

避免重复创建 RDD,尽可能复用RDD
开发 Spark 应用程序时,一般的步骤是:首先基于某个数据源创建一个初始的 RDD,接着对此 RDD 执行某个算子操作,得到下一个 RDD,以此类推,最后调用 action 操作得到想要的结果。在此过程中,多个 RDD 形成了 RDD 的血缘关系链(lineage)

对于同一份数据,只应该创建一个 RDD。如果同一份数据被创建成多个 RDD,Spark 作业进行多次重复计算,增加了作业的性能开销。

//错误的做法
val rdd1 = sc.textFile("filePath")
rdd1.map(...)
val rdd2 = sc.textFile("filePath ")
rdd2.reduce(...)
//正确的做法
val rdd1 = sc.textFile("filePath")
rdd1.map(...)
rdd1.reduce(...)

对重复使用的 RDD 进行持久化
当一个 RDD 在之后的计算中,会被重复用到的话,那么就可以对 RDD 进行持久化操作,这样当再次执行 action 操作时,只会从持久化的 RDD 来开始执行,而不会从开始执行,这样就能节省性能开销。如果数据量太多而内存不足时,通过平衡时间与读写 IO 就需要考虑不同的持久化方式了。

尽量避免使用会触发 shuffle 操作的算子
在 Spark 作业运行过程中,最消耗性能的就是 shuffle 过程。shuffle 过程,就是将分布在集群中多个节点上的相同 key 的数据,拉取到同一个节点上,进行聚合或 join 等操作。

Spark shuffle,就是将数据按照 key 梳理好。shuffle 过程中,各个节点上的相同 key 数据都会先写入本地磁盘文件中,然后其它节点需要通过网络传输拉取到各个节点上的磁盘文件中的相同 key。相同 key 都被拉取到同一个节点进行聚合操作,此时有可能会因为某个节点上处理的 key 过多,导致内存不够存放,进而溢写到磁盘文件中。因此 shuffle 过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘 IO 和网络数据传输是 shuffle 性能较差的主要原因。

因此在开发过程中,尽可能避免使用 reduceByKey、join、distinct、repartition 等会出发 shuffle 的算子,尽可能使用 map 类的非 shuffle 算子,尽可能使用广播变量来避免 shuffle、优先选用 reduceByKey、aggregateByKey、combineByKey 替换 groupByKey,因为 reduceByKey 算子内部使用了预聚合操作。

使用高性能算子

  • 使用 mapPartition 代替 map
    mapPartition 类算子,一次函数调用会处理一个 Partition 所有的数据,而不是一次函数调用处理一次(map)。但是有时如使用 mapPartition 会出现 OOM 的问题,因为每次函数调用就需要处理一个 Partition 中的所有数据,如果内存不够,会频繁 GC,可能出现 OOM 异常。

  • 使用 foreachPartition 替换 foreach
    使用 foreachPartition 函数一次调用会处理一个 Partition 的数据。而 foreach 函数一次调用只能处理一次。这就能提高系统性能。

  • filter 之后使用 coalesce 算子

  • 使用 repartitionAndSortWithinPartition 代替 reparation 与 sort 类操作。

将大变量广播出去
开发过程中出现了使用 “外部变量的场景”,就应该使用 Spark 的广播变量功能来提升性能。

在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到 task 中,此时每个 task 都有一个变量副本。如果变量本身较大,那么大量的变量副本在网络中传输的性能开销,以及在各个节点的 executor 中占用过多内存导致的频繁 GC 垃圾回收, 会极大地影响性能。使用 Spark 的广播功能,对该变量进行广播。广播后的变量,每个 Executor 只保留一份变量副本,而 Executor 中的 task 执行时共享该 Executor 中变量副本。这样,就可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对 Executor 内存的占用开销,降低 GC 的频率。

使用 Kryo 序列化方式
在 Spark 系统中,主要涉及到三个地方的序列化:

  1. 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输,此时需要对此变量进行序列化。
  2. 将自定义的类作为 RDD 的泛型类型时,所有自定义类的对象,都会进行序列化。因此这种情况需要自定义的类必须实现 Serializable 接口。
  3. 使用可序列化的持久化策略时,Spark 会将 RDD 中的每个 Partition 都序列化成一个大的字节数组。

序列化可以大大减少数据在内存,硬盘中占用的空间,减少网络数据传输的开销,但是使用数据中,需要将数据进行反序列化,会消耗 CPU,延长程序执行的时间,从而降低 spark 的性能。所以,序列化实际上利用了 “时间换空间” 的方式。

Spark 默认使 Java 序列化机制来进行序列化和反序列化。 Spark 同时支持使用 Kryo 序列化库,Kyro 序列化库中的性能比 Java 序列化库的性能更高些。但是 Kyro 不支持所有对象的序列化,同时 Kryo 需要用户在使用前注册需要序列化的类型,不够方便。

Kryo 相关配置:
在这里插入图片描述
步骤:

  1. 设置 spark 序列化使用库
//使用Kryo序列化库
sparkconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  
  1. 在该库中注册用户定义的类型
//在Kryo序列化库中注册自定义的类集合
sparkconf.set("spark.kryo.registrator", MyKryoRegistrator.class.getName());
  1. MyKryoRegistrator 类实现 KryoRegistrator 接口的 registerClasses方法。
    在这里插入图片描述
    在这里插入图片描述

可以查看默认序列化与Kyro序列化所占空间的差别
Kryo序列化:
在这里插入图片描述
默认序列化:
在这里插入图片描述

进一步压缩
在这里插入图片描述
在这里插入图片描述

使用优化的数据结构
Java 中,有三种类型比较耗费内存:

  • 自定义对象:每个 java 对象都有对象头、引用等额外的信息,因此比较占用内存空间。
  • 字符串:每个字符串内部都有一个字符数组以及长度等额外信息。
  • 集合类型:集合类型内部通常使用内部类来封装集合元素。

因此在 Spark 编码实现中,特别是对于算子函数中的代码,尽量使用字符串替代对象对象,使用原始类型替代字符串,使用数组替代集合类型,这样可以尽可能地减少内存占用,从而降低 GC 频率,提升系能。

数据倾斜调优

现象

如果绝大多数 task 执行都非常快,但是个别 task 执行极慢。比如:总共有 100 个 task,99 个task 都在 1分钟内执行完成,只剩下一个 task 却要更多的时间。这样就可以确认发生了数据倾斜,另外,数据倾斜严重的话,就会发生 OOM 错误,导致 Application 失败。

原因

使用引起 shuffle 的算子,在进行 shuffle 时,必须将各个节点上相同的 key 拉取到某个节点上的一个 task 来进行处理,比如按照 key 进行聚合或 join 等操作。此时如果某个 key 对应的数据量特别大的话,就会发生数据倾斜。比如大部分 key 对应 10 条数据,但是个别 key 却对应了 100 万条数据,那么大部分 task 可能就只会分配到 10 条数据,然后在很短时间内就执行完成了,而个别的 task 可能分配到 100 万条数据,则可能需要运行很久。因此,整个 Spark 作业的运行进度是由运行时间最长的那个 task 决定的。

定位

shuffle 导致了数据倾斜,常见导致 shuffle 的算子:distinct,groupByKey,reduceByKey,aggregateByKey,join,cogroup,reparation 等。因此可以在代码中直接找到相关的算子。

这些算子会产生 shuffle,shuffle 会划分 stage。所以,从 WebUI 中查看发生数据倾斜的 task 发生在哪个 stage中。无论是 spark standalone 模式还是 spark on yarn 模式的应用程序,都可以在 spark history server 中看到详细的执行信息。也可以通过 yarn logs 命令查看详细的日子信息。
在这里插入图片描述
定位了数据倾斜发生后,接着需要分析一个那个执行 shuffle 操作并且导致了数据倾斜的 RDD/Hive 表,查看一下其中 key 的分布情况。 这主要是为了之后选择哪种技术方案提供依据。查看 key 分布的方式:

  1. 如果是 Spark SQL 中的 group by,join语句导致的数据倾斜,那么久查询 SQL 中使用的表的 key 分布情况。
  2. 如果是对 Spark RDD 执行 shuffle 算子导致的数据倾斜,那么可以在 spark 作业中加入查看 key 分布的代码,比如 RDD.countByKey()。然后对统计出来的各个 key 出现的次数,collect/take 到客户端打印,就可以看到 key 的分布情况。

方法

过滤引起数据倾斜的 key

场景:如果发现导致倾斜的 key 就少数几个,而且对计算本身没有太大的影响,那么就适合采用此方法来处理。就比如前文所举例:只有一个 key 对应有 100w 条数据,但是其他数据比之少之又少,从而因为该 key 而导致数据倾斜。

思路:countByKey 确定数据量超多的某个 Key,使用 filter 方法过滤。SparkSQL中使用 where 方法过滤。

此方法实现简单,而且效果也比较好,可以完全避免数据倾斜。但是适用场景不多。在大多数实际情况下,导致倾斜的 key 还是很多,并不是只有少数几个。

提高 shuffle 操作的并行度

场景:无法使用过滤的方法来规避倾斜问题,只有面对数据倾斜的问题。

思路:执行 RDD shuffle 算子时,给 shuffle 算子传入一个参数,比如 reduceByKey(100),该参数设置了这个 shuffle 算子执行时 shuffle read task 的数量。对于 spark Sql 中的 shuffle 类语句,比如 group by,join等,需要设置一个参数,即 spark.sql.shuffle.partititon,该参数代表了 shuffle.read.task 的并行度,该值默认是 200。

此方法虽然实现简单,但是该方法治标不治本。例如某个 key 对应的数据量有 100w,那么无论 task 数量增加多少,这个对应着 100w 数据的 key 肯定还会被分配到一个 task 中来处理,因此还是会发生数据倾斜。

对数据倾斜 key使用随机数,实现两阶段聚合

场景:对 RDD 执行 reduceByKey 等聚合类 shuffle 算子或者在 spark sql 中使用 group by 语句进行分配聚合时,比较使用这种方法。

思路:这个方案核心思路就是进行两段聚合,第一阶段是局部聚合,先给每个 key 都打上一个随机数,比如 10以内的随机数,此时原先一样的 key 就变成不一样了。比如 (hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成 (1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。
第二阶段,接着对打上随机数后的数据,执行 reduceByKey 等聚合操作,进行局部聚合,那么局部聚合的结果,就变成了 (1_hello, 2) (2_hello, 2)。然后将各个 key 的随机数去掉,就会变成 (hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了。比如 (hello, 4)。

如果聚合类的 shuffle 算子导致的数据倾斜,能有效的处理倾斜,但是 join 类的 shuffle 算子就不适合了。

// 第1步,加随机前缀。
JavaPairRDD<String, Long> randomPrefixKeyRdd = pairRdd.mapToPair(
        new PairFunction<Tuple2<Long,Long>, String, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 第2步,局部聚合。
JavaPairRDD<String, Long> firstAggRdd = randomPrefixKeyRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

// 第3步,去除key的随机前缀。
JavaPairRDD<Long, Long> removedrandomPrefixKeyRdd = firstAggRdd.mapToPair(
        new PairFunction<Tuple2<String,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
                    throws Exception {
                long originalKey = Long.valueOf(tuple._1.split("_")[1]);
                return new Tuple2<Long, Long>(originalKey, tuple._2);
            }
        });

// 第4步全局聚合。
JavaPairRDD<Long, Long> secondAggRdd = removedrandomPrefixKeyRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

将 hash shuffle join 转换成 map join

场景:对 RDD 使用 join 类操作,或者是在 spark sql 中使用 join 语句时,而且 join 操作中的一个 RDD 或表的数据量比较小,比较适合此方案。

思路:不使用join算子进行连接操作,而使用 Broadcast 变量与 map 类算子实现 join 操作,进而完全规避 shuffle 类的操作,彻底避免数据倾斜的发生和出现。–其实这也是利用广播变量来处理。

var list1=List(("zhangsan",20),("lisi",22),("wangwu",26))
var list2=List(("zhangsan","spark"),("lisi","kafka"),("zhaoliu","hive"))
var rdd1=sc.parallelize(list1)
var rdd2=sc.parallelize(list2)

//这种方式存在性能问题,join引起shuffle。如何优化?
//rdd1.join(rdd2).collect().foreach( t =>println(t._1+" : "+t._2._1+","+t._2._2))

//使用广播变量对join进行调优 使用场景:join两边的RDD,有一个RDD的数据量比较小,此时可以使用广播变量,将这个小的RDD广播出去,从而将普通的join,装换为map-side join。
val rdd1Data=rdd1.collectAsMap()
val rdd1BC=sc.broadcast(rdd1Data)

val rdd3=rdd2.mapPartitions(partition => {
    val bc=rdd1BC.value
    for{
        (key,value) <-partition
         if(rdd1Data.contains(key))
     }yield(key,(bc.get(key).getOrElse(""),value))
})

对 join 操作导致的数据倾斜,效果非常好。因为不会发生 shuffle,也就不会发生数据倾斜。但是这种场景一般适合一个大 RDD 和 一个小 RDD的情况。

使用 Partitioner 优化 hash shuffle join

为了对两个 RDD 中的数据进行 join,Spark需要将两个 RDD 上的数据拉取到同一个分区。Spark 中 join 的默认实现是 shuffle hash join: 通过使用与第一个数据集相同的默认分区器对第二个数据集进行分区,从而确保每个分区上的数据将包含相同的 key,从而使两个数据集具有相同 hash 的键值位于同一个分区。虽然这个方法总是可以运行,但是此种操作比较耗费资源,因为需要进行一次 shuffle。

如果两个 RDD 都有一个已知的分区,则可以避免 shuffle,如果它们有相同的分区器,则可以使数据在本地被合并,避免网络传输。因此,建议在 join 两个 RDD 之前,调用 Partitionby方法,并且使用相同的分区器。

val partitioner=new HashPartitioner(10)
agesRDD.partitionBy(partitioner)
addressRDD.partitionBy(partitioner)

综合使用上述方法

如果只是处理较为简单的数据倾斜的场景,使用上述某一种方法即可以解决问题。但是如果要处理一个较为复杂的数据倾斜场景,那么需要将多种方法组合起来一起使用。

资源调优

对 Spark 的资源调优,其主要对 Spark 运行过程中各个使用资源的地方,通过调节各种参数来优化资源使用的效率,从而提升 Spark 作业的执行性能。

主要资源的参数调优,如 driver,executor 的内存,CPU core 进行设置。

JVM 内存管理

Java 的堆内存分为两个区域:新生代和老生代。新生代保存的是生命周期较短的对象;老生代保存生命周期较长的对象。

新生代又可以分为三个区域:Eden、from Survivor、to Survivor。

垃圾回收过程:当 Eden 已满时,Eden 运行一次 minor GC,并将 Eden 和 From Survivor 中存在的对象复制到 to Survivor。此时,from Survivor 和 to Survivor 进行交换。如果一个对象足够老,或者 to Survivor 已满,则会移动带老年代。最后当老年代接近满时,会出发 full GC。

  1. 通过收集垃圾回收信息,判断是否有太多的垃圾回收过程。假如 full gc 在一个 task 完成之前触发了好几次,说明运行 task 的内存空间不足,需要考虑增加内存了。
  2. 配置 JVM 相关信息的位置 spark-default.conf
    spark.executor.extraJavaOptions
    spark.driver.extraJavaOptions
    在这里插入图片描述

driver 和 executor 的 JVM 堆内存的大小通过 driver-memory 和 executor.memory 配置项设置。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scala   WordCount 下一篇大数据之   spark介绍和基本..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目