设为首页 加入收藏

TOP

spark 计算引擎
2019-03-21 01:31:01 】 浏览:64
Tags:spark 计算 引擎
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013928917/article/details/76155198

spark 计算引擎(一)

spark 的计算是一个层层迭代的过程,迭代即上一轮的输出是下一轮计算的输入,RDD是spark计算的核心,是spark对各种计算数据的统一抽象模型,关于RDD的特性,将在今后的博客中陆续更新,今天要给大家展示的是spark的计算引擎模型。

迭代计算

MappedRDD 的iterator 方法实际是父RDD的iterator方法,如果任务是初次执行,此时尚未缓存,会调用computeOfReadCheckpoint方法,迭代方法的容错处理过程,如果在计算的过程中某个分区任务执行失败了,但其他分区任务执行成功了,可以利用DAG重新调度。失败的任务将从检查点恢复状态,而那些成功执行的任务的结果已经存储到存储体系中了,所以调用CacheManager的getOrCompute方法即可获取,不需再次执行。iterator方法如下:

final def iterator(split:Partition,Context:TaskContext):Iterator[T]={
  if(storageLevel!=StorageLevel.NONE){
    SparkEnv.get.cacheManager.getOrcompute(this,split,context,storageLevel)
  }else{
    computeOrReadCheckPoint(split,contxt)
  }
}

Mappped rdd的compute方法首先调用firstparent 找到父RDD,经过层层调用最终达到hadoop-RDD,即最初的计算入口。(spark是惰性计算的,在spark中对RDD的操作主要是两种transformation和action,只有action才会触发真正的计算,然后层层迭代求出最终的结果,transformation是不会触发计算的)。

shuffle

shuflle是所有Mapreduce框架所必须经过的阶段,shuffle 是连接map任务和reduce任务的桥梁,map任务的输出结果按照key值哈希分配后给某reduce个任务spark早期版本的shuffle过程如下:

这里写图片描述

从上图可以知道:
* map任务会为每个reduce任务创建一个part文件(这些part文件会存放在bucket)。如果有M个map任务和N个reduce任务最终将生成M*N个part文件;
* map任务的结果将按照partition的不同写入不同的bucket中;
* reduce任务从本地或者远端的map任务所在的BlockManager中获取对应的bucket作为输入;

通过对shuffle的过程的了解发现,map任务中间结果先存入内存,然后才写入磁盘。这样容易导致内存紧张,进而发生溢出。其次,每个map任务都有M个bucket相对应,虽然bucket的的本身并不是很大,但是当bucket的数量太多时会频繁造成网络IO,成为性能的瓶颈。

所以在后期的spark中对以上问题进行了改进,改进主要包括以下几个方面:
1、 将map任务给每个partition的reduce任务任务输出的bucket合并到同一个文件中,这样可以解决bucket数量多而造成频繁网络IO的问题;
2、 map任务逐条输出计算结果,而不是一次性输出到内存,并使用AppendOnlyMAp缓存机器局和算法对中间结果进行聚合,这样可大大减少中间结果所占用的内存。
3、 reduce在拉取map任务的中间输出时也是逐条读取,而不是一次性读入内存,并且在内存中进行聚合和排序操作,这样可以减小数据所占的内存。
4、 reduce任务将要拉取的Block按照blockmanager地址划分,然后将同一BlockManager地址中的Block累积为少量的网络请求,减少网络IO;

Map端计算结果缓存处理及持久化

Map端对计算结果的缓存有三种方式:
* map端对计算结果在缓存中进行聚合和排序;
* map不使用缓存,也不执行聚合和排序,而是直接调用spillToPartitionFiles将各个文件patition直接写到自己的存储文件中,最后由reduce进行聚合排序;
* map端进行简单缓存

对计算结果的聚合和排序能够大大的节省IO开销,从而提升系统性能(一般通过patitionID和key进行排序和聚合)。

reduce端读取中间计算结果

当map任务相关的Stage都执行完的时候,会唤起下游Stage的提交及任务执行,ResultTask的计算是由RDD的iterator方法驱动的,所以最终计算过程最终会落到ShuffledRDD的compute方法,compute方法首相调用SortShuffleManager的getReader方法创建HassShuffleReader,通过其read方法读取依赖的中间结果。
spark 可以通过MapoutputTracker的getServerStatus来获取map任务执行状态信息,shuffledBlockFetcherIterator是读取中间结果的关键,获取过程其实是创建了一个netty服务,通过这个服务进行上传和查找中间结果所在的位置。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇第一个spark程序----WordCount 下一篇Spark学习-SparkSQL--02-Spark hi..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目