版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/panleiaiying/article/details/84762580
1、Spark 介绍
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是Scala编写,方便快速编程; Spark 技术栈中包括 SparkCore,SparkStreaming,SparkSQL,SparkMllib等。
Spark与MapReduce的区别
1. Spark 基于内存迭代处理数据,MR基于磁盘迭代处理数据
2. Spark 粗粒度资源申请,MR是细粒度资源申请
3. MR中只有mapper,reducer,相当于spark中的map和reduceByKey两个算子,在MR业务逻辑要自己实现,Spark中有各种算子对应各种业务
Spark 运行模式分为 :
本地模式 用于本地开发,多用于测试
standalone Spark自带的资源调度框架,支持分布式搭建,Spark 可以基于Standalone运行任务
yarn Hadoop生态圈中的资源调度框架,Spark也支持在Yarn中运行
2、spark 本地方式运行
导入pom
< dependency>
< groupId> org.apache.spark</ groupId>
< artifactId> spark-core_2.11</ artifactId>
< version> 2.3.2</ version>
</ dependency>
public class WordCount2 {
public static void main ( String[ ] args) {
SparkConf sparkConf = new SparkConf ( ) . setMaster ( "local" ) . setAppName ( "wc_2" ) ;
JavaSparkContext sc = new JavaSparkContext ( sparkConf) ;
JavaRDD< String> text = sc. textFile ( "test.txt" ) ;
JavaRDD< String> word = text. flatMap ( new FlatMapFunction < String, String> ( ) {
@Override
public Iterator< String> call ( String s) throws Exception {
String[ ] str = s. split ( " " ) ;
return Arrays. asList ( str) . iterator ( ) ;
}
} ) ;
JavaPairRDD< String, Integer> paris = word. mapToPair ( new PairFunction < String, String, Integer> ( ) {
@Override
public Tuple2< String, Integer> call ( String s) throws Exception {
Tuple2< String, Integer> tuple2 = new Tuple2 < > ( s, 1 ) ;
return tuple2;
}
} ) ;
JavaPairRDD< String, Integer> count = paris. reduceByKey ( new Function2 < Integer, Integer, Integer> ( ) {
@Override
public Integer call ( Integer integer, Integer integer2) throws Exception {
return integer+ integer2;
}
} ) ;
JavaPairRDD< Integer, String> tem = count. mapToPair ( new PairFunction < Tuple2< String, Integer> , Integer, String> ( ) {
@Override
public Tuple2< Integer, String> call ( Tuple2< String, Integer> tuple2) throws Exception {
return new Tuple2 < > ( tuple2. _2, tuple2. _1) ;
}
} ) ;
JavaPairRDD< Integer, String> sorted = tem. sortByKey ( false ) ;
sorted. foreach ( new VoidFunction < Tuple2< Integer, String> > ( ) {
@Override
public void call ( Tuple2< Integer, String> integerStringTuple2) throws Exception {
System. out. println ( integerStringTuple2. _1+ ":" + integerStringTuple2. _2) ;
}
} ) ;
}
}
spark 是scala 实现的, 用java 感觉麻烦的可以换用scala 去实现; 以下是scala 实现上述功能
object SparkTest {
def main ( args: Array[ String] ) : Unit = {
var conf = new SparkConf ( ) . setMaster ( "local" ) . setAppName ( "wc" )
var sc = new SparkContext ( conf)
var text = sc. textFile ( "test.txt" )
var word = text. flatMap ( ( _. split ( " " ) ) )
var pair = word. map ( ( _, 1 ) )
var result = pair. reduceByKey ( _+ _)
var sort: RDD[ ( Int, String) ] = result. map ( tuple= > ( tuple. _2, tuple. _1) ) ;
var res = sort. sortByKey ( )
res. foreach ( println)
}
}
在上述看到了一堆RDD格式,下面介绍什么是RDD
RDD(Resilient Distributed Dateset),弹性分布式数据集。
RDD 的五大特征:
RDD由一系列partition组成
算子(函数)是作用在partition上的
RDD 之间有依赖关系
分区器是作用在K,V格式的RDD上
partition对外提供最佳的计算位置,利于数据处理的本地化
什么是K,V格式的RDD
RDD里面存储的数据都是二元组对象,那么这个RDD我们就叫做K,V格式的RDD,类似Map<Object,Object>。
哪里体现RDD的弹性(容错)?
partition数量,大小没有限制,体现了RDD的弹性。
RDD之间依赖关系,可以基于上一个RDD重新计算出RDD。
哪里体现RDD的分布式?
RDD是由Partition组成,partition是分布在不同节点上的
standalone 环境搭建
node1 为master; node2 node3 node4 为worke r
1、下载安装包 http://spark.apache.org/downloads.html
并解压
2、进入安装包conf 文件夹下,配置slave
[ root@node2 conf] # mv slaves. template slaves
[ root@node2 conf] # vim slaves
node2
node3
node4
3、 配置spark-env.sh
export SPARK_MASTER_HOST=node1
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1g
export SPARK_MASTER_HOST=node1
设置master 主机
export SPARK_MASTER_PORT=7077
提交任务的端口,默认是7077
export SPARK_WORKER_CORES=1
每个worker从节点能够支配的core的个数
export SPARK_WORKER_MEMORY=1g
每个worker从节点能够支配的内存数
4、 同步 node2 node3 node4 节点, 并启动集群
进入sbin 目录下./start-all.sh
5、 搭建上传任务的客户端(可选)
将spark安装包原封不动的拷贝到一个新的节点上,然后,在新的节点上提交任务即可,也可以在master和worker 上提交
6、 输入node1:8080 端口,可进入webUI 界面
注意:
8080是Spark WEBUI界面的端口,7077是Spark任务提交的端口。
修改master的WEBUI端口, 修改start-master.sh即可,SPARK_MASTER_WEBUI_PORT=8080
standalone两种任务提交模式:
Standalone-client提交任务方式
以启动安装包中的一个Demo PI为例:
. /spark- submit -- master spark: / / node2: 7077 -- class org. apache. spark. examples. JavaSparkPi / opt/ spark- 2.3 .2 - bin- hadoop2. 7 / examples/ jars/ spark- examples_2. 11 - 2.3 .2 . jar 100
./spark-submit
--master spark://node1:7077
--deploy-mode client
--class org.apache.spark.examples.SparkPi
../lib/spark-examples-1.6.0-hadoop2.6.0.jar
100
client 模式任务流程:
执行流程:
client模式提交任务后,会在客户端启动Driver进程。
Driver会向Master申请启动Application启动的资源。
资源申请成功,Driver端将task发送到worker端执行。
worker将task执行结果返回到Driver端
client模式适用于测试调试程序。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端可以看到task执行的情况。
Standalone-cluster提交任务方式:
[root@node2 bin]# ./spark-submit --master spark://node2:7077 --deploy-mode cluster --class org.apache.spark.examples.JavaSparkPi /opt/spark-2.3.2-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.2.jar 100
任务流程图:
执行流程:
cluster模式提交应用程序后,会向Master请求启动Driver.
Master接受请求,随机在集群一台节点启动Driver进程。
Driver启动后为当前的应用程序申请资源。
Driver端发送task到worker节点上执行。
worker将执行情况和执行结果返回给Driver端。
2. Yarn模式两种提交任务方式
在提交任务的节点添加配置
HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
yarn-cilent方式
运行demo :
. /spark- submit -- master yarn -- class org. apache. spark. examples. JavaSparkPi / opt/ spark- 2.3 .2 - bin- hadoop2. 7 / examples/ jars/ spark- examples_2. 11 - 2.3 .2 . jar 100
执行流程:
客户端提交一个Application,在客户端启动一个Driver进程。
应用程序启动后会向RS(ResourceManager)发送请求,启动AM(ApplicationMaster)的资源。
RS收到请求,随机选择一台NM(NodeManager)启动AM。这里的NM相当于Standalone中的Worker节点。
AM启动后,会向RS请求一批container资源,用于启动Executor.
RS会找到一批NM返回给AM,用于启动Executor。
AM会向NM发送命令启动Executor。
Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端
ApplicationMaster的作用:
为当前的Application申请资源
给NameNode发送消息启动Executor。
注意:ApplicationMaster有launchExecutor和申请资源的功能,并没有作业调度的功能。
yarn-cluster提交任务方式:
./spark-submit --master yarn --cluster org.apache.spark.examples.JavaSparkPi /opt/spark-2.3.2-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.2.jar 100
执行流程:
执行流程
客户机提交Application应用程序,发送请求到RS(ResourceManager),请求启动AM(ApplicationMaster)。
RS收到请求后随机在一台NM(NodeManager)上启动AM(相当于Driver端)。
AM启动,AM发送请求到RS,请求一批container用于启动Executor。
RS返回一批NM节点给AM。
AM连接到NM,发送请求到NM启动Executor。
Executor反向注册到AM所在的节点的Driver。Driver发送task到Executor。
总结
Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。
停止集群任务命令:yarn application -kill applicationID
Spark 算子
1、Transformation 算子 (懒执行,需要Action算子触发)
filter:
过滤字符,返回true 的通过
map(mapToPair):
转换为map 格式 例如"aa" -> {“aa”,1}
flatMap:
文件中字符串拆分
reduceByKey
:
sortBy/sortByKey : tem.sortByKey(false)
;
sample:
抽样
join:
合并
2、Action 算子
foreach:
count:
collect():会将结果回收到Driver端
foreachPartition:
reduce:
countByKey:
countByValue:
任务: Application-> Job -> Stage -> Task
资源: Master -> Worker -> Executer -> ThreadPool
Spark资源调度和任务调度
Spark资源调度和任务调度的流程:
启动集群后,Worker节点会向Master节点汇报资源情况,Master掌握了集群资源情况。当Spark提交一个Application后,根据RDD之间的依赖关系将Application形成一个DAG有向无环图。任务提交后,Spark会在Driver端创建两个对象:DAGScheduler和TaskScheduler,DAGScheduler是任务调度的高层调度器,是一个对象。DAGScheduler的主要作用就是将DAG根据RDD之间的宽窄依赖关系划分为一个个的Stage,然后将这些Stage以TaskSet的形式提交给TaskScheduler(TaskScheduler是任务调度的低层调度器,这里TaskSet其实就是一个集合,里面封装的就是一个个的task任务,也就是stage中的并行度task任务),TaskSchedule会遍历TaskSet集合,拿到每个task后会将task发送到计算节点Executor中去执行(其实就是发送到Executor中的线程池ThreadPool去执行)。task在Executor线程池中的运行情况会向TaskScheduler反馈,当task执行失败时,则由TaskScheduler负责重试,将task重新发送给Executor去执行,默认重试3次。如果重试3次依然失败,那么这个task所在的stage就失败了。stage失败了则由DAGScheduler来负责重试,重新发送TaskSet到TaskSchdeuler,Stage默认重试4次。如果重试4次以后依然失败,那么这个job就失败了。job失败了,Application就失败了。
TaskScheduler不仅能重试失败的task,还会重试straggling(落后,缓慢)task(也就是执行速度比其他task慢太多的task)。如果有运行缓慢的task那么TaskScheduler会启动一个新的task来与这个运行缓慢的task执行相同的处理逻辑。两个task哪个先执行完,就以哪个task的执行结果为准。这就是Spark的推测执行机制。在Spark中推测执行默认是关闭的。推测执行可以通过spark.speculation属性来配置。
注意:
对于ETL类型要入数据库的业务要关闭推测执行机制,这样就不会有重复的数据入库。
如果遇到数据倾斜的情况,开启推测执行则有可能导致一直会有task重新启动处理相同的逻辑,任务可能一直处于处理不完的状态。
图解Spark资源调度和任务调度的流程
粗粒度资源申请和细粒度资源申请
粗粒度资源申请(Spark)
在Application执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的task执行完成后,才会释放这部分资源。
优点:在Application执行之前,所有的资源都申请完毕,每一个task直接使用资源就可以了,不需要task在执行前自己去申请资源,task启动就快了,task执行快了,stage执行就快了,job就快了,application执行就快了。
缺点:直到最后一个task执行完成才会释放资源,集群的资源无法充分利用。
细粒度资源申请(MapReduce)
Application执行之前不需要先去申请资源,而是直接执行,让job中的每一个task在执行前自己去申请资源,task执行完成就释放资源。
优点:集群的资源可以充分利用。
缺点:task自己去申请资源,task启动变慢,Application的运行就相应的变慢了。