设为首页 加入收藏

TOP

Spark知识点总结--持续更新
2018-12-14 09:06:23 】 浏览:198
Tags:Spark 知识点 总结 持续 更新
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/bingdianone/article/details/84785019

spark有哪些组件

(1)master:管理集群和节点,不参与计算。
(2)worker:计算节点,进程本身不参与计算,和master汇报。
(3)Driver:运行程序的main方法,创建spark context对象。
(4)spark context:控制整个application的生命周期,包括dagsheduler和task scheduler等组件。
(5)client:用户提交程序的入口。

spark中的模块

Spark Core:

  • 包含Spark基本功能,包括任务调度,内存管理,容错机制等
  • 内部定义了RDDs(弹性分布式数据集)
  • 提供了许多APIs来创建和操作这些RDDs
  • 为其他组件提供底层服务

Spark SQL

  • Spark处理结构化数据的库,类似Hive SQL

Spark Streaming

  • 提供了API处理实时数据流
  • 企业中用来从Kafka接收数据做实时统计

Mlib

  • 机器学习包
  • 支持集群上的横向扩展

Graphx

  • 图处理库,进行图的并行计算
  • 提供了常用的图算法,eg:PageRank

Cluster Managers

  • 集群管理,Spark自带一个集群管理是单独调度器
  • 常见的集群管理包括Hadoop YARN,Apache Mesos

spark工作机制

答:用户在client端提交作业后,会由Driver运行main方法并创建spark context上下文。
执行add算子,形成dag图输入dagscheduler,按照add之间的依赖关系划分stage输入task scheduler。 task scheduler会将stage划分为task set分发到各个节点的executor中执行。

1. Spark术语

Spark框架图如下:
在这里插入图片描述
Application:用户编写的应用程序,用户自定义的Spark程序,用户提交后,Spark为App分配资源将程序转换并执行。
Driver Program:运行Application的main()函数并且创建SparkContext。
SparkContext:是用户逻辑与Spark集群主要的交互接口,它会和Cluster Manager进行交互,进行资源的申请,任务的分配与监控,SparkContext代表Driver。
Worker Node:从节点,集群中可以运行应用程序的节点,负责控制计算节点,启动Executor或Driver。在YARN模式中为NodeManager,负责计算节点的控制。
Executor:执行器,是为某Application运行在worker node上的一个进程,负责执行task,该进程里面会通过线程池的方式负责运行任务,并负责将数据存在内存或者磁盘上。每个Application拥有独立的一组executors。
RDD DAG:当RDD遇到Action算子,将之前的所有算子形成一个有向无环图(DAG)。再在Spark中转化为Job,提交到集群进行执行。一个App可以包含多个Job。
Task:被Executor执行的工作单元,是运行Application最小的单位,多个task组合成一个stage,Task的调度和管理由TaskScheduler负责,一个分区对应一个Task,Task执行RDD中对应Stage中所包含的算子。Task被封装好后放入Executor的线程池中执行。
Job:一个RDD Graph触发的作业,往往由Spark Action算子触发,在SparkContext中通过runJob()向Spark提交Job。包含多个Task组成的并行计算。
Stage:每个Job的Task被拆分成很多组Task, 作为一个TaskSet,命名为Stage。Stage的调度和划分由DAGScheduler负责。Stage又分为Shuffle Map Stage和Result Stage两种。Stage的边界就在发生Shuffle的地方。
RDD:Spark的基本数据操作抽象,可以通过一系列算子进行操作。RDD是Spark最核心的东西,可以被分区、被序列化、不可变、有容错机制,并且能并行操作的数据集合。存储级别可以是内存,也可以是磁盘。
DAG Scheduler:根据Job构建基于Stage的DAG(有向无环任务图),并提交Stage给TaskScheduler。
TaskScheduler:将Task分发给Executor执行。将Stage提交给Worker(集群)运行,每个Executor运行什么在此分配。
SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。
共享变量:Application在整个运行过程中,可能需要一些变量在每个Task中都使用,共享变量用于实现该目的。Spark有两种共享变量:一种缓存到各个节点的广播变量;一种只支持加法操作,实现求和的累加变量。
宽依赖:或称为ShuffleDependency, 宽依赖需要计算好所有父RDD对应分区的数据,然后在节点之间进行Shuffle。
窄依赖:或称为NarrowDependency,指某个RDD,其分区partition x最多被其子RDD的一个分区partion y依赖。窄依赖都是Map任务,不需要发生shuffle。因此,窄依赖的Task一般都会被合成在一起,构成一个Stage。

2.工作原理

Spark基本工作原理,这里我们从宏观讲解Spark的基本工作原理,帮助你全面了解布局,站在一个高度去理解每个算子任务的操作原理,才能有效的把握变化中的状态,通过实际原理图来说明,来理解程序入口的客户端、集群处理流程、读取数据的来源、最终计算结果何去何从等问题。
在这里插入图片描述
1、客户端:
客户端也就是专业们常说的Client端,这里的是表示我们在本地编写Spark程序,然后必须找一个能够连接Spark集群,并提交程序进行运行的机器。

2、读取数据:
在准备运行Spark程序的同时,是不是也要有数据来源进行处理的呢,这里我们介绍几种常见的读取数据来源,是Hadoop集群中的HDFS、Hive也有可能是搭建在集群上的HBase;还有MySQL等DB数据库;或者是在程序中我们设置的集合数据。
在这里插入图片描述

3、Spark分布式集群:
Spark集群是一种分布式计算、是一种迭代式计算、是一种基于内存计算。
分布式计算,这是Spark最基本的特征,计算时候数据会分布存放到各个集群节点,来并行分布式计算。如图的第一个操作map,是对于节点1、2、3上面的数据进行map算子操作,处理后的数据可能会转移到其他节点的内存中,这里假设到了4、5、6节点,处理后的数据有可能多或是变少,这个需要看我们具体的处理方式。第二个操作reduce,是将map处理后的数据再次进行处理。
这也就得到Spark是一种迭代式计算模型,一次计算逻辑中可以分为N个阶段,上一个阶段结果数据成为了下一个阶段的输入数据,这样就不只是像Hadoop中的MapReduce计算一样了,只有两个阶段map和reduce,就结束一个job任务的运行,得落地到HDFS。而Spark在各个阶段计算转换中一直保持基于内存迭代式计算,所以Spark相对于MapReduce来说计算模型可以提供更加强大的计算逻辑功能,从而也大大的提高计算效率。

4、结果数据输出:
这里我们介绍几种输出方式,基于Hadoop的HDFS、Hive或是HBase;MySQL等DB数据;或是直接输出返回给客户端。

Spark工作的一个流程

在这里插入图片描述
1、spark-submit 提交了应用程序的时候,提交spark应用的机器会通过反射的方式,创建和构造一个Driver进程,Driver进程执行Application程序,
2、Driver根据sparkConf中的配置初始化SparkContext,在SparkContext初始化的过程中会启动DAGScheduler和taskScheduler
3、taskSheduler通过后台进程,向Master注册Application,Master接到了Application的注册请求之后,会使用自己的资源调度算法,在spark集群的worker上,通知worker为application启动多个Executor。
4、Executor会向taskScheduler反向注册。
5、Driver完成SparkContext初始化
6、application程序执行到Action时,就会创建Job。并且由DAGScheduler将Job划分多个Stage,每个Stage 由TaskSet 组成
7、DAGScheduler将TaskSet提交给taskScheduler
8、taskScheduler把TaskSet中的task依次提交给Executor
9、Executor在接收到task之后,会使用taskRunner来封装task(TaskRuner主要将我们编写程序,也就是我们编写的算子和函数进行拷贝和反序列化),然后,从Executor的线程池中取出一个线程来执行task。就这样Spark的每个Stage被作为TaskSet提交给Executor执行,每个Task对应一个RDD的partition,执行我们的定义的算子和函数。直到所有操作执行完为止。
在这里插入图片描述

Spark运行原理图

在这里插入图片描述

Spark的Shuffle原理及调优?

spark的shuffleManager是负责shuffle过程的执行、计算和处理的组件。shuffleManager是trait,主要实现类有两个:HashShuffleManager和SortShuffleManager。

val shortShuffleMgrNames =Map(
"hash"->"org.apache.spark.shuffle.hash.HashShuffleManager",
"sort"->"org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort"->"org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager","sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

HashShuffleManager:

在这里插入图片描述
shuffle write阶段,默认Mapper阶段会为Reducer阶段的每一个Task单独创建一个文件来保存该Task中要使用的数据。
(1)map task的计算结果,会根据分区器(default:HashPartitioner)来决定写入到哪一个磁盘小文件中
(2)reduce task 会去map端拉取相应的小文件
产生磁盘小文件的个数公式:M(map task的个数)*R(reduce task的个数)

优点:就是操作数据简单。
缺点:但是在一些情况下(例如数据量非常大的情况)会造成大量文件(M*R,其中M代表Mapper中的所有的并行任务数量,R代表Reducer中所有的并行任务数据)大数据的随机磁盘I/O操作且会形成大量的Memory(极易造成OOM)。

磁盘小文件过多会有什么问题?
1、在Shuffle write过程会产生很多的写磁盘的对象
2、在Shuffle read过程会产生很多的读磁盘的对象
3、在数据传输过程中,会有频繁的网络通信
在JVM堆内存中对象过多会造成频繁的GC;GC还是无法解决运行所需要的内存的话,就会oom;频繁的网络通信,会出现通信故障的可能性大大增加了,一旦网络通信出现了故障,就会出现如下的错误
Shuffle file connot find由于这个错误导致的task失败,那么TaskScheduler不负责重试,由DAGScheduler负责重试stage

HashShuffleManager产生的问题:

第一:不能够处理大规模的数据
第二:Spark不能够运行在大规模的分布式集群上!

改进方案:Consolidate机制:
spark.shuffle.consolidateFiles 该参数默认值为false,将其设置为true即可开启优化机制
后来的改善是加入了Consolidate机制来将Shuffle时候产生的文件数量减少到CR个(C代表在Mapper端,同时能够使用的cores数量,R代表Reducer中所有的并行任务数量)。但是此时如果Reducer端的并行数据分片过多的话则CR可能已经过大,此时依旧没有逃脱文件打开过多的厄运!!!Consolidate并没有降低并行度,只是降低了临时文件的数量,此时Mapper端的内存消耗就会变少,所以OOM也就会降低,另外一方面磁盘的性能也会变得更好。

在这里插入图片描述
开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

前提:每个Excutor分配1个cores,假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

SortShuffle

在Mapper中的每一个ShuffleMapTask中产生两个文件:Data文件和Index文件,其中Data文件是存储当前Task的Shuffle输出的。而index文件中则存储了Data文件中的数据通过Partitioner的分类信息,此时下一个阶段的Stage中的Task就是根据这个Index文件获取自己所要抓取的上一个Stage中的ShuffleMapTask产生的数据的,Reducer就是根据index文件来获取属于自己的数据。

涉及问题:Sorted-based Shuffle:会产生 2*M(M代表了Mapper阶段中并行的Partition的总数量,其实就是ShuffleMapTask的总数量)个Shuffle临时文件。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。
在这里插入图片描述
普通机制Sort-based Shuffle的流程
(1) map task的计算结果会写入到一个内存数据结构里面,内存数据结构默认是5M
(2) 在shuffle的时候会有一个定时器,不定期的去估算这个内存数据结构的大小,如果现在内存数据结构的大小是5.01M,那么它会申请5.01*2-5=5.02M内存给内存数据结构
(3) 如果申请成功,不会进行溢写
(4) 如果申请不成功,这个时候就会有溢写的过程
(5) 在溢写之前,会将内存数据结构里面的数据进行排序,以及分区
(6) 然后开始写磁盘,写磁盘是以bacth的形式去写,一个batch是1W条数据
(7) Map task执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引
(8) Reduce task去map端拉数据的时候,首先解析索引文件,根据索引文件再去拉去属于它自己的数据
产生磁盘小文件的公式:2M(M代表了Mapper阶段中并行的Partition的总数量,其实就是ShuffleMapTask的总数量)

默认Sort-based Shuffle的几个缺陷
1)如果Mapper中Task的数量过大,依旧会产生很多小文件,此时在Shuffle传递数据的过程中到Reducer端,reduce会需要同时打开大量的记录来进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至崩溃!
2)如果需要在分片内也进行排序的话,此时需要进行Mapper端和Reducer端的两次排序!!!
优化:
可以改造Mapper和Reducer端,改框架来实现一次排序。
在这里插入图片描述
bypass运行机制
1、spark.shuffle.sort.bypassMergeThreshold 默认值为200 ,如果shuffle read task的数量小于这个阀值200,则不会进行排序。
2、或者使用hashbasedshuffle + consolidateFiles 机制

上图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件如下:

  1. shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
    这个参数仅适用于SortShuffleManager,如前所述,SortShuffleManager在处理不需要排序的Shuffle操作时,由于排序带来性能的下降。这个参数决定了在这种情况下,当Reduce分区的数量小于多少的时候,在SortShuffleManager内部不使用Merge Sort的方式处理数据,而是与Hash Shuffle类似,直接将分区文件写入单独的文件,不同的是,在最后一步还是会将这些文件合并成一个单独的文件。这样通过去除Sort步骤来加快处理速度,代价是需要并发打开多个文件,所以内存消耗量增加,本质上是相对HashShuffleMananger一个折衷方案。这个参数的默认值是200个分区,如果内存GC问题严重,可以降低这个值。

  2. 不是聚合类的shuffle算子(比如reduceByKey)。
    此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
    该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
    而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

总结
有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。tungsten-sort慎用,存在bug.

spark shuffle参数调优

spark.shuffle.file.buffer

  • 默认值:32k
  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.reducer.maxSizeInFlight

  • 默认值:48m
  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
  • Reduce task去map拉数据,reduce 一边拉数据一边聚合 reduce端有一块聚合内存(executor memory)
  • 解决方法:
    (1)增加reduce聚合的内存比例 设置spark.Shuffle.memoryFraction
    (2)增加executor memory的大小 –executor-memory 5G
    (3)减少reduce task每次拉取的数据量 设置
    spark.reducer.maxSizeInFlight 24m

spark.shuffle.io.maxRetries

  • 默认值:3
  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
  • 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

spark.shuffle.io.retryWait

  • 默认值:5s
  • 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction

  • 默认值:0.2
  • 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

  • 默认值:sort
  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
  • 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

  • 默认值:false
  • 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
  • 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

spark如何保证宕机迅速恢复

处于Standby状态的Master在接收到org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent发送的ElectedLeader消息后,就开始通过ZK中保存的Application,Driver和Worker的元数据信息进行故障恢复了,它的状态也从RecoveryState.STANDBY变为RecoveryState.RECOVERING了。当然了,如果没有任何需要恢复的数据,Master的状态就直接变为RecoveryState.ALIVE,开始对外服务了。
一方面Master通过 恢复Application,Driver和Worker的状态,
beginRecovery(storedApps, storedDrivers, storedWorkers)
一方面通过在60s后主动向自己发送CompleteRecovery的消息,开始恢复数据完成后的操作。

recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,CompleteRecovery)

首先看一下如何通过ZooKeeperLeaderElectionAgent提供的接口恢复数据。
在这里插入图片描述
获取了原来的Master维护的Application,Driver和Worker的列表后,当前的Master通过beginRecovery来恢复它们的状态。 恢复Application的步骤: 置待恢复的Application的状态为UNKNOWN,向AppClient发送MasterChanged的消息 AppClient收到后改变其保存的Master的信息,包括URL和Master actor的信息,回复MasterChangeAcknowledged(appId) Master收到后通过appId后将Application的状态置为WAITING 检查如果所有的worker和Application的状态都不是UNKNOWN,那么恢复结束,调用completeRecovery() 恢复Worker的步骤: 重新注册Worker(实际上是更新Master本地维护的数据结构),置状态为UNKNOWN 向Worker发送MasterChanged的消息 Worker收到消息后,向Master回复 消息WorkerSchedulerStateResponse,并通过该消息上报executor和driver的信息。 Master收到消息后,会置该Worker的状态为ALIVE,并且会检查该Worker上报的信息是否与自己从ZK中获取的数据一致,包括executor和driver。一致的executor和driver将被恢复。对于Driver,其状态被置为RUNNING。 检查如果所有的worker和Application的状态都不是UNKNOWN,那么恢复结束,调用completeRecovery() beginRecovery的源码实现:

  def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
      storedWorkers: Seq[WorkerInfo]) {
    for (app <- storedApps) { // 逐个恢复Application
      logInfo("Trying to recover app: " + app.id)
      try {
        registerApplication(app)
        app.state = ApplicationState.UNKNOWN
        app.driver ! MasterChanged(masterUrl, masterWebUiUrl) //向AppClient发送Master变化的消息,AppClient会回复MasterChangeAcknowledged
      } catch {
        case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
      }
    }
 
    for (driver <- storedDrivers) {
      // Here we just read in the list of drivers. Any drivers associated with now-lost workers
      // will be re-launched when we detect that the worker is missing.
      drivers += driver // 在Worker恢复后,Worker会主动上报运行其上的executors和drivers从而使得Master恢复executor和driver的信息。
    }
 
    for (worker <- storedWorkers) { //逐个恢复Worker
      logInfo("Trying to recover worker: " + worker.id)
      try {
        registerWorker(worker) //重新注册Worker
        worker.state = WorkerState.UNKNOWN
        worker.actor ! MasterChanged(masterUrl, masterWebUiUrl) //向Worker发送Master变化的消息,Worker会回复WorkerSchedulerStateResponse
      } catch {
        case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
      }
    }
  }

通过下面的流程图可以更加清晰的理解这个过程
在这里插入图片描述
如何判断恢复是否结束? 在上面介绍Application和Worker的恢复时,提到了每次收到他们的回应,都要检查是否当前所有的Worker和Application的状态都不为UNKNOWN,如果是,那么恢复结束,调用completeRecovery()。这个机制并不能完全起作用,如果有一个Worker恰好也是宕机了,那么该Worker的状态会一直是UNKNOWN,那么会导致上述策略一直不会起作用。这时候第二个判断恢复结束的标准就其作用了:超时机制,选择是设定了60s得超时,在60s后,不管是否有Worker或者AppClient未返回相应,都会强制标记当前的恢复结束。对于那些状态仍然是UNKNOWN的app和worker,Master会丢弃这些数据。具体实现如下:
在这里插入图片描述
但是对于一个拥有几千个节点的集群来说,60s设置的是否合理?毕竟现在没有使用Standalone模式部署几千个节点的吧?因此硬编码60s看上去也十分合理,毕竟都是逻辑很简单的调用,如果一些节点60S没有返回,那么下线这部分机器也是合理的。 通过设置spark.worker.timeout,可以自定义超时时间。

spark基本工作原理

在这里插入图片描述
driver向worker进程提交资源请求,worker会启动多个executor进程为driver分配资源,executor启动后会向driver进行反注册,以便driver知道自己启动的资源的情况。
driver向executor提交task(map/reduce等),executor启动多个task线程来执行转换/动作算子。

Spark on yarn

在这里插入图片描述

client运行模式

1、在客户端通过Spark-submit提交一个Application
2、在客户端上启动一个Driver进程
3、 Driver启动完成后, client会向RS (ResourceManager)发送请求(给我找一台NM,我要启动AM)
4、RS接受到了请求,找到某一台NM了, Rs会向NM进程发送一条消息(给我启动一个Container容器,我要启动AM进程)
5、AM已经启动了, AM会向RS发送请求(给我一批资源,我要运行Application)
6、RS接受了请求,给他找了一批N回给AM
7、AM会向这一批MM发送消息(你给我启动一个Container,我要启动Executor)
8、Executor会反向注册给客户端里启动的Driver进程
9、 Driver就有了一批计算进程(Executor)
10、 Driver就可以发送task到Executor里面去执行了。

总结:Applicationmaster作用:
1、 为当前的Application申请资源
2、 给NM发送消息;启动Container(一组计算单位)Executor

Cluster运行模式

client vs cluster

1、client模式Driver在客户端启动 测试
2、cluster模式Driver是在yarn集群中某一台NM中启动生产环境
3、ApplicationMaster在不同的模式下作用不一样:
  ApplicationMaster在client模式下:
    (1)为当前的Application申请资源
    (2)给NM发送消息, NM启动Container(一组计算资源的单位) Executor
  ApplicationMaster在cluster模式下:
    (1)为当前的Application申请资源
    (2)给NM发送消息, NM启动container(一组计算资源的单位)Executor
    (3)任务调度

注意事项:

1、 yarn集群所在的节点必须有spark的安装包
2、 Spark跑在Yarn集群上,不需要启动Spark standalone集群,不需要master worker这一些节点
master->RS
Worker->NM
在这里插入图片描述

RDD机制

rdd分布式弹性数据集,简单的理解成一种数据结构,是spark框架上的通用货币。
所有算子都是基于rdd来执行的,不同的场景会有不同的rdd实现类,但是都可以进行互相转换。
rdd执行过程中会形成dag图,然后形成lineage保证容错性等。
从物理的角度来看rdd存储的是block和node之间的映射。

RDD弹性的分布式数据集五大特性
1、他有一系列的Partition组成的
2、每一个算子作用在每一个partition上
3、 rdd之间是有依赖关系的
4、可选项:分区器作用在KV格式的RDD上
 (1)分区器是在shuffle阶段起作用
 (2) GroupByKey, reduceByKey, join, sortBykey等这些算子会产生shuffle
 (3)这些算子必须作用在KV格式的RDD
5、 RDD会提供一系列最佳计算位置,说白了就是暴露每一个partitior的位置这是数据本地化的基础
在这里插入图片描述

RDD持久化原理

Spark 中一个很重要的能力是将数据持久化(或称为缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点的其它分区都可以使用 RDD在内存中进行计算,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10倍)。缓存是迭代算法和快速的交互式使用的重要工具。

RDD 可以使用persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。

另外,每个持久化的 RDD 可以使用不同的存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(可以节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别通过传递一个 **StorageLevel 对象(Scala、Java、Python)给 persist() 方法进行设置。cache()**方法是使用默认存储级别的快捷设置方法,默认的存储级别是 StorageLevel.MEMORY_ONLY(将反序列化的对象存储到内存中)。详细的存储级别介绍如下 :

  • MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
  • MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
  • MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会没有序列化对象的方式节省很多空间,尤其是在使用 fast serializer时会节省更多的空间,但是在读取时会增加 CPU 的计算负担。
  • MEMORY_AND_DISK_SER : 类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。
  • DISK_ONLY : 只在磁盘上缓存 RDD。
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
  • OFF_HEAP(实验中): 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory,这需要启动 off-heap 内存。

注意,在 Python 中,缓存的对象总是使用 Pickle 进行序列化,所以在 Python 中不关心你选择的是哪一种序列化级别。python 中的存储级别包括 MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY和 DISK_ONLY_2
在 shuffle 操作中(例如 reduceByKey),即便是用户没有调用 persist 方法,Spark 也会自动缓存部分中间数据。这么做的目的是,在 shuffle 的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist方法。

如何选择存储级别

Spark 的存储级别的选择,核心问题是在内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择 :

  • 如果使用默认的存储级别(MEMORY_ONLY),存储在内存中的 RDD 没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高 CPU 的效率,可以使在 RDD 上的操作以最快的速度运行。
  • 如果内存不能全部存储 RDD,那么使用 MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。
  • 除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。
  • 如果想快速还原故障,建议使用多副本存储级别(例如,使用 Spark 作为 web 应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。

删除数据

Spark 自动监控各个节点上的缓存使用率,并以最近最少使用的方式(LRU)将旧数据块移除内存。如果想手动移除一个 RDD,而不是等待该 RDD 被 Spark自动移除,可以使用 RDD.unpersist()方法。

不使用RDD的持久化
在这里插入图片描述
1、默认情况下,对于大量数据的action操作都是非常耗时的。可能一个操作就耗时1个小时;
2、在执行action操作的时候,才会触发之前的操作的执行,因此在执行第一次count操作时,就会从hdfs中读取一亿数据,形成lines RDD;
3、第一次count操作之后,我们的确获取到了hdfs文件的行数。但是lines RDD其实会被丢弃掉,数据也会被新的数据丢失;

所以,如果不用RDD的持久化机制,可能对于相同的RDD的计算需要重复从HDFS源头获取数据进行计算,这样会浪费很多时间成本;

RDD持久化的原理

在这里插入图片描述

  1. Spark非常重要的一个功能特性就是可以将RDD持久化在内存中。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition。这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。
  2. 巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。
  3. 要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition
  4. cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中清楚缓存,那么可以使用unpersist()方法。
  5. Spark自己也会在shuffle操作时,进行数据的持久化,比如写入磁盘,主要是为了在节点失败时,避免需要重新计算整个过程。

checkpoint检查点机制

一个Streaming应用程序要求7天24小时不间断运行,因此必须适应各种导致应用程序失败的场景。Spark Streaming的检查点具有容错机制,有足够的信息能够支持故障恢复。支持两种数据类型的检查点:元数据检查点和数据检查点。

(1)元数据检查点,在类似HDFS的容错存储上,保存Streaming计算信息。这种检查点用来恢复运行Streaming应用程序失败的Driver进程。

(2)数据检查点,在进行跨越多个批次合并数据的有状态操作时尤其重要。在这种转换操作情况下,依赖前一批次的RDD生成新的RDD,随着时间不断增加,RDD依赖链的长度也在增加,为了避免这种无限增加恢复时间的情况,通过周期检查将转换RDD的中间状态进行可靠存储,借以切断无限增加的依赖。使用有状态的转换,如果updateStateByKey或者reduceByKeyAndWindow在应用程序中使用,那么需要提供检查点路径,对RDD进行周期性检查。

元数据检查点主要用来恢复失败的Driver进程,而数据检查点主要用来恢复有状态的转换操作。无论是Driver失败,还是Worker失败,这种检查点机制都能快速恢复。许多Spark Streaming都是使用检查点方式。但是简单的Streaming应用程序,不包含状态转换操作不能运行检查点;从Driver程序故障中恢复可能会造成一些收到没有处理的数据丢失。

为了让一个Spark Streaming程序能够被恢复,需要启用检查点,必须设置一个容错的、可靠的文件系统(如HDFS、S3等)路径保存检查点信息,同时设置时间间隔。

streamingContext.checkpoint(checkpointDirectory)//checkpointDirectory
是一个文件系统路径(最好是一个可靠的比如hdfs://…) dstream.checkpoint(checkpointInterval)//设置时间间隔
当程序第一次启动时,创建一个新的StreamingContext,接着创建所有的数据流,然后再调用start()方法。

//定义一个创建并设置StreamingContext的函数
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...)               //创建StreamingContext实例
val DsSream = ssc.socketTextStream(...)      //创建DStream
...
ssc.checkpoint(checkpointDirectory)           //设置检查点机制
ssc
}
//从检查点数据重建或者新建一个StreamingContext
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreate-Context_)
//在context需要做额外的设置完成,不考虑是否被启动或重新启动
context. ...
//启动context
context.start()
context.awaitTermination()

通过使用getOrCreate创建StreamingContext。
当程序因为异常重启时,如果检查点路径存在,则context将从检查点数据中重建。如果检查点目录不存在(首次运行),将会调用functionToCreateContext函数新建context函数新建context,并设置DStream。

但是,Streaming需要保存中间数据到容错存储系统,这个策略会引入存储开销,进而可能会导致相应的批处理时间变长,因此,检查点的时间间隔需要精心设置。采取小批次时,每批次检查点可以显著减少操作的吞吐量;相反,检查点太少可能会导致每批次任务大小的增加。对于RDD检查点的有状态转换操作,其检查点间隔默认设置成DStream的滑动间隔的5~10倍。

故障恢复可以使用Spark的Standalone模式自动完成,该模式允许任何Spark应用程序的Driver在集群内启动,并在失败时重启。而对于YARN或Mesos这样的部署环境,则必须通过其他的机制重启Driver。

checkpoint和持久化机制的区别

1.持久化只是将数据保存在BlockManager中,而RDD的lineage是不变的。但是checkpoint执行完后,RDD已经没有之前所谓的依赖RDD了,而只有一个强行为其设置的checkpointRDD,RDD的lineage改变了。

2.持久化的数据丢失可能性更大,磁盘、内存都可能会存在数据丢失的情况。但是checkpoint的数据通常是存储在如HDFS等容错、高可用的文件系统,数据丢失可能性较小。

注:默认情况下,如果某个RDD没有持久化,但是设置了checkpoint,会存在问题,本来这个job都执行结束了,但是由于中间RDD没有持久化,checkpoint job想要将RDD的数据写入外部文件系统的话,需要全部重新计算一次,再将计算出来的RDD数据checkpoint到外部文件系统。所以,建议对checkpoint()的RDD使用persist(StorageLevel.DISK_ONLY),该RDD计算之后,就直接持久化到磁盘上。后面进行checkpoint操作时就可以直接从磁盘上读取RDD的数据,并checkpoint到外部文件系统。

Spark streaming以及基本工作原理

接收实时输入数据流,然后将数据拆分成多个batch,比如每收集1秒的数据封装为一个batch,然后将每个batch交给Spark的计算引擎进行处理,最后会生产出一个结果数据流,其中的数据,也是由一个一个的batch所组成的。
在这里插入图片描述

DStream以及基本工作原理

DStream是Spark Streaming提供的一种高级抽象,英文全称为Discretized Stream,中文翻译为离散流,它代表了一个持续不断的数据流。DStream可以通过输入数据源(比如从Flume、Kafka中)来创建,也可以通过对其他DStream应用高阶函数(map,flatmap)来创建。

在内部实现上,DStream由一组时间序列上连续的RDD来表示,RDD是Spark Core的核心抽象,即不可变的、分布式的数据集,DStream中的每个RDD都包含了一个时间段内的数据 对DStream应用的算子,在底层会被转换为对DStream中每个RDD的操作 底层原理为,对DStream中每个时间段的RDD都应用一遍算子操作,然后生成新的RDD,即作为新的DStream中的那个时间段的RDD 经过一系列算子操作之后,最终可以将实时计算的结果存储到相关介质中,如Redis、HBase、MySQL。

根据这个流程也可以得出Spark Streaming编程的步骤:

  1. 创建输入的数据流Dstream
  2. 对DStream进行各种算子操作,得到新的DStream
  3. 将处理完的结果存储到存储介质中

批处理时间间隔
Spark Streaming中,数据采集是逐条进行的,而数据处理是按批进行的 Spark Streaming中会先设置好批处理的时间间隔。当达到批处理时间间隔的时候就会把采集到的数据汇总起来成为一批数据交给系统去处理

spark核心编程原理

1、首先我们搭建好了spark集群
2、客户端与spark集群建立连接,之后才能提交spark应用程序
3、spark提交应用程序到spark集群上
4、Spark与MapReduce最大的不同在于,迭代式计算模型: MapReduce,分为两个阶段,map和reduce,两个阶段完了,就结束了。所以我们在一个job里能做的处理很有限,只能在map和reduce里处理。
Spark,计算模型,可以分为n个阶段,因为它是内存迭代式的。我们在处理完一个阶段以后,可以继续往下处理很多个阶段,而不只是两个阶段。所以,Spark相较于MapReduce来说,计算模型可以提供更强大的功能。

Spark的核心编程是什么?

其实,就是: 首先,
第一,定义初始的RDD,就是说,你要定义第一个RDD是从哪里,读取数据,hdfs、linux本地文件、程序中的集合。
第二,定义对RDD的计算操作,这个在spark里称之为算子,map、reduce、flatMap、groupByKey,比mapreduce提供的map和reduce强大的太多太多了。 第三,其实就是循环往复的过程,第一个计算完了以后,数据可能就会到了新的一批节点上,也就是变成一个新的RDD。然后再次反复,针对新的RDD定义计算操作。。。。
第四,最后,就是获得最终的数据,将数据保存起来。 每一批节点上的每一批数据,实际上就是一个RDD!!!一个RDD是分布式的,所以数据都散落在一批节点上了,每个节点都存储了RDD的部分partition。
在这里插入图片描述

RDD的弹性表现在哪几点

1)自动的进行内存和磁盘的存储切换;
2)基于Linage的高效容错;
3)task如果失败会自动进行特定次数的重试;
4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
5)checkpoint和persist,数据计算之后持久化缓存
6)数据调度弹性,DAG TASK调度和资源无关
7)数据分片的高度弹性,a.分片很多碎片可以合并成大的,b.par

RDD有哪些缺陷

1)不支持细粒度的写和更新操作(如网络爬虫),spark写数据是粗粒度的
所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是说可以一条条的读
2)不支持增量迭代计算,Flink支持

RDD创建有哪几种方式

1).使用程序中的集合创建rdd
2).使用本地文件系统创建rdd
3).使用hdfs创建rdd,
4).基于数据库db创建rdd
5).基于Nosql创建rdd,如hbase
6).基于s3创建rdd,
7).基于数据流,如socket创建rdd
如果只回答了前面三种,是不够的,只能说明你的水平还是入门级的,实践过程中有很多种创建方式。

RDD通过Linage(记录数据更新)的方式为何很高效

1)lazy记录了数据的来源,RDD是不可变的,且是lazy级别的,且rDD之间构成了链条,lazy是弹性的基石。由于RDD不可变,所以每次操作就产生新的rdd,不存在全局修改的问题,控制难度下降,所有有计算链条将复杂计算链条存储下来,计算的时候从后往前回溯900步是上一个stage的结束,要么就checkpoint
2)记录原数据,是每次修改都记录,代价很大如果修改一个集合,代价就很小,官方说rdd是粗粒度的操作,是为了效率,为了简化,每次都是操作数据集合,写或者修改操作,都是基于集合的rdd的写操作是粗粒度的,rdd的读操作既可以是粗粒度的也可以是细粒度,读可以读其中的一条条的记录。
3)简化复杂度,是高效率的一方面,写的粗粒度限制了使用场景,如网络爬虫,现实世界中,大多数写是粗粒度的场景

spark性能优化有哪些

通过spark-env文件、程序中sparkconf和set property设置。
(1)计算量大,形成的lineage过大应该给已经缓存了的rdd添加checkpoint,以减少容错带来的开销。
(2)小分区合并,过小的分区造成过多的切换任务开销,使用repartition。

Overview

Spark的瓶颈一般来自于集群(standalone, yarn, mesos, k8s)的资源紧张,CPU,网络带宽,内存。通过都会将数据序列化,降低其内存memory和网络带宽shuffle的消耗。
Spark的性能,想要它快,就得充分利用好系统资源,尤其是内存和CPU:核心思想就是能用内存cache就别spill落磁盘,CPU 能并行就别串行,数据能local就别shuffle。

开发调优

  1. 避免创建重复的RDD
     比如多次读可以persist;但如果input太大,persist可能得不偿失
  2. 尽可能复用同一个RDD
     但是如果rdd的lineage太长,最好checkpoint下来,避免长重建
  3. 对多次使用的RDD进行持久化
     持久化级别(SER,MEM,DISK,_N)
  4. 尽量避免使用shuffle类算子
     shuffle算子如distinct(实际调用reduceByKey)、reduceByKey、aggregateByKey、sortByKey、groupByKey、join、cogroup、repartition等,入参中会有一个并行度参数numPartitions
     shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key
  5. 使用map-side预聚合的shuffle操作
     reduceByKey(combiner),groupByKey(没有combiner)
    在这里插入图片描述
    without combiner
    在这里插入图片描述
  6. 使用高性能的算子
     使用reduceByKey/aggregateByKey替代groupByKey
     使用mapPartitions替代普通map
      特别是在写DB的时候,避免每条写记录都new一个connection;推荐是每个partition new一个connection;更好的是new connection池,每个partition从中取即可,减少partitionNum个new的消耗
     使用foreachPartitions替代foreach
     使用filter之后进行coalesce操作
      减少小文件数量
     使用repartitionAndSortWithinPartitions替代repartition与sort类操作
      一边进行重分区的shuffle操作,一边进行排序
  7. 广播大变量
     广播变量是executor内所有task共享的,避免了每个task自己维护一个变量,OOM
  8. 使用Kryo优化序列化性能
  9. 优化数据结构
     原始类型(Int, Long)
     字符串,每个字符串内部都有一个字符数组以及长度等额外信息
     对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间
     集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry
     尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能

资源参数调优

在这里插入图片描述
spark runtime architecture From Spark in Action
Client:客户端进程,负责提交作业
Driver/SC:运行应用程序/业务代码的main()函数并且创建SparkContext,其中创建SparkContext的目的是为了准备Spark应用程序的运行环境。在Spark中由SparkContext负责和ClusterManager/ResourceManager通信,进行资源的申请、任务的分配和监控等;当Executor部分运行完毕后,Driver负责将SparkContext关闭。通常用SparkContext代表Drive
o SparkContext:整个应用程序的上下文,控制应用的生命周期
o DAGScheduler: 实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中
o TaskScheduler:分配Task到Executor上执行,并维护Task的运行状态
Executor:应用程序Application运行在Worker节点上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上。在Spark on Yarn模式下,其进程名称为CoarseGrainedExecutorBackend,负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task。每个CoarseGrainedExecutorBackend能并行运行Task的数量就取决于分配给它的CPU的个数
Job:一个job包含多个RDD及作用于相应RDD上的各种Operation。每执行一个action算子(foreach, count, collect, take, saveAsTextFile)就会生成一个 job
Stage:每个Job会被拆分很多组Task,每组Task被称为Stage,亦称TaskSet。一个作业job分为多个阶段stages(shuffle,串行),一个stage包含一系列的tasks(并行)
Task:被送往各个Executor上的执行的内容,task之间无状态传递,可以并行执行

  1. client向YARN的ResourceManager/RM申请启动ApplicationMaster/AM(单个应用程序/作业的资源管理和任务监控)
  2. RM收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,spark在此启动其AM,其中AM进行SparkContext/SC/Driver初始化启动并创建RDD Object、DAGScheduler、TASKScheduler
  3. SC根据RDD的依赖关系构建DAG图,并将DAG提交给DAGScheduler解析为stage。Stages以TaskSet的形式提交给TaskScheduler,TaskScheduler维护所有TaskSet,当Executor向Driver发送心跳时,TaskScheduler会根据其资源剩余情况分配相应的Task,另外TaskScheduler还维护着所有Task的运行状态,重试失败了的Task
  4. AM向RM申请container资源,资源到位后便与NodeManager通信,要求它在获得的Container中(executor)启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向AM中的SC注册并申请Task
  5. AM中的SC分配Task给CoarseGrainedExecutorBackend/executor执行,CoarseGrainedExecutorBackend运行Task并向AM汇报运行的状态和进度,以便让AM随时掌握各个task的运行状态,从而可以在任务失败时重新启动任务或者推测执行
  6. 应用程序运行完成后,AM向RM申请注销并关闭自己

调优

  1. executor配置
    spark.executor.memory
    spark.executor.instances
    spark.executor.cores
  2. driver配置
    spark.driver.memory(如果没有collect操作,一般不需要很大,1~4g即可)
    spark.driver.cores
  3. 并行度
    spark.default.parallelism (used for RDD API)
    spark.sql.shuffle.partitions (usef for DataFrame/DataSet API)
  4. 网络超时
    spark.network.timeout (所有网络交互的默认超时)
  5. 数据本地化
    spark.locality.wait
  6. JVM/gc配置
    spark.executor.extraJavaOptions
    spark.driver.extraJavaOptions

数据倾斜调优

在这里插入图片描述
数据倾斜,key=hello过多

  1. 使用Hive ETL预处理数据
    治标不治本(利用了mr的走disk特性),还多了一条skew pipeline
  2. 过滤少数导致倾斜的key
    但有些场景倾斜是常态
  3. 提高shuffle操作的并行度
    让每个task处理比原来更少的数据(之前可能task会%parNum分到2个key),但是如果单key倾斜,方法失效
    在这里插入图片描述
    单个task分到的key少了
  4. 两阶段聚合(局部聚合+全局聚合)
    附加随机前缀 -> 局部聚合 -> 去除随机前缀 -> 全局聚合
    适用于聚合类shuffle(计算sum,count),但是对于join类shuffle不适用在这里插入图片描述
    两阶段聚合
  5. 将reduce join转为map join
    适用于join类shuffle,因为shuffle变成map操作了
    只适用于一个大表和一个小表,将小表广播,并不适合两个都是大表
  6. 使用随机前缀和扩容RDD进行join
    leftDf添加随机前缀(1~N的);复制rightDf每条record至N条并依次打上前缀(1~N)
    缺点是复制后的rightDf增大了N-1倍

Shuffle调优

shuffle原理

  • Spark在DAG阶段以宽依赖shuffle为界,划分stage,上游stage做map task,每个map task将计算结果数据分成多份,每一份对应到下游stage的每个partition中,并将其临时写到磁盘,该过程叫做shuffle write
  • 下游stage做reduce task,每个reduce task通过网络拉取上游stage中所有map task的指定分区结果数据,该过程叫做shuffle read,最后完成reduce的业务逻辑
  • 下图中,上游stage有3个map task,下游stage有4个reduce task,那么这3个map task中每个map task都会产生4份数据。而4个reduce task中的每个reduce task都会拉取上游3个map task对应的那份数据

在这里插入图片描述
shuffle From iteblog_hadoop
shuffle演进

  1. <0.8 hashBasedShuffle
    每个map端的task为每个reduce端的partition/task生成一个文件,通常会产生大量的文件,伴随大量的随机磁盘IO操作与大量的内存开销M*R
  2. 0.8.1 引入文件合并File Consolidation机制
    每个executor为每个reduce端的partition生成一个文件E*R
  3. 0.9 引入External AppendOnlyMap
    combine时可以将数据spill到磁盘,然后通过堆排序merge
  4. 1.1 引入sortBasedShuffle
    每个map task不会为每个reducer task生成一个单独的文件,而是会将所有的结果写到一个文件里,同时会生成一个index文件,reducer可以通过这个index文件取得它需要处理的数据M
  5. 1.4 引入Tungsten-Sort Based Shuffle
    亦称unsafeShuffle,将数据记录用序列化的二进制方式存储,把排序转化成指针数组的排序,引入堆外内存空间和新的内存管理模型
  6. 1.6 Tungsten-sort并入Sort Based Shuffle
    由SortShuffleManager自动判断选择最佳Shuffle方式,如果检测到满足Tungsten-sort条件会自动采用Tungsten-sort Based Shuffle,否则采用Sort Based Shuffle
  7. 2.0 hashBasedShuffle退出历史舞台
    从此Spark只有sortBasedShuffle

调优

shuffle是一个涉及到CPU(序列化反序列化)、网络IO(跨节点数据传输)以及磁盘IO(shuffle中间结果落盘)的操作。所以用户在编写Spark应用程序的过程中应当尽可能避免shuffle算子和考虑shuffle相关的优化,提升spark应用程序的性能。
要减少shuffle的开销,主要有两个思路,
减少shuffle次数,尽量不改变key,把数据处理在local完成
减少shuffle的数据规模

  1. 先去重,再合并
    A.union(B).distinct() vs. A.distinct().union(B.distinct()).distinct()
  2. 用broadcast + filter来代替join
  3. spark.shuffle.file.buffer
    设置shuffle write task的buffer大小,将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘
  4. spark.reducer.maxSizeInFlight
    设置shuffle read task的buffer大小,决定了每次能够拉取pull多少数据。减少拉取数据的次数,也就减少了网络传输的次数
  5. spark.shuffle.sort.bypassMergeThreshold
    shuffle read task的数量小于这个阈值(默认是200),则map-side/shuffle write过程中不会进行排序操作

Spark的join类型

Shuffled Hash Join
Sort Merge Join
Broadcast Join

sql joins From JAMES CONNER

其他优化项

  1. 使用DataFrame/DataSet
    spark sql 的catalyst优化器,
    堆外内存(有了Tungsten后,感觉off-head没有那么明显的性能提升了)
    在这里插入图片描述

spark api演进

在这里插入图片描述

宽依赖和窄依赖?

Spark中RDD的高效与DAG(有向无环图)有很大的关系,在DAG调度中需要对计算的过程划分Stage,划分的依据就是RDD之间的依赖关系。RDD之间的依赖关系分为两种,宽依赖(wide dependency/shuffle dependency)和窄依赖(narrow dependency)
1.窄依赖
窄依赖就是指父RDD的每个分区只被一个子RDD分区使用,子RDD分区通常只对应常数个父RDD分区,如下图所示【其中每个小方块代表一个RDD Partition】
在这里插入图片描述
窄依赖有分为两种:

  • 一种是一对一的依赖,即OneToOneDependency
  • 还有一个是范围的依赖,即RangeDependency,它仅仅被org.apache.spark.rdd.UnionRDD使用。UnionRDD是把多个RDD合成一个RDD,这些RDD是被拼接而成,即每个parent RDD的Partition的相对顺序不会变,只不过每个parent RDD在UnionRDD中的Partition的起始位置不同

2.宽依赖
宽依赖就是指父RDD的每个分区都有可能被多个子RDD分区使用,子RDD分区通常对应父RDD所有分区,如下图所示【其中每个小方块代表一个RDD Partition】
在这里插入图片描述
3.窄依赖与窄依赖比较

  • 宽依赖往往对应着shuffle操作,需要在运行的过程中将同一个RDD分区传入到不同的RDD分区中,中间可能涉及到多个节点之间数据的传输,而窄依赖的每个父RDD分区通常只会传入到另一个子RDD分区,通常在一个节点内完成。
  • 当RDD分区丢失时,对于窄依赖来说,由于父RDD的一个分区只对应一个子RDD分区,这样只需要重新计算与子RDD分区对应的父RDD分区就行。这个计算对数据的利用是100%的
  • 当RDD分区丢失时,对于宽依赖来说,重算的父RDD分区只有一部分数据是对应丢失的子RDD分区的,另一部分就造成了多余的计算。宽依赖中的子RDD分区通常来自多个父RDD分区,极端情况下,所有父RDD都有可能重新计算。如下图,par4丢失,则需要重新计算par1,par2,par3,产生了冗余数据par5

分区丢失图
在这里插入图片描述
4.宽依赖,窄依赖函数

  • 窄依赖的函数有:
    map, filter, union, join(父RDD是hash-partitioned ), mapPartitions, mapValues
  • 宽依赖的函数有:
    groupByKey, join(父RDD不是hash-partitioned ), partitionBy

Spark的三种提交模式是什么?

1)本地模式
Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。将Spark应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地模式分三类
· local:只启动一个executor
· local[k]:启动k个executor
2)standalone模式
分布式部署集群, 自带完整的服务,资源管理和任务监控是Spark自己监控,这个模式也是其他模式的基础,
3)Spark on yarn模式
分布式部署集群,资源和任务监控交给yarn管理,但是目前仅支持粗粒度资源分配方式,包含cluster和client运行模式,cluster适合生产,driver运行在集群子节点,具有容错功能,client适合调试,dirver运行在客户端

spark 实现高可用性:High Availability?

1)配置zookeeper
2)修改spark_env.sh文件,spark的master参数不在指定,添加如下代码到各个master节点

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=zk01:2181,zk02:2181,zk03:2181 -Dspark.deploy.zookeeper.dir=/spark"

3 ) 将spark_env.sh分发到各个节点
4)找到一个master节点,执行./start-all.sh,会在这里启动主master,其他的master备节点,启动master命令: ./sbin/start-master.sh
5)提交程序的时候指定master的时候要指定三台master,例如

./spark-shell --master spark://master01:7077,master02:7077,master03:7077

Spark master使用zookeeper进行HA的,有哪些元数据保存在Zookeeper?

答:spark通过这个参数spark.deploy.zookeeper.dir指定master元数据在zookeeper中保存的位置,包括Worker,Driver和Application以及Executors。standby节点要从zk中,获得元数据信息,恢复集群运行状态,才能对外继续提供服务,作业提交资源申请等,在恢复前是不能接受请求的。另外,Master切换需要注意2点
1)在Master切换的过程中,所有的已经在运行的程序皆正常运行!因为Spark Application在运行前就已经通过Cluster Manager获得了计算资源,所以在运行时Job本身的调度和处理和Master是没有任何关系的!
2) 在Master的切换过程中唯一的影响是不能提交新的Job:一方面不能够提交新的应用程序给集群,因为只有Active Master才能接受新的程序的提交请求;另外一方面,已经运行的程序中也不能够因为Action操作触发新的Job的提交请求;

Spark master HA 主从切换过程不会影响集群已有的作业运行,为什么?

答:因为程序在运行之前,已经申请过资源了,driver和Executors通讯,不需要和master进行通讯的。

Spark on Mesos中,什么是的粗粒度分配,什么是细粒度分配,各自的优点和缺点是什么?

答:1)粗粒度:启动时就分配好资源, 程序启动,后续具体使用就使用分配好的资源,不需要再分配资源;好处:作业特别多时,资源复用率高,适合粗粒度;不好:容易资源浪费,假如一个job有1000个task,完成了999个,还有一个没完成,那么使用粗粒度,999个资源就会闲置在那里,资源浪费。2)细粒度分配:用资源的时候分配,用完了就立即回收资源,启动会麻烦一点,启动一次分配一次,会比较麻烦。

driver的功能是什么?

答: 1)一个Spark作业运行时包括一个Driver进程,也是作业的主进程,具有main函数,并且有SparkContext的实例,是程序的人口点;2)功能:负责向集群申请资源,向master注册信息,负责了作业的调度,,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler。

Spark中Work的主要工作是什么?

答:主要功能:管理当前节点内存,CPU的使用状况,接收master分配过来的资源指令,通过ExecutorRunner启动程序分配任务,worker就类似于包工头,管理分配新进程,做计算的服务,相当于process服务。需要注意的是:1)worker会不会汇报当前信息给master,worker心跳给master主要只有workid,它不会发送资源信息以心跳的方式给mater,master分配的时候就知道work,只有出现故障的时候才会发送资源。2)worker不会运行代码,具体运行的是Executor是可以运行具体appliaction写的业务逻辑代码,操作代码的节点,它不会运行程序的代码的。

Spark为什么比mapreduce快?

答:1)基于内存计算,减少低效的磁盘交互;2)高效的调度算法,基于DAG;3)容错机制Linage,精华部分就是DAG和Lingae

简单说一下hadoop和spark的shuffle相同和差异?

答:1)从 high-level 的角度来看,两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce() (Spark 里可能是后续的一系列操作)。
2)从 low-level 的角度来看,两者差别不小。 Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。目前的 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作;如果你是Spark 1.1的用户,可以将spark.shuffle.manager设置为sort,则会对数据进行排序。在Spark 1.2中,sort将作为默认的Shuffle实现。
3)从实现角度来看,两者也有不少差别。 Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蕴含在 transformation() 中。
如果我们将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 中,问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read 的处理逻辑?以及两个处理逻辑应该怎么高效实现?
Shuffle write由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。

Mapreduce和Spark的都是并行计算,那么他们有什么相同和区别

答:两者都是用mr模型来进行并行计算:
1)hadoop的一个作业称为job,job里面分为map task和reduce task,每个task都是在自己的进程中运行的,当task结束时,进程也会结束。
2)spark用户提交的任务成为application,一个application对应一个sparkcontext,app中存在多个job,每触发一次action操作就会产生一个job。这些job可以并行或串行执行,每个job中有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,组成taskset有TaskSchaduler分发到各个executor中执行,executor的生命周期是和app一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存进行计算。
3)hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系。
spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,groupby等,而且通过DAG图可以实现良好的容错。

spark-submit的时候如何引入外部jar包

方法一:spark-submit –jars
根据spark官网,在提交任务的时候指定–jars,用逗号分开。这样做的缺点是每次都要指定jar包,如果jar包少的话可以这么做,但是如果多的话会很麻烦。
命令:spark-submit --master yarn-client --jars ***.jar, ***.jar
方法二:extraClassPath
提交时在spark-default中设定参数,将所有需要的jar包考到一个文件里,然后在参数中指定该目录就可以了,较上一个方便很多:
spark.executor.extraClassPath=/home/hadoop/wzq_workspace/lib/
spark.driver.extraClassPath=/home/hadoop/wzq_workspace/lib/

需要注意的是,你要在所有可能运行spark任务的机器上保证该目录存在,并且将jar包考到所有机器上。这样做的好处是提交代码的时候不用再写一长串jar了,缺点是要把所有的jar包都拷一遍。

cache和pesist的区别

答:1)cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间;2) cache只有一个默认的缓存级别MEMORY_ONLY ,cache调用了persist,而persist可以根据情况设置其它的缓存级别;3)executor执行的时候,默认60%做cache,40%做task操作,persist最根本的函数,最底层的函数

Spark有哪些聚合类的算子,我们应该尽量避免什么类型的算子?

答:在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。

Spark并行度怎么设置比较合适

答:spark并行度,每个core承载24个partition,如,32个core,那么64128之间的并行度,也就是设置64~128个partion,并行读和数据规模无关,只和内存使用量和cpu使用时间有关

Spark中数据的位置是被谁管理的?

答:每个数据分片都对应具体物理位置,数据的位置是被blockManager,无论
数据是在磁盘,内存还是tacyan,都是由blockManager管理

Spark的数据本地性有哪几种?

答:Spark中的数据本地性有三种:
a.PROCESS_LOCAL是指读取缓存在本地节点的数据
b.NODE_LOCAL是指读取本地节点硬盘数据
c.ANY是指读取非本地节点数据
通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关,如果RDD经常用的话将该RDD cache到内存中,注意,由于cache是lazy的,所以必须通过一个action的触发,才能真正的将该RDD cache到内存中。

rdd有几种操作类型?

1)transformation,rdd由一种转为另一种rdd
2)action,
3)cronroller,crontroller是控制算子,cache,persist,对性能和效率的有很好的支持三种类型,不要回答只有2中操作

Spark如何处理不能被序列化的对象?

将不能序列化的内容封装成object

collect功能是什么,其底层是怎么实现的?

答:driver通过collect把集群中各个节点的内容收集过来汇总成结果,collect返回结果是Array类型的,collect把各个节点上的数据抓过来,抓过来数据是Array型,collect对Array抓过来的结果进行合并,合并后Array中只有一个元素,是tuple类型(KV类型的)的。

Spark程序执行,有时候默认为什么会产生很多task,怎么修改默认task执行个数?

答:1)因为输入数据有很多task,尤其是有很多小文件的时候,有多少个输入
block就会有多少个task启动;2)spark中有partition的概念,每个partition都会对应一个task,task越多,在处理大规模数据的时候,就会越有效率。不过task并不是越多越好,如果平时测试,或者数据量没有那么大,则没有必要task数量太多。3)参数可以通过spark_home/conf/spark-default.conf配置文件设置:
spark.sql.shuffle.partitions 50 spark.default.parallelism 10
第一个是针对spark sql的task数量
第二个是非spark sql程序设置生效

为什么Spark Application在没有获得足够的资源,job就开始执行了,可能会导致什么什么问题发生

答:会导致执行该job时候集群资源不足,导致执行job结束也没有分配足够的资源,分配了部分Executor,该job就开始执行task,应该是task的调度线程和Executor资源申请是异步的;如果想等待申请完所有的资源再执行job的:需要将spark.scheduler.maxRegisteredResourcesWaitingTime设置的很大;spark.scheduler.minRegisteredResourcesRatio 设置为1,但是应该结合实际考虑
否则很容易出现长时间分配不到资源,job一直不能运行的情况。

map与flatMap的区别

map:对RDD每个元素转换,文件中的每一行数据返回一个数组对象
flatMap:对RDD每个元素转换,然后再扁平化
将所有的对象合并为一个对象,文件中的所有行数据仅返回一个数组
对象,会抛弃值为null的值

Spark为什么要持久化,一般什么场景下要进行persist操作?

为什么要进行持久化?
spark所有复杂一点的算法都会有persist身影,spark默认数据放在内存,spark很多内容都是放在内存的,非常适合高速迭代,1000个步骤
只有第一个输入数据,中间不产生临时数据,但分布式系统风险很高,所以容易出错,就要容错,rdd出错或者分片可以根据血统算出来,如果没有对父rdd进行persist 或者cache的化,就需要重头做。
以下场景会使用persist
1)某个步骤计算非常耗时,需要进行persist持久化
2)计算链条非常长,重新恢复要算很多步骤,很好使,persist
3)checkpoint所在的rdd要持久化persist,
lazy级别,框架发现有checnkpoint,checkpoint时单独触发一个job,需要重算一遍,checkpoint前
要持久化,写个rdd.cache或者rdd.persist,将结果保存起来,再写checkpoint操作,这样执行起来会非常快,不需要重新计算rdd链条了。checkpoint之前一定会进行persist。
4)shuffle之后为什么要persist,shuffle要进性网络传输,风险很大,数据丢失重来,恢复代价很大
5)shuffle之前进行persist,框架默认将数据持久化到磁盘,这个是框架自动做的。

介绍一下join操作优化经验?

答:join其实常见的就分为两类: map-side join 和 reduce-side join。当大表和小表join时,用map-side join能显著提高效率。将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘IO消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join。如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。
备注:这个题目面试中非常非常大概率见到,务必搜索相关资料掌握,这里抛砖引玉。

介绍一下cogroup rdd实现原理,你在什么场景下用过这个rdd?

答:cogroup的函数实现:这个实现根据两个要进行合并的两个RDD操作,生成一个CoGroupedRDD的实例,这个RDD的返回结果是把相同的key中两个RDD分别进行合并操作,最后返回的RDD的value是一个Pair的实例,这个实例包含两个Iterable的值,第一个值表示的是RDD1中相同KEY的值,第二个值表示的是RDD2中相同key的值.由于做cogroup的操作,需要通过partitioner进行重新分区的操作,因此,执行这个流程时,需要执行一次shuffle的操作(如果要进行合并的两个RDD的都已经是shuffle后的rdd,同时他们对应的partitioner相同时,就不需要执行shuffle,),
场景:表关联查询

Spark使用parquet文件存储格式能带来哪些好处?

  1. 如果说HDFS 是大数据时代分布式文件系统首选标准,那么parquet则是整个大数据时代文件存储格式实时首选标准
  2. 速度更快:从使用spark sql操作普通文件CSV和parquet文件速度对比上看,绝大多数情况
    会比使用csv等普通文件速度提升10倍左右,在一些普通文件系统无法在spark上成功运行的情况
    下,使用parquet很多时候可以成功运行
  3. parquet的压缩技术非常稳定出色,在spark sql中对压缩技术的处理可能无法正常的完成工作
    (例如会导致lost task,lost executor)但是此时如果使用parquet就可以正常的完成
  4. 极大的减少磁盘I/o,通常情况下能够减少75%的存储空间,由此可以极大的减少spark sql处理
    数据的时候的数据输入内容,尤其是在spark1.6x中有个下推过滤器在一些情况下可以极大的
    减少磁盘的IO和内存的占用,(下推过滤器)
  5. spark 1.6x parquet方式极大的提升了扫描的吞吐量,极大提高了数据的查找速度spark1.6和spark1.5x相比而言,提升了大约1倍的速度,在spark1.6X中,操作parquet时候cpu也进行了极大的优化,有效的降低了cpu
  6. 采用parquet可以极大的优化spark的调度和执行。我们测试spark如果用parquet可以有效的减少stage的执行消耗,同时可以优化执行路径

Executor之间如何共享数据?

答:基于hdfs或者基于tachyon

Spark累加器有哪些特点?

1)累加器在全局唯一的,只增不减,记录全局集群的唯一状态
2)在exe中修改它,在driver读取
3)executor级别共享的,广播变量是task级别的共享
两个application不可以共享累加器,但是同一个app不同的job可以共享

如何在一个不确定的数据规模的范围内进行排序?

为了提高效率,要划分划分,划分的范围并且是有序的
要么有序,要么降序?
水塘抽样:目的是从一个集合中选取,集合非常答,适合内存
无法容纳数据的时候使用
从N中抽取出K个,N是随机数

spark hashParitioner的弊端是什么?

答:HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID;弊端是数据不均匀,容易导致数据倾斜,极端情况下某几个分区会拥有rdd的所有数据

RangePartitioner分区的原理

答:RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。其原理是水塘抽样。
介绍parition和block有什么关联关系?
答:1)hdfs中的block是分布式存储的最小单元,等分,可设置冗余,这样设计有一部分磁盘空间的浪费,但是整齐的block大小,便于快速找到、读取对应的内容;2)Spark中的partion是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partion组成的。partion是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partion大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定;3)block位于存储空间、partion位于计算空间,block的大小是固定的、partion大小是不固定的,是从2个不同的角度去看数据。

Spark应用程序的执行过程是什么?

1)构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
2).资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;
3).SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor。
4).Task在Executor上运行,运行完毕释放所有资源。

hbase预分区个数和spark过程中的reduce个数相同么

答:和spark的map个数相同,reduce个数如果没有设置和reduce前的map数相同。

如何理解Standalone模式下,Spark资源分配是粗粒度的?

答:spark默认情况下资源分配是粗粒度的,也就是说程序在提交时就分配好资源,后面执行的时候
使用分配好的资源,除非资源出现了故障才会重新分配。比如Spark shell启动,已提交,一注册,哪怕没有任务,worker都会分配资源给executor。

Spark如何自定义partitioner分区器?

答:1)spark默认实现了HashPartitioner和RangePartitioner两种分区策略,我们也可以自己扩展分区策略,自定义分区器的时候继承org.apache.spark.Partitioner类,实现类中的三个方法
def numPartitions: Int:这个方法需要返回你想要创建分区的个数;
def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1;
equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。
2)使用,调用parttionBy方法中传入自定义分区对象
参考:http://blog.csdn.net/high2011/article/details/68491115

窄依赖父RDD的partition和子RDD的parition是不是都是一对一的关系?

答:不一定,除了一对一的窄依赖,还包含一对固定个数的窄依赖(就是对父RDD的依赖的Partition的数量不会随着RDD数量规模的改变而改变),比如join操作的每个partiion仅仅和已知的partition进行join,这个join操作是窄依赖,依赖固定数量的父rdd,因为是确定的partition关系

Hadoop中,Mapreduce操作的mapper和reducer阶段相当于spark中的哪几个算子?

答:相当于spark中的map算子和reduceByKey算子,当然还是有点区别的,MR会自动进行排序的,spark要看你用的是什么partitioner

不需要排序的hash shuffle是否一定比需要排序的sort shuffle速度快?

答:不一定!!当数据规模小,Hash shuffle快于Sorted Shuffle数据规模大的时候;当数据量大,sorted Shuffle会比Hash shuffle快很多,因为数量大的有很多小文件,不均匀,甚至出现数据倾斜,消耗内存大,1.x之前spark使用hash,适合处理中小规模,1.x之后,增加了Sorted shuffle,Spark更能胜任大规模处理了。

Spark shell启动时会启动derby

答: spark shell启动会启动spark sql,spark sql默认使用derby保存元数据,但是尽量不要用derby,它是单实例,不利于开发。会在本地生成一个文件metastore_db,如果启动报错,就把那个文件给删了 ,derby数据库是单实例,不能支持多个用户同时操作,尽量避免使用

spark.default.parallelism这个参数有什么意义,实际生产中如何设置?

答:1)参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能;2)很多人都不会设置这个参数,会使得集群非常低效,你的cpu,内存再多,如果task始终为1,那也是浪费,spark官网建议task个数为CPU的核数*executor的个数的2~3倍。

spark.storage.memoryFraction参数的含义,实际生产中如何调优?

答:1)用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6,,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。2)如果持久化操作比较多,可以提高spark.storage.memoryFraction参数,使得更多的持久化数据保存在内存中,提高数据的读取性能,如果shuffle的操作比较多,有很多的数据读写操作到JVM中,那么应该调小一点,节约出更多的内存给JVM,避免过多的JVM gc发生。在web ui中观察如果发现gc时间很长,可以设置spark.storage.memoryFraction更小一点。

spark.shuffle.memoryFraction参数的含义,以及优化经验?

答:1)spark.shuffle.memoryFraction是shuffle调优中 重要参数,shuffle从上一个task拉去数据过来,要在Executor进行聚合操作,聚合操作时使用Executor内存的比例由该参数决定,默认是20%
如果聚合时数据超过了该大小,那么就会spill到磁盘,极大降低性能;2)如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值

介绍一下你对Unified Memory Management内存管理模型的理解?

答:Spark中的内存使用分为两部分:执行(execution)与存储(storage)。执行内存主要用于shuffles、joins、sorts和aggregations,存储内存则用于缓存或者跨节点的内部数据传输。1.6之前,对于一个Executor,内存都有哪些部分构成:
1)ExecutionMemory。这片内存区域是为了解决 shuffles,joins, sorts and aggregations 过程中为了避免频繁IO需要的buffer。 通过spark.shuffle.memoryFraction(默认 0.2) 配置。
2)StorageMemory。这片内存区域是为了解决 block cache(就是你显示调用dd.cache, rdd.persist等方法), 还有就是broadcasts,以及task results的存储。可以通过参数 spark.storage.memoryFraction(默认0.6)。设置
3)OtherMemory。给系统预留的,因为程序本身运行也是需要内存的。 (默认为0.2).
传统内存管理的不足:
1).Shuffle占用内存0.2*0.8,内存分配这么少,可能会将数据spill到磁盘,频繁的磁盘IO是很大的负担,Storage内存占用0.6,主要是为了迭代处理。传统的Spark内存分配对操作人的要求非常高。(Shuffle分配内存:ShuffleMemoryManager, TaskMemoryManager,ExecutorMemoryManager)一个Task获得全部的Execution的Memory,其他Task过来就没有内存了,只能等待。
2).默认情况下,Task在线程中可能会占满整个内存,分片数据特别大的情况下就会出现这种情况,其他Task没有内存了,剩下的cores就空闲了,这是巨大的浪费。这也是人为操作的不当造成的。
3).MEMORY_AND_DISK_SER的storage方式,获得RDD的数据是一条条获取,iterator的方式。如果内存不够(spark.storage.unrollFraction),unroll的读取数据过程,就是看内存是否足够,如果足够,就下一条。unroll的space是从Storage的内存空间中获得的。unroll的方式失败,就会直接放磁盘。
4). 默认情况下,Task在spill到磁盘之前,会将部分数据存放到内存上,如果获取不到内存,就不会执行。永无止境的等待,消耗CPU和内存。
在此基础上,Spark提出了UnifiedMemoryManager,不再分ExecutionMemory和Storage Memory,实际上还是分的,只不过是Execution Memory访问Storage Memory,Storage Memory也可以访问Execution Memory,如果内存不够,就会去借。

提交任务时,如何指定Spark Application的运行模式?

1)cluster模式:./spark-submit --class xx.xx.xx --master yarn --deploy-mode cluster xx.jar
2) client模式:./spark-submit --class xx.xx.xx --master yarn --deploy-mode client xx.jar

不启动Spark集群Master和work服务,可不可以运行Spark程序?

答:可以,只要资源管理器第三方管理就可以,如由yarn管理,spark集群不启动也可以使用spark;spark集群启动的是work和master,这个其实就是资源管理框架,yarn中的resourceManager相当于master,NodeManager相当于worker,做计算是Executor,和spark集群的work和manager可以没关系,归根接底还是JVM的运行,只要所在的JVM上安装了spark就可以。

spark on yarn Cluster 模式下,ApplicationMaster和driver是在同一个进程么?

答:是,driver 位于ApplicationMaster进程中。该进程负责申请资源,还负责监控程序、资源的动态情况。

如何使用命令查看application运行的日志信息

答:yarn logs -applicationId

Spark on Yarn 模式有哪些优点?

1)与其他计算框架共享集群资源(eg.Spark框架与MapReduce框架同时运行,如果不用Yarn进行资源分配,MapReduce分到的内存资源会很少,效率低下);资源按需分配,进而提高集群资源利用等。
2)相较于Spark自带的Standalone模式,Yarn的资源分配更加细致
3)Application部署简化,例如Spark,Storm等多种框架的应用由客户端提交后,由Yarn负责资源的管理和调度,利用Container作为资源隔离的单位,以它为单位去使用内存,cpu等。
4)Yarn通过队列的方式,管理同时运行在Yarn集群中的多个服务,可根据不同类型的应用程序负载情况,调整对应的资源使用量,实现资源弹性管理。

Executor启动时,资源通过哪几个参数指定?

1)num-executors是executor的数量
2)executor-memory 是每个executor使用的内存
3)executor-cores 是每个executor分配的CPU

一个task的map数量由谁来决定?

一般情况下,在输入源是文件的时候,一个task的map数量由splitSize来决定的,那么splitSize是由以下几个来决定的
goalSize = totalSize / mapred.map.tasks
inSize = max {mapred.min.split.size, minSplitSize}
splitSize = max (minSize, min(goalSize, dfs.block.size))
一个task的reduce数量,由partition决定。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark与深度学习框架——H2O、dee.. 下一篇spark相关概念

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目