设为首页 加入收藏

TOP

Spark2.x 学习笔记(一) - Spark Core
2018-12-14 01:16:14 】 浏览:65
Tags:Spark2.x 学习 笔记 Spark Core
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/panleiaiying/article/details/84762580

1、Spark 介绍

Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是Scala编写,方便快速编程; Spark 技术栈中包括 SparkCore,SparkStreaming,SparkSQL,SparkMllib等。
Spark与MapReduce的区别
1. Spark 基于内存迭代处理数据,MR基于磁盘迭代处理数据
2. Spark 粗粒度资源申请,MR是细粒度资源申请
3. MR中只有mapper,reducer,相当于spark中的map和reduceByKey两个算子,在MR业务逻辑要自己实现,Spark中有各种算子对应各种业务
Spark 运行模式分为 :

  • 本地模式 用于本地开发,多用于测试
  • standalone Spark自带的资源调度框架,支持分布式搭建,Spark 可以基于Standalone运行任务
  • yarn Hadoop生态圈中的资源调度框架,Spark也支持在Yarn中运行

2、spark 本地方式运行

导入pom

<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.11</artifactId>
	<version>2.3.2</version>
</dependency>
public class WordCount2 {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("wc_2");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        JavaRDD<String> text = sc.textFile("test.txt");
        // 将文件中字符串拆分
        JavaRDD<String> word = text.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                String[] str = s.split(" ");
                return Arrays.asList(str).iterator();
            }
        });
        // 转换为map 格式    例如"aa" -> {"aa",1}
     JavaPairRDD<String, Integer> paris = word.mapToPair(new PairFunction<String, String, Integer>() {
         @Override
         public Tuple2<String, Integer> call(String s) throws Exception {
             Tuple2<String,Integer> tuple2 = new Tuple2<>(s,1);
             return tuple2;
         }
     });
     // 根据key 相同的叠加,  先分组再计算
     JavaPairRDD<String,Integer> count =  paris.reduceByKey(new Function2<Integer, Integer, Integer>() {
         @Override
         public Integer call(Integer integer, Integer integer2) throws Exception {
             return integer+integer2;
         }
     });
     // map 中 key 和 value 调换, 调换后key为Integer格式,以便后面根据 key 排序,
     JavaPairRDD<Integer,String> tem =  count.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {
         @Override
         public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple2) throws Exception {
             return new Tuple2<>(tuple2._2,tuple2._1);
         }
     });
        // 排序
        JavaPairRDD<Integer,String> sorted =  tem.sortByKey(false);

        //遍历打印
        sorted.foreach(new VoidFunction<Tuple2<Integer, String>>() {
         @Override
         public void call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
             System.out.println(integerStringTuple2._1+":"+integerStringTuple2._2);
         }
     });
    }
}

spark 是scala 实现的, 用java 感觉麻烦的可以换用scala 去实现; 以下是scala 实现上述功能

object SparkTest {
  def main(args: Array[String]): Unit = {
    var conf = new SparkConf().setMaster("local").setAppName("wc")
    var sc = new SparkContext(conf)
    var text = sc.textFile("test.txt")
    var word = text.flatMap((_.split(" ")))
    var pair = word.map((_,1))
    var result = pair.reduceByKey(_+_)
    var sort:RDD[(Int, String)] = result.map(tuple=>(tuple._2,tuple._1));
    var res =sort.sortByKey()
    res.foreach(println)
  }
}

在上述看到了一堆RDD格式,下面介绍什么是RDD
RDD(Resilient Distributed Dateset),弹性分布式数据集。
RDD 的五大特征:

  1. RDD由一系列partition组成
  2. 算子(函数)是作用在partition上的
  3. RDD 之间有依赖关系
  4. 分区器是作用在K,V格式的RDD上
  5. partition对外提供最佳的计算位置,利于数据处理的本地化

什么是K,V格式的RDD
RDD里面存储的数据都是二元组对象,那么这个RDD我们就叫做K,V格式的RDD,类似Map<Object,Object>。
哪里体现RDD的弹性(容错)?
partition数量,大小没有限制,体现了RDD的弹性。
RDD之间依赖关系,可以基于上一个RDD重新计算出RDD。
哪里体现RDD的分布式?
RDD是由Partition组成,partition是分布在不同节点上的

standalone 环境搭建

node1 为master; node2 node3 node4 为worker

1、下载安装包 http://spark.apache.org/downloads.html
并解压

2、进入安装包conf 文件夹下,配置slave

[root@node2 conf]# mv slaves.template slaves
[root@node2 conf]# vim slaves
node2
node3
node4

3、 配置spark-env.sh

export SPARK_MASTER_HOST=node1
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1 
export SPARK_WORKER_MEMORY=1g

export SPARK_MASTER_HOST=node1 设置master 主机
export SPARK_MASTER_PORT=7077 提交任务的端口,默认是7077
export SPARK_WORKER_CORES=1 每个worker从节点能够支配的core的个数
export SPARK_WORKER_MEMORY=1g 每个worker从节点能够支配的内存数

4、 同步 node2 node3 node4 节点, 并启动集群
进入sbin 目录下./start-all.sh

5、 搭建上传任务的客户端(可选)
将spark安装包原封不动的拷贝到一个新的节点上,然后,在新的节点上提交任务即可,也可以在master和worker 上提交

6、 输入node1:8080 端口,可进入webUI 界面
可以查看启动的worker,和每个worker的内存

注意:

  • 8080是Spark WEBUI界面的端口,7077是Spark任务提交的端口。
  • 修改master的WEBUI端口, 修改start-master.sh即可,SPARK_MASTER_WEBUI_PORT=8080

standalone两种任务提交模式:

Standalone-client提交任务方式
以启动安装包中的一个Demo PI为例:

./spark-submit --master spark://node2:7077 --class org.apache.spark.examples.JavaSparkPi /opt/spark-2.3.2-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.2.jar 100
./spark-submit 
--master spark://node1:7077 
--deploy-mode client 
--class org.apache.spark.examples.SparkPi 
../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
100

client 模式任务流程:
在这里插入图片描述
执行流程:

  1. client模式提交任务后,会在客户端启动Driver进程。
  2. Driver会向Master申请启动Application启动的资源。
  3. 资源申请成功,Driver端将task发送到worker端执行。
  4. worker将task执行结果返回到Driver端

client模式适用于测试调试程序。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端可以看到task执行的情况。

Standalone-cluster提交任务方式:

  [root@node2 bin]# ./spark-submit --master spark://node2:7077 --deploy-mode cluster --class org.apache.spark.examples.JavaSparkPi /opt/spark-2.3.2-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.2.jar 100

任务流程图:
在这里插入图片描述

执行流程:

  1. cluster模式提交应用程序后,会向Master请求启动Driver.
  2. Master接受请求,随机在集群一台节点启动Driver进程。
  3. Driver启动后为当前的应用程序申请资源。
  4. Driver端发送task到worker节点上执行。
  5. worker将执行情况和执行结果返回给Driver端。

2. Yarn模式两种提交任务方式

在提交任务的节点添加配置

HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

yarn-cilent方式

运行demo :

./spark-submit --master yarn --class org.apache.spark.examples.JavaSparkPi /opt/spark-2.3.2-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.2.jar 100

在这里插入图片描述
执行流程:

  1. 客户端提交一个Application,在客户端启动一个Driver进程。
  2. 应用程序启动后会向RS(ResourceManager)发送请求,启动AM(ApplicationMaster)的资源。
  3. RS收到请求,随机选择一台NM(NodeManager)启动AM。这里的NM相当于Standalone中的Worker节点。
  4. AM启动后,会向RS请求一批container资源,用于启动Executor.
  5. RS会找到一批NM返回给AM,用于启动Executor。
  6. AM会向NM发送命令启动Executor。
  7. Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端

ApplicationMaster的作用:

  1. 为当前的Application申请资源
  2. 给NameNode发送消息启动Executor。
    注意:ApplicationMaster有launchExecutor和申请资源的功能,并没有作业调度的功能。

yarn-cluster提交任务方式:

./spark-submit --master yarn --cluster org.apache.spark.examples.JavaSparkPi /opt/spark-2.3.2-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.2.jar 100

执行流程:
在这里插入图片描述

执行流程

  1. 客户机提交Application应用程序,发送请求到RS(ResourceManager),请求启动AM(ApplicationMaster)。
  2. RS收到请求后随机在一台NM(NodeManager)上启动AM(相当于Driver端)。
  3. AM启动,AM发送请求到RS,请求一批container用于启动Executor。
  4. RS返回一批NM节点给AM。
  5. AM连接到NM,发送请求到NM启动Executor。
  6. Executor反向注册到AM所在的节点的Driver。Driver发送task到Executor。

总结
Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。

停止集群任务命令:yarn application -kill applicationID

Spark 算子

1、Transformation 算子 (懒执行,需要Action算子触发)

filter: 过滤字符,返回true 的通过
map(mapToPair):转换为map 格式 例如"aa" -> {“aa”,1}
flatMap: 文件中字符串拆分
reduceByKey:
sortBy/sortByKey : tem.sortByKey(false);
sample: 抽样
join:合并

2、Action 算子

foreach:
count:
collect():会将结果回收到Driver端
foreachPartition:
reduce:
countByKey:
countByValue:

任务: Application-> Job -> Stage -> Task
资源: Master -> Worker -> Executer -> ThreadPool

Spark资源调度和任务调度

在这里插入图片描述

Spark资源调度和任务调度的流程:
启动集群后,Worker节点会向Master节点汇报资源情况,Master掌握了集群资源情况。当Spark提交一个Application后,根据RDD之间的依赖关系将Application形成一个DAG有向无环图。任务提交后,Spark会在Driver端创建两个对象:DAGScheduler和TaskScheduler,DAGScheduler是任务调度的高层调度器,是一个对象。DAGScheduler的主要作用就是将DAG根据RDD之间的宽窄依赖关系划分为一个个的Stage,然后将这些Stage以TaskSet的形式提交给TaskScheduler(TaskScheduler是任务调度的低层调度器,这里TaskSet其实就是一个集合,里面封装的就是一个个的task任务,也就是stage中的并行度task任务),TaskSchedule会遍历TaskSet集合,拿到每个task后会将task发送到计算节点Executor中去执行(其实就是发送到Executor中的线程池ThreadPool去执行)。task在Executor线程池中的运行情况会向TaskScheduler反馈,当task执行失败时,则由TaskScheduler负责重试,将task重新发送给Executor去执行,默认重试3次。如果重试3次依然失败,那么这个task所在的stage就失败了。stage失败了则由DAGScheduler来负责重试,重新发送TaskSet到TaskSchdeuler,Stage默认重试4次。如果重试4次以后依然失败,那么这个job就失败了。job失败了,Application就失败了。
TaskScheduler不仅能重试失败的task,还会重试straggling(落后,缓慢)task(也就是执行速度比其他task慢太多的task)。如果有运行缓慢的task那么TaskScheduler会启动一个新的task来与这个运行缓慢的task执行相同的处理逻辑。两个task哪个先执行完,就以哪个task的执行结果为准。这就是Spark的推测执行机制。在Spark中推测执行默认是关闭的。推测执行可以通过spark.speculation属性来配置。
注意:
对于ETL类型要入数据库的业务要关闭推测执行机制,这样就不会有重复的数据入库。
如果遇到数据倾斜的情况,开启推测执行则有可能导致一直会有task重新启动处理相同的逻辑,任务可能一直处于处理不完的状态。

图解Spark资源调度和任务调度的流程
在这里插入图片描述

粗粒度资源申请和细粒度资源申请

粗粒度资源申请(Spark)
在Application执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的task执行完成后,才会释放这部分资源。

优点:在Application执行之前,所有的资源都申请完毕,每一个task直接使用资源就可以了,不需要task在执行前自己去申请资源,task启动就快了,task执行快了,stage执行就快了,job就快了,application执行就快了。

缺点:直到最后一个task执行完成才会释放资源,集群的资源无法充分利用。

细粒度资源申请(MapReduce)
Application执行之前不需要先去申请资源,而是直接执行,让job中的每一个task在执行前自己去申请资源,task执行完成就释放资源。

优点:集群的资源可以充分利用。
缺点:task自己去申请资源,task启动变慢,Application的运行就相应的变慢了。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark的基本框架原理 下一篇Spark 以及 spark streaming 核心..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目