设为首页 加入收藏

TOP

spark core 笔记
2019-03-04 13:19:10 】 浏览:110
Tags:spark core 笔记
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/mjlfto/article/details/88095526

spark 核心笔记记录

一、spark简介

1.spark是什么:

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

spark使用快速通用的集群计算系统,它提供了java,scala,python和R多种语言的api,以及支持通用执行图的优化引擎;同时也提供想spark SQL这样的工具来处理结构化数据, 使用MLlib进行机器学习,Graphx进行图像处理,Spark Streaming进行流处理

2.spark的运行模式:

a.local:这是一种适用于本地开发的运行模式

b.standalone:spark自带的一套资源调度框架,支持分布式搭建,spark可以基于standalone运行任务

c.yarn:hadoop yand的资源调度框架,国内运用广泛

d.mesos:国外被广泛使用的资源调度框架

3.spark与MR的区别

a.spark是基于内存迭代处理数据,MR基于磁盘迭代处理数据

b.Spark中有DAG有向无环图执行引擎,而spark比mr快的原因基本在spark是基于内存和dag计算模型的原因,可以说DAG相比MapReduce在大多数情况下可以减少shuffle的次数。spark中的DAGScheduler相当于一个改进版的MapReduce,如果在计算过程中不涉及节点间数据传输,spark可以直接基于内存运输,数据无需落地磁盘,这样速度会比MapReduce快很多,但是如果涉及多节点间数据传输,那么spark也会将数据落地磁盘。

c.spark是粗粒度申请资源,MR是细粒度申请资源

d.在mr中只有mapper和reducer,相当于spark中的map和reduceByKey两个算子,在MR业务逻辑需要自己实现,而spark中有各种算子对应各种作业

4.spark核心RDD

1.RDD是什么:(Resilient Distributed Dateset)弹性分布式数据集

2.RDD的五大特性

A list of partitions
A function for computing each partition
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs
Optionally, a list of preferred locations to compute each split on

(1)由一些列Partitions组成

(2)算子(函数)作用于partition上的

(3)RDD之间存在上下级依赖

(4)分区器是作用在(K, V)格式的RDD上

(5)partition对外提供最佳计算位置,有利于实现计算跟着数据移动

3.RDD中不存储数据

4.基本问题:

(1)(K, V)格式的RDD就是在RDD中的原始是二元组

(2)哪里体现了RDD的分布式:RDD中的partition分布在不同的节点上

(3)哪里体现了RDD的弹性:partition的个数可调整,RDD间存在依赖关系

5.RDD的宽窄依赖

Spark中rdd存在父子依赖关系,在依赖关系中又分为宽依赖和窄依赖

窄依赖:窄依赖是指父RDD中与子RDD中的partition之间是一对一关系

宽依赖:窄依赖是指父RDD中与子RDD中的partition之间是一对多关系,当出现宽依赖时,会出现Shuffle现象

在这里插入图片描述

二、一个简单的SPARK程序

package com.mjlf.scala_study

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object SparkWordCount {
  def main(args: Array[String]): Unit = {
    //定义SparkConf配置
    val sparkConf = new SparkConf();
    //设置运行模式和应用名称,必须
    sparkConf.setMaster("local").setAppName("scalaWordCount")
    
    //创建sparkContext上下文
    val sc = new SparkContext(sparkConf)
   //读取本地磁盘文件,并将其转为为RDD
    val lines:RDD[String] = sc.textFile("D:\\project\\scala\\study_test\\src\\com\\mjlf\\scala_study\\words");
	
     //使用flatMap 按" "切割每行
    val words:RDD[String] = lines.flatMap(line => {
      line.split(" ");
    })

	//将每个单词转换为为二元组,如 word -> (word, 1)
    val pairWords:RDD[(String, Int)] = words.map(word => {
      new Tuple2(word, 1)
    })
	
      //统计单词数量
    val reduce:RDD[(String, Int)] = pairWords.reduceByKey((v1:Int, v2:Int) => {
      v1 + v2
    })
	
      //执行action算子,计算上边的过程,在遇见action算子之前,transformation算子都不会执行,关于action和transformation算子后续讲解
    reduce.foreach(x => {
      println(x)
    })

      //关闭sparkContext上下文
    sc.stop()
  }
}

三、spark算子

1.Transfromation算子

transfromation算子在spark中是一些列的懒执行算子,也就是这些算子在遇到action算之前都不会执行

filter:过滤算子, filter(f: X => Boolean)),表示传入一个参数,要求返回true|false,true表示留下该元素,false表示过滤该元素:
    val rdd = sc.makeRDD(List(1, 3, 3, 3, 2, 2, 1, 5), 3)
    rdd.filter( x => {x % 2 == 0}).collect().foreach(println(_))
结果:
	1 3 3 3 1 5

map:map(f: x => U),针对rdd中的每个元素进行计算,最后每个元素计算后返回一个值
    //对rdd中每个元素求平方
	val rdd = sc.makeRDD(List(1, 3, 3, 3, 2, 2, 1, 5), 3)
    rdd.map(x => (x*x)).collect().foreach( x => {print(x + " ")})
结果:1 9 9 9 4 4 1 25

mapToPair:在scala中没有该算子,类似于map算子

flatMap:可以将多个元素扁平化,如"hello appache spark"经过float(x=>{x.split(" ")})算子后变成rdd(hello, apache, spark)
    val rdd = sc.makeRDD(List("hello appache spark"))
    rdd.flatMap(x => (x.split(" "))).collect().foreach( x => {print(x + " ")})
结果:hello appache spark

reduceByKey:一个聚合算子,将相同的rdd中相同key每个元素进行聚合,
	    val rdd = sc.makeRDD(List(("appach", 1), ("scala", 1), ("appach", 1), ("java", 1),("appach", 1)))
    rdd.reduceByKey(_+_).collect().foreach( x => {print(x + " ")})
结果:(scala,1) (appach,3) (java,1)

sortBy/SortByKey:排序算子
		//根据rdd中key进行排序
	    val rdd = sc.makeRDD(List(("appach", 1), ("scala", 1), ("appach", 1), ("java", 1),("appach", 1)))
    rdd.reduceByKey(_+_).collect().sortBy(x => {//根据rdd中key进行排序
      x._1
    }).foreach( x => {print(x + " ")})
结果:(appach,3) (java,1) (scala,1)

sample:随机算子
	参数:
	1、withReplacement:元素可以多次抽样(在抽样时替换)		
	2、fraction:期望样本的大小作为RDD大小的一部分, 
		当withReplacement=false时:选择每个元素的概率;分数一定是[0,1] ; 
		当withReplacement=true时:选择每个元素的期望次数; 分数必须大于等于0。
	3、seed:随机数生成器的种子
	需要知道最后返回的元素数量并不一定等于元素总数 * fraction,只是相近

taskSample:
	def takeSample(
    withReplacement: Boolean,
    num: Int,
    seed: Long = Utils.random.nextLong): Array[T] = withScope {...}
	参数:
	1、withReplacement:元素可以多次抽样(在抽样时替换)	
	2、num:返回的样本的大小
	3、seed:随机数生成器的种子
	该函数返回的元素个数固定

join:内链接
leftOuterJoin:左外连接
rightOuterJoin:右外连接
fullOuterJoin:全外连接
union:并集

intersection:两个rdd的交集
subtract:一个rdd减去另一个rdd
distract:对rdd进行去重
cogroup:合并两个rdd中key,将第一个rdd中的相同key对应的value放到一个集合中,另一个rdd中对应key的value放到另一个集合中, 最后将key, 第一个集合,第二个集合组成三元祖返回
如:
val rdd1 = sc.parallelize(Array(("aa",1),("bb",2),("cc",6)))
val rdd2 = sc.parallelize(Array(("aa",3),("dd",4),("aa",5)))
val rdd3 = rdd1.cogroup(rdd2).collect()
结果:
(aa,(CompactBuffer(1),CompactBuffer(3, 5)))
(dd,(CompactBuffer(),CompactBuffer(4)))
(bb,(CompactBuffer(2),CompactBuffer()))
(cc,(CompactBuffer(6),CompactBuffer()))

mapPartitions:和map类似, 但是该算子将每个分区中的元素整体放到了一个迭代器中
repartion:重新分区

coalsesce:
	该函数用于将RDD进行重分区,使用HashPartitioner。
	第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;

mapPartitionWithIndex:与mapPartition不同的是, 他会将分区号也带入计算中
	var rdd1 = sc.makeRDD(1 to 5,2)
    //rdd1有两个分区
    var rdd2 = rdd1.mapPartitionsWithIndex{
            (x,iter) => {//x表示分区号
              var result = List[String]()
                var i = 0
                while(iter.hasNext){
                  i += iter.next()
                }
                result.::(x + "|" + i).iterator

            }
          }
    //rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引
    scala> rdd2.collect
    res13: Array[String] = Array(0|3, 1|12)

groupByKey:通过key进行分组
zip:zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
zipWithIndex:该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
    scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21

    scala> rdd2.zipWithIndex().collect
    res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))
...

2.Action算子

foreach:
count:会将结果返回到Driver端
first: = take(1)
take:
foreachPartition:
reduce:
collect:会将结果返回到Driver端
countByKey:
countByValue:

3.持久化算子

cahce(): = persist(MEMORY_ONLY)
persist(StorageLevel):
持久化级别:
	 val NONE = new StorageLevel(false, false, false, false)
 	 val DISK_ONLY = new StorageLevel(true, false, false, false)
 	 val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  	 val MEMORY_ONLY = new StorageLevel(false, true, false, true)
 	 val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  	 val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
 	 val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
 	 val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
 	 val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
 	 val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
 	 val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
 	 val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

4.spark中算子的创建方式

Scala:
	sc.textFile(path, minNumPartition)
	sc.parallelize(xxx, numPartititon)
	sc.makeRDD(xxx, numPartition)
Java:
	sc.textFile(path, minNumPartition)
	sc.parallelize(xx,numpartition)
	sc.parallelizePairs(list(tuple2),numpartition)

四、spark集群搭建

1.基于standalone方式

1.选择合适的spark版本下载:http://ftp.jaist.ac.jp/pub/apache/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz

2.解压:tar -xvf spark-2.0.0-bin-hadoop2.7.tgz

3.修改配置:在conf修改配置文件

1.复制文件spark-env.sh.template 为 spark-env.sh (cp spark-env.sh.template spark-env.sh)
2. 添加配置信息:
	#jdk位置
	export JAVA_HOME=/usr/local/jdk
	#主master地址
	export SPARK_MASTER_IP=node1
	#master 服务端口
	export SPARK_MASTER_PORT=7077
	#该节点分配的核心数
	export SPARK_MASTER_CORES=1
	#该节点分配的内存
	export SPARK_MASTER_MEMORY=512M
3.在salve文件中添加从节点地址:
	node2
	node3
4.将spark配置分发到其他节点上:scp -r sparkxxx node2:`pwd`
						   scp -r sparkxxx node3:`pwd`
5.启动服务器:到sbin目录下执行./start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/spark2/spark1.6.0/logs/spark-root-org.apache.spark.deploy.master.Master-1-node1.out
node2: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark2/spark1.6.0/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node2.out
node3: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark2/spark1.6.0/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node3.out
node2: failed to launch org.apache.spark.deploy.worker.Worker:
node2: full log in /opt/spark2/spark1.6.0/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node2.out
node3: failed to launch org.apache.spark.deploy.worker.Worker:
node3: full log in /opt/spark2/spark1.6.0/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node3.out

测试:访问node1:8080

在这里插入图片描述

2.基于yarn模式下任务提交配置

在conf下spark-env.sh添加如下配置信息,配置完成后需求重启spark,但是基于yarn提交任务,需要依赖hadoop,所以在任务提交前需要先启动hadoop

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

3.集群执行日志保存到HDFS

需要依赖hdfs,启动前需要先配置并启动hadoop

1.修改配置文件:spark-default.conf,在其中添加如下信息:

spark.eventLog.enabled  true
spark.eventLog.dir      hdfs://node1:8020/log/
spark.history.fs.logDirectory hdfs://node1:8020/log/

2.分发配置信息到其他节点:

scp ./* node2:`pwd`
scp ./* node3:`pwd`

4.spark 高可用

spark高可用方式有两者模式,一种是将相关信息保存到FileSystem.另一种这是将信息保存到zookeeper,由于第一种方式使用很多少,我们这里也只介绍使用zookeeper的配置方式

1.配置高可用, 需要在spark-env.sh中添加如下信息

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node3:2181,node4:2181,node5:2181 -Dspark.deploy.zookeeper.dir=/MasterHA0315"

2.分配配置信息到其他节点:

scp ./* node2:`pwd`
scp ./* node3:`pwd`

3.选在当你的主节点挂掉后启动从节点作为主节点的节点, 在其节点修改spark-env.sh

将SPARK_MASTER_IP修改从节点ip
SPARK_MASTER-IP=node2

5.spark基本配置信息:

在spark中存在很多默认配置,可以到spark官网查询:https://spark.apache.org/docs/latest/configuration.html

SPARK_MASTER_WEBUI_PORT=9999配置spark web ui端口,默认8080

五. spark任务提交

1.standalone-client模式提交任务

在这里插入图片描述

1.集群启动后,worker会向master注册资源

2.我们向Client提交Application,client收到请求会创建一个Driver

3.Driver创建完成之后,会向Master申请运行Application运行是需要的资源

4.Master接收到Driver的资源申请请求后,会根据worker注册在自己哪儿的数据,判断哪些worker满足Driver的需求,随后会在对应的worker上创建Executor,这些Executor就是实际用来运行Application的任务

5.当worker上的executor创建完成后,driver会向这些executor发送task,同步task执行完毕后,他会回收这些task任务的数据

Application 提交命令:

./spark-submit --master spark://node1:7077 --class classPath jar 参数
./spark-submit --master spark://node1:7077 --deploy-mode client --class classPath jar 参数

2.standalone-cluster模式提交任务

在这里插入图片描述

1.集群启动时,worker向master汇报资源

2.向client提交application

3.client收到application请求后,向master申请资源用于启动Driver

4.master收到Driver启动申请后,会选择何是的worker,并在其上边启动Driver

5.Driver启动后,会向Master申请资源启动Exector,用于运行实际的task

6.Master收到请求后,会分析选择出合适的worker,并在其上创建Executor

7.Executor启动后,Driver向Executor发送task,同时接收task产生的数据

Applicationt提交命令

./spark-submit --master spark://node1:7077 --deploy-mode cluster --class classPath jar 参数

3.yarn-client模式提交任务

在这里插入图片描述

1.集群启动,Node Manager向Resource Manager注册资源

2.向Client提交Application申请

3.Client收到Application提交申请后,启动Driver, Driver再向ResourceManager提交申请用于启动Application Master

4.Resource Manager收到申请后,选择合适NodeManger启动Application Master

5.Applicaiton Manater启动成功后,再向ResourceManager申请资源,用于启动Executor

6.Resource Manager选择合适的NodeManager返回给Application Master

7.Application Master收到Resource Manager返回的NodeManger信息后,在对应的NodeManager上启动Executor

8.Executor启动成功后,向Driver注册信息

9.Driver收到注册信息后,向Executor发送task,同时接收task返回的数据

Application提交命令:

./spark-submit --master yarn --class ... jar 参数
./spark-submit  --master yarn-client --class ..jar 参数
./spark-submit --master yarn --deploy- mode client --class ... jar 参数

4.yarn-cluster默认提交任务

在这里插入图片描述
1.集群启动,NodeManager注册资源到Resource Manger

2.提交Application到client

3.client向ResourceManger申请资源启动ApplicationMaster和Driver

4.Resource Manger申请后,选择合适的NodeManger启动ApplicationMaster 和Driver

5.Application Master启动成功后,向Resource Manager申请资源,用于启动Executor

6.Resource Manager收到资源申请后,分析选择合适的NodeManager节点返回给Application Master

7.Application Master收到Resource Manager返回的节点信息后,在对应的NodeManger上启动Executor

8.启动后的Executor向Driver 注册信息

9.Driver向Executor发送Task和接收Task返回的数据

Application提交命令

./spark-submit --master yarn-cluster --classs ..jar 参数
./spark-submit --master yarn --deploy-mode cluster --class ..jar 参数

六.Stage

stage的划分是Spark作业调度的关键一步,它基于DAG确定依赖关系,借此来划分stage,将依赖链断开,每个stage内部可以并行运行,整个作业按照stage顺序依次执行,最终完成整个Job。实际应用提交的Job中RDD依赖关系是十分复杂的,依据这些依赖关系来划分stage自然是十分困难的,Spark此时就利用了前文提到的依赖关系,调度器从DAG图末端出发,逆向遍历整个依赖关系链,遇到ShuffleDependency(宽依赖关系的一种叫法)就断开,遇到NarrowDependency就将其加入到当前stage。stage中task数目由stage末端的RDD分区个数来决定,RDD转换是基于分区的一种粗粒度计算,一个stage执行的结果就是这几个分区构成的RDD。

在这里插入图片描述

每个Stage都是由一组并行的task组成,

stage的并行度依赖于最后一个RDD的partition

如果需要提高stage的并行度,需要增加最后一个RDD的partition,可以使用重分区算子

task采用pipline的计算方式,及数据时一个一个的取出计算,而非批量

对于管道中数据,在RDD持久化和shuffle write时才会落地磁盘

未完待续。。。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark 学习5 下一篇185、Spark 2.0之SparkSession、D..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目