设为首页 加入收藏

TOP

Spark 基础知识
2019-01-11 01:06:29 】 浏览:24
Tags:Spark 基础知识
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_1018944104/article/details/85629580

目录

1、Spark的产生背景

1.1、MapReduce的发展

1.1.1、MRv1的缺陷

1.1.2、MRv2的缺陷

1.1.3、Spark的产生

2、Spark概念

3、Spark特点

3.1、Speed:快速高效

3.2、Ease of Use:简洁使用

3.3、Generally:全栈式数据处理

3.4、Runs Everywhere:兼容

4、Spark应用场景

5、Spark集群安装

5.1、Spark版本选择

5.2、Spark编译

5.3、Spark依赖环境

5.4、安装JDK

5.5、安装Scala

5.6、安装Spark

5.6.1、Spark分布式集群

5.6.2、Spark高可用集群

5.6.3、配置Spark HistoryServer

6、Spark基本使用

6.1、执行第一个Spark程序

6.2、启动Spark Shell

6.3、在Spark Shell中编写WordCount程序

6.4、在IDEA中编写WordCount程序

7、修改Spark的日志级别

7.1、临时修改

7.2、永久修改

8、Spark的WordCount

8.1、Scala版本的WordCount

8.2、Java7版本的WordCount

8.3、Java8 Lambda表达式版本的WordCount


1、Spark的产生背景

1.1、MapReduce的发展

1.1.1、MRv1的缺陷

早在 Hadoop1.x 版本,当时采用的是 MRv1 版本的 MapReduce 编程模型。MRv1 版本的实现都封装在 org.apache.hadoop.mapred 包中,MRv1 的 Map 和 Reduce 是通过接口实现的。MRv1包括三个部分:

  • 运行时环境(JobTracker 和 TaskTracker)
  • 编程模型(MapReduce)
  • 数据处理引擎(MapTask

MRv1 存在以下不足:

可扩展性差:在运行时,JobTracker 既负责资源管理又负责任务调度,当集群繁忙时,JobTracker 很容易成为瓶颈,最终导致它的可扩展性问题。


可用性差:采用了单节点的 Master,没有备用 Master 及选举操作,这导致一旦 Master 出现故障,整个集群将不可用。单点故障


资源利用率低:TaskTracker 使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等)。一个 Task 获取到一个 slot 后才有机会运行,Hadoop 调度器负责将各个 TaskTracker 上的空闲 slot 分配给 Task 使用。一些 Task 并不能充分利用 slot,而其他 Task也无法使用这些空闲的资源。slot 分为 Map slot 和 Reduce slot 两种,分别供 MapTask 和ReduceTask 使用。有时会因为作业刚刚启动等原因导致 MapTask 很多,而 Reduce Task 任务还没有调度的情况,这时 Reduce slot 也会被闲置。

不能支持多种 MapReduce 框架:无法通过可插拔方式将自身的 MapReduce 框架替换为其他实现,如 Spark、Storm 等。

1.1.2、MRv2的缺陷

Apache 为了解决 MRv1 中的缺陷,对 Hadoop 进行了升级改造。MRv2 就诞生了。

MRv2 中,重用了 MRv1 中的编程模型和数据处理引擎。但是运行时环境被重构了。JobTracker被拆分成了通用的

  • 资源调度平台(ResourceManager,简称 RM)
  • 节点管理器(NodeManager)
  • 负责各个计算框架的任务调度模型(ApplicationMaster,简称 AM)

ResourceManager 依然负责对整个集群的资源管理,但是在任务资源的调度方面只负责将资源封装为 Container 分配给 ApplicationMaster 的一级调度,二级调度的细节将交给ApplicationMaster 去完成,这大大减轻了 ResourceManager 的压力,使得 ResourceManager更加轻量。NodeManager 负责对单个节点的资源管理,并将资源信息、Container 运行状态、健康状况等信息上报给 ResourceManager。ResourceManager 为了保证 Container 的利用率,会监控 Container,如果 Container 未在有限的时间内使用,ResourceManager 将命令NodeManager杀死Container,以便于将资源分配给其他任务。MRv2的核心不再是MapReduce框架,而是 YARN。在以 YARN 为核心的 MRv2 中,MapReduce 框架是可插拔的,完全可以替换为其他分布式计算模型实现,比如 Spark、Storm 等。

Hadoop MRv2 虽然解决了 MRv1 中的一些问题,但是由于对 HDFS 的频繁操作(包括计算结果持久化、数据备份、资源下载及 Shuffle 等)导致磁盘 I/O 成为系统性能的瓶颈,因此只适用于离线数据处理或批处理,而不能支持对迭代式、交互式、流式数据的处理

重点概念:离线处理,批处理,实时处理,流式处理

1.1.3、Spark的产生

Spark 看到 MRv2 的问题,对 MapReduce 做了大量优化,总结如下:

减少磁盘 I/O:随着实时大数据应用越来越多,Hadoop 作为离线的高吞吐、低响应框架已不能满足这类需求。Hadoop MapReduce 的 map 端将中间输出和结果存储在磁盘中,reduce 端又需要从磁盘读写中间结果,势必造成磁盘 IO 成为瓶颈。Spark 允许将 map 端的中间输出和结果存储在内存中,reduce 端在拉取中间结果时避免了大量的磁盘 I/O。Hadoop YARN 中的 ApplicationMaster 申请到 Container 后,具体的任务需要利用 NodeManager 从 HDFS 的不同节点下载任务所需的资源(如 Jar 包),这也增加了磁盘 I/O。Spark 将应用程序上传的资源文件缓冲到 Driver 本地文件服务的内存中,当 Executor 执行任务时直接从 Driver 的内存中读取,也节省了大量的磁盘 I/O。

增加并行度:由于将中间结果写到磁盘与从磁盘读取中间结果属于不同的环节,Hadoop 将它们简单的通过串行执行衔接起来。Spark 把不同的环节抽象为 Stage,允许多个 Stage 既可以串行执行,又可以并行执行。

避免重新计算:当 Stage 中某个分区的 Task 执行失败后,会重新对此 Stage 调度,但在重新调度的时候会过滤已经执行成功的分区任务,所以不会造成重复计算和资源浪费。

可选的 Shuffle 和排序:Hadoop MapReduce 在 Shuffle 之前有着固定的排序操作(只能对 key排字典顺序),而 Spark 则可以根据不同场景选择在 map 端排序或者 reduce 端排序。

灵活的内存管理策略:Spark 将内存分为堆上的存储内存、堆外的存储内存、堆上的执行内存、堆外的执行内存 4 个部分。Spark 既提供了执行内存和存储内存之间是固定边界的实现,又提供了执行内存和存储内存之间是“软”边界的实现。Spark 默认使用“软”边界的实现,执行内存或存储内存中的任意一方在资源不足时都可以借用另一方的内存,最大限度的提高资源的利用率,减少对资源的浪费。Spark 由于对内存使用的偏好,内存资源的多寡和使用率就显得尤为重要,为此 Spark 的内存管理器提供的 Tungsten 实现了一种与操作系统的内存Page 非常相似的数据结构,用于直接操作操作系统内存,节省了创建的 Java 对象在堆中占用的内存,使得 Spark 对内存的使用效率更加接近硬件。Spark 会给每个 Task 分配一个配套的任务内存管理器,对 Task 粒度的内存进行管理。Task 的内存可以被多个内部的消费者消费,任务内存管理器对每个消费者进行 Task 内存的分配与管理,因此 Spark 对内存有着更细粒度的管理。

基于以上所列举的优化,Spark 官网声称性能比 Hadoop 快 100 倍。即便是内存不足需要磁盘 I/O 时,其速度也是 Hadoop 的 10 倍以上。

Spark 会取代 Hadoop 么?
Spark 是 MapReduce 的替代方案,而且兼容 HDFS、Hive,可融入 Hadoop 的生态系统,以弥补 MapReduce 的不足。

2、Spark概念

官网:http://spark.apache.org/

Spark 是一种快速、通用、可扩展的大数据分析引擎
2009 年诞生于加州大学伯克利分校 AMPLab
2010 年开源
2013 年 6 月成为 Apache 孵化项目
2014 年 2 月成为 Apache 顶级项目

Spark 生态圈也称为 BDAS(伯克利数据分析栈),是伯克利 APMLab 实验室打造的,力图在算法(Algorithms)、机器(Machines)、人(People)之间通过大规模集成来展现大数据应用的一个平台。伯克利 AMPLab 运用大数据、云计算、通信等各种资源以及各种灵活的技术方案,对海量不透明的数据进行甄别并转化为有用的信息,以供人们更好的理解世界。该生态
圈已经涉及到机器学习、数据挖掘、数据库、信息检索、自然语言处理和语音识别等多个领域。

Spark 生态圈以 SparkCore 为核心,从 HDFS、Amazon S3 或者 HBase 等持久层读取数据,以MESOS、YARN 和自身携带的 Standalone 为资源管理器调度 Job 完成 Spark 应用程序的计算。这些应用程序可以来自于不同的组件,如 SparkShell/SparkSubmit 的批处理、SparkStreaming的实时处理应用、SparkSQL 的结构化数据处理/即席查询、BlinkDB 的权衡查询、MLlib/MLbase的机器学习、GraphX 的图处理和 PySpark 的数学/科学计算和 SparkR 的数据分析等等。

目前,Spark 生态系统已经发展成为一个包含多个子项目的集合,其中包含 Spark SQL、Spark Streaming、GraphX、MLlib 等子项目,Spark 是基于内存计算的大数据并行计算框架。Spark 基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将 Spark 部署在大量廉价硬件之上,形成集群。Spark 得到了众多大数据公司的支持,这些公司包括 Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的 Spark 已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用 GraphX 构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯 Spark 集群达到 8000 台的规模,是当前已知的世界上最大的 Spark集群。

3、Spark特点

3.1、Speed:快速高效

随着实时大数据应用越来越多,Hadoop 作为离线的高吞吐、低响应框架已不能满足这类需求。Hadoop MapReduce 的 Job 将中间输出和结果存储在 HDFS 中,读写 HDFS 造成磁盘 IO 成为瓶颈。Spark 允许将中间输出和结果存储在内存中,节省了大量的磁盘 IO。Apache Spark使用最先进的 DAG 调度程序,查询优化程序和物理执行引擎,实现批量和流式数据的高性能。同时 Spark 自身的 DAG 执行引擎也支持数据在内存中的计算。Spark 官网声称性能比Hadoop 快 100 倍。即便是内存不足需要磁盘 IO,其速度也是 Hadoop 的 10 倍以上。

3.2、Ease of Use:简洁使用

Spark 现在支持 Java、Scala、Python 和 R 等编程语言编写应用程序,大大降低了使用者的门槛。自带了 80 多个高等级操作符,允许在 Scala,Python,R 的 shell 中进行交互式查询,可以非常方便的在这些 Shell 中使用 Spark 集群来验证解决问题的方法。

3.3、Generally:全栈式数据处理

Spark 提供了统一的解决方案。Spark 统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。

支持批处理(Spark Core)。Spark Core 是 Spark 的核心功能实现,包括:SparkContext 的初始化(DriverApplication 通过 SparkContext 提交)、部署模式、存储体系、任务提交与执行、计算引擎等。

支持交互式查询(Spark SQL)。Spark SQL 是 Spark 来操作结构化数据的程序包,可以让我们使用 SQL 语句的方式来查询数据,Spark 支持多种数据源,包含 Hive 表,parquet 以及 JSON等内容。

支持流式计算(Spark Streaming)。与 MapReduce 只能处理离线数据相比,Spark 还支持实时的流计算。Spark 依赖 Spark Streaming 对数据进行实时的处理。

支持机器学习(Spark MLlib)。提供机器学习相关的统计、分类、回归等领域的多种算法实现。其一致的 API 接口大大降低了用户的学习成本。

支持图计算(Spark GraghX)。提供图计算处理能力,支持分布式, Pregel 提供的 API 可以解决图计算中的常见问题。

支持 Python 操作--PySpark

支持 R 语言--SparkR

3.4、Runs Everywhere:兼容

可用性高。Spark 也可以不依赖于第三方的资源管理和调度器,它实现了 Standalone 作为其内置的资源管理和调度框架,这样进一步降低了 Spark 的使用门槛,使得所有人都可以非常容易地部署和使用 Spark,此模式下的 Master 可以有多个,解决了单点故障问题。当然,此模式也完全可以使用其他集群管理器替换,比如 YARN、Mesos、Kubernetes、EC2 等。

丰富的数据源支持。Spark 除了可以访问操作系统自身的本地文件系统和 HDFS 之外,还可以访问 Cassandra、HBase、Hive、Tachyon 以及任何 Hadoop 的数据源。这极大地方便了已经使用 HDFS、HBase 的用户顺利迁移到 Spark。

Spark 支持的几种部署方案:
Mesos:Spark 可以运行在 Mesos 里面(Mesos 类似于 YARN 的一个资源调度框架)
Standalone:Spark 自己可以给自己分配资源(Master,Worker)
YARN:Spark 可以运行在 Hadoop 的 YARN 上面
Kubernetes:Spark 接收 Kubernetes 的资源调度

4、Spark应用场景

目前大数据处理场景有以下几个类型:

1、复杂的批量处理(Batch Data Processing),偏重点在于处理海量数据的能力,至于处理速度可忍受,通常的时间可能是在数十分钟到数小时;
2、基于历史数据的交互式查询(Interactive Query),通常的时间在数十秒到数十分钟之间
3、基于实时数据流的数据处理(Streaming Data Processing),通常在数百毫秒到数秒之间

目前对以上三种场景需求都有比较成熟的处理框架:
第一种情况可以用 Hadoop 的 MapReduce 来进行批量海量数据处理
第二种情况可以 Impala、Kylin 进行交互式查询
第三中情况可以用 Storm 分布式处理框架处理实时流式数据

以上三者都是比较独立,各自一套维护成本比较高,而 Spark 的出现能够一站式平台满意以上需求。
第一种情况使用 Spark Core 解决
第二种情况使用 Spark SQL 解决
第三种情况使用 Spark Streaming 解决

通过以上分析,总结 Spark 场景有以下几个:

1、Spark 是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小
2、由于 RDD 的特性,Spark 不适用那种异步细粒度更新状态的应用,例如 web 服务的存储或者是增量的 web 爬虫和索引。就是对于那种增量修改的应用模型不适合
3、数据量不是特别大,但是要求实时统计分析需求

典型行业应用场景:

1、Yahoo 将 Spark 用在 Audience Expansion 中的应用,进行点击预测和即席查询
2、淘宝技术团队使用了 Spark 来解决多次迭代的机器学习算法、高计算复杂度的算法等。应用于内容推荐、社区发现
3、腾讯大数据精准推荐借助 Spark 快速迭代的优势,实现了在“数据实时采集、算法实时训练、系统实时预测”的全流程实时并行高维算法,最终成功应用于广点通 PCTR 投放系统上。

4、优酷土豆将 Spark 应用于视频推荐(图计算)、广告业务,主要实现机器学习、图计算等迭代计算。

5、Spark集群安装

5.1、Spark版本选择

三大主要版本:
Spark-0.X
Spark-1.X(主要 Spark-1.3 和 Spark-1.6)
Spark-2.X(最新 Spark-2.3)

官网首页:http://spark.apache.org/downloads.html

或者其他镜像站:

https://mirrors.tuna.tsinghua.edu.cn/apache/spark/

https://www.apache.org/dyn/closer.lua/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz

https://www.apache.org/dyn/closer.lua/spark/

我们选择的版本:spark-2.3.0-bin-hadoop2.7.tgz

5.2、Spark编译

自行利用搜索引擎解决,可做可不做

官网:http://spark.apache.org/docs/latest/building-spark.html

5.3、Spark依赖环境

在官网文档中有一句话:

所以总结:Spark-2.3 需要依赖:Java 8+ Python 2.7+/3.4+ Scala 2.11R 3.1+

5.4、安装JDK

略。

5.5、安装Scala

略。

5.6、安装Spark

5.6.1、Spark分布式集群

Spark 也是一个主从架构的分布式计算引擎。主节点是 Master,从节点是 Worker。所以集群规划:

Server Master Worker
hadoop02
hadoop03
hadoop04
hadoop05

详细安装步骤:

1、上传下载好的 Spark 到集群中的一个节点,比如是 hadoop05
put c:/spark-2.3.0-bin-hadoop2.7.tgz

2、使用之前安装 hadoop 集群相同的 hadoop 用户安装 spark 集群,现在规划安装目录
/home/hadoop/apps/,解压缩进行安装:
tar -zxvf spark-2.3.0-bin-hadoop2.7.tgz -apps /home/hadoop/apps/

3、修改配置文件 spark-env.sh
进入 SPARK_HOME 的 conf 目录中,进行如下更改:
cd /home/hadoop/apps/spark-2.3.0-bin-hadoop2.7/conf
mv spark-env.sh.template spark-env.sh

然后修改 spark-env.sh:
export JAVA_HOME=/usr/local/java/jdk1.8.0_73
export SPARK_MASTER_HOST=hadoop02
export SPARK_MASTER_PORT=7077

4、修改配置文件 slave
进入 SPARK_HOME 的 conf 目录中,进行如下更改:
cd /home/hadoop/apps/spark-2.3.0-bin-hadoop2.7/conf
mv slaves.template slaves

在 slaves 的最后添加所有 worker 节点的主机名
hadoop02
hadoop03
hadoop04

hadoop05

5、将 spark 安装包 copy 到所有安装节点
scp -r spark-2.3.0-bin-hadoop2.7 hadoop02:/home/hadoop/apps/
scp -r spark-2.3.0-bin-hadoop2.7 hadoop03:/home/hadoop/apps/

scp -r spark-2.3.0-bin-hadoop2.7 hadoop04:/home/hadoop/apps/

6、配置环境变量
vim ~/.bashrc
export SPARK_HOME=/home/hadoop/apps/spark-2.3.0-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

source ~/.bashrc
千万注意:HADOOP_HOME/sbin 和 SPARK_HOME/sbin 目录中都包含 start-all.sh 和 stopall.sh 脚本。所以会有冲突。所以在使用有冲突的命令等要千万注意。
如果区分不清楚,那么不推荐配置环境变量

7、启动 Spark 集群
[hadoop@hadoop02 ~]$ cd /home/hadoop/apps/spark-2.3.0-bin-hadoop2.7
[hadoop@hadoop02 spark-2.3.0-bin-hadoop2.7]$ sbin/start-all.sh

8、验证集群启动是否成功
8.1、验证每个节点上的对应进程是否都启动正常

8.2、验证 Spark Web UI
打开浏览器访问:http://hadoop02:8080/
hadoop02 就是 master 所在的服务器

8.3、测试能否运行一个 Spark 例子程序
提交一个 spark 程序:
[hadoop@hadoop03 ~]$ run-example SparkPi 10
最后结果:

或者:

[hadoop@hadoop3 ~]$ ~/apps/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop02:7077 \
--executor-memory 512m \

--total-executor-cores 1 \
~/apps/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar \
100

--master spark://hadoop02:7077 指定 Master 的地址
--executor-memory 512m 指定每个 worker 可用内存为 500m
--total-executor-cores 1 指定整个集群使用的 CPU 核数为 1 个

8.4、进入 Spark Shell 提交 wordcount 程序:
数据准备:

进入 Spark Shell:
[hadoop@hadoop2 ~] spark-shell
或者
[hadoop@hadoop2 ~]$ ~/apps/spark-2.3.0-bin-hadoop2.7/bin/spark-shell \

> --master spark://hadoop02:7077 \
> --executor-memory 512m \
> --total-executor-cores 1

执行程序:sc.textFile("/home/hadoop/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)

如果是 Spark-1.6.3,那么启动的 spark-shell 如下:

注意:
如果启动 Spark Shell 时没有指定 master 地址,但是也可以正常启动 Spark Shell 和执行 SparkShell 中的程序,其实是启动了 Spark 的 local 模式,该模式仅在本机启动一个进程,没有与集群建立联系。

Spark Shell 中已经默认将 SparkContext 类初始化为对象 sc。用户代码如果需要用到,则直接应用 sc 即可。

Spark Shell 中已经默认将 Spark Session 类初始化为对象 spark。用户代码如果需要用到,则直接应用 spark 即可。

注意 Spark2 和 Spark1 的区别。

5.6.2、Spark高可用集群

在上面的 4.6.1 中的安装的 Spark 集群是一个普通的分布式集群,存在 master 节点的单点故障问题。Hadoop 在 2.X 版本开始,已经利用 ZooKeeper 解决了单点故障问题。同样的策略,Spark 也利用 ZooKeeper 解决 Spark 集群的单点故障问题。

集群规划:

Server Master Worker
hadoop02
hadoop03
hadoop04
hadoop05

具体步骤:

1、停止 Spark 集群

[hadoop@hadoop02 ~]$ cd /home/hadoop/apps/spark-2.3.0-bin-hadoop2.7
[hadoop@hadoop02 ~]$ sbin/stop-all.sh

2、配置 ZooKeeper 集群,并且启动好 ZooKeeper 集群

3、修改 SPARK_HOME/conf 目录中的 spark-env.sh 配置文件:
删掉:export SPARK_MASTER_HOST=hadoop02

增加一行:export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -
Dspark.deploy.zookeeper.url=hadoop02,hadoop03,hadoop04 -
Dspark.deploy.zookeeper.dir=/spark"

解释:

-Dspark.deploy.recoveryMode=ZOOKEEPER
说明整个集群状态是通过zookeeper来维护的,整个集群状态的恢复也是通过zookeeper来维护的。就是说用 zookeeper 做了 Spark 的 HA 配置,Master(Active)挂掉的话,Master(standby)要想变成 Master(Active)的话,Master(Standby)就要像 zookeeper 读取整个集群状态信息,然后进行恢复所有 Worker 和 Driver 的状态信息,和所有的Application 状态信息。

-Dspark.deploy.zookeeper.url=hadoop2:hadoop03:hadoop04
#将所有配置了 zookeeper,并且在这台机器上有可能做 master(Active)的机器都配置进来(我用了 3 台,就配置了 3 台)。

-Dspark.deploy.zookeeper.dir=/spark
这里的 dir 和 zookeeper 配置文件 zoo.cfg 中的 dataDir 的区别???-Dspark.deploy.zookeeper.dir 是保存 spark 的元数据,保存了 spark 的作业运行状态;zookeeper 会保存 spark 集群的所有的状态信息,包括所有的 Workers 信息,所有的Applactions 信息,所有的 Driver 信息,如果集群。

4、如果是高可用的 Hadoop 集群,一定要把 core-site.xml 和 hdfs-site.xml 放置在$SPARK_HOME/conf 目录中。然后所有节点要同步

5、同步配置文件
[hadoop@hadoop02 conf]$ scp -r spark-env.sh hadoop03:$PWD
[hadoop@hadoop02 conf]$ scp -r spark-env.sh hadoop04:$PWD
[hadoop@hadoop02 conf]$ scp -r spark-env.sh hadoop05:$PWD

6、启动集群
在 hadoop02 上执行:
[hadoop@hadoop02 ~]$ cd /home/hadoop/apps/spark-2.3.0-bin-hadoop2.7
[hadoop@hadoop02 spark-2.3.0-bin-hadoop2.7]$ sbin/start-all.sh

此时,通过观察启动日志,或者检查 hadoop04 上是否包含有 master 进程等都可以得知hadoop04 上的 master 并不会自动启动,所以需要手动启动,那么在 hadoop04 执行命令进行启动:

7、验证高可用
这是正常情况:
Hadoop02 是 spark 集群的 active master 节点
Hadoop04 是 spark 集群的 standby master 节点

通过杀掉 active master 观察是否 hadoop04 能启动切换为 active 状态。结果:

5.6.3、配置Spark HistoryServer

具体步骤:https://blog.csdn.net/qq_1018944104/article/details/85170496

6、Spark基本使用

6.1、执行第一个Spark程序

利用 Spark 自带的例子程序执行一个求 PI(蒙特卡洛算法)的程序:

$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop02:7077 \
--executor-memory 512m \

-total-executor-cores 2 \
$SPARK_HOME/examples/jars/spark-examples_2.11-2.3.0.jar \
100

6.2、启动Spark Shell

启动命令:

$SPARK_HOME/bin/spark-shell \
--master spark://hadoop02:7077,hadoop04:7077 \
--executor-memory 512M \
--total-executor-cores 2

注意上图中的 cores 参数,是 0,那么以后这个 spark shell 中运行的代码是不能执行成功的。千万注意。必要要把 cpu cores 和 memory 设置合理
1、 executor memory 不能超过虚拟机的内存
2、 cpu cores 不要超过 spark 集群能够提供的总 cpu cores,否则会使用全部。最好不要使用全部。否则其他程序由于没有 cpu core 可用,就不能正常运行。

参数说明:
--master spark://hadoop02:7077 指定 Master 的地址
--executor-memory 2G 指定每个 worker 可用内存为 2G
--total-executor-cores 2 指定整个集群使用的 cup 核数为 2 个

注意:
如果启动 spark shell 时没有指定 master 地址,但是也可以正常启动 spark shell 和执行 sparkshell 中的程序,其实是启动了 spark 的 local 模式,该模式仅在本机启动一个进程,没有与集群建立联系。

Spark-2.X:
Spark Shell 中已经默认将 SparkContext 类初始化为对象 sc。
Spark Shell 中已经默认将 SparkSession 类初始化为对象 spark。
用户代码如果需要用到,则直接应用 sc,spark 即可

Spark-1.X:
Spark Shell 中已经默认将 SparkContext 类初始化为对象 sc。
Spark Shell 中已经默认将 SQLContext 类初始化为对象 sqlContext。
用户代码如果需要用到,则直接应用 sc,sqlContext 即可

6.3、在Spark Shell中编写WordCount程序

在提交 WordCount 程序之前,先在 HDFS 集群中的准备一个文件用于做单词统计:

words.txt 内容如下:

hello huangbo
hello xuzheng
hello wangbaoqiang

把该文件上传到 HDFS 文件系统中:

[hadoop@hadoop05 ~]$ hadoop fs -mkdir -p /spark/wc/input
[hadoop@hadoop05 ~]$ hadoop fs -put words.txt /spark/wc/input

在 Spark Shell 中提交 WordCOunt 程序:

sc.textFile("hdfs://myha01/spark/wc/input/words.txt").flatMap(_.split("
")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://myha01/spark/wc/output")

执行最后的结果:

说明:
sc 是 SparkContext 对象,该对象时提交 spark 程序的入口
textFile("hdfs://myha01/spark/wc/input/words.txt")是从 HDFS 中读取数据
flatMap(_.split(" "))先 map 再压平
map((_,1))将单词和1 构成元组(word,1)
reduceByKey(_+_)按照 key 进行 reduce,并将 value 累加
saveAsTextFile("hdfs://myha01/spark/wc/output")将结果写入到 HDFS 对应输出目录中

6.4、在IDEA中编写WordCount程序

Spark Shell 仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在 IDEA 中编制程序,然后打成 jar 包,然后提交到集群,最常用的是创建一个 Maven 项目,利用 Maven来管理 jar 包的依赖。

1、创建一个 IDEA 的 maven 项目

2、选择 Maven 项目,然后点击 next

3、填写 maven 的 GAV,然后点击 next

4、填写项目名称,然后点击 finish

5、创建好 maven 项目后,点击 Enable Auto-Import

6、配置 maven 的 pom.xml 文件

<xml version="1.0" encoding="UTF-8">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.mazh.spark</groupId>
    <artifactId>Spark_WordCount</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.0</spark.version>
        <hadoop.version>2.7.5</hadoop.version>
        <scala.compat.version>2.11</scala.compat.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>
    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excluudes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

7、将 src/main/java 和 src/test/java 分别修改成 src/main/scala 和 src/test/scala,与 pom.xml中的配置保持一致

8、新建一个 Scala Class 类型为 Object,编写 WordCount 程序

package com.mazh.spark
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
    def main(args: Array[String]): Unit = {
        // 创建一个 SparkConf 对象,并设置程序的名称
        val conf = new SparkConf().setAppName("WordCount")
        // 创建一个 SparkContext 对象
        val sc = new SparkContext(conf)
        // 读取 HDFS 上的文件构建一个 RDD
        val fileRDD = sc.textFile(args(0))
        // 构建一个单词 RDD
        val wordAndOneRDD = fileRDD.flatMap(_.split(" ")).map((_, 1))
        // 进行单词的聚合
        val resultRDD = wordAndOneRDD.reduceByKey(_+_)
        // 对 resultRDD 进行单词出现次数的降序排序,然后写出结果到 HDFS
        resultRDD.sortBy(_._2, false).saveAsTextFile(args(1))
        sc.stop()
    }
}

9、使用 maven 进行打包
点击右侧的 maven project 选项。先点击 clean 再点击 package 进行打包

10、启动 HDFS 集群和 Spark 集群
启动操作略

11、上传打好的 jar 包到 spark 集群中的用来提交任务的节点
put c:/Spark_WordCount-1.0-SNAPSHOT.jar
执行命令:

$SPARK_HOME/bin/spark-submit \
--class com.mazh.spark.WordCount \
--master spark://hadoop02:7077 \
--executor-memory 512m \
--total-executor-cores 4 \
/home/hadoop/Spark_WordCount-1.0-SNAPSHOT.jar \
hdfs://myha01/spark/wc/input \
hdfs://myha01/spark/wc/output_11

12、验证结果

7、修改Spark的日志级别

7.1、临时修改

7.2、永久修改

从我们运行的 spark 程序运行的情况来看,可以看到大量的 INFO 级别的日志信息。淹没了我们需要运行输出结果。可以通过修改 Spark 配置文件来 Spark 日志级别。

以下是详细步骤:

第一步:先进入 conf 目录
[hadoop@hadoop05 conf]$ cd $SPARK_HOME/conf

第二步:准备 log4j.properties
[hadoop@hadoop05 conf]$ cp log4j.properties.template log4j.properties

第三步:配置日志级别:
把 INFO 改成你想要的级别:主要有 ERROR, WARN, INFO, DEBUG 几种

8、Spark的WordCount

8.1、Scala版本的WordCount

package com.mazh.spark
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
    def main(args: Array[String]): Unit = {
        // 创建一个 SparkConf 对象,并设置程序的名称
        val conf = new SparkConf().setAppName("WordCount")
        conf.setMaster("local")
        // 创建一个 SparkContext 对象
        val sc = new SparkContext(conf)
        // 读取 HDFS 上的文件构建一个 RDD
        val fileRDD = sc.textFile("hdfs://myha01/spark/wc/input")
        // val fileRDD = sc.textFile(args(0))
        // 构建一个单词 RDD
        val wordAndOneRDD = fileRDD.flatMap(_.split(" ")).map((_, 1))
        // 进行单词的聚合
        val resultRDD = wordAndOneRDD.reduceByKey(_+_)
        // 对 resultRDD 进行单词出现次数的降序排序,然后写出结果到 HDFS
        resultRDD.sortBy(_._2, false).saveAsTextFile("hdfs://myha01/spark/wc/output_spark33")
        // resultRDD.sortBy(_._2, false).saveAsTextFile(args(1))
        sc.stop()
    }
}

8.2、Java7版本的WordCount

package com.mazh.spark.wc;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class JavaWordCount {
    public static void main(String[] args){
        if(args.length!=2){
            System.out.println("Usage:JavaWordCount<input><output>");
            System.exit(1);
        }
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName(JavaWordCount.class.getSimpleName());
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD<String> line = jsc.textFile(args[0]);
        //切割压平 flatMap() 两个参数,一个输入类型,一个输出类型
        JavaRDD<String> jrdd1 = line.flatMap(new FlatMapFunction<String, String>() {
            @Override
             public Iterator<String> call(String s) throws Exception {
                 //该方法的返回值类型是 Iterator,需要把 Array 类型的结果转换为迭代器类型的
                 return Arrays.asList(s.split(" ")).iterator();
             }
        });
        //和 1 组合成元组 mapToPair() 第一个参数,输入数据类型,第二个参数是元组的 key 类型,第三个参数是元组的 value 类型
        JavaPairRDD<String, Integer> javaPairRDD = jrdd1.mapToPair(new PairFunction<String, String, Integer>() { 
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                 return new Tuple2<String, Integer>(s, 1);
            }
        });
        //分组聚合 reduceByKey() (a,b)=>a+b 第三个参数:返回值的类型
        JavaPairRDD<String, Integer> result = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                 return v1 + v2;
            }
        });
        //先在本地测试一下
         /* result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
             @Override
             public void call(Tuple2<String, Integer> tuple) throws Exception {
                 System.out.println(tuple);
             }
         });*/
        //可以进行排序
        JavaPairRDD<Integer, String> res1 = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
                 return t.swap();
            }
        });
        //排序,默认是升序,如果需要降序,参数 false
        JavaPairRDD<Integer, String> res2 = res1.sortByKey(false);
        JavaPairRDD<String, Integer> finalRes = res2.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
                 return t.swap();
            }            
        });
        //保存
        finalRes.saveAsTextFile(args[1]);
        //释放资源
        jsc.close();
    }
}

8.3、Java8 Lambda表达式版本的WordCount

package com.mazh.spark.wc;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class JavaLambdaWordCount {
    public static void main(String[] args){
        if(args.length!=2){
            System.out.println("Usage JavaLambdaWordCount<input><output>");
            System.exit(1);
        }
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName(JavaLambdaWordCount.class.getSimpleName());
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //读取数据
        JavaRDD<String> jrdd = jsc.textFile(args[0]);
        //切割压平
        JavaRDD<String> jrdd2 = jrdd.flatMap(t -> Arrays.asList(t.split("")).iterator());
        //和 1 组合
        JavaPairRDD<String, Integer> jprdd = jrdd2.mapToPair(t -> new Tuple2<String, Integer>(t, 1));
        //分组聚合
        JavaPairRDD<String, Integer> res = jprdd.reduceByKey((a, b) -> a + b);
        //保存
        res.saveAsTextFile(args[1]);
        //释放资源
        jsc.close();
    }
}


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark概念解析 下一篇spark简述

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }