设为首页 加入收藏

TOP

spark的认识(一)
2019-03-14 13:06:13 】 浏览:20
Tags:spark 认识
版权声明:未经本人同意不得转载和商用。 https://blog.csdn.net/Perfecti/article/details/88545466

spark的认识(一)

1、主要内容

  • 1、掌握spark相关概念

  • 2、掌握搭建一个spark集群

  • 3、掌握编写简单的spark应用程序

2、spark概述

2.1 spark是什么

  • Apache Spark is a unified analytics engine for large-scale data processing.

  • spark是一个针对于大规模数据处理的统一分析引擎。

    spark是一个基于内存计算的大数据处理框架,处理非常快。但是这里仅仅只涉及到数据的计算,并没有涉及到数据的存储,后期我们想要通过spark来处理数据,就需要跟外部的数据源进行对接。比如要处理hdfs上的数据。

2.2 为什么要学习spark

  • spark计算速度比mapreduce快很多。

2.3 spark的四大特性

  • 1、速度快

    • spark比mapreduce在内存中快100倍,比mapreduce在磁盘中快10倍

    • 快的主要2个原因

      (1) spark在处理任务的时候,job的输出结果可以保存在内存中,而mapreduce它在处理任务的时候,job的输出结果是只能够保存在磁盘里面。后续又有其他的job需要依赖于前面job的输出结果,spark中可以直接从内存中获取得到,而mapreduce必须要进行大量的磁盘io操作才能够获取得到,这样一来,spark中的job就可以大大减少磁盘的io操作,最后提升性能。
      
      select name age from (select * from user where age >30 and age <40)
      
      
      (2) mapreduce任务它是以进程的方式运行在yarn集群中,比如一个mapreduce任务有100个MapTask,这个时候要处理这100个MapTask就需要启动100个进程。spark任务它是以线程的方式运行在进程中。比如一个spark的任务有100个MapTask,这个时候要处理这100个MapTask可以只启动一个进程,在这一个进程中运行100个线程就可以了。这里启动一个进程跟启动一个线程它的代价肯定是不一样。启动一个进程比启动一个线程需要的时间和资源都是大大增加。spark任务需要的时间和资源是减少了,任务的性能得到了提升。
  • 2、易用性

    • 可以快速写一个spark应用程序通过java、scala、python、R、sql语言来进行代码开发

  • 3、通用性

    • spark框架就是一个生态系统,可以通过它的一些子项目sparksql、sparkStreaming、Mlib、Graphx来使用到不同的应用场景。

  • 4、兼容性

    • spark程序就是一个计算任务的程序,哪里可以给当前这个任务提供计算的资源,我们就可以把spark程序提交到哪里去运行。

      • standalone

        • 它是spark集群自带的集群模式,整个任务的资源分配由Master负责。

      • yarn

        • 可以把spark程序提交到yarn中去运行,整个任务的资源分配由ResourceManager负责

      • mesos

        • 它是一个apache类似于yarn的资源调度框架

3、spark集群安装部署

  • 1、下载对应spark版本安装包

  • 2、规划安装目录

    • /export/servers

  • 3、上传安装包到服务器中

  • 4、解压安装包到指定的规划目录

    • tar -zxvf spark-2.1.3-bin-hadoop2.7.tgz -C /export/servers

  • 5、重命名解压目录

    • mv spark-2.1.3-bin-hadoop2.7 spark

  • 6、修改配置文件

    • 进入到spark/conf目录下

      • vim spark-env.sh (mv spark-env.sh.template spark-env.sh)

        #添加java环境变量
        export JAVA_HOME=/export/servers/jdk
        #配置master的IP地址
        export SPARK_MASTER_HOST=node1
        #配置master的端口
        export SPARK_MASTER_PORT=7077
      • vim slaves (mv slaves.template slaves)

        #指定哪些节点是worker(把默认的值localhost干掉)
        node2
        node3
  • 7、配置spark的环境变量

    • vim /etc/profile

      export SPARK_HOME=/export/servers/spark
      export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
  • 8、分发spark目录和spark环境变量到其他节点

    scp -r spark node2:/export/servers
    scp -r spark node3:/export/servers
    
    scp /etc/profile node2:/etc
    scp /etc/profile node3:/etc
  • 9、让所有spark节点的环境变量生效

    • 在所有spark节点上执行

      • source /etc/profile

4、spark集群启动和停止

  • 1、启动spark集群

    • 可以在主节点上进入到spark/sbin 执行

      • ./start-all.sh

  • 2、停止spark集群

    • 可以在主节点上进入到spark/sbin 执行

      • ./stop-all.sh

5、spark集群web管理界面

  • 1、先启动spark集群

  • 2、访问地址

    • master主机名:8080

    • 进去之后可以看到整个spark集群所有信息,这些信息主要包括:整个spark集群所有的资源信息,已经使用的spark集群资源信息、还剩的spark集群资源信息、还有worker相关的信息、还有正在运行的任务信息、还有已经完成的任务信息。

6、基于zookeeper构建spark高可用集群

  • 1、需要安装zk集群

  • 2、修改配置文件

    • vim spark-env.sh

      #需要注释掉手动指定的masterip地址配置
      #这个时候整个集群就有很多个master,活着的master不在由于我们自己手动指定,而是通过zk选举产生
      #export SPARK_MASTER_HOST=node1
      
      #引入zk 构建spark高可用集群
      export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER  -Dspark.deploy.zookeeper.url=node1:2181,n
      ode2:2181,node3:2181  -Dspark.deploy.zookeeper.dir=/spark"
  • 3、分发spark-env.sh 到其他节点

  • 4、先启动zk集群

  • 5、启动spark集群

    • 1、可以在任意一台机器启动(前提条件:就是需要实现所有机器两两之间它的ssh免密登陆)

      • start-all.sh

      • 在哪里执行上面这个脚本,它就会在当前机器启动一个Master进程

      • 整个spark集群来说worker进程由slaves文件决定

  • 6、可以在其他机器单独启动master

    • start-master.sh

  • 7、高可用说明

    引入zk之后,整个spark集群就有很多个master,其中有一个master被zk选举成活着的master,其他多个master都处于standBy状态,启动完成之后这个时候它会把整个spark集群的元数据信息通过zk中的节点进行保存, 活着的master它会提供服务,处于standBy的master不会提供服务。
    
    当前活着的master挂掉了,首先zk会感知到,接下来会在所有处于standBy中的master进行选举,然后再来产生一个活着的新的Master,它会读取zk中保存集群的元数据信息,最后恢复到上一次master挂掉的状态。整个恢复的过程需要一定的时间,一般就是1-2分钟。
    
    
    当前整个spark集群中的master挂掉了,挂掉了之后到恢复结束整个过程对任务有什么影响?
    (1)对于当前正在运行的任务,由于这个任务既然可以运行,说明它已经获取得到了任务计算需要的资源,对于它来有资源就可以继续运行,它是不受任何影响。
    
    (2)对于后面要提交的任务,由于没有这样一个活着的master这个角色,整个任务就获取不到资源,它就无法运行。
    
    

7、spark角色介绍

  • 1、Driver

    • 它会运行客户端写好的main方法,并且会创建SparkContext对象,该对象是所有spark程序的执行入口。

  • 2、Application

    • 它就是一个计算任务的应用程序,它是包括了Driver端的代码以及整个任务在运行的时候需要的资源信息。

  • 3、Cluster Manager

    • 它就是一个可以获取到外部资源的服务

      • standAlone

        • 它是spark自带的集群模式,整个任务的资源分配由Master负责

      • yarn

        • spark程序可以提交到yarn去运行,整个任务的资源分配由ResourceManager负责

      • mesos

        • 它是一个apache开源类似于yarn的资源调度平台

  • 4、Master

    • 它就是整个spark集群的老大,它负责任务资源的分配

  • 5、Worker

    • 它就是整个spark集群的小弟,它会负责任务的真正计算。

  • 6、Executor

    • 它就是一个进程,它会在worker节点来启动进程去运行任务

  • 7、Task

    • 它就是一个线程,它会以线程的方式运行在worker节点的executor进程中

8、初识spark程序

  • 1、以普通模式提交任务到spark集群中运行(就是我们已经知道了整个spark集群活着的master地址)

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master spark://node1:7077 \
    --executor-memory 1G \
    --total-executor-cores 2 \
    examples/jars/spark-examples_2.11-2.1.3.jar \
    10
  • 2、以高可用模式提交任务到spark集群中运行( 就是我们并不知道哪一个master是活着的master)

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node1:7077,node2:7077,node3:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
examples/jars/spark-examples_2.11-2.1.3.jar \
10

在高可用模式下提交任务,需要把所有的master都罗列出来 spark://node1:7077,node2:7077,node3:7077
整个任务会轮训master列表,最后找到活着的master,然后向这个活着的master申请资源。

9、spark-shell使用

9.1 通过spark-shell --master local[N]读取本地数据文件实现单词统计

  • --master local[N]

  • local 表示本地运行,跟spark集群没有任何关系,N表示一个正整数,在这里表示本地采用N个线程去跑任务

  • 提交脚本

    • spark-shell --master local[2]

      sc.textFile("file:///root/words.txt").flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).collect
      
      sc.textFile("file:///root/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

9.2 通过spark-shell --master local[N]读取HDFS上数据文件实现单词统计

  • 提交脚本

    • spark-shell --master local[2]

    • spark整合HDFS

      • vim spark-env.sh

        #spark整合hdfs
        export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop
    sc.textFile("hdfs://node1:9000/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
    
    sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

9.3 通过 spark-shell 指定spark集群中活着的master

  • 提交脚本

    • spark-shell --master spark://node1:7077 --executor-memory 1g --total-executor-cores 2

      sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/out")

10、基于IDEA开发spark的程序

  • 1、引入pom依赖

     <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>2.11.8</version>
    </dependency>

10.1 利用scala语言实现spark的wordcount程序(本地运行)

  • 1、代码开发

    package demo.spark
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    //todo:利用scala语言去开发spark的wordcount程序
    object WordCount {
      def main(args: Array[String]): Unit = {
       //1、创建SparkConf对象 设置applicationName和master地址  local[2]表示本地采用2个线程运行任务
        val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
    
       //2、创建SparkContext对象 它是所有spark程序的入口,它内部会创建DAGScheduler和TaskScheduler
        val sc = new SparkContext(sparkConf)
    
       //3、读取数据文件
        val data: RDD[String] = sc.textFile("E:\\words.txt")
    
       //4、切分每一行,获取所有的单词
       val words: RDD[String] = data.flatMap(_.split(" "))
    
       //5、每个单词计为1
        val wordAndOne: RDD[(String, Int)] = words.map((_,1))
    
       //6、相同单词出现的1累加
        val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
    
         //按照单词出现的次数降序排列  sortBy默认是升序,给false就是降序
         val sortRDD: RDD[(String, Int)] = result.sortBy(x=>x._2,false)
    
       //7、收集打印
        val finalResult: Array[(String, Int)] = sortRDD.collect()
        finalResult.foreach(println)
    
       //8、关闭sparkcontext
        sc.stop()
      }
    }
    

10.2 利用scala语言实现spark的wordcount程序(集群运行)

  • 1、代码开发

    package demo.spark
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    //todo:利用scala语言开发spark的wordcount(集群运行)
    object WordCount_Online {
     def main(args: Array[String]): Unit = {
      //1、创建SparkConf对象 设置applicationName
      val sparkConf: SparkConf = new SparkConf().setAppName("WordCount_Online")
    
      //2、创建SparkContext对象 它是所有spark程序的入口,它内部会创建DAGScheduler和TaskScheduler
      val sc = new SparkContext(sparkConf)
    
      //3、读取数据文件
      val data: RDD[String] = sc.textFile(args(0))
    
      //4、切分每一行,获取所有的单词
      val words: RDD[String] = data.flatMap(_.split(" "))
    
      //5、每个单词计为1
      val wordAndOne: RDD[(String, Int)] = words.map((_,1))
    
      //6、相同单词出现的1累加
      val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
    
      //7、把结果数据保存到hdfs上
      result.saveAsTextFile(args(1))
    
      //8、关闭sparkcontext
      sc.stop()
      }
    }
    
    • 2、打成jar包提交到集群运行

      spark-submit --master spark://node1:7077 --class cn.itcast.spark.WordCount_Online --executor-memory 1g --total-executor-cores 4 original-spark_class12-1.0-SNAPSHOT.jar /words.txt  /out_spark

10.3 利用java 语言开发spark的wordcount程序(本地运行)

  • 1、代码开发

    package demo.spark;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    
    //todo:利用java实现spark的wordcount程序
    public class WordCount_Java {
      public static void main(String[] args) {
        //1、创建SparkConf对象
        SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]");
    
        //2、创建JavaSparkContext对象
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    
        //3、读取数据文件
        JavaRDD<String> data = jsc.textFile("E:\\words.txt");
    
        //4、切分每一行,获取所有的单词
        JavaRDD<String> words = data.flatMap(new FlatMapFunction<String, String>() {
          public Iterator<String> call(String line) throws Exception {
            String[] words = line.split(" ");
            return Arrays.asList(words).iterator();
           }
         });
    
        //5、每个单词计为1
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
          public Tuple2<String, Integer> call(String word) throws Exception {
            return new Tuple2<String, Integer>(word, 1);
           }
         });
    
        //6、相同单词出现的1累加
        JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
          public Integer call(Integer v1, Integer v2) throws Exception {
            return v1 + v2;
           }
         });
    
        //按照单词出现的次数降序 (单词,次数)------->(次数,单词).sortByKey----->(单词,次数)
    
        JavaPairRDD<Integer, String> reverseJavaPairRDD = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
          public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
            return new Tuple2<Integer, String>(t._2, t._1);
           }
         });
    
        JavaPairRDD<String, Integer> sortedJavaPairRDD = reverseJavaPairRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
          public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
            return new Tuple2<String, Integer>(t._2, t._1);
           }
         });
    
        //7、收集打印
        List<Tuple2<String, Integer>> finalResult = sortedJavaPairRDD.collect();
        for (Tuple2<String, Integer> tuple : finalResult) {
          System.out.println("单词:"+ tuple._1+" 次数:"+tuple._2);
         }
    
        //8、关闭jsc
          jsc.stop();
       }
    }
    

更多内容请关注我的公众号:


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark与深度学习框架——H2O、dee.. 下一篇史上最简单的spark教程第三章-深..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }