设为首页 加入收藏

TOP

Apache Spark Jobs 性能调优(四)
2017-10-17 09:21:13 】 浏览:7936
Tags:Apache Spark Jobs 性能
RDD 呢? 由 textFile 或者 hadoopFile 生成的 RDD 的 partition 个数由它们底层使用的 MapReduce InputFormat 决定的。一般情况下,每读到的一个 HDFS block 会生成一个 partition。通过 parallelize 接口生成的 RDD 的 partition 个数由用户指定,如果用户没有指定则由参数 spark.default.parallelism 决定。

要想知道 partition 的个数,可以通过接口 rdd.partitions().size() 获得。

这里最需要关心的问题在于 task 的个数太小。如果运行时 task 的个数比实际可用的 slot 还少,那么程序解没法使用到所有的 CPU 资源。

过少的 task 个数可能会导致在一些聚集操作时, 每个 task 的内存压力会很大。任何 join,cogroup,*ByKey 操作都会在内存生成一个 hash-map或者 buffer 用于分组或者排序。join, cogroup ,groupByKey 会在 shuffle 时在 fetching 端使用这些数据结构, reduceByKey ,aggregateByKey 会在 shuffle 时在两端都会使用这些数据结构。

当需要进行这个聚集操作的 record 不能完全轻易塞进内存中时,一些问题会暴露出来。首先,在内存 hold 大量这些数据结构的 record 会增加 GC的压力,可能会导致流程停顿下来。其次,如果数据不能完全载入内存,Spark 会将这些数据写到磁盘,这会引起磁盘 IO和排序。在 Cloudera 的用户中,这可能是导致 Spark Job 慢的首要原因。

那么如何增加你的 partition 的个数呢?如果你的问题 stage 是从 Hadoop 读取数据,你可以做以下的选项:
- 使用 repartition 选项,会引发 shuffle;
- 配置 InputFormat 用户将文件分得更小;
- 写入 HDFS 文件时使用更小的block。

如果问题 stage 从其他 stage 中获得输入,引发 stage 边界的操作会接受一个 numPartitions 的参数,比如

val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X)

X 应该取什么值?最直接的方法就是做实验。不停的将 partition 的个数从上次实验的 partition 个数乘以1.5,直到性能不再提升为止。

同时也有一些原则用于计算 X,但是也不是非常的有效是因为有些参数是很难计算的。这里写到不是因为它们很实用,而是可以帮助理解。这里主要的目标是启动足够的 task 可以使得每个 task 接受的数据能够都塞进它所分配到的内存中。

每个 task 可用的内存通过这个公式计算:spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/spark.executor.cores 。 memoryFraction 和 safetyFractio 默认值分别 0.2 和 0.8.

在内存中所有 shuffle 数据的大小很难确定。最可行的是找出一个 stage 运行的 Shuffle Spill(memory) 和 Shuffle Spill(Disk) 之间的比例。在用所有shuffle 写乘以这个比例。但是如果这个 stage 是 reduce 时,可能会有点复杂:

在往上增加一点因为大多数情况下 partition 的个数会比较多。

试试在,在有所疑虑的时候,使用更多的 task 数(也就是 partition 数)都会效果更好,这与 MapRecuce 中建议 task 数目选择尽量保守的建议相反。这个因为 MapReduce 在启动 task 时相比需要更大的代价。

压缩你的数据结构

Spark 的数据流由一组 record 构成。一个 record 有两种表达形式:一种是反序列化的 Java 对象另外一种是序列化的二进制形式。通常情况下,Spark 对内存中的 record 使用反序列化之后的形式,对要存到磁盘上或者需要通过网络传输的record 使用序列化之后的形式。也有计划在内存中存储序列化之后的 record。

spark.serializer 控制这两种形式之间的转换的方式。Kryo serializer,org.apache.spark.serializer.KryoSerializer 是推荐的选择。但不幸的是它不是默认的配置,因为 KryoSerializer 在早期的 Spark 版本中不稳定,而 Spark 不想打破版本的兼容性,所以没有把 KryoSerializer 作为默认配置,但是 KryoSerializer 应该在任何情况下都是第一的选择。

你的 record 在这两种形式切换的频率对于 Spark 应用的运行效率具有很大的影响。去检查一下到处传递数据的类型,看看能否挤出一点水分是非常值得一试的。

过多的反序列化之后的 record 可能会导致数据到处到磁盘上更加频繁,也使得能够 Cache 在内存中的 record 个数减少。点击这里查看如何压缩这些数据。

过多的序列化之后的 record 导致更多的 磁盘和网络 IO,同样的也会使得能够 Cache 在内存中的 record 个数减少,这里主要的解决方案是把所有的用户自定义的 class 都通过 SparkConf#registerKryoClasses 的API定义和传递的。

数据格式

任何时候你都可以决定你的数据如何保持在磁盘上,使用可扩展的二进制格式比如:Avro,Parquet,Thrift或者Protobuf,从中选择一种。当人们在谈论在Hadoop上使用Avro,Thrift或者Protobuf时,都是认为每个 record 保持成一个 Avro/Thrift/Protobuf 结构保存成 sequence file。而不是JSON。

每次当时试图使用JSON存储大量数据时,还是先放弃吧…

原文地址:

  • http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/
  • http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
首页 上一页 1 2 3 4 下一页 尾页 4/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇使用 Eureka 实现服务注册与发现 下一篇一、JAVA环境变量配置详解——Jav..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目