设为首页 加入收藏

TOP

spark核心题
2019-03-20 13:05:24 】 浏览:51
Tags:spark 核心

Spark 面试题

1.Spark内置模块包括哪些?请分别简述其功能。

  1. Spark Core:实现了Spark 的基本功能:

    1. 任务调度
    2. 内存调度
    3. 错误恢复
    4. 与存储系统交互
    5. RDD(Resilient Distributed DataSet)的API的定义
  2. Spark SQL:是Spark用来操作结构化数据的程序包. 可以通过SQL或Hive版本的(HQL)查询数据.

  3. Spark Streaming:是Spark提供的对实时数据进行流式计算的组件.提供了用于操作数据流的API,且与Spark Core中的RDD API高度对应

  4. Spark MLlib: 提供了常见的机器学习(ML)功能的程序库,括分类、回归、聚类、协同过滤等,还提供了模型评估、数据 导入等额外的支持功能。

  5. 集群管理器:Spark 设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计 算。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(Cluster Manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark自带的一个简易调度 器,叫作独立调度器。

    在这里插入图片描述

2.Spark的特点有哪些?简述。

  1. 快:基于内存,又联合了硬盘(内存不足时可以使用硬盘),比MR的运算要快100倍以上

  2. 易用, Spark支持java,Python,scala的API支持多种高级算法,可以非常方便的使用集群解决问题

  3. 通用: spark提供了统一的解决方案.可以使批处理, 交互式查询(spark sql),实时流处理,机器学习和图计算在

    统一个应用中无缝使用,减少成本

  4. 兼容性: spark可以很容易和其他开源产品融合,比如,spark可以使用hadoop的Yarn和Mesos作为他的资源管理器,并且可以处理所有hadoop支持的数据, HDFS, HBase

3.Spark中核心的进程有哪些?分别说明其主要功能。

  1. Master: 资源调度与管理
  2. Work:提供给executer运行环境
  3. Executer Block: 可以看做可执行任务的容器
  4. SparkSubmit: 提交任务的进程

4.Spark有几种运行模式,如何指定在不同的模式下运行?

spark有四种运行模式: 指定 --master 参数

  1. Local模式 默认就是本地模式,可以省略
  2. Standalone模式 --master spark://hadoop102:7077
  3. Yarn模式 --master yarn
  4. Mesos模式 --master mesos://hadoop102:7077

5.如何提交一个Spark任务?主要参数有哪些?

#本地模式
bin/spark-submit \

--class org.apache.spark.examples.SparkPi \  <-- 全类名

--executor-memory 1G \   <--内存

--total-executor-cores 2 \  <-- 内核 

./examples/jars/spark-examples_2.11-2.1.1.jar \ <---路径

100  <---命令行参数

# standalone模式

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop100:7077 \          <---运行模式
--executor-memory 1G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100


# yarn模式
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \                             <---运行模式
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100

6.画出在Standalone-Client模式下提交任务的流程图。

在这里插入图片描述

上图是把Driver放在了spark集群中, 若把Driver放在客户端中,客户端和集群会不断地产生大量数据交互,最终会导致客户端崩溃,该模式可以做测试使用

7.画出在Yarn-Cluster模式下提交任务的流程图。

在这里插入图片描述

8.简述你所理解的不同运行模式之间的区别。

  1. 本地模式的资源调度和任务调度都在本机上,

  2. 独立模式中有Master和Worker,Master负责资源调度,Worker负责任务调度

  3. yarn模式中 由RM负责资源调度, 由NM负责任务调度

9.编写WordCount(读取一个本地文件),并打包到集群运行,说明需要添加的主要参数。

//第一种
scala> val rdd = sc.textFile("file:/opt/module/spark/input")
rdd: org.apache.spark.rdd.RDD[String] = file:/opt/module/spark/input MapPartitionsRDD[5] at textFile at <console>:24
val words = rdd.flatMap(_.split(" "))

scala> words.map((_,1)).reduceByKey(_+_).collect
res7: Array[(String, Int)] = Array((thanks,1), (welcome,1), (world,2), (hello,2), (spark,2), (to,1), (friends,1))

//第二种 使用groupBy

scala> words.groupBy(x=>x).map(t=>{(t._1,t._2.size)}).collect
res10: Array[(String, Int)] = Array((thanks,1), (welcome,1), (world,2), (hello,2), (spark,2), (to,1), (friends,1))

//第三种 使用groupByKey  先转换为(key,value)
scala> words.map(x=>(x,x)).groupByKey().map(t=>(t._1,t._2.toList.length)).collect
res12: Array[(String, Int)] = Array((thanks,1), (welcome,1), (world,2), (hello,2), (spark,2), (to,1), (friends,1))


//第四种  使用aggregateByKey()()  同样需要先转换
scala> words.map((_,1)).aggregateByKey(0)(_+_,_+_).collect
res13: Array[(String, Int)] = Array((thanks,1), (welcome,1), (world,2), (hello,2), (spark,2), (to,1), (friends,1))

//第五种 foldByKey()()  aggregateByKey()()的简化版
val wo = words.map((_,1))

scala> wo.foldByKey(0)(_+_).collect
res18: Array[(String, Int)] = Array((thanks,1), (welcome,1), (world,2), (hello,2), (spark,2), (to,1), (friends,1))

//第六种 combineByKey () 注意:该算子需要注明类型

scala> wo.combineByKey(x=>x,(x:Int,y:Int)=>x+y,(x:Int,y:Int)=>x+y).collect
res2: Array[(String, Int)] = Array((thanks,1), (welcome,1), (world,2), (hello,2), (spark,2), (to,1), (friends,1))


10.RDD的属性。

*  - A list of partitions  							一组分区
*  - A function for computing each split  			一个计算分区的函数
*  - A list of dependencies on other RDDs  			RDD之间的依赖关系
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)  一个Partitioner 						即RDD的分片函数
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for   									优先存储顺序
*    an HDFS file)

11.RDD的特点。

RDD表示只读的数据集合,对RDD改动,只能通过RDD的转换操作,新的RDD包含了从其他RDD衍生所必须的所有信息,即RDD之间存在了依赖关系. RDD的执行是按照血缘关系延时计算的. 如果血缘关系较长,可以通过持久化RDD切断血缘关系.

11.如何创建一个RDD,有几种方式,举例说明

  1. 从集合中创建 val rdd=sc.makeRDD(1 to 10)
  2. 在外部文件中创建 val rdd = sc.textFile("hdfs://hadoop100:9000/RELEASE")
  3. 其他RDD转换 val rdd1 = rdd.map((_,1)).reduceByKey(_+_)

12.创建一个RDD,使其一个分区的数据转变为一个String。例如(Array(“a”,“b”,“c”,“d”),2)=>(“ab”,“cd”)

scala> val rdd =(Array("a","b","c","d"),2)
rdd: (Array[String], Int) = (Array(a, b, c, d),2)

scala> val aaa = rdd.glom.map(t=>{t.fold("")(_+_)})
aaa: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at map at <console>:26

scala> aaa.collect
res28: Array[String] = Array(ab, cd)

13.map与mapPartitions的区别。

  • map : 每次处理一条数据
  • mapPartiitons : 每次处理一个分区的数据, 这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM(内存溢出)

14.coalesce与repartition两个算子的作用以及区别与联系。

  • coalesce : 重新分区,通过设置参数觉得是否进行shuffle阶段

  • respartition: 重新分区,实际上是调用的coalesce,默认是进行shuffle

15.使用zip算子时需要注意的是什么(即哪些情况不能使用)。

两个RDD分区数量一致,并且元素数量也要一致.

16.reduceByKey跟groupByKey之间的区别。

  1. groupByKey: 按照key分组,直接进行shuffle
  2. reduceByKey: 按照key进行聚合, 在shuffle之前有个预聚合,返回结果是RDD[k,v].

17.reduceByKey跟aggregateByKey之间的区别与联系。

  1. reduceByKey:使用指定的reduce函数,将相同key的值聚合到一起,reduce的任务可以通过第二个可选的参数来设置
  2. aggregateByKey:在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
  3. 后者比前者粒度更细一些,因为后者的参数更多,更容易控制.

18.combineByKey的参数作用,说明其参数调用时机。

参数:(createCombiner: V => C, mergeva lue: (C, V) => C, mergeCombiners: (C, C) => C)

createCombiner : 如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值

mergeva lue:如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeva lue()方法将该键的累加器对应的当前值与这个新的值进行合并

mergeCombiner(): 如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。

19.使用RDD实现Join的多种方式。

  1. join()算子:
  2. cogroup()算子:
  3. 广播变量:

20.aggregateByKey与aggregate之间的区别与联系。

  1. aggregateByKey: 分区间不会与初始值进行计算
  2. aggregate: 不仅分区内与初始值做计算, 分区间与初始也要计算

21.创建一个RDD,自定义一种分区规则并实现?spark中是否可以按照Value分区。

  1. class MyPatition() extends Partiitoner{},重写方法,构建自己的分区规则.

    object Spark04_SelfPartitioner {
    
        def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Partition")
    
            // 构建Spark上下文对象
            val sc: SparkContext = new SparkContext(conf)
    
            val rdd = sc.makeRDD(Array(("a", 1),("b", 2), ("c", 3), ("d", 4)), 4)
    
            val pRdd: RDD[(String, Int)] = rdd.partitionBy(new MyPartitioner(4))
    
            val result: RDD[(Int, (String, Int))] = pRdd.mapPartitionsWithIndex( (ind, items) => { items.map((ind, _)) } )
    
            result.collect().foreach(println)
    
            // 释放资源
            sc.stop()
        }
    }
    class MyPartitioner (partitions: Int) extends Partitioner {
        override def numPartitions: Int = partitions
    
        // 获取分区号
        override def getPartition(key: Any): Int = {
            1
        }
    }
    
  2. 不能按照Value分区,因为分区号是使用keyhashcode进行计算出的

22.说说你对RDD血缘关系的理解。

  • 血缘关系体现在一种设计模式----装饰者设计模式,即一个新的RDD是在老的RDD基础上转换过来的,又或者说把老的RDD放在了新的RDD中

  • 所以, 能够体现出血缘关系,即便新的RDD丢失了数据,也能依靠血缘关系找到数据

    DAG( 有向无环图):RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据

23.Spark是如何进行任务切分的,请说明其中涉及到的相关概念。

任务划分

RDD任务切分中间分为:Application、Job、Stage和Task

  • Application:初始化一个SparkContext即生成一个Application

  • Job:一个Action算子就会生成一个Job

  • Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。

  • Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task。

    注意:

    Application->Job->Stage->Task每一层都是1对n的关系

    在这里插入图片描述

24.RDD的cache和checkPoint的区别和联系。

  1. cache: 存储在内存的一个断点,一旦重启,缓存就没了,无法恢复.
  2. checkPoint: 永久化的一个文件, 随时都可以恢复.(前提是文件不删除)

25.Spark读取HDFS文件默认的切片机制。

取文件数量总和,然后除以总的分区数,然后看每个切片的是不是大于Block块的1.1倍,偌大于,则在开启一个切片

26.说说你对广播变量的理解。

  1. 举例: 两个RDD做 join使用广播变量,相当于通过一个map(窄依赖),把一个RDD转换为一个共享变量, 减少shuffle阶段,提高效率.

  2. 注意: 广播变量是依赖于内存的, 所以必须控制数据的大小.

    在这里插入图片描述

27.自定义一个累加器,实现计数功能。

object SelfAdd {

    /**
      * 自定义累加器
      *
      */
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SelfAdd")

        val sc = new SparkContext(conf)

        // 使用累加器
        val rdd = sc.makeRDD(Array("abc", "bcd", "efg"))

        val accumulator = new MyBlcakListAccumulator()
        //注册
        sc.register(accumulator)

        val mpRdd: RDD[String] = rdd.map(item => {
            accumulator.add(item)
            item
        })
        mpRdd.collect()
        println(accumulator.value)
    }
}

//查询黑名单
class MyBlcakListAccumulator extends AccumulatorV2[String,java.util.ArrayList[String]]{
    var blackList = new java.util.ArrayList[String]()
    override def isZero: Boolean = blackList.isEmpty

    override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {
        val acc = new MyBlcakListAccumulator()
        acc
    }

    override def reset(): Unit = {
        blackList.clear()
    }

    override def add(v: String): Unit ={
        if(v.contains("b")){
            blackList.add(v)
        }
    }

    override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {
        blackList.addAll(other.value)
    }

    override def value: util.ArrayList[String] = {
        blackList
    }
}

//累加和
class MyAccumulator extends AccumulatorV2[Int,Int]{
    var sum =0
    override def isZero: Boolean = {
        sum==0
    }

    override def copy(): AccumulatorV2[Int, Int] = {
        val mine = new MyAccumulator();
        mine.synchronized{
            mine.sum = 0;
        }
        mine
    }

    override def reset(): Unit = sum=0

    override def add(v: Int): Unit = {
        sum+=v
    }

    override def merge(other: AccumulatorV2[Int,Int]): Unit = {
        sum= sum+other.value
    }

    override def value: Int= {
        sum
    }
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark(3) 下一篇Scala   WordCount

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目