版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/mjlfto/article/details/88095526
spark 核心笔记记录
一、spark简介
1.spark是什么:
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming .
spark使用快速通用的集群计算系统,它提供了java ,scala,python 和R多种语言的api,以及支持通用执行图的优化引擎;同时也提供想spark SQL这样的工具来处理结构化数据, 使用MLlib进行机器学习,Graphx进行图像处理,Spark Streaming进行流处理
2.spark的运行模式:
a.local:这是一种适用于本地开发的运行模式
b.standalone:spark自带的一套资源调度框架,支持分布式搭建,spark可以基于standalone运行任务
c.yarn:hadoop yand的资源调度框架,国内运用广泛
d.mesos:国外被广泛使用的资源调度框架
3.spark与MR的区别
a.spark是基于内存迭代处理数据,MR基于磁盘迭代处理数据
b.Spark中有DAG有向无环图执行引擎,而spark比mr快的原因基本在spark是基于内存和dag计算模型的原因,可以说DAG相比MapReduce在大多数情况下可以减少shuffle的次数。spark中的DAGScheduler相当于一个改进版的MapReduce,如果在计算过程中不涉及节点间数据传输,spark可以直接基于内存运输,数据无需落地磁盘,这样速度会比MapReduce快很多,但是如果涉及多节点间数据传输,那么spark也会将数据落地磁盘。
c.spark是粗粒度申请资源,MR是细粒度申请资源
d.在mr中只有mapper和reducer,相当于spark中的map和reduceByKey两个算子,在MR业务逻辑需要自己实现,而spark中有各种算子对应各种作业
4.spark核心RDD
1.RDD是什么:(Resilient Distributed Dateset)弹性分布式数据集
2.RDD的五大特性
A list of partitions
A function for computing each partition
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs
Optionally, a list of preferred locations to compute each split on
(1)由一些列Partitions组成
(2)算子(函数)作用于partition上的
(3)RDD之间存在上下级依赖
(4)分区器是作用在(K, V)格式的RDD上
(5)partition对外提供最佳计算位置,有利于实现计算跟着数据移动
3.RDD中不存储数据
4.基本问题:
(1)(K, V)格式的RDD就是在RDD中的原始是二元组
(2)哪里体现了RDD的分布式:RDD中的partition分布在不同的节点上
(3)哪里体现了RDD的弹性:partition的个数可调整,RDD间存在依赖关系
5.RDD的宽窄依赖
Spark中rdd存在父子依赖关系,在依赖关系中又分为宽依赖和窄依赖
窄依赖:窄依赖是指父RDD中与子RDD中的partition之间是一对一关系
宽依赖:窄依赖是指父RDD中与子RDD中的partition之间是一对多关系,当出现宽依赖时,会出现Shuffle现象
二、一个简单的SPARK程序
package com.mjlf.scala_study
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object SparkWordCount {
def main(args: Array[String]): Unit = {
//定义SparkConf配置
val sparkConf = new SparkConf();
//设置运行模式和应用名称,必须
sparkConf.setMaster("local").setAppName("scalaWordCount")
//创建sparkContext上下文
val sc = new SparkContext(sparkConf)
//读取本地磁盘文件,并将其转为为RDD
val lines:RDD[String] = sc.textFile("D:\\project\\scala\\study_test\\src\\com\\mjlf\\scala_study\\words");
//使用flatMap 按" "切割每行
val words:RDD[String] = lines.flatMap(line => {
line.split(" ");
})
//将每个单词转换为为二元组,如 word -> (word, 1)
val pairWords:RDD[(String, Int)] = words.map(word => {
new Tuple2(word, 1)
})
//统计单词数量
val reduce:RDD[(String, Int)] = pairWords.reduceByKey((v1:Int, v2:Int) => {
v1 + v2
})
//执行action算子,计算上边的过程,在遇见action算子之前,transformation算子都不会执行,关于action和transformation算子后续讲解
reduce.foreach(x => {
println(x)
})
//关闭sparkContext上下文
sc.stop()
}
}
三、spark算子
1.Transfromation算子
transfromation算子在spark中是一些列的懒执行算子,也就是这些算子在遇到action算之前都不会执行
filter:过滤算子, filter(f: X => Boolean)),表示传入一个参数,要求返回true|false,true表示留下该元素,false表示过滤该元素:
val rdd = sc.makeRDD(List(1, 3, 3, 3, 2, 2, 1, 5), 3)
rdd.filter( x => {x % 2 == 0}).collect().foreach(println(_))
结果:
1 3 3 3 1 5
map:map(f: x => U),针对rdd中的每个元素进行计算,最后每个元素计算后返回一个值
//对rdd中每个元素求平方
val rdd = sc.makeRDD(List(1, 3, 3, 3, 2, 2, 1, 5), 3)
rdd.map(x => (x*x)).collect().foreach( x => {print(x + " ")})
结果:1 9 9 9 4 4 1 25
mapToPair:在scala中没有该算子,类似于map算子
flatMap:可以将多个元素扁平化,如"hello appache spark"经过float(x=>{x.split(" ")})算子后变成rdd(hello, apache, spark)
val rdd = sc.makeRDD(List("hello appache spark"))
rdd.flatMap(x => (x.split(" "))).collect().foreach( x => {print(x + " ")})
结果:hello appache spark
reduceByKey:一个聚合算子,将相同的rdd中相同key每个元素进行聚合,
val rdd = sc.makeRDD(List(("appach", 1), ("scala", 1), ("appach", 1), ("java ", 1),("appach", 1)))
rdd.reduceByKey(_+_).collect().foreach( x => {print(x + " ")})
结果:(scala,1) (appach,3) (java,1)
sortBy/SortByKey:排序算子
//根据rdd中key进行排序
val rdd = sc.makeRDD(List(("appach", 1), ("scala", 1), ("appach", 1), ("java", 1),("appach", 1)))
rdd.reduceByKey(_+_).collect().sortBy(x => {//根据rdd中key进行排序
x._1
}).foreach( x => {print(x + " ")})
结果:(appach,3) (java,1) (scala,1)
sample:随机算子
参数:
1、withReplacement:元素可以多次抽样(在抽样时替换)
2、fraction:期望样本的大小作为RDD大小的一部分,
当withReplacement=false时:选择每个元素的概率;分数一定是[0,1] ;
当withReplacement=true时:选择每个元素的期望次数; 分数必须大于等于0。
3、seed:随机数生成器的种子
需要知道最后返回的元素数量并不一定等于元素总数 * fraction,只是相近
taskSample:
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T] = withScope {...}
参数:
1、withReplacement:元素可以多次抽样(在抽样时替换)
2、num:返回的样本的大小
3、seed:随机数生成器的种子
该函数返回的元素个数固定
join:内链接
leftOuterJoin:左外连接
rightOuterJoin:右外连接
fullOuterJoin:全外连接
union:并集
intersection:两个rdd的交集
subtract:一个rdd减去另一个rdd
distract:对rdd进行去重
cogroup:合并两个rdd中key,将第一个rdd中的相同key对应的value放到一个集合中,另一个rdd中对应key的value放到另一个集合中, 最后将key, 第一个集合,第二个集合组成三元祖返回
如:
val rdd1 = sc.parallelize(Array(("aa",1),("bb",2),("cc",6)))
val rdd2 = sc.parallelize(Array(("aa",3),("dd",4),("aa",5)))
val rdd3 = rdd1.cogroup(rdd2).collect()
结果:
(aa,(CompactBuffer(1),CompactBuffer(3, 5)))
(dd,(CompactBuffer(),CompactBuffer(4)))
(bb,(CompactBuffer(2),CompactBuffer()))
(cc,(CompactBuffer(6),CompactBuffer()))
mapPartitions:和map类似, 但是该算子将每个分区中的元素整体放到了一个迭代器中
repartion:重新分区
coalsesce:
该函数用于将RDD进行重分区,使用HashPartitioner。
第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;
mapPartitionWithIndex:与mapPartition不同的是, 他会将分区号也带入计算中
var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有两个分区
var rdd2 = rdd1.mapPartitionsWithIndex{
(x,iter) => {//x表示分区号
var result = List[String]()
var i = 0
while(iter.hasNext){
i += iter.next()
}
result.::(x + "|" + i).iterator
}
}
//rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引
scala> rdd2.collect
res13: Array[String] = Array(0|3, 1|12)
groupByKey:通过key进行分组
zip:zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
zipWithIndex:该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21
scala> rdd2.zipWithIndex().collect
res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))
...
2.Action算子
foreach:
count:会将结果返回到Driver端
first: = take(1)
take:
foreachPartition:
reduce:
collect:会将结果返回到Driver端
countByKey:
countByValue:
3.持久化算子
cahce(): = persist(MEMORY_ONLY)
persist(StorageLevel):
持久化级别:
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
4.spark中算子的创建方式
Scala:
sc.textFile(path, minNumPartition)
sc.parallelize(xxx, numPartititon)
sc.makeRDD(xxx, numPartition)
Java:
sc.textFile(path, minNumPartition)
sc.parallelize(xx,numpartition)
sc.parallelizePairs(list(tuple2),numpartition)
四、spark集群搭建
1.基于standalone方式
1.选择合适的spark版本下载:http://ftp.jaist.ac.jp/pub/apache/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
2.解压:tar -xvf spark-2.0.0-bin-hadoop2.7.tgz
3.修改配置:在conf修改配置文件
1.复制文件spark-env.sh.template 为 spark-env.sh (cp spark-env.sh.template spark-env.sh)
2. 添加配置信息:
#jdk位置
export JAVA_HOME=/usr/local/jdk
#主master地址
export SPARK_MASTER_IP=node1
#master 服务端口
export SPARK_MASTER_PORT=7077
#该节点分配的核心数
export SPARK_MASTER_CORES=1
#该节点分配的内存
export SPARK_MASTER_MEMORY=512M
3.在salve文件中添加从节点地址:
node2
node3
4.将spark配置分发到其他节点上:scp -r sparkxxx node2:`pwd`
scp -r sparkxxx node3:`pwd`
5.启动服务器:到sbin目录下执行./start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/spark2/spark1.6.0/logs/spark-root-org.apache.spark.deploy.master.Master-1-node1.out
node2: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark2/spark1.6.0/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node2.out
node3: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark2/spark1.6.0/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node3.out
node2: failed to launch org.apache.spark.deploy.worker.Worker:
node2: full log in /opt/spark2/spark1.6.0/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node2.out
node3: failed to launch org.apache.spark.deploy.worker.Worker:
node3: full log in /opt/spark2/spark1.6.0/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node3.out
测试:访问node1:8080
2.基于yarn模式下任务提交配置
在conf下spark-env.sh添加如下配置信息,配置完成后需求重启spark,但是基于yarn提交任务,需要依赖hadoop,所以在任务提交前需要先启动hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
3.集群执行日志保存到HDFS
需要依赖hdfs,启动前需要先配置并启动hadoop
1.修改配置文件:spark-default.conf,在其中添加如下信息:
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node1:8020/log/
spark.history.fs.logDirectory hdfs://node1:8020/log/
2.分发配置信息到其他节点:
scp ./* node2:`pwd`
scp ./* node3:`pwd`
4.spark 高可用
spark高可用方式有两者模式,一种是将相关信息保存到FileSystem.另一种这是将信息保存到zookeeper,由于第一种方式使用很多少,我们这里也只介绍使用zookeeper的配置方式
1.配置高可用, 需要在spark-env.sh中添加如下信息
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node3:2181,node4:2181,node5:2181 -Dspark.deploy.zookeeper.dir=/MasterHA0315"
2.分配配置信息到其他节点:
scp ./* node2:`pwd`
scp ./* node3:`pwd`
3.选在当你的主节点挂掉后启动从节点作为主节点的节点, 在其节点修改spark-env.sh
将SPARK_MASTER_IP修改从节点ip
SPARK_MASTER-IP=node2
5.spark基本配置信息:
在spark中存在很多默认配置,可以到spark官网查询:https://spark.apache.org/docs/latest/configuration.html
SPARK_MASTER_WEBUI_PORT=9999配置spark web ui端口,默认8080
五. spark任务提交
1.standalone-client模式提交任务
1.集群启动后,worker会向master注册资源
2.我们向Client提交Application,client收到请求会创建一个Driver
3.Driver创建完成之后,会向Master申请运行Application运行是需要的资源
4.Master接收到Driver的资源申请请求后,会根据worker注册在自己哪儿的数据,判断哪些worker满足Driver的需求,随后会在对应的worker上创建Executor,这些Executor就是实际用来运行Application的任务
5.当worker上的executor创建完成后,driver会向这些executor发送task,同步task执行完毕后,他会回收这些task任务的数据
Application 提交命令:
./spark-submit --master spark://node1:7077 --class classPath jar 参数
./spark-submit --master spark://node1:7077 --deploy-mode client --class classPath jar 参数
2.standalone-cluster模式提交任务
1.集群启动时,worker向master汇报资源
2.向client提交application
3.client收到application请求后,向master申请资源用于启动Driver
4.master收到Driver启动申请后,会选择何是的worker,并在其上边启动Driver
5.Driver启动后,会向Master申请资源启动Exector,用于运行实际的task
6.Master收到请求后,会分析选择出合适的worker,并在其上创建Executor
7.Executor启动后,Driver向Executor发送task,同时接收task产生的数据
Applicationt提交命令
./spark-submit --master spark://node1:7077 --deploy-mode cluster --class classPath jar 参数
3.yarn-client模式提交任务
1.集群启动,Node Manager向Resource Manager注册资源
2.向Client提交Application申请
3.Client收到Application提交申请后,启动Driver, Driver再向ResourceManager提交申请用于启动Application Master
4.Resource Manager收到申请后,选择合适NodeManger启动Application Master
5.Applicaiton Manater启动成功后,再向ResourceManager申请资源,用于启动Executor
6.Resource Manager选择合适的NodeManager返回给Application Master
7.Application Master收到Resource Manager返回的NodeManger信息后,在对应的NodeManager上启动Executor
8.Executor启动成功后,向Driver注册信息
9.Driver收到注册信息后,向Executor发送task,同时接收task返回的数据
Application提交命令:
./spark-submit --master yarn --class ... jar 参数
./spark-submit --master yarn-client --class ..jar 参数
./spark-submit --master yarn --deploy- mode client --class ... jar 参数
4.yarn-cluster默认提交任务
1.集群启动,NodeManager注册资源到Resource Manger
2.提交Application到client
3.client向ResourceManger申请资源启动ApplicationMaster和Driver
4.Resource Manger申请后,选择合适的NodeManger启动ApplicationMaster 和Driver
5.Application Master启动成功后,向Resource Manager申请资源,用于启动Executor
6.Resource Manager收到资源申请后,分析选择合适的NodeManager节点返回给Application Master
7.Application Master收到Resource Manager返回的节点信息后,在对应的NodeManger上启动Executor
8.启动后的Executor向Driver 注册信息
9.Driver向Executor发送Task和接收Task返回的数据
Application提交命令
./spark-submit --master yarn-cluster --classs ..jar 参数
./spark-submit --master yarn --deploy-mode cluster --class ..jar 参数
六.Stage
stage的划分是Spark作业调度的关键一步,它基于DAG确定依赖关系,借此来划分stage,将依赖链断开,每个stage内部可以并行运行,整个作业按照stage顺序依次执行,最终完成整个Job。实际应用提交的Job中RDD依赖关系是十分复杂的,依据这些依赖关系来划分stage自然是十分困难的,Spark此时就利用了前文提到的依赖关系,调度器从DAG图末端出发,逆向遍历整个依赖关系链,遇到ShuffleDependency(宽依赖关系的一种叫法)就断开,遇到NarrowDependency就将其加入到当前stage。stage中task数目由stage末端的RDD分区个数来决定,RDD转换是基于分区的一种粗粒度计算,一个stage执行的结果就是这几个分区构成的RDD。
每个Stage都是由一组并行的task组成,
stage的并行度依赖于最后一个RDD的partition
如果需要提高stage的并行度,需要增加最后一个RDD的partition,可以使用重分区算子
task采用pipline的计算方式,及数据时一个一个的取出计算,而非批量
对于管道中数据,在RDD持久化和shuffle write时才会落地磁盘
未完待续。。。