设为首页 加入收藏

TOP

Spark WordCount简单案例(java,scala版)
2019-03-20 13:14:01 】 浏览:66
Tags:Spark WordCount 简单 案例 java scala
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/WYpersist/article/details/79783861

Spark 是什么?

官方文档解释:Apache Spark is a fast and general engine for large-scale data processing.

通俗的理解:Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在 大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark 部署在大量廉价硬件之上,形成集群。

扩展了MapReduce计算模型;相比与MapReduce编程模型,Spark提供了更加灵活的DAG(Directed Acyclic Graph) 编程模型, 不仅包含传统的map、reduce接口, 还增加了filter、flatMap、union等操作接口,使得编写Spark程序更加灵活方便。

高效支持多种计算模式;Spark 不仅可以做离线运算,还可以做流式运算以及迭代式运算。

Spark 组成大一统软件栈

接下来直接上代码

Java版






package com.spark.wordcount;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;

public class SparkWordCount {
public static void main(String[] args) {
//参数检查
if (args.length < 2) {
System.err.println("Usage: SparkWordCount <input> <output> ");
System.exit(1);
}
//获取参数
String input = args[0]; //输入参数
String output = args[1]; //输出参数
/**
* 1、创建SparkConf对象,设置Spark应用程序的配置信息
*/
SparkConf conf = new SparkConf()
//设置Spark应用程序的名称
.setAppName("WordCount");

// conf.set("spark.testing.memory", "2147480000");

// 因为jvm无法获得足够的资源 这串数字大于512
/**
* 2、创建SparkContext对象,Java开发使用JavaSparkContext;
* Scala开发使用SparkContext
* 在Spark中,SparkContext负责连接Spark集群,创建RDD、累积量和广播量等。
* Master参数是为了创建TaskSchedule(较低级的调度器,高层次的调度器为DAGSchedule),如下:
* 如果setMaster("local")则创建LocalSchedule;
* 如果setMaster("spark")则创建SparkDeploySchedulerBackend。
* 在SparkDeploySchedulerBackend的start函数,会启动一个Client对象,连接到Spark集群。
*/
//创建java版本的SparkContext,创建上下文对象
JavaSparkContext sc = new JavaSparkContext(conf);

/**
* 3、sc中提供了textFile方法是SparkContext中定义的,如下:
* def textFile(path: String): JavaRDD[String] = sc.textFile(path)
* 用来读取HDFS上的文本文件、集群中节点的本地文本文件或任何支持Hadoop的文件系统上的文本文件,
* 它的返回值是JavaRDD[String],是文本文件每一行
*/
//读取数据
JavaRDD<String> lines = sc.textFile(input);
/**
* 4、将行文本内容拆分为多个单词
* lines调用flatMap这个transformation算子(参数类型是FlatMapFunction接口实现类)返回每一行的每个单词
*
* flatmap与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,
* 而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。
* 返回的是一个迭代
*/
////进行相关计算
//----用各种Transformation算子对RDD进行操作-----------------------------------------
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

private static final long serialVersionUID = 1L;

public Iterable<String> call(String line) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList(line.split(" "));
}
});

/**
* 2.2.0版本 Iterator不是 Iterable
*/
// lines.flatMap(new FlatMapFunction<String, Object>() {
// public Iterator call(String lines){
// return Arrays.asList(lines.split(""));
// }
// });

/**
* 5、将每个单词的初始数量都标记为1个
* words调用mapToPair这个transformation算子(参数类型是PairFunction接口实现类,
* PairFunction<String, String, Integer>的三个参数是<输入单词, Tuple2的key, Tuple2的value>),
* 返回一个新的RDD,即JavaPairRDD
*
* Spark为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为PairRDD
mapToPair函数会对一个RDD中的每个元素调用f函数,其中原来RDD中的每一个元素都是T类型的,
调用f函数后会进行一定的操作把每个元素都转换成一个<K2,V2>类型的对象
Tuple2多元组
*/
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

private static final long serialVersionUID = 1L;

public Tuple2<String, Integer> call(String word) throws Exception {
// TODO Auto-generated method stub
// return new Tuple2(word, 1); //也可以
return new Tuple2<String, Integer>(word, 1);
}
});
/**
* 6、计算每个相同单词出现的次数
* pairs调用reduceByKey这个transformation算子(参数是Function2接口实现类)对每个key的value进行reduce操作,
*
* 返回一个JavaPairRDD,这个JavaPairRDD中的每一个Tuple的key是单词、value则是相同单词次数的和
* reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,
*因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。
*/
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});

/**
* 7、使用foreach这个action算子提交Spark应用程序
* 在Spark中,每个应用程序都需要transformation算子计算,最终由action算子触发作业提交
*/
//----------打印信息-----------------------------------
wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {

public void call(Tuple2<String, Integer> wordCount) throws Exception {
// TODO Auto-generated method stub
System.out.println("====" + wordCount._1 + " appeared " + wordCount._2 + " times");
}
});
/**
* 8、将计算结果文件输出到文件系统
* HDFS:
* 使用新版API(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;)
* wordCount.saveAsNewAPIHadoopFile("hdfs://ns1/spark/wordcount", Text.class, IntWritable.class,
* TextOutputFormat.class, new Configuration());
* 使用旧版API(org.apache.hadoop.mapred.JobConf;org.apache.hadoop.mapred.OutputFormat;)
* wordCount.saveAsHadoopFile("hdfs://ns1/spark/wordcount", Text.class, IntWritable.class,
* OutputFormat.class, new JobConf(new Configuration()));
* 使用默认TextOutputFile写入到HDFS(注意写入HDFS权限,如无权限则执行:hdfs dfs -chmod -R 777 /spark)
* wordCount.saveAsTextFile("hdfs://soy1:9000/spark/wordCount");
*/
wordCounts.saveAsTextFile("G:/test/3");
//将结果直接写入到dfs上
//wordCounts.saveAsTextFile("hdfs://ip:8020/file/" + System.currentTimeMillis());
System.out.println("单词计数完成!!!!!!!!!!!!!!");
/**
* 9、关闭SparkContext容器,结束本次作业
*/
sc.close();
}
}

Scala版



package com.spark.wordcount
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
/**
* 第1步;创建Spark的配置对象SparkConf,设置Spark程序运行时的配置信息
* 例如 setAppName用来设置应用程序的名称,在程序运行的监控界面可以看到该名称,
* setMaster设置程序运行在本地还是运行在集群中,运行在本地可是使用local参数,也可以使用local[K]/local[*],
* 可以去spark官网查看它们不同的意义。

如果要运行在集群中,以Standalone模式运行的话,需要使用spark:

//HOST:PORT的形式指定master的IP和端口号,默认是7077
*/
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
// val conf = new SparkConf().setAppName("WordCount").setMaster("spark://master:7077") //运行在集群中

/**
* 第2步:创建SparkContext 对象
* SparkContext是Spark程序所有功能的唯一入口
* SparkContext核心作用: 初始化Spark应用程序运行所需要的核心组件,包括DAGScheduler、TaskScheduler、SchedulerBackend
* 同时还会负责Spark程序往Master注册程序
*
* 通过传入SparkConf实例来定制Spark运行的具体参数和配置信息
*/
val sc = new SparkContext(conf)

/**
* 第3步: 根据具体的数据来源(HDFS HBase、Local FS、DB S3等)通过SparkContext来创建RDD
* RDD 的创建基本有三种方式: 根据外部的数据来源(例如HDFS)、根据Scala集合使用SparkContext的parallelize方法、
* 由其他的RDD操作产生
* 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴
*/

val lines = sc.textFile("D:/resources/README.md") // 读取本地文件
// 读取HDFS文件,并切分成不同的Partition
// val lines = sc.textFile("/library/wordcount/input")
// 或者明确指明是从HDFS上获取数据
// val lines = sc.textFile("hdfs://master:9000/libarary/wordcount/input")

/**
* 第4步: 对初始的RDD进行Transformation级别的处理,例如 map、filter等高阶函数来进行具体的数据计算
*/
// 拆分单词,并过滤掉空格,当然还可以继续进行过滤,如去掉标点符号
val words = lines.flatMap(_.split(" ")).filter(word => word != " ")

// 在单词拆分的基础上对每个单词实例计数为1, 也就是word => (word, 1)
val pairs = words.map(word => (word, 1))

// 在每个单词实例计数为1的基础之上统计每个单词在文件中出现的总次数, 即key相同的value相加
val wordscount = pairs.reduceByKey(_ + _)
// val wordscount = pairs.reduceByKey((v1, v2) => v1 + v2) // 等同于

// 打印结果,使用collect会将集群中的数据收集到当前运行drive的机器上,需要保证单台机器能放得下所有数据
wordscount.collect.foreach(println)

sc.stop() // 释放资源

}
}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark与深度学习框架——H2O、dee.. 下一篇spark IDE:   System memory..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目