设为首页 加入收藏

TOP

Spark复习
2019-03-14 01:19:47 】 浏览:34
Tags:Spark 复习

1. Spark概述

Apache Spark是是一个开源的快速通用的计算框架。Spark是由UC Berkeley AMP lab所开源的类MapReduce的通用内存并行计算框架。Spark的一个重要特点就是能够在内存中计算,因而更快,即使依赖磁盘进行计算,Spark依然比MapReduce执行效率更高。

2. Spark运行模式

2.1 Local模式

在本机上执行,主要用于测试代码,运行资源仅限于本台机器。

2.2 Standalone模式

利用Spark自带的资源管理器与调度器管理Spark集群,采用Master/Slaver结构。

2.3 Yarn模式

集群运行在Yarn资源管理器上,资源管理由yarn负责,spark只负责调度和计算。
Spark on Yarn模式中,分为yarn-client和yarn-cluster模式,两者的区别在于:yarn-client的driver端在本地。yarn-cluster的driver由yarn进行分配。

2.4 Mesos模式

集群运行在Mesos资源管理器上,资源管理由Mesos负责,spark只负责调度和计算。

3. Spark运行流程

3.1 Spark中的基本概念

  • ClusterManager:集群资源管理器,在Standalone模式中即为Master节点,在Yarn中就是ResourceManager.
  • Application:代表你的应用程序
  • Driver:主节点,运行main函数并且初始化SparkContext的节点。由SparkContext和ClusterManager通信,进行资源申请,同时对任务进行分配和监控。运行结束后关闭SparkContext。
  • Worker:计算节点,负责启动Driver或者Executor,在Yarn中就是NodeManager节点。
  • Executor:执行器,Application运行在Worker节点上的一个进程,该进程负责运行某些task,并且负责将数据存在内存或者磁盘上,和Yarn中的Container功能差不多。在Spark on Yarn模式中每个Excutor运行Task的并行数由分配的CPU个数控制。
  • SparkContext:整个Spark程序的入口,控制应用的生命周期,2.0版本后,叫做SparkSession。
  • RDD: Spark的最基本的数据抽象。
  • Task:在Executor中执行任务的工作单元,多个Task组成一个Stage。
  • Stage:阶段,每个Job会拆分成几个阶段执行,每个阶段代表很多Task,作为一个TaskSet,称作为Stage。
  • Job:作业,由Action行为触发,例如调用RDD的一个Action算子。
  • DAGScheduler:根据Job构建基于Stage的DAG,并提交Stage到TaskScheduler,划分Stage的依据是RDD之间的依赖关系。流程为:1.得到Job,2计算DAG,3划分Stage,4提交给TaskScheduler,5对Stage级别的错误进行重跑
  • TaskScheduler:将Task提交给集群运行,每个Executor执行什么task由其控制。Task级别的错误由其控制

3.2 Spark基本运行流程

  1. Client提交任务。
  2. ClusterManager找到一个worker启动driver。
  3. driver向ClusterManager申请Executor资源。
  4. sparkcontext构建DAG图,将DAG分解为Stage,由DAGScheduler将TaskSet发送给TaskScheduler。
  5. TaskScheduer将Task分配给Executor.
  6. Task在Executor上运行完毕,给driver返回状态,释放资源。

3.3 Spark运行架构特点

  1. 每个Application有自己的executor,每个driver调度自己的任务,每个Task运行在不同的JVM,这种隔离机制使程序之间互不干扰,但这也意味着不能跨应用共享数据。
  2. Driver应该靠近worker节点,因为SparkContext与Executor之间有大量的信息交换。
  3. Task采用了数据本地性和推测执行的优化机制。

4. Spark组件

4.1 SparkCore

4.1.1 RDD

1. 什么是RDD
RDD(Resilient Distributed DataSet)弹性分布式数据集,是Spark最基本的数据抽象。RDD是一个分布在集群的节点上的不可变、可分区、可以并行计算的容错数据集合。注意:RDD存储的不是真实的数据,只是存储数据的获取方法、分区方法以及数据类型等信息。

2. 为什么叫弹性

  • 自动的进行内存和磁盘的存储切换。
  • 基于血统的高效容错,当节点出错时,会尝试获取上游依赖数据重新计算出错数据。
  • 数据分片高度弹性,可以人工自由的设置分片(repartition)。

3. RDD的属性

  • Partition,一组分区,即数据集的基本组成单位。对于RDD来说,每个分区都由一个计算任务处理,并决定并行计算的粒度。可以在创建RDD的时候指定分区个数,如果没有指定,采用默认值,默认值为程序分配到的CPU个数。
  • Iterator和Compute,一个计算每个分区的函数,表示RDD是如何通过父RDD计算得到的。RDD的计算是以分区为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算结果。
  • Dependencies与Lineage,RDD之间的依赖和血缘关系。RDD每次转换都会生成一个新的RDD,所以RDD之间就会形成一个类似流水线一样的前后依赖关系。当部分分区数据丢失时,Spark可以依据这个依赖关系,重新计算丢失的数据,而不必对RDD所有的分区进行计算。依赖关系中又分为窄依赖和宽依赖,如果父类RDD的每个分区最多只能被子RDD的一个分区使用,称为窄依赖(NarrowDependency);如果父类RDD的每个分区可以被子RDD的多个分区使用,称为宽依赖(ShuffleDpendency),顾名思义,这其中还需要shuffle操作。
  • Partitioner,RDD的分片函数,决定了RDD的分区方式。当前Spark实现了两种类型的分片函数,一个基于哈希的HashPartitioner和一个基于范围的RangePartitioner。Hash是以key作为分区条件散列分布,分区数据不连续,极端情况下散列到少数几个分区上,导致数据不均等,range按Key的排序进行均衡分布,分区内数据连续,大小也相对均等。只有key-value的RDD,才有Partitioner。Partitioner决定了RDD的分区数量,也决定了Shuffle输出时分区的数量。
  • PreferredLocation,存储每个Partition优先位置的列表。对于HDFS文件来说,这个列表就是每个partition存储的block的位置。按照“移动数据不如移动计算”的理念,spark在计算时,会尽量将计算任务分配在其处理的数据块的存储位置。
  • CheckPoint,一种缓存机制。尽管当RDD出现问题时,可以由它的依赖关系进行恢复,但是对于较长依赖关系的RDD来说,恢复起来耗时太久。ChekPoint是Spark提供的一种缓存机制,当计算的RDD过多时,为避免重复计算以前的RDD,可以对RDD做CheckPoint处理,会将当前RDD保存到一个目录中,由于做checkPoint操作时会将所有依赖的父级RDD清除掉,所以建议先做persist(持久化)操作。CheckPoint不会马上执行,触发action时才会执行。与另一种缓存机制cache相比,cache缓存由executor管理,当executor消失了,需要重新计算,而CheckPoint将数据保存到HDFS,job可以从检查点继续计算。
4.1.2 Transformation/Action

RDD常用的算子有两种类型:
(1) Transformation(转换算子):根据原有RDD创建一个新的RDD,不触发提交作业,lazy模式,遇到action才开始计算。
(2) Action(行动算子):对RDD操作后将结果返回给driver,会直接触发计算,其计算结果不再是RDD。
常用的算子:

Transformation 含义
map(func) 对原来的RDD每个元素进行func运算,然后返回一个新的RDD。
filter(func) 过滤器,对原来的RDD每个元素进行func运算,返回运算结果是true的元素。
flatmap(func) 类似于map,每一个输入元素可以被映射为0个或多个元素,并将生成的RDD每个集合中的元素合并为一个集合
mapPartitions(func) 与map类似,区别在于map是对RDD每一个元素运算,mapPartitions是对RDD的每个分区进行计算,由于每次处理的是一个分区,减少了拉取数据次数,所以mapPartitions通常是一个高性能算子。
mapParitionsWithIndex(func) 为func提供了表示分区index的整数值((Int,Iterator)=>Iterator),与mapParitions的区别在于,这个算子返回的RDD带有分区index
sample(withReplacement,fraction,seed) 从RDD中抽取样本,第一个参数控制有无放回抽样,即是否将之前抽取的结果放回RDD继续抽取,第二个控制抽取比例,第三个是随机种子,相同的随机种子,抽取的结果必然是相同的,这其中涉及到算法中的伪随机数。
union(otherDataSet) 对源RDD和参数RDD求并集后,返回一个新的RDD
intersection(otherDataSet) 对源RDD和参数RDD求交集后,返回一个新的RDD
distinct([numPartitions]) 对源RDD去重后返回一个新的RDD
groupByKey([numPartitions]) 根据Key值进行聚合,在一个(K,V)的RDD上调用,返回一个(K,iterator(V))的RDD。默认情况下,输出的并行度取决于父RDD的分区数,可以传递一个numPartitions来设置不同的分区个数。
reduceByKey(func,[numParititions]]) 当对一个(K,V)的RDD上使用时,将K值相同的分为同组,将同组内的V值根据func函数进行聚合,最终返回一个(K,V)的RDD
aggregateByKey(zeroValue)(seqOp,CombOp,[numParitions]) 先按分区聚合,每个分区的元素和初始值交流后,最终分区之间再总的聚合,例如:RDD([(you,1),(jump,1)],[(i,1),(jump,1)]).aggregateByKey(1)(_+_,_+_),先分区内按Key聚合,结果为RDD([(you,1),(jump,1)],[(i,1),(jump,1)]),接着执行第一个 _+_(初始值此时会加在每个key值中,即RDD([(you,(1,1)),(jump,(1,1))],[(i,(1,1)),(jump,(1,1))])),结果为RDD([(you,2),(jump,2)],[(i,2),(jump,2)]),最终分区之间再聚合,聚合规则为第二个_+_,最终结果为RDD((you,2),(i,2),(jump,4)),详细解析可以参考 注1 的网址。
sortByKey([ascending],[numPartitions]) 在一个K,V的RDD上调用,对Key进行排序,可以依据ascending参数决定是升序还是降序。内部实现使用了RangePartitioner,可以使相应范围内key数据分到同一个partition里,每个partition用到了标准的sort机制,避免了大量数据shuffle
sortBy(func,[ascending],[numPartitions]) 与sortByKey相似,但是不同处在与更灵活,前者只能依靠Key进行排序,sortBy根据func来决定排序规则。
join(otherDataSet,[numPartitions]) 两个(K,V)的RDD求交集。当对(K,V)和(K,W)数据集调用时,返回一个(K,(V,W))数据集,相当于内连接,同时还有leftOuterJoin、rightOuterJoin和fullOuterJoin。
cogroup(otherDataSet,[numPartitions]) 在类型(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))的RDD。
cartesian(otherDataSet) 返回一个两个RDD笛卡尔积,即所有元素的关联组合
pipe(command,[envVars]) 调用外部程序,RDD将每个分区执行command(shell命令或者perl、bash脚本),RDD的每个元素作为每个command的输入,command的输出以字符串返回给RDD
coalesce(numPartitions,[shuffle]) 重分区,默认是分区由多变少,不涉及shuffle操作,如果要分区由少变多,第二个参数需要设置为true,内部将使用HashPartitioner将数据散列分布在新的分区上。
repartition(numPartitions) 内部实现为coalesce(numPartitions, shuffle = true),不管增大还是减少分区,其必然涉及到shuffle操作,减少分区时建议使用coalesce,避免shuffle操作。
repartitionAndSortWithinPartitions(partitioner) 根据指定的partitioner对RDD进行重新分区,并且在分区的时候排序。适用于(K,V)的RDD,性能比先repartition再排序效率高,因为它可以将排序在shuffle阶段完成。
cache/persist RDD缓存机制,可以将RDD缓存到指定位置,避免重复计算从而减少时间,cache内部调用了persist方法,cache默认就一个缓存级别(内存),persist提供了多种缓存级别
Action 含义
reduce(func) 通过func方法,将RDD中所有元素聚合起来。当运行时,func函数接受2个参数,返回一个值,然后这个值会和RDD中的另一个元素再传入func函数中,直到最后剩一个元素。
collect() 在Driver端,将RDD的所有元素以数组的形式返回。通常在filter以后或者足够小的数据子集后再操作,否则容易导致Driver端OOM
count() 返回RDD中的元素个数
first() 返回RDD的第一个元素,内部实现为take(1)
take(n) 返回RDD的前n个元素组成的数组
takeSample(withReplacement,num,[seed]) 返回一个数组,该数组由从RDD中随机采样的num个元素组成,withReplacement控制有无放回抽样,seed参数为随机种子,详细解释见Transformation的sample算子。
takeOrdered(n,[ordering]) 获取RDD按自然排序的前n个元素,或者可以自定义ordering排序
saveAsTextFile(path) 将RDD中的所有元素以textfile的形式,保存在本地环境、HDFS或其他Hadoop支持的文件系统上,对于RDD中的每个元素,Spark会调用toString方法,将其转换为文件中的一行文本。
saveAsSequenceFile(path) 将RDD中的所有元素以sequencefile的形式,保存在本地环境、HDFS或其他Hadoop支持的文件系统上
saveAsObjectFile(path) 内部实现为将RDD序列化后使用saveAsSequenceFile保存。
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,统计每个key对应的元素个数
foreach(func) 对RDD的每个元素运行函数func,没有返回值。
4.1.3 广播变量和计数器

1. 广播变量

  • 如果我们在分布式计算里分发大对象,例如字段、集合、黑白名单等,这个都会由driver端分发。如果这个变量不是广播变量,那么每个task都要从driver端获取一次,这在task量非常多的情况下,driver端的带宽,将是瓶颈。如果这个变量为广播变量,那么每个executor上将会存有一份,这个executor上启动的task会共享这个变量,减少通讯成本。
  • 不能将RDD广播出去,可以将RDD的结果广播出去,原因是RDD存储的并不是真实的数据,只是存储数据的获取方法、分区方法以及数据类型等信息。
  • 广播变量只能在driver端定义。
  • 广播变量只能在driver端修改,executor只能读。
  • 不使用广播变量,executor端有多少task就有多少个变量副本,使用后,executor端只会有一个变量副本。

2. 累加器

  • 一种分布式的变量机制,原理类似mapreduce,即分布式的改变,再聚合这些改变。通常用于在调试时对作业执行过程中的事件进行计数。
  • 累加器只能在driver端构建,并只能从driver端读取最终结果,在task端只能累加。
  • Spark原生只支持数字类型的累加器,2.0版本后可以继承AccumulatorV2来实现自定义类型的累加器。
  • 累加器不会改变spark的lazy模式,所以只有在触发action操作后,累加器才会工作。

4.2 SparkSQL,DataSet/DataFrame

4.2.1 SparkSQL

1. SparkSQL发展:
 1) 1.0以前: Shark
 2) 1.1.x:SparkSQL(测试性的)
 3) 1.3.x:SparkSQL(正式版)+ DataFrame
 4) 1.5.x:SparkSQL钨丝计划
 5) 1.6.x:SparkSQL + DataFrame + DataSet(测试版本)
 6) 2.x:SparkSQL + DataFrame(整合到DataSet中) + DataSet(正式版本)
 
2.什么是SparkSQL:

  • SparkSQL是Spark的一个模块,主要对结构化数据进行处理,SparkSQL提供了SQL查询功能,并将结果集以DataSet/DataFrame形式返回。
  • Spark支持HiveQL语法以及Hive SerDes和UDF,允许访问现有的Hive仓库。只需在classPath中增加hive-site.xml配置文件,即可轻松连接hive,需要注意的是,每台executor都需要有这个配置文件。
  • SparkSQL的元数据默认是存储在derby,通常SparkSQL建立在Hive之上实现的,所以Hive的元数据在哪,SparkSQL的元数据就在哪。
  • 运行原理就是将SQL解析为DAG,最终变成RDD的计算。
4.2.2 DataSet/DataFrame
  • RDD、DataSet和DataFrame都是Spark平台下的分布式弹性数据集。
  • 三者都有Lazy机制,在进行转换时不会立即执行,只有遇到action的时候,才会触发操作。
  • 都有partition的概念。
  • DataSet是1.6版本增加的一个接口,在2.0版本,DataSet和DataFrame统一了接口。
  • DataFrame只是DataSet[Row]的一个别名。DataSet和DataFrame底层还是调用RDD,只不过它增加了一些执行计划、优化器,它会对RDD优化后再执行,所以DataSet号称比RDD快100倍。
  • DataSet是强类型数据集,带有schema元信息,里面每一个字段都有一个类型和名字,类似于关系数据库中的表,而RDD里面默认装的都是String类型,切分完数据后还要根据不同类型去转换,DataSet则不用考虑这些问题。
  • DataFrame和DataSet均支持SparkSQL操作,例如groupby、select等,还能注册临时表和视图,进行sql操作。

4.3 SparkStreaming

4.3.1 什么是SparkStreaming

SparkStreaming是Spark用于处理流式数据的模块,可以实现高吞吐量的、具备容错机制的实时流数据的处理(伪实时,其核心还是按时间片处理,另一种实时处理Storm是以消息驱动)。支持从多种数据源获取数据,例如:Kafka、Flume、Kinesis、TCP sockets等。从数据源获取数据后,可以使用map、reduce、window等高级函数对数据进行处理。最后还可以将数据写入到文件系统、数据库或其他地方。

4.3.2 SparkStreaming机制

处理机制: 接收实时流的数据,并根据一定的时间分隔(batch duration)成一批批的数据(Discretized Stream),对应的批数据,在Spark内核中对应一个个RDD,因此可以将对应的流数据看成是一组RDDs。在流数据分为一批批后,通过一个先进先出的队列,Spark Engine从队列里依次取出一个个数据,转换成RDD,然后进行处理。
运行流程:
 1. 在client上提交任务,资源管理器生成一个application,开启一个driver,然后初始化SparkStreaming的入口StreamingContext;
 2. 资源管理器为application分配资源,在集群的worker上启动executor。
 3. driver端向executor发送多个receiver。receiver是一个接收器,用来接收消息,在executor上运行时,相当于一个task。
 4. receiver接收到数据后,按照设定的时间间隔,每隔一段时间生成一个block块,也就是一个RDD的一个分区,block块会存储在executor里面,存储级别为Memory_And_Disk_2(优先内存,内存不够写磁盘,另存副本)。
 5. receiver产生了这些块信息后,会把block块的信息发送给StreamingContext。
 6. SteamingContext接收到这些块信息后,会根据一定规则将这些块定义成一个RDD。

4.3.3 SparkStreaming的一些术语定义
  • 离散流(Discretized steam)或DStream: 这是Spark Steaming对内部持续的实时数据流的一个抽象描述,我们处理的实时流在SparkStreaming中就是一个DStream。
  • 批数据(batch data): 将实时数据以时间分片为单位进行分批。
  • 时间片或者批处理时间间隔(batch interval): 批次之间的时间间隔,以时间片为我们拆分数据的依据。
  • 窗口长度(window length): 一个窗口函数覆盖的流数据的时间长度,必须是时间片的倍数。
  • 滑动时间间隔: 前一个窗口到下一个窗口所经过的时间长度,必须是时间片的倍数。
  • input DStream: 特殊的DStream,将SparkStreaming连接到外部的数据源来读取数据。
4.3.4 如何使用SparkStreaming
  1. 创建StreamingContext对象: 同Spark初始化需要创建SparkContext一样,SparkStreaming也需要创建StreamingContext对象。创建StreamingContext需要SparkConf和interval参数,SparkConf指定了一些Spark的配置,interval指定Streaming的时间间隔,此参数需要根据用户的需求和集群的处理能力来设置,最好能在间隔时间内将批次处理完毕,以免数据积压。
  2. 创建InputDStream: 指明Spark Streaming的数据源,SparkStreaming支持多种数据源,例如Kafka、Flume、TCP sockets等
  3. 操作DStream: 对于从数据源得到的DStream,可以对其进行各种RDD操作。例如分隔、统计、输出等等。
  4. 启动SparkStreaming: 之前所有步骤只是创建了执行流程,程序没有真正的连上数据源,也没有对数据做任何操作,当ssc.start()启动后,程序才会真正的进行所有预期的操作。一但启动了ssc,就不能增加新的流计算。一个JVM中,只允许一个ssc处于活动状态。
  5. 等待处理停止: 使用ssc.awaitTermination()方法等待处理结束。
  6. 停止: 可以使用ssc.stop()停止任务,一单停止ssc,就无法再重启它。需要注意的是,默认情况下停止StreamingContext时也会将SparkContext关闭,如果仅想停止ssc,可以将stop中的stopSparkContext参数设置为false。
4.3.5 调优思路
  1. 优化运行时间

    • 增加并行度: 确保使用整个集群的资源,而不是把任务集中在几个任务点上。对于shuffle操作,增加并行度可以确保充分利用集群资源。
    • 减少数据序列化,反序列化的负担: SparkStreaming默认将接到的数据序列化后存储在内存中,以减少内存的使用。因此更高效的序列化方法可以更高效的使用CPU。
    • 设置合理的批次时间: SparkStreaming中的Job处于一个先进先出的队列,后面的Job必须确保前面的运行完成后才能运行。如果前一个执行时间超过批次时间间隔,后面的Job会因此延误,造成后续的Job阻塞。因此需要设置一个合理的批处理间隔以确保作业能够在这个批次时间内处理完成。
    • 减少因任务提交和分发带来的负担: 通常情况下,任务提交和分发的耗时是基本没有感知的,但是当批处理间隔非常小时(500ms),提交和分发的延迟就变得无法接受了,使用一个合理的资源管理器可以解决一部分问题。
  2. 优化内存时间

    • 控制batch size: SparkStreaming会把批次内接收到的所有数据存储在可用内存内,超出部分将会写在磁盘,会大大降低运行速度,因此必须确保当前executor中Spark的可用内存中能容纳下这个批次处理时间间隔内的所有数据。
    • 及时清理不使用的数据: 由于会将接收的全部数据存储在可用内存内,因此需要及时对不再使用的数据进行清理,释放资源,确保拥有富余的内存资源。可以设置spark.cleaner.ttl参数来清理超时无用数据,但是需谨慎设置,以免将可能需要用的数据被超时处理掉。
    • 观察及适当调整GC策略: GC会对Job造成一定的影响,导致Job时间延长等一系列不可预期的问题,因此需要根据GC的运行情况,采用不同的GC策略以减少内存回收对Job的影响。

待完善

shuffle相关
Mlib相关

参考资料

http://spark.apache.org/docs/2.3.0
https://blog.csdn.net/jiaotongqu6470/article/details/78457966(注1)
https://www.cnblogs.com/miqi1992/p/5621268.html
https://www.cnblogs.com/haozhengfei/p/039dfec24294b39a2035b915dc96ef4c.html
https://www.cnblogs.com/qingyunzong/p/8945933.html
https://www.cnblogs.com/qingyunzong/p/9026429.html
https://www.cnblogs.com/shishanyuan/p/4747735.html
https://www.cnblogs.com/liuliliuli2017/p/6809094.html


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark ShuffleManager内存缓冲器B.. 下一篇秦凯新技术社区-大数据实战系列滚..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }