1、spark和hadoop对比优点
1.1、只用搭建一套spark(包括:spark core,spark streaming,graphx,mllib等),便于开发维护,
hadoop要搭建多个平台:hadoop,mapreduce,storm,mahout
1.2、spark速度快,基于内存计算,hadoop基于硬盘计算。速度大约是hadoop的100倍
2、RDD(Resilient Distributed Dataset)弹性分布式数据集,提供了一系列算子。不存储数据,只是一个逻辑上的概念。存指针,指向每个数据块block
五大特性:
3、spark任务执行流程
Driver作用:(1)Driver分发task到数据所在的位置让worker上执行,(2)将计算结果拉回到Driver端
4、transformation算子和action算子
transformation算子:延迟执行--针对RDD操作,只有在遇到action算子的时候才会触发执行
返回值是一个RDD
map:传进来的参数由父RDD的类型决定。输入一条,输出一条
filter:传进来true保留,false过滤掉
flatMap: 输入一条,输出多条
sample:抽样,如果传进来0.5,会随机抽取50%的数据返回
action算子:返回值是 非RDD类型
count等。。
5.1第一个WordCount程序,统计单词出现的次数
object WordCount2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("wordcount")
conf.setMaster("local")
val sc = new SparkContext(conf)
val lineRDD = sc.textFile("wc")
// wordRDD非kv格式的rdd,wordRDD数据类型是String
val wordRDD = lineRDD.flatMap((line:String) => {
line.split(" ")
})
// 把wordRDD变成kv格式的rdd
val pairRDD = wordRDD.map((word:String) => {
(word, 1)
})
// 按key来分组,然后将每一组内的数据聚合
var resultRDD = pairRDD.reduceByKey((v1:Int, v2:Int) => {
v1 + v2
})
// 打印结果,接收参数是一个二元祖
resultRDD.foreach((x:(String, Int)) => {
println("word:" + x._1 + " count:" + x._2)
})
sc.stop()
}
}
5.2、统计学校名最多的那个,从文件中删除
重要操作:
加载文件
map,获取学校名
map,组成二元组
reduceByKey,统计次数
map,交换位置
sortByKey,排序
获取最多的那个二元组中学校名
filter,过滤
/**
* Created by wxx on 2017/12/9.
* 统计哪一个学校出现的次数最多,包含这个学校的记录,清除掉
* 数据类型:2016-09-02 XX学校 1110 北京
*/
object FilterSchool2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("filterSchool").setMaster("local")
val sc = new SparkContext(sparkConf)
// 读文件
val rdd1 = sc.textFile("log.txt")
// 获取学校名字
// val rdd2 = rdd1.map((x:String) => {
// val schoolName = x.split("\t")
// schoolName
// })
val sampleRdd = rdd1.sample(true, 0.5, 1)
val rdd2 = sampleRdd.map{_.split("\t")(1)}
// 将每一个学校名,计数为1
// rdd3是一个KV格式的rdd
// K:schoolname,V:1
// val rdd3 = rdd2.map((x:String) => {
// (x, 1)
// })
val rdd3 = rdd2.map((_, 1))
// 统计每个学校出现的次数
// 累加key相同的value
// 结果:(schoolname, count)
// val rdd4 = rdd3.reduceByKey((v1:Int, v2:Int) => {
// v1 + v2
// })
val rdd4 = rdd3.reduceByKey((_+_))
// 交换key和value的位置
// 结果:(count, schoolname)
// val rdd5 = rdd4.map((x:(String, Int)) => {
// x.swap
// })
val rdd5 = rdd4.map(_.swap)
// 按照学校出现的次数降序排序
val rdd6 = rdd5.sortByKey(false)
// 获取第一个元素,第一个元素次数最多的学校
// take是一个action类算子
// arr中只有一条记录,并且这一条记录是二元组类型的
val arr = rdd6.take(1)
val schoolName = arr(0)._2
println("schoolName:" + schoolName)
// 过滤次数最多的学校,过滤rdd1,过滤依据是schoolname
val resultRDD = rdd1.filter((x:String) => {
!schoolName.equals(x.split("\t")(1))
})
// 保存到项目根路径下
resultRDD.saveAsTextFile("result")
sc.stop()
}
}
说明:rdd4到rdd6之间的转换可以替换为:
val rdd6 = rdd4.sortBy((x:(String, Int)) => x._2, false)
不用交换key和value也能完成二元组的排序