设为首页 加入收藏

TOP

漫谈RDD
2019-03-26 01:30:24 】 浏览:90
Tags:漫谈 RDD

开篇:spark各种库,sparksqlsparkmachicelearning,等这么多库底层都是封装的RDD。意味着1RDD本身提供了通用的抽象,2spark现在有5个子框架,sqlStreaming,流式处理,机器学习,图计算,sparkR可以根据具体领域的内容建模,建第6个库,第7个库。必须掌握spark的这个通用抽象基石-RDD.

1:RDD,基于工作集的分布式函数编程的应用抽象。MapReduce是基于数据集的。他们的共同特征是位置,感知,容错和负载均衡是数据集和工作集都有的。

基于数据集的处理:工作方式是从物理存储设备上,加载数据,然后操作数据,然后写入物理存储设备。但它不适应的场景有:1)不适合于大量的迭代。2)不适合于交互式查询,每次查询都要从磁盘上读取数据,然后查询,然后写回数据结果、如果复杂的查询有多个步骤,则要多次基于磁盘,这还是次要的,只是速度影响。重点是基于数据流的方式,不能复用曾经的结果或者中间的计算结果,这才是致命的,例如有几千人并发操作一个数据仓库,假如有一百人的查询完全是一样的,它也会重新加载数据,重新查询,而spark会对结果重用,复用中间计算结果,就是前面10个步骤是一样的,算过,数据集不会复用,spark则会复用

RDD(ResillientDistributedDataset)是基于工作集的,有前面讲过的7大弹性:

弹性1:自动的进行内存和磁盘数据存储的切换;

弹性2:基于Lineage的高校容错;

弹性3Task如果失败会自动进行特定次数的重试;

弹性4Stage如果失败会自动进行特定次数的重试,而且重试的时候只会计算失败的分片

弹性5checkpointpersist,链条比较长,计算比较笨重的时候,我们把数据都放在磁盘/HDFS上,这是checkpoint。而persist,是在内存中或者磁盘中对数据进行复用。这是效率和容错的延伸点。

弹性6数据调度弹性,DAGTASK和资源管理无关。

弹性7数据分片的高度弹性,如计算过程中会产生很多数据碎片,这时partition就特别小。每次都消耗一个线程去处理的话,这会降低处理效率。这时会考虑把很多partion合并成一个大的partition提升效率。另一个方面,内存不是那么多,但是partition数据比较大,数据block比较大。会考虑把它变成更小的分片,这样让spark有更多的处理批次,但是不会出现OOM。这样数据分片,我们可以人工提高并行度,降低并行度,是弹性的高度体现,而且它完全有数据本地性。


把多个分片变成一个,不要使用repartition,他会使用shuffle,应该使用colease

而从一万个分片变成10万个分片,则一般可能需要shuffleRDD本身容许用户在执行多个查询时,显示的将工作集缓存在内存中。以后其它人来查询就可以重用工作集。自然极大提升查询速度。

提示:spark的位置感知比hadoop的位置感知好很多,hadoop进行partition的时候,就不管位置在哪里,spark进行partition的时候,进行下一步stage操作,是会确定这个位置的,它更精致化

2Spark Streaming为什么老是用checkpoint,因为经常要用到以前的东西。假设Spark如果有1000RDD,一般不会产生1000个中间结果。假设Stage内部有一千个步骤,它中间不会产生999次中间结果,默认情况下,它只是产生一次中间结果,而hadoop会产生1000次中间结果。由于SparkRDD它本身是只读分区集合,但又为了应对它只对数据标记,不做计算的计算模型,所以它是lazy级别的,所以每次transformation构建的新的RDD,也都是以父RDD为自己的第一个参数传进去的,由此构成了一个链条,在计算的由最后action的时候再触发,所以只有一个中间结果,也所以这就构成了一个从后往前回溯的过程,就是一个函数展开的过程,从源码也看到它是这种从后往前的链条依赖关系,所以它容错的开销会非常低,为什么呢?因为常规的容错方式有1数据检查点(它的工作方式要通过数据中心的网络连接不同的机器,每次操作的时候都要复制整个数据集。每次都有一个拷贝,是要通过网络的,因为要复制到其他机器上,而带宽就是分布式的瓶颈,这对存储资源也是非常大的消耗)以及2记录数据的更新(每次数据变化了,我们都记录下,但这个第一复杂,第2耗性能,重算的时候比较难处理),既然这么多缺点spark为什么在记录数据更新上就这么高效呢1RDD是不可变的所以每次操作就会变成新的RDD+lazy,不存在全局修改的问题,控制难度极大的下降。又产生了链条,可以很方便的容错。2是粗粒度模式最简单的想,RDD就是一个List或者ArrayRDD是分布式函数式编程的抽象。基于RDD编程一般都采用高级函数。

3Stage结束,数据会写磁盘。是粗粒度模式,是为了效率,为了简化。如果是更新力度太细太多,记录成本非常高,效率就不是那么高了。对RDD的具体的数据的改变操作(写操作)都是粗粒度的。RDD的写操作是粗粒度的(限制了它的使用场景,网络爬虫这件事就不适合Rdd去做),但是RDD的读操作,既可以是粗粒度的也可以是细粒度的。Partition本身是一个很普通的数据结构,指向我们的具体的数据本身,即计算时知道数据在哪里。而且这系列数据分片的计算逻辑都是一样的。

4compute为什么所有RDD操作返回都是迭代器?好处是让所有框架无缝集成,结果流处理,机器学习都可以互调,无论是机器学习操作sql,还是sql操作机器学习,还是流处理操作图计算,还是流处理操作sql,大家都是基于RDD,我才不管你是什么东西,只关心你是RDD。第2点,又有可以调用子类的具体东西,所以是不是流处理可以直接调用机器学习的具体功能去训练。因为有this.type(),所以可以通过运行时runtime,来具体把实际的实例赋值给RDD,你转过来就可以去操作它,这样使用了接口,还能调用接口下面的子类。

5Scala中使用了接口,还能调用接口下面的子类。无缝集成的基础上,可以使用各自的功能。产生核裂变:如果我是做金融方面的,开发了一个金融类的子框架,子框架可以直接在代码中调机器学习,调图计算进行什么分享预测,行为分析,模式分析。也可以调sql进行数据挖掘。你写了个子框架,遵循RDD的规范,机器学习转过来可以调用我的金融框架。因为无缝集成,写个电商框架也可以调金融框架,每增强一个,可以让所有的增强。每提出一个新的框架,是不是可以使用其它所有的功能。

6:由于有了PreferedLocation,Spark可以处理一切数据,每次都符合完美的数据本地性。Spark就是要做一体化多元化的数据处理框架,不仅仅只是大数据。兼容一切文件系统,一切操作系统,一切文件格式。任何格式的数据,第一计算更快,第2使用更简单。但是Spark做实时事务性处理,反应没那么快,控制难度大。如银行转账。做实时处理是可以的。除此之外,Spark要一统数据处理的天下!

7RDD的弊端:目前不支持细粒度的写操作(如网络爬虫)以及增量迭代计算(每次迭代的时候,只迭代其中的一部分数据,本身是粗粒度,不能很好的支持增量迭代)


1.RDD的创建方式

Spark应用程序运行过程中,第一个RDD代表了Spark应用程序输入数据的来源,之后通过Trasformation来对RDD进行各种算子的转换,来实现具体的算法

Spark中的基本方式:

1)使用程序中的集合创建

这种方式的实际意义主要用于测试。

2)使用本地文件系统创建

这种方式的实际意义主要用于测试大量数据的文件

3)使用HDFS创建RDD

这种方式为生产环境中最常用的创建RDD的方式

4)基于DB创建

5)基于NoSQL:例如HBase

6)基于S3(SC3)创建

7)基于数据流创建

2.RDD创建实战

1)通过集合创建

代码:

object RDDBasedOnCollection {
def main (args: Array[String]) {
val conf = newSparkConf()//create SparkConf
conf.setAppName("RDDBasedOnCollection")//set app name
conf.setMaster("local")//run local
valsc =new SparkContext(conf)
val numbers = 1 to 100 //创建一个Scala集合
valrdd = sc.parallelize(numbers)
val sum =rdd.reduce(_+_) //1+2=3 3+3=6 6+4=10
println
("1+2+...+99+100"+"="+sum)
}
}

结果:

2)通过本地文件系统创建

代码:

object RDDBasedOnLocalFile {
def main (args: Array[String]) {
val conf = newSparkConf()//create SparkConf
conf.setAppName("RDDBasedOnCollection")//set app name
conf.setMaster("local")//run local
valsc =new SparkContext(conf)
val rdd = sc.textFile("C:/Users/feng/IdeaProjects/WordCount/src/SparkText.txt")
val linesLength=rdd.map(line=>line.length())
val sum = linesLength.reduce(_+_)
println("the total characters of the file"+"="+sum)
}
}

结果:

3)通过HDFS创建RDD

代码:

val wordcount = sc.textFile("/library/wordcount/input/licenses").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_).filter(pair=>pair._2>20).collect().foreach(println)



由于RDD的不可修改的特性,导致RDD的操作与正常面向对象的操作不同,RDD的操作基本分为3大类:transformationactioncontoller

1.Transformation

Transformation是通过转化针对已有的RDD创建出新的RDD

map(func):对调用mapRDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集

filter(func): 对调用filterRDD数据集中的每个元素都使用func,然后返回一个包含使functrue的元素构成的RDD

flatMap(func):map差不多,但是flatMap生成的是多个结果

mapPartitions(func):map很像,但是map是每个element,而mapPartitions是每个partition

mapPartitionsWithSplit(func):mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index

sample(withReplacement,faction,seed):抽样

union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合

distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinctelement

groupByKey(numTasks):返回(K,Seq[V]),也就是hadoopreduce函数接受的key-valuelist

reduceByKey(func,[numTasks]):就是用一个给定的reducefunc再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数

sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascendingboolean类型

join(otherDataset,[numTasks]):当有两个KVdataset(K,V)(K,W),返回的是(K,(V,W))dataset,numTasks为并发的任务数

cogroup(otherDataset,[numTasks]):当有两个KVdataset(K,V)(K,W),返回的是(K,Seq[V],Seq[W])dataset,numTasks为并发的任务数

Transformation特性:

lazy优化:由于Tranformationlazy特性,也就是创建不马上运行,对于框架来说,我有足够的时间查看到尽可能多的步骤,看到的步骤越多,优化的空间就越大。最简单的优化方式就是步骤合并,例如本来的做法是a=b*3;b=c*3;c=d*3;d=3,步骤合并后就是a=3*3*3*3

2.Action

Action操作的目的是得到一个值,或者一个结果

reduce(func):说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的

collect():一般在filter或者足够小的结果的时候,再用collect封装返回一个数组

count():返回的是dataset中的element的个数

first():返回的是dataset中的第一个元素

take(n):返回前nelements,这个士driverprogram返回的

takeSample(withReplacementnumseed):抽样返回一个dataset中的num个元素,随机种子seed

saveAsTextFilepath):把dataset写到一个textfile中,或者hdfs,或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file

saveAsSequenceFile(path):只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统

countByKey():返回的是key对应的个数的一个map,作用于一个RDD

foreach(func):dataset中的每个元素都使用func

3.Contoller

Contoller动作主要为持久化RDD,例如cache()persist(),checkpoint();

具体内容在后续刊物中会讲解。

4.Spark WordCount动手实践

本小节通过IDEA具体逐步调试一个WordCount案例,让学员知道各步骤中RDD的具体类型,并为下一节逐步解析做铺垫

(1)使用的wordCount代码如下:

1.object WordCount {

2.def main (args: Array[String]) {

3.val conf = new SparkConf()//create SparkConf

4.conf.setAppName("Wow,My First Spark App")//set app name

5.conf.setMaster("local")//run local

6.val sc =new SparkContext(conf)

7.val lines =sc.textFile("C://Users//feng//IdeaProjects//WordCount//src//SparkText.txt")

8.val words = lines.flatMap{ lines => lines.split(" ") }

9.val pairs =words.map ( word => (word,1) )

10.val reduce = pairs.reduceByKey(_+_)

11.val sort_1 = reduce.map(pair=>(pair._2,pair._1))

12.val sort_2 = sort_1.sortByKey(false)

13.val sort_3=sort_2.map(pair=>(pair._2,pair._1))

14.val filter=sort_3.filter(pair=>pair._2>2)

15.filter.collect.foreach(wordNumberPair => println(wordNumberPair._1+" : "+wordNumberPair._2))

16.sc.stop()

17.}

18.}

(1)程序使用的SparkText.txt文件内容如下

hadoop hadoop hadoop

spark Flink spark

scala scala object

object spark scala

spark spark

Hadoop hadoop

(2)程序WordCount调试结果:

通过IDEA的逐步调试,会在调试窗口显示每一行代码具体操作什么类型的RDD,此RDD通过什么依赖关系依赖于父RDD等重要信息

2.8.2 解析RDD生成的内部机制

本小节基于上小节程序的调试结果,逐条查看调试信息内容,并基于信息内容进行讲解,并在讲解中回顾并复习本章所有内容。

(1)line= sc.textFile()

本语句的作用在于从外部数据中读取数据,并生成MapPartitionsRDD此处需要注意:

如图2-16所示,可以看出次MapPartitionsRDDdeps(dependency,依赖)HadoopRDD,从这里可以发现其实textFile()过程包含两个步骤,第一步骤将文件内容转化为HadoopRDD(key-value形式,key为行号),第二步骤将HadoopRDD转化为MapPartitionsRDD(value形式,将key-value类型的key删去)

(2)words=line.flatMap()

此命令对于RDD采取transformation(转换)操作,作用在于将MapPartitionsRDD中的每一个记录进行以空格为标记的切分,并把每一个RDD的切分的结果放在一个MapPartitionRDD中

(3)pairs=words.map(word=>(word,1))

此命令对于RDD采取transformation(转换)操作,作用在于将MapPartitionsRDD中的每一个记录(例:spark(value类型))转换为key-value类型(: (spark,1)),便于下一步reduceByKey操作

(4)reduce = pairs.reduceByKey(_+_)

此命令对于RDD采取action(动作)操作,作用在于通过shufflepairs中所有的记录按照key相同value相加的规则进行处理,并把结果放到一个shuffleRDD中。例((spark,1),(spark,1))变成((spark,2))

同时需要注意一下两点:首先本步骤实质上分为两个步骤,第一步骤为local级别的reduce,对当前计算机所拥有的数据先进行reduce操作,生成MapPartitionsRDD;第二步骤为shuffle级别的reduce,基于第一步骤的结果,对结果进行shuffle-reduce操作,生成最终的shuffleRDD其次 Action操作进行时,对此操作之前的所有转换操作进行执行,所以调试过程中会出现此前的除textFile操作的执行时间均非常短,说明RDD转换操作不直接进行运算。

(5)sort_1 = reduce.map(pair=>(pair._2,pair._1))

此命令对于RDD采取transformation(转换)操作,作用在于将shuffleRDD中的每一个记录的keyvalue互换,生成一个新的MapPartitionsRDD。例: (spark,2)变为(2,spark)

(6)sort_2 = sort_1.sortByKey(false)

此命令对于RDD采取action(动作)操作,作用在于将MapPartitionsRDD根据key进行排序,并生成shuffleRDD

(7)sort_3=sort_2.map(pair=>(pair._2,pair._1))

此命令对于RDD采取transformation(转换)操作,作用在于将shuffleRDD中的每一个记录的keyvalue互换,生成一个新的MapPartitionsRDD。例: (2,spark)变为(spark,2)

(8)filter=sort_3.filter(pair=>pair._2>2)

此命令对于RDD采取transformation(转换)操作,作用在于根据value值筛选MapPartitionsRDD中的数据,输出value大于2的记录

(9)最后通过collect()方法将结果收集后,使用foreach()方法遍历数据并通过println()方法打印出所有数据。


1.创建RDD

代码:

def sparkContext(name:String)=
{
val conf = new SparkConf().setAppName(name).setMaster("local")
val sc = new SparkContext(conf)
sc
}

2.Map

作用:适用于任何集合,且对其作用的集合中的每一个元素循环遍历,并调用其作为参数的函数对每一个遍历的元素进行具体化处理。

代码:

def mapTransformation(sc:SparkContext): Unit ={
val nums = sc.parallelize(1 to 10)//根据集合创建RDD
val mapped = nums.map(item=> 2 * item)
mapped.collect.foreach(println)
}

结果:

3.Filter

作用:遍历集合中的所有元素,将每个元素作为参数放入函数中进行判断,将判断结果为真的元素筛选出来。

代码:

def filterTransformation(sc:SparkContext): Unit ={
val nums = sc.parallelize(1 to 20)//根据集合创建RDD
val filtered = nums.filter(item => item % 2 == 0)
filtered.collect.foreach(println)
}

4.Flatmap

作用:通过传入的作为参数的函数来作用与RDD的每个字符串进行单词切分,然后把切分后的结果合并成一个大的集合

代码:

def flatmapTransformation(sc:SparkContext): Unit ={
val bigData = Array("scala","spark","java Hadoop","java tachyon")
val bigDataString =sc.parallelize(bigData)
val words= bigDataString.flatMap(line=>line.split(" "))
words.collect.foreach(println)
}

结果:

5.groupByKey

作用:将传入的tuple数组生成为RDD,通过groupByKey方法将RDD通过key进行分组汇总,并生成一个新的RDD

代码:

def groupByKeyTransformation(sc:SparkContext): Unit ={
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(90,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(70,"Scala"))
val dataRDD = sc.parallelize(data)
val group = dataRDD.groupByKey()
group.collect.foreach(pair=>println(pair._1+":"+pair._2))
}

结果:

6.reduceByKey

作用:对key相同的元素进行value值得相加。

代码:

def reduceByKeyTransformation(sc:SparkContext): Unit ={
val lines =sc.textFile("C://Users//feng//IdeaProjects//WordCount//src//SparkText.txt",1)
val reduce= lines.map(line=>(line,1)).reduceByKey(_+_)
reduce.collect.foreach(pair=>println(pair._1+":"+pair._2))
}


7.Join

作用:根据相同key,把不同的RDD合并为一个RDD
代码:

def joinTransformation(sc:SparkContext): Unit ={
//大数据中最重要的算子
val studentNames=Array(
Tuple2(1,"Spark"),
Tuple2(2,"Tachyon"),
Tuple2(3,"Hadoop")
)
val studentScore=Array(
Tuple2(1,100),
Tuple2(2,95),
Tuple2(3,65),
Tuple2(2,95),
Tuple2(3,65)
)
val names = sc.parallelize(studentNames)
val scores = sc.parallelize(studentScore)
val studentNameAndScore=names.join(scores)
studentNameAndScore.collect.foreach(println)
}
结果:

8.cogroup

作用:协同分组,首先将两个RDD的内容进行join,在此基础上,以IDkey的情况下将改ID内容的所有分数聚合到一起。

代码:

def cogroupTransformation(sc:SparkContext): Unit ={
val nameList = Array(
Tuple2(1,"Spark"),
Tuple2(2,"Scala"),
Tuple2(3,"Hadoop")
)
val scoreList = Array(
Tuple2(1,100),
Tuple2(2,90),
Tuple2(3,87),
Tuple2(1,80),
Tuple2(2,90),
Tuple2(2,60)
)
val names = sc.parallelize(nameList)
val scores =sc.parallelize(scoreList)
val nameScores= names.cogroup(scores)
nameScores.collect.foreach(println)
}






】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark Python API 学习(1) 下一篇Spark SQL 源代码分析系列

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目