TOP

Spark学习(1)- Spark及其生态圈概述
2019-03-05 01:11:08 】 浏览:249
Tags:Spark 学习 及其 生态 概述

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/bingdianone/article/details/84105561

Spark概述及特点

Apache Spark是一个统一的分析引擎进行大规模数据处理

  1. 快速
    更快地运行工作负载100倍。
    Apache Spark使用最先进的DAG调度器、查询优化器和物理执行引擎,实现了批处理和流数据的高性能。
  2. 易用
    用Java、Scala、Python、R和SQL快速编写应用程序。
    Spark提供了超过80个高级操作符,使得构建并行应用程序变得容易。可以在Scala、Python、R和SQL shell中交互式地使用它。
  3. 通用
    结合SQL、流和复杂分析。
    Spark支持一系列库,包括SQL和DataFrames、用于机器学习的MLlib、GraphX和Spark流。您可以在同一个应用程序中无缝地组合这些库。
  4. 到处运行
    Spark运行在Hadoop、Apache Mesos、Kubernetes、独立或云中。它可以访问不同的数据源。
    您可以使用Spark的独立集群模式在EC2上、Hadoop YARN上、Mesos上或Kubernetes上运行。访问HDFS、Alluxio、Apache Cassandra、Apache HBase、Apache Hive和其他数百个数据源中的数据。

Spark产生背景

MapReduce的局限性:

  1. 代码繁琐;
  2. 只能够支持map和reduce方法;
  3. 执行效率低下;
  4. 不适合迭代多次、交互式、流式的处理;

框架多样化:

  1. 批处理(离线):MapReduce、Hive、Pig
  2. 流式处理(实时): Storm、JStorm
  3. 交互式计算:Impala

Spark发展历史

在这里插入图片描述
Spark官网详细历史:
http://spark.apache.org/news/index.html

Spark对比Hadoop

Hadoop生态系统

在这里插入图片描述

Spark生态系统 BDAS(BDAS:Berkeley Data Analytics Stack)

在这里插入图片描述

Hadoop和Spark生态圈对比:

在这里插入图片描述
表格解读:

用例 其他 Spark生态圈
批处理 Hadoop中的MapRdeuce(Java操作MR,Pig,Hive) Spark RDDs(java/scala/python调用相关api即可)
SQL查询 Hadoop中的Hive Spark SQL
流处理/实时处理 Storm,Kafka Spark Streaming
机器学习 Mahout(已停止更新) Spark ML Lib
实时查询 NoSQL(Hbase,Cassandra等等) 无相关spark组件;但是Spark可以在NoSQL存储中查询数据(api调用即可)

Hadoop对比Spark:

在这里插入图片描述表格解读:

Hadoop Spark
分布式存储+分布式计算 只关心分布式计算
MapReduce框架 广义的计算(通用的计算(流处理-机器学习等))
通常数据存储在磁盘上(HDFS) 即存储在磁盘上又存储在内存中(可设置存储策略)
不适合迭代工作 擅长迭代工作(机器学习)
批处理 即可批处理又可流处理;磁盘上的数据速度要快到2x-10x,而内存上的数据速度要快到100x
一般使用java 支持Java, Python, Scala,R等

MapReduce对比Spark

iter:作业处理;
MapReduce作业1和作业2之间的数据交互要落地磁盘;速度慢。
Spark作业1和作业2之间的数据交互无需落地;速度快。
在这里插入图片描述

Spark为什么比mapreduce快?

答:1)基于内存计算,减少低效的磁盘交互;2)高效的调度算法,基于DAG;3)容错机制Linage,精华部分就是DAG和Lingae

Spark和Hadoop的协作性

Hadoop优势

  1. 数据规模
    支持多种不同数据源
    支持多种应用
    支持多种用户
  2. 企业级的平台
    高可靠
    多租户
    安全性
  3. 应用范围
    文件数据
    数据库数据
    支持半结构化数据
    在这里插入图片描述

Spark的优势

  1. API简单,支持python,scala,java
  2. 内存计算框架
  3. 综合多个子框架进行使用
    在这里插入图片描述

Hadoop+Spark(相辅相成)

在这里插入图片描述
在这里插入图片描述

Mapreduce和Spark的都是并行计算,那么他们有什么相同和区别

答:两者都是用mr模型来进行并行计算:
1)hadoop的一个作业称为job,job里面分为map task和reduce task,每个task都是在自己的进程中运行的,当task结束时,进程也会结束。
2)spark用户提交的任务成为application,一个application对应一个sparkcontext,app中存在多个job,每触发一次action操作就会产生一个job。这些job可以并行或串行执行,每个job中有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,组成taskset有TaskSchaduler分发到各个executor中执行,executor的生命周期是和app一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存进行计算。
3)hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系。
spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,groupby等,而且通过DAG图可以实现良好的容错。

Spark术语

Spark框架图如下:
在这里插入图片描述
在这里插入图片描述
Application:用户编写的应用程序,用户自定义的Spark程序,用户提交后,Spark为App分配资源将程序转换并执行。
Driver Program:运行Application的main()函数并且创建SparkContext。
SparkContext:是用户逻辑与Spark集群主要的交互接口,它会和Cluster Manager进行交互,进行资源的申请,任务的分配与监控,SparkContext代表Driver。
Worker Node:从节点,集群中可以运行应用程序的节点,负责控制计算节点,启动Executor或Driver。在YARN模式中为NodeManager,负责计算节点的控制。
Executor:执行器,是为某Application运行在worker node上的一个进程,负责执行task,该进程里面会通过线程池的方式负责运行任务,并负责将数据存在内存或者磁盘上。每个Application拥有独立的一executors。
RDD DAG:当RDD遇到Action算子,将之前的所有算子形成一个有向无环图(DAG)。再在Spark中转化为Job,提交到集群进行执行。一个App可以包含多个Job。
Task:被Executor执行的工作单元,是运行Application最小的单位,多个task组合成一个stage,Task的调度和管理由TaskScheduler负责,一个分区对应一个Task,Task执行RDD中对应Stage中所包含的算子。Task被封装好后放入Executor的线程池中执行。
Job:一个job包含多个RDD及作用于相应RDD上的各种Operation。每执行一个action算子(foreach, count, collect, take, saveAsTextFile)就会生成一个 job。包含多个Task组成的并行计算。
Stage:每个Job的Task被拆分成很多组Task, 每组Task作为一个TaskSet,命名为Stage。一个作业job分为多个阶段stages(shuffle,串行),一个stage包含一系列的tasks(并行)。Stage的调度和划分由DAGScheduler负责。Stage又分为Shuffle Map Stage和Result Stage两种。Stage的边界就在发生Shuffle的地方。
RDD:Spark的基本数据操作抽象,可以通过一系列算子进行操作。RDD是Spark最核心的东西,可以被分区、被序列化、不可变、有容错机制,并且能并行操作的数据集合。存储级别可以是内存,也可以是磁盘。
DAG Scheduler:根据Job构建基于Stage的DAG(有向无环任务图),实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中,并提交Stage给TaskScheduler。
TaskScheduler:将Task分发给Executor执行,并维护Task的运行状态。将Stage提交Worker(集群)运行,每个Executor运行什么在此分配。
SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。
共享变量:Application在整个运行过程中,可能需要一些变量在每个Task中都使用,共享变量用于实现该目的。Spark有两种共享变量:一种缓存到各个节点的广播变量;一种只支持加法操作,实现求和的累加变量。
宽依赖:或称为ShuffleDependency, 宽依赖需要计算好所有父RDD对应分区的数据,然后在节点之间进行Shuffle。
窄依赖:或称为NarrowDependency,指某个RDD,其分区partition x最多被其子RDD的一个分区partion y依赖。窄依赖都是Map任务,不需要发生shuffle。因此,窄依赖的Task一般都会被合成在一起,构成一个Stage。

Spark工作流程

Spark的三种提交模式是什么?

1)本地模式
Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。将Spark应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地模式分三类
· local:只启动一个executor
· local[k]:启动k个executor
2)standalone模式
分布式部署集群, 自带完整的服务,资源管理和任务监控是Spark自己监控,这个模式也是其他模式的基础,
3)Spark on yarn模式
分布式部署集群,资源和任务监控交给yarn管理,但是目前仅支持粗粒度资源分配方式,包含cluster和client运行模式,cluster适合生产,driver运行在集群子节点,具有容错功能,client适合调试,dirver运行在客户端

Spark-standalone工作流程

在这里插入图片描述
1、spark-submit 提交了应用程序的时候,提交spark应用的机器会通过反射的方式,创建和构造一个Driver进程,Driver进程执行Application程序,
2、Driver根据sparkConf中的配置初始化SparkContext,在SparkContext初始化的过程中会启动DAGScheduler和taskScheduler
3、taskSheduler通过后台进程,向Master注册Application,Master接到了Application的注册请求之后,会使用自己的资源调度算法,在spark集群的worker上,通知worker为application启动多个Executor。
4、Executor会向taskScheduler反向注册。
5、Driver完成SparkContext初始化
6、application程序执行到Action时,就会创建Job。并且由DAGScheduler将Job划分多个Stage,每个Stage 由TaskSet 组成
7、DAGScheduler将TaskSet提交给taskScheduler
8、taskScheduler把TaskSet中的task依次提交给Executor
9、Executor在接收到task之后,会使用taskRunner来封装task(TaskRuner主要将我们编写程序,也就是我们编写的算子和函数进行拷贝和反序列化),然后,从Executor的线程池中取出一个线程来执行task。就这样Spark的每个Stage被作为TaskSet提交给Executor执行,每个Task对应一个RDD的partition,执行我们的定义的算子和函数。直到所有操作执行完为止。
在这里插入图片描述
在这里插入图片描述

Spark on yarn工作流程

在这里插入图片描述

client运行模式

1、在客户端通过Spark-submit提交一个Application
2、在客户端上启动一个Driver进程
3、 Driver启动完成后, client会向RS (ResourceManager)发送请求(给我找一台NM,我要启动AM)
4、RS接受到了请求,找到某一台NM了, Rs会向NM进程发送一条消息(给我启动一个Container容器,我要启动AM进程)
5、AM已经启动了, AM会向RS发送请求(给我一批资源,我要运行Application)
6、RS接受了请求,给他找了一批N回给AM
7、AM会向这一批MM发送消息(你给我启动一个Container,我要启动Executor)
8、Executor会反向注册给客户端里启动的Driver进程
9、 Driver就有了一批计算进程(Executor)
10、 Driver就可以发送task到Executor里面去执行了。

总结:Applicationmaster作用:
1、 为当前的Application申请资源
2、 给NM发送消息;启动Container(一组计算单位)Executor

Cluster运行模式

client vs cluster

1、client模式Driver在客户端启动 测试
2、cluster模式Driver是在yarn集群中某一台NM中启动生产环境
3、ApplicationMaster在不同的模式下作用不一样:
  ApplicationMaster在client模式下:
    (1)为当前的Application申请资源
    (2)给NM发送消息, NM启动Container(一组计算资源的单位) Executor
  ApplicationMaster在cluster模式下:
    (1)为当前的Application申请资源
    (2)给NM发送消息, NM启动container(一组计算资源的单位)Executor
    (3)任务调度

注意事项:

1、 yarn集群所在的节点必须有spark的安装包
2、 Spark跑在Yarn集群上,不需要启动Spark standalone集群,不需要master worker这一些节点
master->RS
Worker->NM
在这里插入图片描述

Spark Shuffle

HashShuffleManager:

在这里插入图片描述
shuffle write阶段,默认Mapper阶段会为Reducer阶段的每一个Task单独创建一个文件来保存该Task中要使用的数据。
(1)map task的计算结果,会根据分区器(default:HashPartitioner)来决定写入到哪一个磁盘小文件中
(2)reduce task 会去map端拉取相应的小文件
产生磁盘小文件的个数公式:M(map task的个数)*R(reduce task的个数)

优点:就是操作数据简单。
缺点:但是在一些情况下(例如数据量非常大的情况)会造成大量文件(M*R,其中M代表Mapper中的所有的并行任务数量,R代表Reducer中所有的并行任务数据)大数据的随机磁盘I/O操作且会形成大量的Memory(极易造成OOM)。

磁盘小文件过多会有什么问题?
1、在Shuffle write过程会产生很多的写磁盘的对象
2、在Shuffle read过程会产生很多的读磁盘的对象
3、在数据传输过程中,会有频繁的网络通信
在JVM堆内存中对象过多会造成频繁的GC;GC还是无法解决运行所需要的内存的话,就会oom;频繁的网络通信,会出现通信故障的可能性大大增加了,一旦网络通信出现了故障,就会出现如下的错误
Shuffle file connot find由于这个错误导致的task失败,那么TaskScheduler不负责重试,由DAGScheduler负责重试stage

HashShuffleManager产生的问题:

第一:不能够处理大规模的数据
第二:Spark不能够运行在大规模的分布式集群上!

改进方案:Consolidate机制:
spark.shuffle.consolidateFiles 该参数默认值为false,将其设置为true即可开启优化机制
后来的改善是加入了Consolidate机制来将Shuffle时候产生的文件数量减少到CR个(C代表在Mapper端,同时能够使用的cores数量,R代表Reducer中所有的并行任务数量)。但是此时如果Reducer端的并行数据分片过多的话则CR可能已经过大,此时依旧没有逃脱文件打开过多的厄运!!!Consolidate并没有降低并行度,只是降低了临时文件的数量,此时Mapper端的内存消耗就会变少,所以OOM也就会降低,另外一方面磁盘的性能也会变得更好。

在这里插入图片描述
开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

前提:每个Excutor分配1个cores,假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

SortShuffle

在Mapper中的每一个ShuffleMapTask中产生两个文件:Data文件和Index文件,其中Data文件是存储当前Task的Shuffle输出的。而index文件中则存储了Data文件中的数据通过Partitioner的分类信息,此时下一个阶段的Stage中的Task就是根据这个Index文件获取自己所要抓取的上一个Stage中的ShuffleMapTask产生的数据的,Reducer就是根据index文件来获取属于自己的数据。

涉及问题:Sorted-based Shuffle:会产生 2*M(M代表了Mapper阶段中并行的Partition的总数量,其实就是ShuffleMapTask的总数量)个Shuffle临时文件。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。
在这里插入图片描述
普通机制Sort-based Shuffle的流程
(1) map task的计算结果会写入到一个内存数据结构里面,内存数据结构默认是5M
(2) 在shuffle的时候会有一个定时器,不定期的去估算这个内存数据结构的大小,如果现在内存数据结构的大小是5.01M,那么它会申请5.01*2-5=5.02M内存给内存数据结构
(3) 如果申请成功,不会进行溢写
(4) 如果申请不成功,这个时候就会有溢写的过程
(5) 在溢写之前,会将内存数据结构里面的数据进行排序,以及分区
(6) 然后开始写磁盘,写磁盘是以bacth的形式去写,一个batch是1W条数据
(7) Map task执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引
(8) Reduce task去map端拉数据的时候,首先解析索引文件,根据索引文件再去拉去属于它自己的数据
产生磁盘小文件的公式:2M(M代表了Mapper阶段中并行的Partition的总数量,其实就是ShuffleMapTask的总数量)

默认Sort-based Shuffle的几个缺陷
1)如果Mapper中Task的数量过大,依旧会产生很多小文件,此时在Shuffle传递数据的过程中到Reducer端,reduce会需要同时打开大量的记录来进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至崩溃!
2)如果需要在分片内也进行排序的话,此时需要进行Mapper端和Reducer端的两次排序!!!
优化:
可以改造Mapper和Reducer端,改框架来实现一次排序。
在这里插入图片描述
bypass运行机制
1、spark.shuffle.sort.bypassMergeThreshold 默认值为200 ,如果shuffle read task的数量小于这个阀值200,则不会进行排序。
2、或者使用hashbasedshuffle + consolidateFiles 机制

上图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件如下:

  1. shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
    这个参数仅适用于SortShuffleManager,如前所述,SortShuffleManager在处理不需要排序的Shuffle操作时,由于排序带来性能的下降。这个参数决定了在这种情况下,当Reduce分区的数量小于多少的时候,在SortShuffleManager内部不使用Merge Sort的方式处理数据,而是与Hash Shuffle类似,直接将分区文件写入单独的文件,不同的是,在最后一步还是会将这些文件合并成一个单独的文件。这样通过去除Sort步骤来加快处理速度,代价是需要并发打开多个文件,所以内存消耗量增加,本质上是相对HashShuffleMananger一个折衷方案。这个参数的默认值是200个分区,如果内存GC问题严重,可以降低这个值。

  2. 不是聚合类的shuffle算子(比如reduceByKey)。
    此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
    该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
    而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

总结
有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。tungsten-sort慎用,存在bug.

spark shuffle参数调优

spark.shuffle.file.buffer

  • 默认值:32k
  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.reducer.maxSizeInFlight

  • 默认值:48m
  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
  • Reduce task去map拉数据,reduce 一边拉数据一边聚合 reduce端有一块聚合内存(executor memory)
  • 解决方法:
    (1)增加reduce聚合的内存比例 设置spark.Shuffle.memoryFraction
    (2)增加executor memory的大小 –executor-memory 5G
    (3)减少reduce task每次拉取的数据量 设置
    spark.reducer.maxSizeInFlight 24m

spark.shuffle.io.maxRetries

  • 默认值:3
  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
  • 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

spark.shuffle.io.retryWait

  • 默认值:5s
  • 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction

  • 默认值:0.2
  • 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

  • 默认值:sort
  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
  • 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

  • 默认值:false
  • 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
  • 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

Spark RDD

RDD机制

rdd是一个弹性的分布式数据集,简单的理解成一种数据结构,是spark框架上的通用货币。
所有算子都是基于rdd来执行的,不同的场景会有不同的rdd实现类,但是都可以进行互相转换。
rdd执行过程中会形成dag图,然后形成lineage保证容错性等。
从物理的角度来看rdd存储的是block和node之间的映射。

RDD弹性的分布式数据集五大特性

  1. RDD有一系列的Partition组成的
  2. 每一个算子实际上是作用在每一个partition上
  3. rdd之间是有依赖关系的
  4. 可选项:分区器作用在KV格式的RDD上
     (1)分区器是在shuffle阶段起作用
     (2) GroupByKey, reduceByKey, join, sortBykey等这些算子会产生shuffle
     (3)这些算子必须作用在KV格式的RDD
  5. RDD会提供一系列最佳计算位置,说白了就是暴露每一个partitior的位置这是数据本地化的基础

RDD持久化原理

不使用RDD的持久化

在这里插入图片描述
1、默认情况下,对于大量数据的action操作都是非常耗时的。可能一个操作就耗时1个小时;
2、在执行action操作的时候,才会触发之前的操作的执行,因此在执行第一次count操作时,就会从hdfs中读取一亿数据,形成lines RDD;
3、第一次count操作之后,我们的确获取到了hdfs文件的行数。但是lines RDD其实会被丢弃掉,数据也会被新的数据丢失;

所以,如果不用RDD的持久化机制,可能对于相同的RDD的计算需要重复从HDFS源头获取数据进行计算,这样会浪费很多时间成本;

使用持久化

在这里插入图片描述

  1. Spark非常重要的一个功能特性就是可以将RDD持久化在内存中。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition。这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。
  2. 巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。
  3. 要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition
  4. 1)cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间;2) cache只有一个默认的缓存级别MEMORY_ONLY ,cache调用了persist,而persist可以根据情况设置其它的缓存级别;3)executor执行的时候,默认60%做cache,40%做task操作,persist最根本的函数,最底层的函数。如果需要从内存中清楚缓存,那么可以使用unpersist()方法。
  5. Spark自己也会在shuffle操作时,进行数据的持久化,比如写入磁盘,主要是为了在节点失败时,避免需要重新计算整个过程。

Spark 中一个很重要的能力是将数据持久化(或称为缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点的其它分区都可以使用 RDD在内存中进行计算,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10倍)。缓存是迭代算法和快速的交互式使用的重要工具。

RDD 可以使用persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。

另外,每个持久化的 RDD 可以使用不同的存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(可以节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别通过传递一个 **StorageLevel 对象(Scala、Java、Python)给 persist() 方法进行设置。cache()**方法是使用默认存储级别的快捷设置方法,默认的存储级别是 StorageLevel.MEMORY_ONLY(将反序列化的对象存储到内存中)。详细的存储级别介绍如下 :

  • MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
  • MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
  • MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会没有序列化对象的方式节省很多空间,尤其是在使用 fast serializer时会节省更多的空间,但是在读取时会增加 CPU 的计算负担。
  • MEMORY_AND_DISK_SER : 类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。
  • DISK_ONLY : 只在磁盘上缓存 RDD。
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
  • OFF_HEAP(实验中): 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory,这需要启动 off-heap 内存。

注意,在 Python 中,缓存的对象总是使用 Pickle 进行序列化,所以在 Python 中不关心你选择的是哪一种序列化级别。python 中的存储级别包括 MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY和 DISK_ONLY_2
在 shuffle 操作中(例如 reduceByKey),即便是用户没有调用 persist 方法,Spark 也会自动缓存部分中间数据。这么做的目的是,在 shuffle 的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist方法。

如何选择存储级别

Spark 的存储级别的选择,核心问题是在内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择 :

  • 如果使用默认的存储级别(MEMORY_ONLY),存储在内存中的 RDD 没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高 CPU 的效率,可以使在 RDD 上的操作以最快的速度运行。
  • 如果内存不能全部存储 RDD,那么使用 MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。
  • 除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。
  • 如果想快速还原故障,建议使用多副本存储级别(例如,使用 Spark 作为 web 应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。

删除RDD

Spark 自动监控各个节点上的缓存使用率,并以最近最少使用的方式(LRU)将旧数据块移除内存。如果想手动移除一个 RDD,而不是等待该 RDD 被 Spark自动移除,可以使用 RDD.unpersist()方法。

checkpoint检查点机制

一个Streaming应用程序要求7天24小时不间断运行,因此必须适应各种导致应用程序失败的场景。Spark Streaming的检查点具有容错机制,有足够的信息能够支持故障恢复。支持两种数据类型的检查点:元数据检查点和数据检查点。

(1)元数据检查点,在类似HDFS的容错存储上,保存Streaming计算信息。这种检查点用来恢复运行Streaming应用程序失败的Driver进程。

(2)数据检查点,在进行跨越多个批次合并数据的有状态操作时尤其重要。在这种转换操作情况下,依赖前一批次的RDD生成新的RDD,随着时间不断增加,RDD依赖链的长度也在增加,为了避免这种无限增加恢复时间的情况,通过周期检查将转换RDD的中间状态进行可靠存储,借以切断无限增加的依赖。使用有状态的转换,如果updateStateByKey或者reduceByKeyAndWindow在应用程序中使用,那么需要提供检查点路径,对RDD进行周期性检查。

元数据检查点主要用来恢复失败的Driver进程,而数据检查点主要用来恢复有状态的转换操作。无论是Driver失败,还是Worker失败,这种检查点机制都能快速恢复。许多Spark Streaming都是使用检查点方式。但是简单的Streaming应用程序,不包含状态转换操作不能运行检查点;从Driver程序故障中恢复可能会造成一些收到没有处理的数据丢失。

为了让一个Spark Streaming程序能够被恢复,需要启用检查点,必须设置一个容错的、可靠的文件系统(如HDFS、S3等)路径保存检查点信息,同时设置时间间隔。

streamingContext.checkpoint(checkpointDirectory)//checkpointDirectory
是一个文件系统路径(最好是一个可靠的比如hdfs://…) dstream.checkpoint(checkpointInterval)//设置时间间隔
当程序第一次启动时,创建一个新的StreamingContext,接着创建所有的数据流,然后再调用start()方法。

//定义一个创建并设置StreamingContext的函数
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...)               //创建StreamingContext实例
val DsSream = ssc.socketTextStream(...)      //创建DStream
...
ssc.checkpoint(checkpointDirectory)           //设置检查点机制
ssc
}
//从检查点数据重建或者新建一个StreamingContext
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreate-Context_)
//在context需要做额外的设置完成,不考虑是否被启动或重新启动
context. ...
//启动context
context.start()
context.awaitTermination()

通过使用getOrCreate创建StreamingContext。
当程序因为异常重启时,如果检查点路径存在,则context将从检查点数据中重建。如果检查点目录不存在(首次运行),将会调用functionToCreateContext函数新建context函数新建context,并设置DStream。

但是,Streaming需要保存中间数据到容错存储系统,这个策略会引入存储开销,进而可能会导致相应的批处理时间变长,因此,检查点的时间间隔需要精心设置。采取小批次时,每批次检查点可以显著减少操作的吞吐量;相反,检查点太少可能会导致每批次任务大小的增加。对于RDD检查点的有状态转换操作,其检查点间隔默认设置成DStream的滑动间隔的5~10倍。

故障恢复可以使用Spark的Standalone模式自动完成,该模式允许任何Spark应用程序的Driver在集群内启动,并在失败时重启。而对于YARN或Mesos这样的部署环境,则必须通过其他的机制重启Driver。

checkpoint和持久化机制的区别

1.持久化只是将数据保存在BlockManager中,而RDD的lineage是不变的。但是checkpoint执行完后,RDD已经没有之前所谓的依赖RDD了,而只有一个强行为其设置的checkpointRDD,RDD的lineage改变了。

2.持久化的数据丢失可能性更大,磁盘、内存都可能会存在数据丢失的情况。但是checkpoint的数据通常是存储在如HDFS等容错、高可用的文件系统,数据丢失可能性较小。

注:默认情况下,如果某个RDD没有持久化,但是设置了checkpoint,会存在问题,本来这个job都执行结束了,但是由于中间RDD没有持久化,checkpoint job想要将RDD的数据写入外部文件系统的话,需要全部重新计算一次,再将计算出来的RDD数据checkpoint到外部文件系统。所以,建议对checkpoint()的RDD使用persist(StorageLevel.DISK_ONLY),该RDD计算之后,就直接持久化到磁盘上。后面进行checkpoint操作时就可以直接从磁盘上读取RDD的数据,并checkpoint到外部文件系统。

RDD的弹性表现在哪几点

1)自动的进行内存和磁盘的存储切换;
2)基于Linage的高效容错;
3)task如果失败会自动进行特定次数的重试;
4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
5)checkpoint和persist,数据计算之后持久化缓存
6)数据调度弹性,DAG TASK调度和资源无关
7)数据分片的高度弹性,a.分片很多碎片可以合并成大的,b.par

RDD有哪些缺陷

1)不支持细粒度的写和更新操作(如网络爬虫),spark写数据是粗粒度的
所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是说可以一条条的读
2)不支持增量迭代计算,Flink支持

RDD创建有哪几种方式

1).使用程序中的集合创建rdd
2).使用本地文件系统创建rdd
3).使用hdfs创建rdd,
4).基于数据库db创建rdd
5).基于Nosql创建rdd,如hbase
6).基于s3创建rdd,
7).基于数据流,如socket创建rdd
如果只回答了前面三种,是不够的,只能说明你的水平还是入门级的,实践过程中有很多种创建方式。

RDD通过Linage(记录数据更新)的方式为何很高效

1)lazy记录了数据的来源,RDD是不可变的,且是lazy级别的,且rDD之间构成了链条,lazy是弹性的基石。由于RDD不可变,所以每次操作就产生新的rdd,不存在全局修改的问题,控制难度下降,所有有计算链条将复杂计算链条存储下来,计算的时候从后往前回溯900步是上一个stage的结束,要么就checkpoint
2)记录原数据,是每次修改都记录,代价很大如果修改一个集合,代价就很小,官方说rdd是粗粒度的操作,是为了效率,为了简化,每次都是操作数据集合,写或者修改操作,都是基于集合的rdd的写操作是粗粒度的,rdd的读操作既可以是粗粒度的也可以是细粒度,读可以读其中的一条条的记录。
3)简化复杂度,是高效率的一方面,写的粗粒度限制了使用场景,如网络爬虫,现实世界中,大多数写是粗粒度的场景

宽依赖和窄依赖?

Spark中RDD的高效与DAG(有向无环图)有很大的关系,在DAG调度中需要对计算的过程划分Stage,划分的依据就是RDD之间的依赖关系。RDD之间的依赖关系分为两种,宽依赖(wide dependency/shuffle dependency)和窄依赖(narrow dependency)
1.窄依赖
窄依赖就是指父RDD的每个分区只被一个子RDD分区使用,子RDD分区通常只对应常数个父RDD分区,如下图所示【其中每个小方块代表一个RDD Partition】
在这里插入图片描述
窄依赖有分为两种:

  • 一种是一对一的依赖,即OneToOneDependency
  • 还有一个是范围的依赖,即RangeDependency,它仅仅被org.apache.spark.rdd.UnionRDD使用。UnionRDD是把多个RDD合成一个RDD,这些RDD是被拼接而成,即每个parent RDD的Partition的相对顺序不会变,只不过每个parent RDD在UnionRDD中的Partition的起始位置不同

2.宽依赖
宽依赖就是指父RDD的每个分区都有可能被多个子RDD分区使用,子RDD分区通常对应父RDD所有分区,如下图所示【其中每个小方块代表一个RDD Partition】
在这里插入图片描述
3.窄依赖与窄依赖比较

  • 宽依赖往往对应着shuffle操作,需要在运行的过程中将同一个RDD分区传入到不同的RDD分区中,中间可能涉及到多个节点之间数据的传输,而窄依赖的每个父RDD分区通常只会传入到另一个子RDD分区,通常在一个节点内完成。
  • 当RDD分区丢失时,对于窄依赖来说,由于父RDD的一个分区只对应一个子RDD分区,这样只需要重新计算与子RDD分区对应的父RDD分区就行。这个计算对数据的利用是100%的
  • 当RDD分区丢失时,对于宽依赖来说,重算的父RDD分区只有一部分数据是对应丢失的子RDD分区的,另一部分就造成了多余的计算。宽依赖中的子RDD分区通常来自多个父RDD分区,极端情况下,所有父RDD都有可能重新计算。如下图,par4丢失,则需要重新计算par1,par2,par3,产生了冗余数据par5

分区丢失图
在这里插入图片描述
4.宽依赖,窄依赖函数

  • 窄依赖的函数有:
    map, filter, union, join(父RDD是hash-partitioned ), mapPartitions, mapValues
  • 宽依赖的函数有:
    groupByKey, join(父RDD不是hash-partitioned ), partitionBy

Spark学习(1)- Spark及其生态圈概述 https://www.cppentry.com/bencandy.php?fid=116&id=211649

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark学习(3)-Spark SQL(1) 下一篇spark 学习5