设为首页 加入收藏

TOP

4、spark入门和应用算子举例
2019-03-20 13:10:33 】 浏览:75
Tags:spark 入门 应用 算子 举例

1、spark和hadoop对比优点
1.1、只用搭建一套spark(包括:spark core,spark streaming,graphx,mllib等),便于开发维护,
hadoop要搭建多个平台:hadoop,mapreduce,storm,mahout
1.2、spark速度快,基于内存计算,hadoop基于硬盘计算。速度大约是hadoop的100倍

2、RDD(Resilient Distributed Dataset)弹性分布式数据集,提供了一系列算子。不存储数据,只是一个逻辑上的概念。存指针,指向每个数据块block
五大特性:

  • 由多个partition组成
  • 每个算子实际上作用在partition上
  • 有一系列的依赖关系, 依赖其他RDD(可以容错)
  • 一个分区器(作用:决定处理的结果存放在哪个磁盘小文件上)必须作用在kv格式的RDD上(kv格式的RDD:RDD中存储的元素是二元祖类型)
  • 提供一系列最佳计算位置(调用RDD的接口,就知道每个partition需要作用在哪个节点上,然后发送过去)


    4369144-cb4f5da69b4e37cc.png
    image.png

3、spark任务执行流程
Driver作用:(1)Driver分发task到数据所在的位置让worker上执行,(2)将计算结果拉回到Driver端


4369144-9e811e3c2d0866bd.png
image.png

4、transformation算子和action算子
transformation算子:延迟执行--针对RDD操作,只有在遇到action算子的时候才会触发执行
返回值是一个RDD
  map:传进来的参数由父RDD的类型决定。输入一条,输出一条
  filter:传进来true保留,false过滤掉
  flatMap: 输入一条,输出多条
  sample:抽样,如果传进来0.5,会随机抽取50%的数据返回
action算子:返回值是 非RDD类型
  count等。。

5.1第一个WordCount程序,统计单词出现的次数

object WordCount2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("wordcount")
    conf.setMaster("local")
    val sc = new SparkContext(conf)

    val lineRDD = sc.textFile("wc")
    // wordRDD非kv格式的rdd,wordRDD数据类型是String
    val wordRDD = lineRDD.flatMap((line:String) => {
      line.split(" ")
    })

    // 把wordRDD变成kv格式的rdd
    val pairRDD = wordRDD.map((word:String) => {
      (word, 1)
    })

    // 按key来分组,然后将每一组内的数据聚合
    var resultRDD = pairRDD.reduceByKey((v1:Int, v2:Int) => {
      v1 + v2
    })

    // 打印结果,接收参数是一个二元祖
    resultRDD.foreach((x:(String, Int)) => {
      println("word:" + x._1 + " count:" + x._2)
    })

    sc.stop()
  }
}

5.2、统计学校名最多的那个,从文件中删除
重要操作:
  加载文件
  map,获取学校名
  map,组成二元组
  reduceByKey,统计次数
  map,交换位置
  sortByKey,排序
  获取最多的那个二元组中学校名
  filter,过滤

/**
  * Created by wxx on 2017/12/9.
  * 统计哪一个学校出现的次数最多,包含这个学校的记录,清除掉
  * 数据类型:2016-09-02 XX学校    1110    北京
  */
object FilterSchool2 {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("filterSchool").setMaster("local")
    val sc = new SparkContext(sparkConf)

    // 读文件
    val rdd1 = sc.textFile("log.txt")
    // 获取学校名字
//    val rdd2 = rdd1.map((x:String) => {
//      val schoolName = x.split("\t")
//      schoolName
//    })

    val sampleRdd = rdd1.sample(true, 0.5, 1)
    val rdd2 = sampleRdd.map{_.split("\t")(1)}


    // 将每一个学校名,计数为1
    // rdd3是一个KV格式的rdd
    // K:schoolname,V:1
//    val rdd3 = rdd2.map((x:String) => {
//      (x, 1)
//    })
    val rdd3 = rdd2.map((_, 1))

    // 统计每个学校出现的次数
    // 累加key相同的value
    // 结果:(schoolname, count)
//    val rdd4 = rdd3.reduceByKey((v1:Int, v2:Int) => {
//      v1 + v2
//    })
    val rdd4 = rdd3.reduceByKey((_+_))

    // 交换key和value的位置
    // 结果:(count, schoolname)
//    val rdd5 = rdd4.map((x:(String, Int)) => {
//      x.swap
//    })
    val rdd5 = rdd4.map(_.swap)

    // 按照学校出现的次数降序排序
    val rdd6 = rdd5.sortByKey(false)

    // 获取第一个元素,第一个元素次数最多的学校
    // take是一个action类算子
    // arr中只有一条记录,并且这一条记录是二元组类型的
    val arr = rdd6.take(1)
    val schoolName = arr(0)._2
    println("schoolName:" + schoolName)

    // 过滤次数最多的学校,过滤rdd1,过滤依据是schoolname
    val resultRDD = rdd1.filter((x:String) => {
      !schoolName.equals(x.split("\t")(1))
    })

    // 保存到项目根路径下
    resultRDD.saveAsTextFile("result")
    sc.stop()
  }
}

说明:rdd4到rdd6之间的转换可以替换为:
val rdd6 = rdd4.sortBy((x:(String, Int)) => x._2, false)
不用交换key和value也能完成二元组的排序

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark IDE:   System memory.. 下一篇Spark一千篇旅游日记0001 之 简..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目