设为首页 加入收藏

TOP

Spark WordCount 两种运行方式
2019-02-12 01:21:54 】 浏览:33
Tags:Spark WordCount 运行 方式
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/xsg448457111/article/details/82990844

用Scala编写WordCount程序,在IDEA中可以通过(1)新建maven项目,在pom文件中引入Spark,Scala,Hadoop相关的依赖包来开发;(2)新建普通的Scala项目,然后将相关的jar包导入到项目中,同样可以来开发。

一般来说,采用maven的方式来进行开发会比较方便,Spark WordCount的pom文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.whua</groupId>
  <artifactId>word-count</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <spark.version>2.3.0</spark.version>
      <scala.version>2.11.0</scala.version>
      <hadoop.version>2.7.0</hadoop.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name&g
		    

t; <url>http://scala-tools.org/repo-releases</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project>

Spark WordCount 的源码如下:

package com.whua

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author: whua 
  * @create: 2018/10/09 19:37
  */
object SparkWC {
  def main(args: Array[String]): Unit = {
    //设置Spark配置
    val conf: SparkConf = new SparkConf().setAppName("SparkWC") //.setMaster("local[*]")
    //创建Spark上下文
    val sc: SparkContext = new SparkContext(conf)
    //读取输入数据
    val lines = sc.textFile(args(0))
    //处理数据
    val words: RDD[String] = lines.flatMap(_.split(" "))
    val pair: RDD[(String, Int)] = words.map((_, 1))
    val reduced: RDD[(String, Int)] = pair.reduceByKey(_ + _)
    val ans = reduced.sortBy(_._2, false)

    ans.saveAsTextFile(args(1))
    //    println(ans.collect().toBuffer)

    //关闭
    sc.stop()
  }
}

在IDEA中执行需要编辑运行配置,这里主要是给一个输入文件的路径,为hdfs上的/input路径。

在hdfs上的/input路径中要保证有输入数据:

运行方式

1、在IDEA中运行

直接在IDEA中运行程序,可以看到程序运行成功:

2、打成jar包,上传到集群上运行

点击Maven projects,然后双击package,IDEA自动开始打包,过程结束之后,会生成一个target目录,其中会生成一个jar包,然后将jar包上传在集群上。

在Spark master上,进入到$SPARK_HOME/bin目录下面,里面有一个spark-submit的脚本,我们将使用这个脚本来进行任务提交。提交任务的命令如下:

 ./spark-submit --class com.whua.SparkWC --master spark://master:7077 --executor-memory 1g --total-executor-cores 2 /home/whua/Documents/word-count-1.0-SNAPSHOT.jar hdfs://master:9000/input hdfs://master:9000/output

命令中的参数分别为:运行的类名(com.whua.SparkWC),运行任务的集群的master(--master spark://master:7077)(一个机器上可以有多个集群的进程),每个机器的内存(--executor-memory 1g ),运行改任务总共的最大核数(--total-executor-cores 2),jar包的路径(/home/whua/Documents/word-count-1.0-SNAPSHOT.jar),输入数据的路径和输出数据的路径。参考链接

作业开始运行,运行完成之后如下图,可以看到,和我们在命令中配置的参数相同。

同时,在hdfs上可以看到输出的数据,因为有三个节点,所以输出的结果分成了三个part。

至此,Spark WordCount程序及其两种运行方式介绍完毕。


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇基于Spark MLlib的余弦相似度计算.. 下一篇Spark 之 内置函数 JSON

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }