版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/answer100answer/article/details/78743053
1. Spark RDD
1.1 spark RDD简介
RDD是spark的基石 。
RDD提供了通用的抽象。
现在Spark有5个子框架SparkStreaming 、SparkSQL 、SparkML 、GraphX 、SparkR
RDD弹性:
1. 自动的进行内存和磁盘数据存储的切换
2. 基于lineage的高效容错
3. task失败会自动进行特定次数的重试
4. stage如果失败会自动进行特定次数的重试而且重试时只会试算失败的分片。
5. checkpoint和persist,是效率和容错的延伸。
6. 数据调度弹性:DAGTASK和资源管理无关
7. 数据分片的高度弹性
1.1 数据分片partition
计算过程中有很多数据碎片,那么Partition就会非常小,每个Partition都会由一个线程处理,就会降低处理效率。这时就要考虑把小文件合并成一个大文件。另外一个方面,如果内存不多,而每个Partition比较大(数据Block大),就要考虑变成更小的分片,Sparke有更多的处理批次但不会出现OOM。
所以说根据数据分片的大小来提高并行度或降低并行度也是Spark高度弹性的表现。同时需要指出的是,不管是提高并行度还是降低并行度,仍具有数据本地性。当然,提高并行度还是降低度行度都是人工通过代码来调整的。
coalesce 假设有一百万个数据分片,每个数据分片都非常小(1K或10KB),如果要把数据分片调整为一万个,如果使用repartition ,就需要Shuffle,这是不可取的,而要调用coalesce ,coalesce默认的shuffle 为 false 。
RDD.scala 源码:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true )
}
def coalesce(numPartitions: Int, shuffle: Boolean = false )(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
... ...
1.2 缓存cache
RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,如果有其他人执行同样的查询的话就可以存入工作集,这就极大地提高了效率。特别是在数据仓库中,如果有一千人做同样的查询,第一个人在查询之后,每二个人查询时就可以直接从缓存中取结果。退一步说,如果一千人执行的查询只有前 10 个步骤是一样的,如果第一个人计算完成后,后面的人的前 10 步就不需要再计算了。这就极大地提升了查询速度。 如果是Hadoop 的话一千个人执行同样的查询,就需要重复计算一千次。
缓存随时可以清理掉,如果内存或磁盘不足就需要根据优先度将不常使用的缓存内容清理掉。 RDD 的 cache 是直接放在内存中的。 RDD 的 cache 通过 checkpoint 来清除。但 checkpoint 是重量级的。SparkStreaming经常进行 Checkpoint ,原因是经常要用到以前的内容。假设要统计一段时间的内容,那就需要以前的数据。
如果 Spark 的一个 Stage 中有一千个步骤的话,默认只会产生一次结果。如果是 HadoopMR 就会产生 999 次中间结果,如果数据量很大的话,内存和磁盘都可能存不下。
Spark 本身就是 RDD 的内容, RDD 是只读分区的集合。 RDD 是数据集合,可以简单理解为 List 或 Array 。
Spark的每一步操作都是对 RDD 进行操作,而 RDD 是只读分区的集合。
※ 由于每一次操作都是只读的,而操作会改变数据,那么产生中间结果怎么办?
=>不能立即计算。这就是 lazy:不用时不算,用时才计算,所以不会产生中间结果
RDD的核心之一就是它的Lazy,因为这不计算,开始时只是对数据处理进行标记而已。例如WordCount中的map、flatmap其实并不计算数据,只是对数据操作的标记而已。
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
可以看出在flatMap 中创建了一个 MapPartitionsRDD ,但第一个参数是 this ,这个 this 是指它依赖的父 RDD 。每次创建新的 RDD 都会把父 RDD 作为第一个参数传入。而这只是对数据处理的标记而已。可以看出在flatMap中创建了一个 MapPartitionsRDD ,但第一个参数是 this ,这个 this 是指它依赖的父 RDD 。每次创建新的 RDD 都会把父 RDD 作为第一个参数传入。而这只是对数据处理的标记而已。
1.3 函数展开式回溯
每次创建新的 RDD 都会把父 RDD 作为第一个参数传入,这就类比函数展开式操作:
f(x)=x+2
x=y+1
y=z+3
执行时只是函数展开。
所以RDD每次构建对象都依赖于父RDD,最后就是函数展开。所以如果一个Stage中有一千个步骤的话,不会产生999次中间结果。可以看出在flatMap中创建了一个 MapPartitionsRDD ,但第一个参数是 this ,这个 this 是指它依赖的父 RDD 。每次创建新的 RDD 都会把父 RDD 作为第一个参数传入。而这只是对数据处理的标记而已
由于RDD是只读的,为了应对计算模型,RDD又是lazy级别的。每次操作都会产生RDD,每次构建新的RDD都是把父RDD作为第一个参数传入,这就构成了一个链条。在最后Action时才触发,这就构成了一个从后往前回溯的过程,其实就是函数展开的过程。
由于这种从后往前的回溯机制,Spark的容错的开销会非常低。
※ 以前有人说spark不适合大规模计算,当时确实有其道理,主要有两点原因:
1)Spark一直基于内存迭代会消耗大量内存。例如如一千个步骤虽然不产生中间结果,但如果要复用别人的结果时就需要手动persist或cache,这确实非常消耗内存。
2)主要是因为shuffle机制。但现在Shuffle支持很多种机制,如hashShuffle、sortBasedShuffle、钨丝计划等,而且现在Shuffle只是一个接口,一个插件,可以自定义,所以可以应对任意规模的数据处理。
Spark1.2以前确实有规模的限制,但1.3以后就无限制。因为Spark1.3引入了dataframe ,这是里程碑式的。
可以看出在flatMap中创建了一个 MapPartitionsRDD ,但第一个参数是 this ,这个 this 是指它依赖的父 RDD 。每次创建新的 RDD 都会把父 RDD 作为第一个参数传入。而这只是对数据处理的标记而已
1.4 RDD与迭代器
如果想让Spark直接操作MYSQL的数据或者操作HBASE数据就需要复写RDD。
RDD的数据分片上运行的计算逻辑都是一样的。对于每个计算逻辑都有计算函数compute
def compute(split: Partition, context: TaskContext): Iterator[T]