设为首页 加入收藏

TOP

spark算子--action篇
2019-02-28 01:04:36 】 浏览:59
Tags:spark 算子 --action篇

spark算子分为两大种,一种是transformation算子,另一种是action算子。

transformation又叫转换算子,它从一个RDD到另一个RDD是延迟执行的,不会马上触发作业的提交,只有在后续遇到某个action算子时才执行;

action算子会触发SparkContext提交Job,并将数据输出spark系统。今天举例讲解一下action算子。

1) count

就是统计RDD中元素个数的算子。

举个栗子:

valrdd =sc.parallelize(List("hello","world!","hi","beijing"))

println(rdd.count())

输出:

4

2) collect

把RDD中的元素提取到driver内存中,返回数组形式。

举个栗子:

valrdd =sc.parallelize(List("hello","world!","hi","beijing"),2)

valarr: Array[String] = rdd.collect()

println(arr)

arr.foreach(println)

输出:

[Ljava.lang.String;@760e8cc0

hello

world!

hi

beijing

3) foreach

遍历RDD中的每一个元素,无返回值。此算子用法参考上下文。

4) saveAsTextFile

把RDD中的数据以文本的形式保存

举个栗子:

valrdd =sc.parallelize(List(5,4,7,1,9),3)

rdd.saveAsTextFile("/home/myname/test")

5) saveAsSequenceFile

是个k-v算子,把RDD中的数据以序列化的形式保存。使用此算子的前提是RDD中元素是键值对格式。

举个栗子:

valrdd =sc.parallelize(List(("a",1),("b",2),("c",3),("c",4)),2)

rdd.saveAsSequenceFile("/home/myname/test")

6) countByKey

是个k-v算子,按key统计各key的次数,返回Map

举个栗子:

valrdd =sc.parallelize(List(("a",1),("b",2),("c",3),("c",4)),2)

valres: Map[String, Long] = rdd.countByKey()

res.foreach(println)

输出:

(b,1)

(a,1)

(c,2)

7) collectAsMap

把RDD中元素以Map形式提取到driver端。需要注意的是如果存在多个相同key,后面出现的会覆盖前面的。

举个栗子:

valrdd =sc.parallelize(List(("a",1),("b",2),("c",3),("c",4)),2)

valres: Map[String, Int] = rdd.collectAsMap()

res.foreach(println)

输出:

(b,2)

(a,1)

(c,4)

8) take

从RDD中取下标前n个元素,不排序。返回数组。

举个栗子:

val rdd =sc.parallelize(List(5,4,7,1,9),3)

valtake: Array[Int] = rdd.take(2)

take.foreach(println)

输出:

5

4

9) takeSample

从指定RDD中抽取样本。第一个参数为false表示取过的元素不再取,为true表示取过的元素可以再次被抽样;第二个参数表示取样数量;第三个参数不好把握建议默认值

举个栗子:

valrdd =sc.makeRDD(Array("aaa","bbb","ccc","ddd","eee"))

valsample: Array[String] = rdd.takeSample(false,2)

sample.foreach(println)

输出:

eee

bbb

10) first

返回RDD中第一个元素。

举个栗子:

val rdd =sc.parallelize(List(5,4,7,1,9),3)

valfirst:Int= rdd.first()

println(first)

输出:

5

11) top

从RDD中按默认顺序(降序)或指定顺序取n个元素

举个栗子:

val rdd =sc.parallelize(List(5,4,7,1,9),3)

valtake: Array[Int] = rdd.top(2)

take.foreach(println)

输出:

9

7

12) takeOrdered

从RDD中取n个元素,与top算子不同的是它是以和top相反的顺序返回元素。

举个栗子:

val rdd =sc.parallelize(List(5,4,7,1,9),3)

valtake: Array[Int] = rdd.takeOrdered(2)

take.foreach(println)

输出:

1

4

13) saveAsObjectFile

把RDD中元素序列化并保存,底层依赖saveAsSequenceFile

举个栗子:

val rdd =sc.parallelize(List(5,4,7,1,9),3)

rdd.saveAsObjectFile("/home/myname/test")

14)reduce

reduce参数是一个函数,把RDD中的元素两两传递给此函数,然后进行计算产生一个结果,结果再与下一个值计算,如此迭代。

举个栗子:

valrdd =sc.makeRDD(List(1,2,3,4,5))

valresult:Int= rdd.reduce((x,y) => x + y)

println(result)

输出:

15

15) lookup

是个k-v算子,指定key值,返回此key对应的所有v值

举个栗子:

val rdd1 =sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))

valrdd2:Seq[Int] = rdd1.lookup("A")

rdd2.foreach(println)

输出:

0

2

16) aggregate

aggregate用户聚合RDD中的元素,先指定初始值,再对RDD中元素进行局部求和,最后全局求和。此算子理解起来不是特别直观,大家感受一下。

举个栗子:

valrdd =sc.parallelize(List(1,2,3,4))

valres:Int= rdd.aggregate(2)(_+_,_+_)

println(res)

输出:

14

17) fold

fold是aggregate的简化

举个栗子:

valrdd =sc.parallelize(List(1,2,3,4))

valres:Int= rdd.fold(2)((x,y) => x + y)

println(res)

输出:

14

喜欢的话请微信扫描下方二维码关注我!

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark 学习7 下一篇spark join操作

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目