设为首页 加入收藏

TOP

Scala经典案例-------------------------词频统计单机版和分布式实现
2019-03-19 12:45:10 】 浏览:169
Tags:Scala 经典案例 ------------------------- 统计 单机版 分布式 实现

             单机版
scala> var arr = Array("Spark Hadopp Hive", "Hive Hbase", "Sqoop Redis Hadoop")
        arr: Array[String] = Array(Spark Hadopp Hive, Hive Hbase, Sqoop Redis Hadoop)

        #将元素进行拆分, 拆分后每个元素("Spark Hadopp Hive")形成独立的小数组
        scala> var arr2 = arr.map(x => x.split(" "))
        arr2: Array[Array[String]] = Array(Array(Spark, Hadopp, Hive), Array(Hive, Hbase), Array(Sqoop, Redis, Hadoop))

        #压平操作,将子数组的元素压破,flatMap=flatten+map
        scala> var arr4 = arr.flatMap(x => x.split(" "))
        arr4: Array[String] = Array(Spark, Hadopp, Hive, Hive, Hbase, Sqoop, Redis, Hadoop)

        #将每个元素做一次计数
        scala> var arr3 = arr2.map(x => (x, 1))
        arr3: Array[(String, Int)] = Array((Spark,1), (Hadopp,1), (Hive,1), (Hive,1), (Hbase,1), (Sqoop,1), (Redis,1), (Hadoop,1))

        scala> var arr4 = arr3.groupBy(x => (x._1))
        arr4: scala.collection.immutable.Map[String,Array[(String, Int)]] = 
        Map(
            Sqoop -> Array((Sqoop,1)), 
            Hbase -> Array((Hbase,1)), 
            Hive -> Array((Hive,1), (Hive,1)), 
            Hadopp -> Array((Hadopp,1)), 
            Spark -> Array((Spark,1)), 
            Redis -> Array((Redis,1)), 
            Hadoop -> Array((Hadoop,1))
        )

        #对数据进行统计
        scala> val arr5 = arr4.map(x => (x._1, x._2.length))
        arr5: scala.collection.immutable.Map[String,Int] = Map(Sqoop -> 1, Hbase -> 1, Hive -> 2, Hadopp -> 1, Spark -> 1, Redis -> 1, Hadoop -> 1)

                                             分布式实现
/**
  * 使用scala集合及函数实现词频统计WordCount
  * Created by zhangjingcun on 2018/9/11 14:50.
  */
object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    //有一些数据,需要进行分析统计,词频统计。获取每个单词出现的次数,按照次数进行降序排序,取得前3个数据
    //1:准备数据
    val lines = List("hadoop,hbase,spark,,hive,hbase ", "", "sqoop,pig,scala ,", "hbase,yarn",
      "java,sqoop,mysql, mysql ")

    /**
      * 回顾一下:
      * mapreduce是如何进行词频统计:
      * 1:读取数据,一行行读取数据,对每一行的数据进行分割单词
      * 2:进行分组、统计
      * 数据格式:
      * line -> List<word, 1> -> (key, List<value>) -> <key, values.sum>
      */
    //2:数据转换,此处采用链式编程
    //val words = lines.flatMap(line => line.trim().split(",")).map(word => (word, 1))
    //println(words.toBuffer)

    /**
      * 下述采用类似于Mapreduce中map处理的数据及输出
      */
    val mapTuple = lines
      //对每一行数据进行分割
      .flatMap(line => line.trim().split(","))
      //转换,去掉单词空格
      .map(word => word.trim())
      //数据过滤
      .filterNot(word => word.isEmpty)
      //将单词转换为二元组
      .map(word => (word.trim(), 1))
    println(mapTuple.toBuffer)

    /**
      * 上述写法比较麻烦,更简单的方式
      */
    val mapTuple2 = lines.flatMap(line => {
      line.trim().split(",").map(_.trim()).filterNot(_.isEmpty()).map((_, 1))
    })

    println(s"maped:${mapTuple2.toBuffer}")

    /**
      * 进行分组操作,将相同的key和value聚合在一起
      */
    //3:数据分组
    val groupTuple = mapTuple2.groupBy(tuple => tuple._1)

    println(s"grouped:${groupTuple.toBuffer}")

    /**
      * 分组以后
      * 对每一组数据中的value值进行统计
      * 类似于mr中的reduce函数的操作
      */
    //4: 每一组进行数据统计
    val result = groupTuple.map(tuple => {
      //获取单词
      val word = tuple._1
      //计算单词Word对应值
      val count = tuple._2.map(_._2).sum
      //返回结果
      (word, count)
    })

    //5:打印结果
    println(result.toBuffer)

    /**
      * 按照词频频率降序排序,取得出现频率最高的前三条数据
      */
    val sortTuple = result.toList.sortBy(tuple => - tuple._2).take(3)

    println(s"sorted:${sortTuple.toBuffer}")
//一条语句实现

    lines.flatMap(line => line.trim.split(","))
      .map(_.trim).filterNot(_.isEmpty)
      .map((_, 1)).groupBy(_._1)
      .map(tuple => (tuple._1, tuple._2.map(_._2).sum)).toList
      .sortBy(tuple => - tuple._2)
      .take(3)
      .foreach(println)
  }
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hadoop记录篇7-hive常用sql统计 下一篇hive:Access denied for user ..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目