TOP

Spark WordCount 两种运行方式
2019-02-12 01:21:54 】 浏览:67
Tags:Spark WordCount 运行 方式

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/xsg448457111/article/details/82990844

用Scala编写WordCount程序,在IDEA中可以通过(1)新建maven项目,在pom文件中引入Spark,Scala,Hadoop相关的依赖包来开发;(2)新建普通的Scala项目,然后将相关的jar包导入到项目中,同样可以来开发。

一般来说,采用maven的方式来进行开发会比较方便,Spark WordCount的pom文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.whua</groupId>
  <artifactId>word-count</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <spark.version>2.3.0</spark.version>
      <scala.version>2.11.0</scala.version>
      <hadoop.version>2.7.0</hadoop.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

    <dependencies>
      <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>2.3.0</version>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
      </dependency>
    </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

Spark WordCount 的源码如下:

package com.whua

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author: whua 
  * @create: 2018/10/09 19:37
  */
object SparkWC {
  def main(args: Array[String]): Unit = {
    //设置Spark配置
    val conf: SparkConf = new SparkConf().setAppName("SparkWC") //.setMaster("local[*]")
    //创建Spark上下文
    val sc: SparkContext = new SparkContext(conf)
    //读取输入数据
    val lines = sc.textFile(args(0))
    //处理数据
    val words: RDD[String] = lines.flatMap(_.split(" "))
    val pair: RDD[(String, Int)] = words.map((_, 1))
    val reduced: RDD[(String, Int)] = pair.reduceByKey(_ + _)
    val ans = reduced.sortBy(_._2, false)

    ans.saveAsTextFile(args(1))
    //    println(ans.collect().toBuffer)

    //关闭
    sc.stop()
  }
}

在IDEA中执行需要编辑运行配置,这里主要是给一个输入文件的路径,为hdfs上的/input路径。

在hdfs上的/input路径中要保证有输入数据:

运行方式

1、在IDEA中运行

直接在IDEA中运行程序,可以看到程序运行成功:

2、打成jar包,上传到集群上运行

点击Maven projects,然后双击package,IDEA自动开始打包,过程结束之后,会生成一个target目录,其中会生成一个jar包,然后将jar包上传在集群上。

在Spark master上,进入到$SPARK_HOME/bin目录下面,里面有一个spark-submit的脚本,我们将使用这个脚本来进行任务提交。提交任务的命令如下:

 ./spark-submit --class com.whua.SparkWC --master spark://master:7077 --executor-memory 1g --total-executor-cores 2 /home/whua/Documents/word-count-1.0-SNAPSHOT.jar hdfs://master:9000/input hdfs://master:9000/output

命令中的参数分别为:运行的类名(com.whua.SparkWC),运行任务的集群的master(--master spark://master:7077)(一个机器上可以有多个集群的进程),每个机器的内存(--executor-memory 1g ),运行改任务总共的最大核数(--total-executor-cores 2),jar包的路径(/home/whua/Documents/word-count-1.0-SNAPSHOT.jar),输入数据的路径和输出数据的路径。参考链接

作业开始运行,运行完成之后如下图,可以看到,和我们在命令中配置的参数相同。

同时,在hdfs上可以看到输出的数据,因为有三个节点,所以输出的结果分成了三个part。

至此,Spark WordCount程序及其两种运行方式介绍完毕。


Spark WordCount 两种运行方式 https://www.cppentry.com/bencandy.php?fid=116&id=208035

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇基于Spark MLlib的余弦相似度计算.. 下一篇Spark 之 内置函数 JSON