设为首页 加入收藏

TOP

spark_core
2018-12-14 01:28:15 】 浏览:109
Tags:spark_core

==>spark是什么?
a) 是一种通用的大数据计算框架

b) Spark Core 离线计算
Spark SQL 交互式查询
Spark Streaming 实时流式计算
Spark MLlib 机器学习
Spark GraphX 图计算

c) 特点:
i. 一站式:一个技术堆栈解决大数据领域的所有的计算问题
ii. 基于内存

d) Spark2009年诞生于伯克利大学的AMPLab实验室
2010年正式开源了Spark项目
2013年Spark成为Apache下的项目
2014年飞速发展,成为Apache的顶级项目
2015年在国内兴起,代替mr,hive,storm等
作者:辛湜(shi)时

e) Spark和Hive:
Spark优点:
i. 速度快
ii. Spark SQL支持大量不同的数据源:hive,rdd,json,parquet,jdbc

f) Spark 和Storm
i. 计算模型不一样
ii. Spark吞吐量大
iii.和spark 生态圈的其他产品方便整合

g) 特点:快,易用,通用,兼容性

h) spark运行模式
i. local(本地)
ii. standalone(集群)
iii.on yarn(由 yarn作为资源调度Spark负责任务调度和计算)
iv. on mesos(由mesos作为资源调度S)

i) 配置步骤
======================= on yarn ====================
【说明】

  1. spark任务运行在hadoop yarn上,由yarn来进行资源调度和管理,spark只负责任务的调度 和计算
  2. 不需要配置和启动spark集群
  3. 只需要在提交任务的节点上安装并配置spark on yarn 模式
  4. 必须找一台节点安装spark
  5. 步骤:
    i. 安装配置JDK
    ii. vi spark-env.sh
  6. export JAVA_HOME=/opt/modules/jdk1.7_6.0
  7. export HADOOP_CONF_DIR = /opt/modules/hadoop/etc/hadoop
    iii. 测试spark on yarn 模式是否安装成功
    iv. 网络测试:http://hadoop-yarn1.beicai.com:8088

===================== sdandalone模式 ==============
【说明】

  1. spark运行在spark 集群上,由spark的master进行资源调度管理,同时还负责任务的调度和计算。(dirver在做任务的调度)
  2. 需要配置和启动spark集群
  3. 步骤:
    i. 安装配置JDK
    ii. 上传并解压Spark
    iii. 建立软连接 ln -s spark spark 或者修改名称
    iv. 配置环境变量
    v. 安装配置Spark,修改spark配置文件(spark-env.sh, slaves)
  4. vi spark-env.sh
    a) export JAVA_HOME=/opt/modules/jdk(jdk位置)
    b) export SPARK_MASTER_IP=hadoop-yarn1.beicai.com
    c) export SPARK_MASTER_PORT=7077
  5. vi slaves(用于指定在哪些节点上启动worker)
    a) hadoop-yarn2.beicai.com
    hadoop-yarn3.beicai.com
    vi. 将spark发送给其他主机
    vii. 启动
    /opt/modules/spark/bin/start-all.sh
    vii. 查看SparkUI界面:http://hadoop-yarn1.beicai.com:8080

j)
==>Spark原理
1、Spark的运行原理
i、分布式
Ii、主要基于内存(少数情况基于磁盘,哪些情况是在磁盘)
Iii、迭代式计算
2、Spark运行原理图

3、Spark 计算模式 VS MapReduce 计算模式对比

Mr这种计算模型比较固定,只有两种阶段,map阶段和reduce阶段,两个阶段结束 后,任务就结束了,这意味着我们的操作很有限,只能在map阶段和reduce阶段, 也同时意味着可能需要多个mr任务才能处理完这个job

Spark 是迭代式计算,一个阶段结束后,后面可以有多个阶段,直至任务计算完 成,也就意味着我们可以做很多的操作,这就是Spark计算模型比mr 强大的地方

==>什么是Spark RDD?

1、什么是RDD?
弹性的,分布式的,数据集

(RDD在逻辑上可以看出来是代表一个HDFS上的文件,他分为多个分区,散落 在Spark的多个节点上)

2、Spark RDD图解

3、RDD----弹性
当RDD的某个分区的数据保存到某个节点上,当这个节点的内存有限,保存不了这个 分区的全部数据时,Spark就会有选择性的将部分数据保存到硬盘上,例如:当worker 的内存只能保存20w条数据时,但是RDD的这个分区有30w条数据,这时候Spark就 会将多余的10w条数据,保存到硬盘上去。Spark的这种有选择性的在内存和硬盘之间 的权衡机制就是RDD的弹性特点所在

4、Spark的容错性
RDD最重要的特性就是,提供了容错性,可以自动的从失败的节点上恢复过来,即如 果某个节点上的RDD partition(数据),因为节点的故障丢了,那么RDD会自动的通过 自己的数据来源重新计算该partition,这一切对使用者来说是透明的

==>Spark的开发

1、图说

2、Spark的开发类型
(1)、核心开发:离线批处理 / 演示性的交互式数据处理
(2)、SQL查询:底层都是RDD和计算操作
(3)、底层都是RDD和计算操作
(4)、机器学习
(5)、图计算

3、Spark 核心开发(Spark-core == Spark-RDD)步骤

(1)、创建初始的RDD

(2)、对初始的RDD进行转换操作形成新的RDD,然后对新的RDD再进行操作,直 至操作计算完成

(3)、将最后的RDD的数据保存到某种介质中(hive、hdfs,MySQL、hbase…)

==>Spark原理

Driver,Master,Worker,Executor,Task各个节点之间的联系

Spark中的各节点的作用:

0、 Spark的 执行流程
①程序会打包到集群
②在提交的节点会启动一个driver进程
③driver进程会向master进行注册,申请计算资源
④master在对应的worker上开启executor进程
⑤executor向driver进行反向注册
⑥driver把对应的计算任务发给worker上的executor
⑦executor接收到任务之后,分批次执行

1、driver的作用:
(1)、构建运行任务的初始化环境
(2) 向master进行任务的注册
(3)、接受该任务的executor的反向注册
(4)、向属于该任务的executor分配任务

2、什么是driver?
我们编写的程序打成jar包后,然后找一台能够连接spark集群的节点做任务的driver,具体的表现为SparkSubmit,具体是在哪里运行得看是client模式还是cluster模式

3、Master的作用:
(1)、监控集群;
(2)、动态感知worker的上下线;
(3)、接受driver端注册请求;
(4)、任务资源的调度

4、Worker的作用:
(1)、定时向master汇报状态;
(2)、接受master资源调度命令,进行资源的调度
(3)、启动任务的容器Executor

5、Executor的作用:
(1)、保存计算的RDD分区数据;
(2)、向Driver反向注册;
(3)、接受Driver端发送来的任务Task,作用在RDD上进行执行

Spark 编程的流程:

1、我们编写的程序打包成jar包,然后调用Spark-Submit 脚本做任务的提交

2、启动driver做任务的初始化

3、Driver会将任务极其参数(core,memory,driver相关的参数)进行封装成ApplicationDescript ,通过taskSchedulerImpl提交给Master

4、Master接受到driver端注册任务请求时,会将请求参数进行解析,并封装成APP,然后进行持久化,并且加入到其任务队列中的waitingAPPs

5、当轮到咱们提交的任务运行时,master会调用schedule()这个方法,做任务资源调度

6、Master将调度好的资源封装成launchExecutor,发送给指定的worker

7、Worker接收到发送来的launchExecutor时,会将其解析并封装成ExecutorRunner,然后调用start方法,启动Executor

8、Executor启动后,会向任务的Driver进行反向注册

9、当属于这个任务的所有executor启动成功并反向注册完之后,driver会结束SparkContext对象的初始化

10、当sc 初始化成功后,意味着运行任务的基本环境已经准备好了,driver会继续运行我们编写好的代码

11、开始注册初始的RDD,并且不断的进行转换操作,当触发了一个action算子时,意味着触发了一个job,此时driver就会将RDD之间的依赖关系划分成一个一个的stage,并将stage封装成taskset,然后将taskset中的每个task进行序列化,封装成launchtask,发送给指定的executor执行

12、Executor接受到driver发送过来的任务task,会对task进行反序列化,然后将对应的算子(flatmap,map,reduceByKey。。。。)作用在RDD分区上

==>RDD详解

1、什么是RDD?
RDD(Resilient Disttibuted Dataset)叫做弹性的分布式的数据集,是spark中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可并行计算的集合

2、RDD的特点:
自动容错
位置感知
伸缩性

3、RDD的属性:
(1)、一组分片(partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度,用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值,默认值就是程序所分配到的CPU Core的数目
(2)、一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现computer函数以达到这个目的。Computer函数会对迭代器进行复合,不需要保存每次计算的结果。
(3)、RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
(4)、一个partition,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于hashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了partition RDD Shuffle输出时的分片数量。
(5)、一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说。这个列表保存的就是每个Partition所在的快的位置。按照“移动数据不如移动计算”的理念。Spark在进行任务调度的时候,会尽可能的将计算任务分配到所要处理数据块的存储位置。

4、RDD的创建:
进行Spark核心编程时,首先要做的事就是创建一个初始的RDD。Spark Core提供了三种创建RDD的方式:
(1)、使用程序中的集合创建RDD (调用parallelize()方法)
(2)、使用本地文件创建RDD (调用textFile()方法)
(3)、使用HDFS文件创建RDD (调用textFile()方法)

==>算子

1、什么是算子?
是RDD中定义的作用在每一个RDD分片上的函数,可以对RDD中的数据进行操作

2、RDD算子的分类
(1)、Transformation算子,这类算子变换不触发提交作业(特点就是lazy特性)
返回的是一个RDD
(2)、Action算子,这类算子会触发SparkContext提交作业(触发一个spark job的运行,从而触发这个action之前所有的transformation的执行)
返回的是一个spark对象

3、常用的Transformation算子
操作 介绍
map 将RDD中的每个元素传入自定义函数,获取一个新的元素,然后用新的元素组成新的RDD
filter 对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除。
flatMap 与map类似,但是对每个元素都可以返回一个或多个新元素。
gropuByKey 根据key进行分组,每个key对应一个Iterable
reduceByKey 对每个key对应的value进行reduce操作。
sortByKey 对每个key对应的value进行排序操作。
join 对两个包含<key,value>对的RDD进行join操作,每个key join上的pair,都会传入自定义函数进行处理。

4、常用的Action算子
操作 介绍
reduce 将RDD中的所有元素进行聚合操作。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。
collect 将RDD中所有元素获取到本地客户端。
count 获取RDD元素总数。
take(n) 获取RDD中前n个元素。
saveAsTextFile 将RDD元素保存到文件中,对每个元素调用toString方法
countByKey 对每个key对应的值进行count计数。
foreach 遍历RDD中的每个元素。
top 获取最大的前几个值(默认情况下会先排序,后取值)
Max,min,sum

==>RDD分区排序

I、分区
两种实现方式:coalesce 和 repartition(底层调用coalesce)

coalesce(numPartitons,isShuffle)
第一个参数是重分区后的数量,第二个参数是是否进行shuffle
如果原来有N个分区,重分区后有M个分区
如果 M > N ,必须将第二参数设置为true(也就是进行shuffle),等价于 repartition(numPartitons) 如果是false将不起作用
如果M < N
100–>10 重分区后的分区数比原来的小的多,那么久需要使用shuffle,也即是设置为true
100–>90 重分区后的分区数和原来的差不多的,那么就不需要使用shuffle,也就是设置为false

II、排序
sortBy(x => x) 这个算子中带有隐式转换参数

x 能够排序(比较大小),那么这个类就必须有比较大小的功能,也就是实现了compareTo 或者compare

实现二次排序有两种方法:
1、继承Comparable 接口 或者 Ordered
2、隐式转换:可以定义隐式转换函数(Ordered)或者隐式转换值(Ordering)
排序分为两种:

==>自定义分区

PartitionBy(自己实现的分区函数)
自定义分区
要求:按照key将对应的value输出到指定的分区中
解释:自定义一个自定义分区类,继承partitioner,实现他的两个方法
1、numPartitions
2、getPartition
具体的功能根据项目的要求自定义实现,然后调用partitionBy方法,new出自定义的类,传入参数即可

代码的实现:

class MyPartitioner(val arr:Array[String]) extends Partitioner{

val map = new mutable.HashMapString,Int
for(i <- 0 until(arr.length)){
map.put(arr(i),i)
}

override def numPartitions: Int = arr.length

override def getPartition(key: Any): Int = {
val k = key.toString
map.getOrElse(k,0)
}
}

//方法的调用
commonRDD.partitionBy(new MyPartitioner(arr)).saveAsTextFile(“E:\tmp\a”+System.currentTimeMillis())

==>RDD持久化原理

1、持久化场景:对于一个rdd会被多次引用到,并且这个rdd计算过程复杂,计算时间特变耗时

2、如何进行持久化,调用rdd.persist方法或cache方法,cache方法底层就是调用persist方法

persist(StorageLevel.MEMORY_ONLY)*
如果对RDD做持久化,默认持久化级别是storageLevel.MEMORY_ONLY ,也就是持久化到内存中去,这种持久化级别是效率最快的,但是由于是纯Java 对象,占用的容量比较大,保存到内存中,那么内存可能保存的数量就会较少
persist(StorageLevel.MEMORY_ONLY_SER)*
如果当我们集群资源有限时,那么我们可以采用MEMORY_ONLY_SER,也就是将Java对象进行序列化之后持久到内存中去,这种持久化的好处是能够持久化更多的数据到内存中,但是由于在持久化时需要序列化,取出来之后又需要反序列化这一过程,这个过程会消耗CPU计算资源,性能相对于MEMORY_ONLY 这种持久化级别来说稍微弱点,但是还是比较高效的

3、如何选择RDD持久化策略?取舍和权衡的思想 —》调优
 Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍,下面是一些通用的持久化级别的选择建议:
1)、优先使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略,因为纯内存速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作
2)、如果MEMORY_ONLY策略,无法存储所有数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快的,只是要消耗CPU进行反序列化
3)、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了
4、能不使用DISK相关的策略,就不要使用,有的时候,从磁盘读取数据,还不如重新计算一次

==>共享变量

1、共享变量分为两种:广播变量 和 累加器
外部变量:

广播变量(broadcast)

2、日常所遇问题
在使用外部变量的时候,会为每个task都发送一份,在实际运行中,每个executor 有多个task,这些给很多task发送就会需要大量的网络数据传输,内存的消耗很大,这些task共享executor里面的cpu,内存。如果外部变量的容量比较大,如果只是给executor发送一份,这些task也能只用这份数据。

3、如何解决以上问题,也就是说什么时候使用广播变量
当RDD引用到了一个外部变量并且这个外部变量数据量不小,同时这个RDD对应的task数量特别多,那么此时使用广播共享变量再合适不过了
我们可以将这种大的外部变量做成广播变量,外部变量做成广播变量的时候,那么每个executor的内存中只会有一个外部变量,而这个副本针对所有的task都是共享的,这样的话就减少了网络流量消耗,降低了executor的内存消耗,提高了spark作业运行效率和缩短了运行时间,同时降低了作业失败的概率

4、广播变量的使用流程:
1)、某个executor的第一个task先执行,首先会从自己的blockManager中查找外部变量,如果没有就从邻居的executor的blockManager的内存中获取这个外部变量,如果还是获取不到,就从driver端获取,拷贝这个外部变量到本地的executor的blockManager
2)、当这个executor的其他task执行时,就不需要从外面获取这个外部变量的副本,直接从本地的blockManager中获取即可

5、如何获取广播变量的值?
可以直接调用广播变量的value() 这个方法即可

【注意】广播变量是只读的,不可写

累加器(Accumulator)

Spark提供的Accumulator ,主要用于多个节点对一个变量进行共享性的操作,Accumulator只是提供了累加的功能。但是却给我们提供了多个task对一个变量并行操作的功能,但是task只能对Accumulator进行累加操作
一个外部变量,在driver端声明之后,在executor里面被使用,注意,executor有很多个,都可以对这个外部变量做修改,但是修改的结果不会体现在driver那边。

【注意】task只能对Accumulator进行类加操作,只有Driver程序可以读取Accumulator的值

场景:分布的问题

==>RDD分区排序

1、wordCount相关RDD深度解析

2、RDD 的Lineage血统
RDD只支持粗粒度转换,即在大量记录上执行的单个操作,将创建RDD的一系列Lineage(血统)记录下来。以便恢复丢失的分区

3、RDD的依赖关系
RDD和它的父RDD的关系有两种不同的类型:
1)、窄依赖(一对一,多对一)
形象的比喻:独生子女
2)、宽依赖(多对多,一对多)
形象的比喻:超生

注释:划分stage的依据就是宽依赖,也就是RDD之间是否有shuffle,shuffle过程就是一个宽依赖过程,shuffle之前的tasks就属于一个stage,shuffle之后的也属于一个stage,shuffle之前和之后的操作都是窄依赖
【注意】shuffle过程分为:shuffle Write过程 和 shuffle read过程

4、DAG的生成(有向无环图)和任务的划分
DAG(Directed Acyclic Graph)叫做有向无环图(有方向无循环的图)

5、一个wordCount过程会产生多少个RDD?
一个算子可能会产生多个Rdd。
至少会产生五个RDD,
第一个,从HDFS中加载后得到一个RDD(即使用sc.textFile()算子),即HadoopRDD
在sc.textFile()过程中还会产生一个RDD(调用map算子),产生一个MapPartitionRDD
第二个,使用flatMap算子,得到一个MapPartitionRDD
第三个,使用map算子,得到一个MapPartitionRDD
第四个,使用reduceByKey算子,也就是在经过了shuffle过程后又会得到一个shuffledRDD (至少2个)
第五个,使用saveAsTextFile算子,再产生一个MapPartitionRDD

Rdd 类型:
HadoopRDD
ParallelCollectionRDD
MapPartitionRdd
ShuffledRDD
CoGroupedRDD(join操作)
CoalescedRDD

6*、Spark内核架构原理和任务流程图

Spark任务简介:
Spark-submit—>SparkSubmit–>main–>submit–>doRunMain–>RunMain–>通过反射创建我们编写的主类的实例对象,调用main方法–>开始执行我们编写的代码–>初始化SparkContext对象–>创建初始的RDD–>触发action算子–>提交job–>worker执行任务–>任务结束

Spark任务详解:
1)、将我们编写的程序打成jar包

2)、调用spark-submit脚本提交任务到集群上运行

3)、运行sparkSubmit的main方法,在这个方法中通过反射的方式创建我们编写的主类的实例对象,然后调用main方法,开始执行我们的代码(注意,我们的spark程序中的driver就运行在sparkSubmit进程中)

4)、当代码运行到创建SparkContext对象时,那就开始初始化SparkContext对象了

5)、在初始化SparkContext对象的时候,会创建两个特别重要的对象,分别是:DAGScheduler
和TaskScheduler

【DAGScheduler的作用】将RDD的依赖切分成一个一个的stage,然后将stage作为taskSet提交给DriverActor

6)、在构建taskScheduler的同时,会创建两个非常重要的对象,分别是DriverActor和ClientActor

【clientActor的作用】向master注册用户提交的任务
【DriverActor的作用】接受executor的反向注册,将任务提交给executor

7)、当clientActor启动后,会将用户提交的任务和相关的参数封装到ApplicationDescription对象中,然后提交给master进行任务的注册

8)、当master接受到clientActor提交的任务请求时,会将请求参数进行解析,并封装成Application,然后将其持久化,然后将其加入到任务队列waitingApps中

9)、当轮到我们提交的任务运行时,就开始调用schedule(),进行任务资源的调度

10)、master将调度好的资源封装到launchExecutor中发送给指定的worker

11)、worker接受到Maseter发送来的launchExecutor时,会将其解压并封装到ExecutorRunner中,然后调用这个对象的start(), 启动Executor

12)、Executor启动后会向DriverActor进行反向注册

13)、driverActor会发送注册成功的消息给Executor

14)、Executor接受到DriverActor注册成功的消息后会创建一个线程池,用于执行DriverActor发送过来的task任务

15)、当属于这个任务的所有的Executor启动并反向注册成功后,就意味着运行这个任务的环境已经准备好了,driver会结束SparkContext对象的初始化,也就意味着new SparkContext这句代码运行完成

16)、当初始化sc成功后,driver端就会继续运行我们编写的代码,然后开始创建初始的RDD,然后进行一系列转换操作,当遇到一个action算子时,也就意味着触发了一个job

17)、driver会将这个job提交给DAGScheduler

18)、DAGScheduler将接受到的job,从最后一个算子向前推导,将DAG依据宽依赖划分成一个一个的stage,然后将stage封装成taskSet,并将taskSet中的task提交给DriverActor

19)、DriverActor接受到DAGScheduler发送过来的task,会拿到一个序列化器,对task进行序列化,然后将序列化好的task封装到launchTask中,然后将launchTask发送给指定的Executor

20)、Executor接受到了DriverActor发送过来的launchTask时,会拿到一个反序列化器,对launchTask进行反序列化,封装到TaskRunner中,然后从Executor这个线程池中获取一个线程,将反序列化好的任务中的算子作用在RDD对应的分区上

【注意】
Spark的任务分为为两种:
a、shuffleMapTask:shuffle之前的任务
b、resultTask:shuffle之后的任务

Spark任务的本质:
将RDD的依赖关系切分成一个一个的stage,然后将stage作为TaskSet分批次的发送到Executor上执行

==>Checkpoint

1、使用checkpoint的场景:
某个RDD会被多次引用,计算特别复杂,计算特别耗时
担心中间某些关键的,在后面会反复几次使用的RDD,可能会因为节点的故障,导致持久化数据的丢失

2、如何对RDD进行checkpoint?
1)、设置还原点目录,设置checkpoint目录
2)、调用RDD的checkpoint的方法对该RDD进行checkpoint

sc.setCheckPointDir(“xxxxx”)
Rdd01.checkPoint()

3、checkpoint的原理
1)、RDD调用了checkpoint方法之后,就接受RDDCheckpointData对象的管理
2)、RDDCheckpointData对象会负责将调用了checkpoint的RDD 的状态设置为MarkedForCheckpoint
3)、当这个RDD所在的job运行结束后,会调用最后一个RDD的doCheckpoint,根据其血统向上查找,查找到被标注为MarkedForCheckpoint状态的RDD,将其状态改变为checkpointingInProgress
4)、启动一个单独的job,将血统中标记为checkpointingInProgress的RDD进行checkpoint,也就是将RDD的数据写入到checkpoint的目录中去
5)、当某个节点发生故障,导致包括持久化的数据全部丢失,此时会从还原点目录还原RDD的每个分区的数据,这样就不需要从头开始计算一次

4、checkpoint需要注意的地方
因为RDD在做checkpoint的时候,会单独启动一个job对需要进行checkpoint的RDD进行重新计算,这样就会增加spark作业运行时间,所以spark强烈建议在做checkpoint之前,应该对需要进行checkpoint的RDD进行持久化(即调用 .cache)

5、checkpoint 和持久化的区别
1)、是否改变血统:
持久化(.cache):不会改变RDD的依赖关系,也就是不会改变其血统
Checkpoint:会改变RDD的血统,做了checkpoint的RDD会清除其所有的依赖关系,并将其父RDD强制设置为checkpointRDD,并且将RDD的状态更改为checkpointed

2)、RDD的数据的可靠性:
持久化:只是将RDD的数据持久化到内存或磁盘中,但是如果节点发生故障,那么持久化的数据还是会丢失
Checkpoint:checkpoint的数据保存在第三方高可靠的分布式的文件系统中,机试节点发生故障,数据也不会丢失,所以checkpoint比持久化可靠性更高

6、后续
我们实现了checkpoint 之后,在某个task 又调用了该RDD的iterator() 方法时,就实现了高容错机制,即使RDD的持久化数据丢失,或者压根儿就没有持久化,但是还是可以通过readCheckpointOrComputer() 方法,优先从父RDD-----checkpointRDD中读取,HDFS(外部文件系统)的数据

==> spark shuffle
三种shuffle的情况
前两种都是基于hash (未优化的,优化的–》减少小文件产生的数量)
排序建索引,最终一个partition --》一个磁盘文件。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark IDE:   System memory.. 下一篇sparksql中dataframe的用法

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目