单机版
scala> var arr = Array("Spark Hadopp Hive", "Hive Hbase", "Sqoop Redis Hadoop")
arr: Array[String] = Array(Spark Hadopp Hive, Hive Hbase, Sqoop Redis Hadoop)
#将元素进行拆分, 拆分后每个元素("Spark Hadopp Hive")形成独立的小数组
scala> var arr2 = arr.map(x => x.split(" "))
arr2: Array[Array[String]] = Array(Array(Spark, Hadopp, Hive), Array(Hive, Hbase), Array(Sqoop, Redis, Hadoop))
#压平操作,将子数组的元素压破,flatMap=flatten+map
scala> var arr4 = arr.flatMap(x => x.split(" "))
arr4: Array[String] = Array(Spark, Hadopp, Hive, Hive, Hbase, Sqoop, Redis, Hadoop)
#将每个元素做一次计数
scala> var arr3 = arr2.map(x => (x, 1))
arr3: Array[(String, Int)] = Array((Spark,1), (Hadopp,1), (Hive,1), (Hive,1), (Hbase,1), (Sqoop,1), (Redis,1), (Hadoop,1))
scala> var arr4 = arr3.groupBy(x => (x._1))
arr4: scala.collection.immutable.Map[String,Array[(String, Int)]] =
Map(
Sqoop -> Array((Sqoop,1)),
Hbase -> Array((Hbase,1)),
Hive -> Array((Hive,1), (Hive,1)),
Hadopp -> Array((Hadopp,1)),
Spark -> Array((Spark,1)),
Redis -> Array((Redis,1)),
Hadoop -> Array((Hadoop,1))
)
#对数据进行统计
scala> val arr5 = arr4.map(x => (x._1, x._2.length))
arr5: scala.collection.immutable.Map[String,Int] = Map(Sqoop -> 1, Hbase -> 1, Hive -> 2, Hadopp -> 1, Spark -> 1, Redis -> 1, Hadoop -> 1)
分布式实现
/**
* 使用scala集合及函数实现词频统计WordCount
* Created by zhangjingcun on 2018/9/11 14:50.
*/
object ScalaWordCount {
def main(args: Array[String]): Unit = {
val lines = List("hadoop,hbase,spark,,hive,hbase ", "", "sqoop,pig,scala ,", "hbase,yarn",
"java,sqoop,mysql, mysql ")
/**
* 回顾一下:
* mapreduce是如何进行词频统计:
* 1:读取数据,一行行读取数据,对每一行的数据进行分割单词
* 2:进行分组、统计
* 数据格式:
* line -> List<word, 1> -> (key, List<value>) -> <key, values.sum>
*/
/**
* 下述采用类似于Mapreduce中map处理的数据及输出
*/
val mapTuple = lines
.flatMap(line => line.trim().split(","))
.map(word => word.trim())
.filterNot(word => word.isEmpty)
.map(word => (word.trim(), 1))
println(mapTuple.toBuffer)
/**
* 上述写法比较麻烦,更简单的方式
*/
val mapTuple2 = lines.flatMap(line => {
line.trim().split(",").map(_.trim()).filterNot(_.isEmpty()).map((_, 1))
})
println(s"maped:${mapTuple2.toBuffer}")
/**
* 进行分组操作,将相同的key和value聚合在一起
*/
val groupTuple = mapTuple2.groupBy(tuple => tuple._1)
println(s"grouped:${groupTuple.toBuffer}")
/**
* 分组以后
* 对每一组数据中的value值进行统计
* 类似于mr中的reduce函数的操作
*/
val result = groupTuple.map(tuple => {
val word = tuple._1
val count = tuple._2.map(_._2).sum
(word, count)
})
println(result.toBuffer)
/**
* 按照词频频率降序排序,取得出现频率最高的前三条数据
*/
val sortTuple = result.toList.sortBy(tuple => - tuple._2).take(3)
println(s"sorted:${sortTuple.toBuffer}")
lines.flatMap(line => line.trim.split(","))
.map(_.trim).filterNot(_.isEmpty)
.map((_, 1)).groupBy(_._1)
.map(tuple => (tuple._1, tuple._2.map(_._2).sum)).toList
.sortBy(tuple => - tuple._2)
.take(3)
.foreach(println)
}
}