设为首页 加入收藏

TOP

Spark Shuffle原理讲解以及调优【小二讲堂】
2019-04-29 01:08:18 】 浏览:66
Tags:Spark Shuffle 原理 讲解 以及 讲堂
版权声明:本文为博主原创文章,转载请注明出处!!!小二学堂:https://blog.csdn.net/Mirror_w https://blog.csdn.net/Mirror_w/article/details/89637705

一、Spark Shuffle

在DAG调度的过程中,Stage阶段的划分是根据是否有shuffle过程,也就是存在ShuffleDependency宽依赖的时候,需要进行shuffle,这时候会将作业job划分成多个Stage;并且在划分Stage的时候,构建ShuffleDependency的时候进行shuffle注册,获取后续数据读取所需要的ShuffleHandle,最终每一个job提交后都会生成一个ResultStage和若干个ShuffleMapStage,其中ResultStage表示生成作业的最终结果所在的Stage. ResultStage与ShuffleMapStage中的task分别对应着ResultTask与ShuffleMapTask。一个作业,除了最终的ResultStage外,其他若干ShuffleMapStage中各个ShuffleMapTask都需要将最终的数据根据相应的Partitioner对数据进行分组,然后持久化分区的数据。

  • shuffle
    就是在不同stage上的相同key的数据文件都汇聚成一个文件,当然不同节点上都会形成,reduce task拉取数据的时候,将不同节点不同分区中相同key的数据拉取到一起,形成一个文件,这个文件中所有的key是相同的,是所有节点上相同key的集合。
  • 聚合类型
    – shuffle write:上一个stage的map task就必须保障自己处理的当前分区的数据相同的key写入到同一个分区文件中
    – shuffle read:reduce task就会从上一个stage的所有task所在的节点上寻找属于自己的分区文件。这样就可以保证每一个key,所对应的value,都会汇聚到同一个节点上去处理。
    在这里插入图片描述

上图是由task任务执行的stage中每个partition中的数据最后溢写成不同的小文件
在HashShuffle没有优化之前,每一个ShufflleMapTask会为每一个ReduceTask创建一个bucket缓存,并且会为每一个bucket创建一个文件。这个bucket存放的数据就是经过Partitioner操作(默认是HashPartitioner)之后找到对应的bucket然后放进去,最后将数据刷新bucket缓存的数据到磁盘上,即对应的block file。
然后ShuffleMapTask将输出作为MapStatus发送到DAGScheduler的MapOutputTrackerMaster,每一个MapStatus包含了每一个ResultTask要拉取的数据的位置和大小

ResultTask然后去利用BlockStoreShuffleFetcher向MapOutputTrackerMaster获取MapStatus,看哪一份数据是属于自己的,然后底层通过BlockManager将数据拉取过来

拉取过来的数据会组成一个内部的ShuffleRDD,优先放入内存,内存不够用则放入磁盘,然后ResulTask开始进行聚合,最后生成我们希望获取的那个MapPartitionRDD
缺点:
如上图所示:在这里有1个worker,2个executor,每一个executor运行2个ShuffleMapTask,有三个ReduceTask,所以总共就有4 * 3=12个bucket和12个block file。
#如果数据量较大,将会生成MR个小文件,比如ShuffleMapTask有100个,ResultTask有100个,这就会产生100100=10000个小文件
#bucket缓存很重要,需要将ShuffleMapTask所有数据都写入bucket,才会刷到磁盘,那么如果Map端数据过多,这就很容易造成内存溢出,尽管后面有优化,bucket写入的数据达到刷新到磁盘的阀值之后,就会将数据一点一点的刷新到磁盘,但是这样磁盘I/O就多了
在这里插入图片描述
在task任务进行向内存中溢写的时候,首先在executor中会根据reduce的个数分配三个缓冲区,task处理的数据,进入executor时会写入buffer缓冲区中,这里会通过key.hashCode%numBuffer,用key的哈希值对Buffer缓冲区的个数取模进行分配数据取那个缓冲区,这个Buffer缓冲区的大小默认是32k,当buffer缓冲区中的数据写满32k时,会将数据进行溢写到磁盘上,溢写成一个磁盘文件,这样当一个task执行完毕时,就会形成很多个task磁盘文件,当task很多时,这样形成的小文件就是特别多的,这样就会造成很多问题:产生的文件个数:MR(map个数reduce个数)

1.造成了IO量的剧增,当reduce进行拉取数据时,效率也就低下了。
2.初次之外还在进行溢写磁盘文件时,肯定会创建大量对象的,这里GC压力过大,导致OOM。
3.进行拉取数据时,首先可定会进行大量的连接,大连接数过多时,肯定会造成连接的中断问题过多。这样会造成taskScheduler进行重试(默认重试3次),三次不成功,DAGScheduler进行重新计算分配Task任务,重新跑task任务,这样的效率是极为低下的。

二、Spark 1.6.x

对上面问题进行的优化
1.HashShuffle
在这里插入图片描述
由上图可以看出运行原理基本和上面相同,就是在进行想缓冲去中写文件时进行了优化,由一个executor中的两个task分别共用一个buffer,这样两个task任务会将取模后相同的buffer缓冲区的数据写到一块 ,最后溢写成对应的一个磁盘小文件。减少了至少一半的磁盘小文件。虽然减少了磁盘小文件但是,在面对大数据的巨量数据集下,这种优化还是有着很大的问题的,和上面一样的问题。
磁盘文件个数:ER(一个Executor产生的小文件个数Reduce的个数)。

三、Sort Shuffle(Spark 2.3.x,Spark 1.6.x)

1.普通运行机制

在这里插入图片描述
在task进行运行的时候,有一个估算机制,会加你估算执行task任务之前所需要的缓冲区大小。
在申请内存的时候,估算机制是:2*估算值-当前。executor会专门开启一块内存用于后续内存的申请。
首先task会将数据一块内存中,然后达到一定的阈值之后进行溢写文件,溢写文件的时候,将数据向事先估算好的内存缓冲中,但是在这之前会将数据的key的hashCode对缓冲区的个数进行取模,这样根据取模的结果进行分配不同的数据到不同的缓冲区中,写到之前首先会进行排序,这里的排序和MapReduce中的分区排序一样。将数据加载到不同的内存缓冲区中。并且到内存缓冲区的值达到一定阈值后会溢写成响应的磁盘文件,最后当map task 执行完毕的时候,会将所有缓冲区中的文件进行溢写到对应的磁盘文件上。当mpa task执行完毕之后会通过归并排序将所有的数据文件合并成一个大文件,这里比如有0,1,2好对应的的文件,则会将不同的分区文件拉取到一起并且0号对应的文件会在大文件的前面,1号在后面,并且归并拉取的同时会创建一个索引文件,来记录数据的位置。
最后由reduce task会将0号对应的磁盘文件拉取过来,包括不同节点上的磁盘文件。

  • 总结:
    在执行过程中map task产生的磁盘小文件是2*M个,即一个map task会产生两个磁盘小文件,即一个是数据文件、另一个是数据文件对应的索引文件。
    这样大大减少了磁盘文件,磁盘IO访问量,以及数据拉取时的网络IO访问量。

2…bypass优化机制

在这里插入图片描述
和sortshuffle机制的运行原理相同,只不过少了排序的过程,这里是针对不需要进行排序的需求进行的运行机制。比如repartition,它只是进行重新分区而不需要文件排序。
这里的运行效率就大大提高了

  • 使用bypass的条件:
    1.reduce端不能有聚合类型的操作。
    2.reduce task的个数必须要小于spark.shuffle.sort.bypassMergeThreshold 参数的值默认是200个,如果reducetask个数过多时,可以设置这个参数的值。

四、Spark Shuffle总结

Shuffle 过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。
shuffle作为处理连接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle一般为shuffle的Write阶段,reduce端的shuffle一般为shuffle的read阶段。Hadoop和spark的shuffle在实现上面存在很大的不同,spark的shuffle分为两种实现,分别为HashShuffle和SortShuffle。

HashShuffle又分为普通机制和合并机制,普通机制因为其会产生MR个数的巨量磁盘小文件而产生大量性能低下的Io操作,从而性能较低,因为其巨量的磁盘小文件还可能导致OOM,HashShuffle的合并机制通过重复利用buffer从而将磁盘小文件的数量降低到CoreR个,但是当Reducer 端的并行任务或者是数据分片过多的时候,依然会产生大量的磁盘小文件。

SortShuffle也分为普通机制和bypass机制,普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能。

在Spark 1.6以前,默认的shuffle计算引擎是HashShuffleManager,因为HashShuffleManager会产生大量的磁盘小文件而性能低下,在Spark 1.6以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

细节解释

  • shuffle中的定时器:
    定时器会检查内存数据结构的大小,如果内存数据结构空间不够,那么会申请额外的内存,申请的大小满足如下公式:
    applyMemory=nowMenory2-oldMemory
    申请的内存=当前的内存情况
    2-上一次的内嵌情况
    意思就是说内存数据结构的大小的动态变化,如果存储的数据超出内存数据结构的大小,将申请内存数据结构存储的数据*2-内存数据结构的设定值的内存大小空间。申请到了,内存数据结构的大小变大,内存不够,申请不到,则发生溢写。
  • 排序
    在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。
  • 溢写
    排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
  • merge
    一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
    SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。
    1)block file= 2M
    一个map task会产生一个索引文件和一个数据大文件
  1. m*r>2m(r>2):SortShuffle会使得磁盘小文件的个数再次的减少

五、Spark shuffle调优

  • spark.shuffle.file.buffer
    默认值:32k
    参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
    调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
  • spark.reducer.maxSizeInFlight
    默认值:48m
    参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
    调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
  • spark.shuffle.io.maxRetries
    默认值:3
    参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
    调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
  • spark.shuffle.io.retryWait
    默认值:5s
    参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
    调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。
  • spark.shuffle.memoryFraction
    默认值:0.2
    参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
    调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
  • spark.shuffle.manager
    默认值:sort
    参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
    调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。
  • spark.shuffle.sort.bypassMergeThreshold
    默认值:200
    参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
    调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。
  • spark.shuffle.consolidateFiles
    默认值:false
    参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
    调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

MapReduce shuffle讲解:https://blog.csdn.net/Mirror_w/article/details/89421705
小二讲堂:https://blog.csdn.net/Mirror_w
Spark讲堂:https://blog.csdn.net/Mirror_w/article/details/89408567

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇        .. 下一篇Spark之   Spark Streaming..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目