设为首页 加入收藏

TOP

简单Spark作业编写与提交执行
2019-02-15 01:28:47 】 浏览:50
Tags:简单 Spark 作业 编写 提交 执行
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u012842205/article/details/53160171

转载请注明出处: http://blog.csdn.net/u012842205/article/details/53160171

声明: 关于以上环境的搭建,配置,本文不作叙述。直接进入正题。本文实例是编写一个简单的spark作业,用于统计输入文件的长度总和。并使用spark-submit脚本提交给spark计算平台运行。


1 环境

操作系统: Ubuntu 16.04 x86_64

Spark: Apache spark 2.0.0-bin-hadoop2.7

hadoop: Apache Hadoop 2.7.1

JDK: Oracle JDK 1.8.1_101

Scala环境: 2.11.8

IDE: scala ide 4.4.1


这里说明一下。笔者亲自尝试过,只要jar文件引用全了,只运行Master为local(非集群方式,本地运行)的spark作业,是可以不需要安装hadoop和spark环境的。本文的spark示例作业所需要的包见下文(通过maven依赖插件dependency获取到)。而没有spark环境意味着没有$SPARK_HOME这个环境变量,也可能会报错,我之前报错出现端口绑定异常,在程序中加入了spark.driver.host变量,设置为localhost即可。部分此错误堆栈如下:

16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
16/01/04 13:49:40 ERROR SparkContext: Error initializing SparkContext.
java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries!
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:444)
    at sun.nio.ch.Net.bind(Net.java:436)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:125)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:485)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1089)
    at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:430)
    at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:415)
    at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:903)
    at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:198)
    at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:348)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)


2 Spark作业编写

简单叙述下Spark作业编写步骤:

2.1 初始化SparkContext

SparkContext是所有Spark程序运行的核心,即可以看作是一个Spark库调用的上下文句柄。以下是初始化过程,通过SparkConf辅助SparkContext进行初始化,当然,也可以直接new一个SparkContext,具体可以参看Spark API (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext)。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

// ...

val sparkConf = new SparkConf ()
  .setMaster ("local[4]")
  .setAppName ("dt.ez.sparkhbase.test.LengthCount")
  
val sc = new SparkContext (sparkConf)
注意,此处设置了Master(用于指定使用什么方式将作业提交给Spark平台),这里使用的是local方式,除此之外还有standalong、 yarn、 mesos等方式。在使用spark-submit提交作业时,有个参数master,若代码中配置了master,则脚本的master参数不起作用,所以当要用脚本提交代码时,最好将这行setMaster注释掉。


2.2 输入数据

众所周知,Spark中所有数据为达到并行计算都使用一个叫RDD的弹性分布式数据集合(Resilient Distributed Dataset),所有输入数据都将被读取到这个数据集合中,并完成相应的转换(如将RDD中每个元素的结构改变),运算(对RDD中每个元素进行一些计算)。输入元素也将被读入到RDD中。本文中spark程序使用的数据是HDFS文件系统中的文件,spark已经有现成的API供调用: SparkContext.textFile (),这个函数将产生RDD[String],读出文件的每一行,作为RDD中的一个元素。这里提一句,RDD是惰性求值,也就是说,这里创建的RDD也只是产生了一个数据依赖关系图(DAG,有向无环图),这个图描述了获取输入的整个过程,当真正产生RDD计算时,才会真正从文件中读取数据。

var textRdd : RDD [String] = sc.emptyRDD

for (file <- args) {
  /* Note we didn't check the existence of files there. */
  println ("File: " + file)
  textRdd ++= sc.textFile (file)
}
程序可以接受多个文件名输入,输入在args中,所以这里定义一个空的RDD,每个文件产生的RDD将union在这个RDD上。


2.3 计算

Spark有两类API,用于描述在RDD上的计算。一类是转化API,一类是动作API,转化API不会触发真正的计算操作,只会在RDD数据依赖图中产生更复杂的依赖描述。而动作API将产生真正的IO和运算操作,也即是会遍历整个DAG,并求值,其中有缓存的数据则直接使用。

val totalLength = textRdd.map {t => t.length}.reduce (_ + _)
上面进行了一次map转化,又在后面完成了一次reduce动作。

map将从文件中读入数据产生的RDD[String]转化成一个RDD[Int],每个元素表示每行读入数据的长度。

reduce动作将每个元素两两相加,最后产生结果。返回给totalLength变量。同理,上文已说明了,map作为转化,不会产生真正的迭代动作,而是在运行到reduce时,才会真正进行数据操作(读取,计算长度,两两相加)。

注意: reduce操作将返回一个具体的元素,对本例来说最后操作了RDD[Int](由map产生),所以reduce将返回Int,这是具体类型。假如reduce作用的RDD[T],而这个T是不可序列化的(比如org.apache.hadoop.hbase.client.Result,hbase查询结果),那么这时候使用reduce将无法返回,且抛出不能序列化的异常。


3 提交作业

根据以上步骤编写了代码,并编译成jar文件后,则可以使用spark自带的工具spark-submit完成作业提交:

bin/spark-submit --class dt.ez.sparkhbase.test.LengthCount --master local[8] ./LengthCount.jar /path/to/input/file1 # ...
这里编译的类是dt.ez.sparkhbase.test.LengthCount,指定了master为local方式,并使用8线程。 在最后写上能传入作业的文件名。在运行这行shell命令前,请将路径转换到$SPARK_HOME

注意:若使用其他的提交方式,比如standalong,最好将jar文件拷贝到各个节点的相同路径下。之前有想过将jar放在HDFS上,但这样貌似用spark-submit提交时会找不到。


4 实例代码

package dt.ez.sparkhbase.test

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object LengthCount extends App {
  
  if (args.length < 1) {
    println ("Error in reading files.")
    System.exit (0)
  }
  
  val sparkConf = new SparkConf ()
//    .setMaster ("local[4]")
    .setAppName ("dt.ez.sparkhbase.test.LengthCount")
    
  val sc = new SparkContext (sparkConf)
  
  var textRdd : RDD [String] = sc.emptyRDD
  
  for (file <- args) {
    /* Note we didn't check the existence of files there. */
    println ("File: " + file)
    textRdd ++= sc.textFile (file)
  }
  
  val totalLength = textRdd.map {t => t.length}.reduce (_ + _)
  println (new java.util.Date ().toString + " " + totalLength)
  
  sc.stop
  
}

5 jar依赖文件

编译运行Spark程序所依赖的多是Spark和Hadoop的API。这里我使用了maven的maven-dependencyo-plugin获取所有依赖包,但创建的是java maven项目,至于scala的maven项目产生依赖有什么区别,这里我没有试过。如下:

activation-1.1.jar
aopalliance-repackaged-2.4.0-b34.jar
apacheds-i18n-2.0.0-M15.jar
apacheds-kerberos-codec-2.0.0-M15.jar
api-asn1-api-1.0.0-M20.jar
api-util-1.0.0-M20.jar
avro-1.7.4.jar
avro-ipc-1.7.7.jar
avro-ipc-1.7.7-tests.jar
avro-mapred-1.7.7-hadoop2.jar
chill_2.11-0.8.0.jar
chill-java-0.8.0.jar
commons-beanutils-1.7.0.jar
commons-beanutils-core-1.8.0.jar
commons-cli-1.2.jar
commons-codec-1.3.jar
commons-collections-3.2.1.jar
commons-compress-1.4.1.jar
commons-configuration-1.6.jar
commons-digester-1.8.jar
commons-httpclient-3.1.jar
commons-io-2.4.jar
commons-lang-2.6.jar
commons-lang3-3.3.2.jar
commons-logging-1.1.3.jar
commons-math3-3.4.1.jar
commons-net-2.2.jar
compress-lzf-1.0.3.jar
curator-client-2.7.1.jar
curator-framework-2.4.0.jar
curator-recipes-2.4.0.jar
dependency
gson-2.2.4.jar
guava-14.0.1.jar
hadoop-annotations-2.7.1.jar
hadoop-auth-2.7.1.jar
hadoop-client-2.7.1.jar
hadoop-common-2.7.1.jar
hadoop-hdfs-2.7.1.jar
hadoop-mapreduce-client-app-2.7.1.jar
hadoop-mapreduce-client-common-2.7.1.jar
hadoop-mapreduce-client-core-2.7.1.jar
hadoop-mapreduce-client-jobclient-2.7.1.jar
hadoop-mapreduce-client-shuffle-2.7.1.jar
hadoop-yarn-api-2.7.1.jar
hadoop-yarn-client-2.7.1.jar
hadoop-yarn-common-2.7.1.jar
hadoop-yarn-server-common-2.7.1.jar
hk2-api-2.4.0-b34.jar
hk2-locator-2.4.0-b34.jar
hk2-utils-2.4.0-b34.jar
htrace-core-3.1.0-incubating.jar
httpclient-4.2.5.jar
httpcore-4.2.4.jar
ivy-2.4.0.jar
jackson-annotations-2.6.5.jar
jackson-core-2.6.5.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.6.5.jar
jackson-jaxrs-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
jackson-module-paranamer-2.6.5.jar
jackson-module-scala_2.11-2.6.5.jar
jackson-xc-1.9.13.jar
javassist-3.18.1-GA.jar
javax.annotation-api-1.2.jar
javax.inject-2.4.0-b34.jar
javax.servlet-api-3.1.0.jar
javax.ws.rs-api-2.0.1.jar
jaxb-api-2.2.2.jar
jcl-over-slf4j-1.7.16.jar
jersey-client-1.9.jar
jersey-client-2.22.2.jar
jersey-common-2.22.2.jar
jersey-container-servlet-2.22.2.jar
jersey-container-servlet-core-2.22.2.jar
jersey-core-1.9.jar
jersey-guava-2.22.2.jar
jersey-media-jaxb-2.22.2.jar
jersey-server-2.22.2.jar
jets3t-0.7.1.jar
jetty-util-6.1.26.jar
json4s-ast_2.11-3.2.11.jar
json4s-core_2.11-3.2.11.jar
json4s-jackson_2.11-3.2.11.jar
jsp-api-2.1.jar
jsr305-1.3.9.jar
jul-to-slf4j-1.7.16.jar
kryo-shaded-3.0.3.jar
leveldbjni-all-1.8.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mesos-0.21.1-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
metrics-jvm-3.1.2.jar
minlog-1.3.0.jar
netty-3.8.0.Final.jar
netty-all-4.0.29.Final.jar
objenesis-2.1.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.6.jar
protobuf-java-2.5.0.jar
py4j-0.10.3.jar
pyrolite-4.9.jar
RoaringBitmap-0.5.11.jar
scala-compiler-2.11.0.jar
scala-library-2.11.8.jar
scalap-2.11.0.jar
scala-parser-combinators_2.11-1.0.1.jar
scala-reflect-2.11.7.jar
scalatest_2.11-2.2.6.jar
scala-xml_2.11-1.0.2.jar
servlet-api-2.5.jar
slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
snappy-java-1.1.2.6.jar
spark-core_2.11-2.0.1.jar
spark-launcher_2.11-2.0.1.jar
spark-network-common_2.11-2.0.1.jar
spark-network-shuffle_2.11-2.0.1.jar
spark-tags_2.11-2.0.1.jar
spark-unsafe_2.11-2.0.1.jar
stax-api-1.0-2.jar
stream-2.7.0.jar
unused-1.0.0.jar
validation-api-1.1.0.Final.jar
xbean-asm5-shaded-4.4.jar
xercesImpl-2.9.1.jar
xml-apis-1.3.04.jar
xmlenc-0.52.jar
xz-1.0.jar
zookeeper-3.4.5.jar

maven依赖配置:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.0.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.7.1</version>
    </dependency>





】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark 部署及示例代码讲解 下一篇Spark+Hadoop环境搭建

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目