设为首页 加入收藏

TOP

[Spark优化]--spark-1.6.0调优指南
2019-05-11 02:27:32 】 浏览:61
Tags:Spark 优化 --spark-1.6.0 指南

Spark优化


数据序列化

其他考虑事项

总结

由于大多数Spark计算的内存性质,Spark程序可能会受到群集中任何资源(CPU,网络带宽或内存)的瓶颈。大多数情况下,如果数据在内存中,瓶颈就是网络带宽,但有时候,您还需要进行一些调整,例如序列化形式存储RDD,以减少内存使用量。本指南将涵盖两个主要主题:数据序列化,这对于良好的网络性能至关重要,并且还可以减少内存使用以及内存调整。我们还描述了几个较小的主题。

数据序列化

序列化在任何分布式应用程序的性能中都扮演着重要的角色。将对象序列化或者消耗大量字节的格式会大大减慢计算速度。通常,这将是您应该优化Spark应用程序的第一件事。Spark旨在在便利性(允许您使用操作中的任何Java类型)和性能之间取得平衡。它提供了两个序列化库:

  • Java序列化:默认情况下,Spark使用JavaObjectOutputStream框架对对象进行序列化,并且可以与您创建的实现的任何类一起使用java.io.Serializable。您还可以通过扩展来更紧密地控制序列化的性能java.io.ExternalizableJava序列化是灵活的,但通常非常缓慢,并导致许多类的大型序列化格式。
  • Kryo序列化Spark还可以使用Kryo库(版本2)更快地序列化对象。KryoJava序列化速度更快且更紧凑(通常高达10倍),但不支持所有Serializable类型,并且需要您事先注册将在程序中使用的类以获得最佳性能。

通过使用SparkConf初始化作业并调用,

您可以切换到使用Kryoconf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")。此设置将序列化器配置为不仅在工作站节点之间混洗数据,而且还将RDD序列化到磁盘时使用。Kryo不是默认的唯一原因是由于自定义注册要求,但我们建议在任何网络密集型应用程序中尝试它。

Spark自动包含Kryo序列化程序,用于从Twitter chill库中AllScalaRegistrar中涵盖的许多常用核心Scala类。

要使用Kryo注册您自己的自定义类,请使用该registerKryoClasses方法。

val conf =newSparkConf().setMaster(...).setAppName(...)

conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

val sc =newSparkContext(conf)


所述KRYO文档描述了更先进的注册选项,如添加自定义序列的代码。

如果你的对象很大,你可能还需要增加spark.kryoserializer.buffer配置。该值需要足够大以容纳您要序列化的最大对象。

最后,如果你没有注册你的自定义类,Kryo仍然可以工作,但它将不得不为每个对象存储完整的类名,这是浪费的。

内存调优

使用内存调优有三个方面的考虑:你的对象所使用的内存的(你可能希望你的整个数据集,以适应在内存中),访问这些对象的成本,和垃圾回收的开销(如果你有高成交对象的条款)。

默认情况下,Java对象的访问速度很快,但与其字段中的原始数据相比,可以轻松消耗多达2-5倍的空间。这是由于几个原因:

  • 每个不同的Java对象都有一个对象头,大约有16个字节,并包含诸如指向其类的指针等信息。对于数据非常少的对象(比如说一个Int字段),这可能比数据更大。
  • JavaString在原始字符串数据上有大约40个字节的开销(因为它们将它存储在一个Chars数组中,并保留诸如长度的额外数据),并且由于UTF-16的内部使用而将每个字符存储为两个字节String编码。因此一个10个字符的字符串可以很容易地消耗60个字节
  • 通用集合类,例如HashMapLinkedList,使用链接数据结构,其中每个条目都有一个包装器对象(例如Map.Entry)。这个对象不仅有一个头,而且还有指向列表中下一个对象的指针(通常每个8个字节)。
  • 基本类型的集合通常将它们存储为盒装对象,如java.lang.Integer

本节首先概述Spark的内存管理,然后讨论用户可以在他/她的应用程序中更有效地使用内存的具体策略。具体来说,我们将介绍如何确定对象的内存使用情况,以及如何改进它 - 可以通过更改数据结构,也可以通过以序列化格式存储数据。然后我们将介绍调整Spark的缓存大小和Java垃圾回收器。

内存管理概述

Spark中的内存使用大部分属于两类:执行和存储。执行内存是指用于在混洗,连接,排序和聚合中进行计算的内存,而存储内存指的是用于跨群集缓存和传播内部数据的内存。在Spark中,执行和存储共享统一区域(M)。当不使用执行内存时,存储可以获取所有可用内存,反之亦然。如有必要,执行可能会驱逐存储空间,但只能在总存储内存使用量低于特定阈值(R)时才执行。换句话说,R描述了M缓存块永远不会被驱逐的分区域。由于实施的复杂性,存储可能不会执行。

这种设计确保了几种理想的性能首先,不使用缓存的应用程序可以使用整个空间执行,避免不必要的磁盘溢出。其次,使用高速缓存的应用程序可以保留最小的存储空间(R),使其数据块不会被驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而不需要用户在内部划分内存的专业知识。

虽然有两种相关配置,但一般用户不需要调整它们,因为默认值适用于大多数工作负载:

  • spark.memory.fraction表示MJVM堆空间 - 300MB)的一部分(默认值为0.75)的大小。其余空间(25%)保留给用户数据结构,Spark中的内部元数据,以及在稀疏和异常大的记录情况下防止OOM错误。
  • spark.memory.storageFraction表示大小RM(默认0.5)的一部分。RM缓存块不受执行驱逐的缓存块的存储空间。

确定内存消耗

调整数据集所需的内存消耗量的最佳方式是创建RDD,将其放入缓存,然后查看Web UI中的存储页面。该页面会告诉你RDD占用了多少内存。

要估计特定对象的内存消耗,请使用SizeEstimator'sestimate方法这对于尝试使用不同的数据布局来调整内存使用情况以及确定广播变量在每个执行程序堆上占用的空间量非常有用。

优化数据结构

减少内存消耗的第一种方法是避免增加开销的Java功能,例如基于指针的数据结构和包装对象。做这件事有很多种方法:

1.设计你的数据结构来优先选择对象数组和基本类型,而不是标准的JavaScala集合类(例如HashMap)。该fastutil库提供方便的集合类基本类型是与Java标准库兼容。

2.尽可能避免使用大量小对象和指针的嵌套结构。

3.考虑使用数字ID或枚举对象而不是键的字符串。

4.如果RAM少于32 GB,请将JVM标志设置-XX:+UseCompressedOops为使指针为4个字节而不是8个。您可以在这些选项中添加spark-env.sh

http://spark.apache.org/docs/1.6.0/spark-standalone.html#cluster-launch-scripts

序列化RDD存储

如果您的对象仍然过大,无法进行高效存储,但是使用RDD持久性API的序列化存储级别(例如,)来减少内存使用的一种更简单的方法是将它们以序列化的形式存储。Spark然后将每个RDD分区存储为一个大字节数组。以序列化格式存储数据的唯一缺点是访问时间较慢,这是由于必须快速反序列化每个对象。如果你想以序列化的形式缓存数据,我们强烈推荐使用Kryo,因为它比Java序列化(当然也比原始的Java对象)要小得多。MEMORY_ONLY_SER

垃圾收集调优

当您的程序存储RDD时,JVM垃圾收集可能会成为问题。(一次读取RDD然后对其执行许多操作的程序通常不是问题。)当Java需要驱逐旧对象以腾出空间给新的对象时,它需要跟踪所有的Java对象并查找未使用的。这里要记住的要点是垃圾回收的成本与Java对象的数量成正比,因此使用较少对象的数据结构(例如Ints而不是a的数组LinkedList)会大大降低此成本。更好的方法是以序列化的形式保存对象,如上所述:现在只有一个对象(一个字节数组)对每个RDD分区。在尝试其他技术之前,如果GC是一个问题,首先要尝试使用序列化缓存

由于任务的工作内存(运行任务所需的空间量)与缓存在节点上的RDD之间的干扰,GC也可能成为问题。我们将讨论如何控制分配给RDD缓存的空间来缓解这一问题。

测量GC的影响

GC调优的第一步是收集统计垃圾收集的频率和GC花费的时间。这可以通过添加-verbose:gc-XX:+PrintGCDetails -XX:+PrintGCTimeStampsJava选项来完成。(有关将Java选项传递给Spark作业的信息,请参阅配置指南。)下次运行Spark作业时,每次发生垃圾回收时都会在工作人员的日志中看到消息。请注意,这些日志将位于群集的工作节点上(stdout位于其工作目录中的文件中),而不是驱动程序中。

高级GC调整

为了进一步调整垃圾收集,我们首先需要了解JVM中有关内存管理的一些基本信息:

  • Java堆空间分为两个区域,分别是YoungOld。年轻一代是为了容纳短寿命的物体,而老一代则是为了寿命更长的物体。
  • 年轻一代进一步分为三个区域[Eden, Survivor1, Survivor2]
  • 垃圾收集过程的简单描述:Eden已满时,在Eden上运行次要GC,并将从EdenSurvivor 1中存活的物件复制到Survivor 2。Survivor区域被交换。如果对象足够旧或Survivor2已满,则将其移至Old。最后,当Old接近满时,将调用完整的GC。
  • SparkSpark中进行调优的目标是确保只有长寿命的RDD才被存储在老一代中,并且Young generation的大小足以存储短期对象。这将有助于避免完整的GC收集任务执行期间创建的临时对象。一些可能有用的步骤是:
  • 通过收集GC统计信息来检查是否有太多的垃圾收集。如果在任务完成之前多次调用完整的GC,则意味着没有足够的内存可用于执行任务。
  • 在打印的GC统计信息中,如果OldGen接近满,则通过降低来减少用于缓存的内存量spark.memory.storageFraction;缓存更少的对象比减慢任务执行更好!
  • 如果有太多次要收集,但没有太多主要GC,为Eden分配更多内存将会有所帮助。您可以将Eden的大小设置为高估每个任务需要多少内存。如果确定Eden的规模E,那么您可以使用该选项来设置年轻一代的规模-Xmn=4/3*E。(比例增加4/3是为了解释Survivor地区的空间。)
  • 例如,如果您的任务正在从HDFS读取数据,则可以使用从HDFS读取的数据块的大小来估计该任务使用的内存量。请注意,解压缩块的大小通常是块大小的23倍。因此,如果我们希望有34个任务的工作空间值,并且HDFS块大小为64 MB,我们可以估计Eden的大小4*3*64MB
  • 监视垃圾收集所花费的频率和时间如何随新设置发生变化。

我们的经验表明,GC调整的效果取决于您的应用程序和可用内存量。在线描述的调谐选项还有很多,但在较高层次上,管理全面GC发生的频率可以帮助减少开销。

其他考虑事项

并行度水平

除非您将每项操作的并行度设置得足够高,否则集群将无法充分利用。Spark根据其大小自动设置每个文件上运行的映射任务的数量(尽管可以通过可选参数SparkContext.textFile等来控制它),而对于分布式的“reduce”操作,比如groupByKeyreduceByKey,它使用最大的父级RDD的分区数量。您可以将并行性级别作为第二个参数传递(请参阅spark.PairRDDFunctions文档),或者将config属性设置spark.default.parallelism为更改默认值。一般来说,我们建议您的群集中每个CPU核心有2-3个任务。

减少任务的内存使用

有时候,你会得到一个OutOfMemoryError,不是因为你的RDD不适合内存,而是因为你的一个任务的工作集,比如其中的一个reduce任务groupByKey,太大了。spark的洗排操作(sortByKeygroupByKeyreduceByKeyjoin,等)建立每个任务中的哈希表来进行分组,而这往往是大的。这里最简单的解决方法是增加并行度,以便每个任务的输入集合更小。Spark可以有效地支持短至200毫秒的任务,因为它可以在多个任务中重复使用一个执行器JVM,并且任务启动成本较低,因此您可以安全地将并行度提高到超过集群内核的数量。

广播大变量

使用可用的广播功能SparkContext可以大大减少每个序列化任务的大小以及通过群集启动作业的成本。如果你的任务使用了驱动程序中的任何大对象(如静态查找表),考虑把它变成一个广播变量。Spark会在主服务器上打印每个任务的序列化大小,因此您可以查看以确定您的任务是否太大;一般来说大于20KB的任务可能值得优化。

数据本地化

数据局部性可能会对Spark作业的性能产生重大影响。如果数据和在其上运行的代码在一起,那么计算就会很快。但是,如果代码和数据是分开的,就必须转移到另一个。通常,由于代码大小比数据小得多,因此将序列化代码从一个地方运送到另一个地方比一个数据块更快。Spark围绕这个数据局部性的一般原理构建调度表。

数据局部性是数据与代码处理的距离。根据数据的当前位置有几个级别的地点。从最近到最远的顺序:

  • PROCESS_LOCAL数据与运行代码在同一个JVM中。这是可能的最佳地点
  • NODE_LOCAL数据在同一个节点上。示例可能位于同一节点上的HDFS中,也可能位于同一节点上的另一个执行器中。这比PROCESS_LOCAL数据必须在进程之间传输要慢一点
  • NO_PREF数据可以从任何地方快速访问,并且没有地区偏好
  • RACK_LOCAL数据位于同一台服务器上。数据位于同一机架上的不同服务器上,因此需要通过网络进行发送,通常通过一台交换机进行发送
  • ANY数据位于网络上的其他位置,而不在同一个机架中

Spark更喜欢在最好的地点级别安排所有任务,但这并非总是可行。在任何空闲执行程序中没有未处理的数据的情况下,Spark会切换到较低的本地级别。有两种选择:

a)等待一个繁忙的CPU释放,以便在同一台服务器上的数据上启动一项任务

b)立即在远离需要移动数据的地方开始一项新任务。

Spark通常所做的就是等待繁忙CPU释放的希望。一旦该超时到期,它就开始将数据从远处移动到空闲的CPU。每个级别之间回退的等待超时可以单独配置或全部一起配置为一个参数;有关详细信息,请参阅配置页面spark.locality上的参数。如果您的任务很长并且看到地点不好,则应该增加这些设置,但默认情况下通常效果不错。

总结

这是一个简短的指南,指出您在调整Spark应用程序时应该了解的主要问题 - 最重要的是数据序列化和内存调优。对于大多数程序,切换到Kryo序列化并以序列化形式保存数据将解决最常见的性能问题。随意在Spark邮件列表上询问其他调整最佳实践。

原文:http://spark.apache.org/docs/1.6.0/tuning.html

参考:https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇java之快速排序 下一篇maven常用命令

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目