开篇:spark各种库,sparksql,sparkmachicelearning,等这么多库底层都是封装的RDD。意味着1:RDD本身提供了通用的抽象,2:spark现在有5个子框架,sql,Streaming,流式处理,机器学习,图计算,sparkR。可以根据具体领域的内容建模,建第6个库,第7个库。必须掌握spark的这个通用抽象基石-RDD.
1:RDD,基于工作集的分布式函数编程的应用抽象。MapReduce是基于数据集的。他们的共同特征是位置,感知,容错和负载均衡是数据集和工作集都有的。
基于数据集的处理:工作方式是从物理存储设备上,加载数据,然后操作数据,然后写入物理存储设备。但它不适应的场景有:1)不适合于大量的迭代。2)不适合于交互式查询,每次查询都要从磁盘上读取数据,然后查询,然后写回数据结果、如果复杂的查询有多个步骤,则要多次基于磁盘,这还是次要的,只是速度影响。重点是基于数据流的方式,不能复用曾经的结果或者中间的计算结果,这才是致命的,例如有几千人并发操作一个数据仓库,假如有一百人的查询完全是一样的,它也会重新加载数据,重新查询,而spark会对结果重用,复用中间计算结果,就是前面10个步骤是一样的,算过,数据集不会复用,spark则会复用。
而RDD(ResillientDistributedDataset)是基于工作集的,有前面讲过的7大弹性:
弹性1:自动的进行内存和磁盘数据存储的切换;
弹性2:基于Lineage的高校容错;
弹性3:Task如果失败会自动进行特定次数的重试;
弹性4:Stage如果失败会自动进行特定次数的重试,而且重试的时候只会计算失败的分片,
弹性5:checkpoint和persist,链条比较长,计算比较笨重的时候,我们把数据都放在磁盘/HDFS上,这是checkpoint。而persist,是在内存中或者磁盘中对数据进行复用。这是效率和容错的延伸点。
弹性6:数据调度弹性,DAGTASK和资源管理无关。
弹性7:数据分片的高度弹性,如计算过程中会产生很多数据碎片,这时partition就特别小。每次都消耗一个线程去处理的话,这会降低处理效率。这时会考虑把很多partion合并成一个大的partition提升效率。另一个方面,内存不是那么多,但是partition数据比较大,数据block比较大。会考虑把它变成更小的分片,这样让spark有更多的处理批次,但是不会出现OOM。这样数据分片,我们可以人工提高并行度,降低并行度,是弹性的高度体现,而且它完全具有数据本地性。
把多个分片变成一个,不要使用repartition,他会使用shuffle,应该使用colease
而从一万个分片变成10万个分片,则一般可能需要shuffle。RDD本身容许用户在执行多个查询时,显示的将工作集缓存在内存中。以后其它人来查询就可以重用工作集。自然极大提升查询速度。
提示:spark的位置感知比hadoop的位置感知好很多,hadoop进行partition的时候,就不管位置在哪里,spark进行partition的时候,进行下一步stage操作,是会确定这个位置的,它更精致化。
2:Spark Streaming为什么老是用checkpoint,因为经常要用到以前的东西。假设Spark如果有1000个RDD,一般不会产生1000个中间结果。假设Stage内部有一千个步骤,它中间不会产生999次中间结果,默认情况下,它只是产生一次中间结果,而hadoop会产生1000次中间结果。由于Spark的RDD它本身是只读分区的集合,但又为了应对它只对数据标记,不做计算的计算模型,所以它是lazy级别的,所以每次transformation构建的新的RDD,也都是以父RDD为自己的第一个参数传进去的,由此构成了一个链条,在计算的由最后action的时候再触发,所以只有一个中间结果,也所以这就构成了一个从后往前回溯的过程,就是一个函数展开的过程,从源码也看到它是这种从后往前的链条依赖关系,所以它容错的开销会非常低,为什么呢?因为常规的容错方式有1:数据检查点(它的工作方式要通过数据中心的网络连接不同的机器,每次操作的时候都要复制整个数据集。每次都有一个拷贝,是要通过网络的,因为要复制到其他机器上,而带宽就是分布式的瓶颈,这对存储资源也是非常大的消耗)以及2:记录数据的更新(每次数据变化了,我们都记录下,但这个第一复杂,第2耗性能,重算的时候比较难处理),既然这么多缺点spark为什么在记录数据更新上就这么高效呢1)RDD是不可变的所以每次操作就会变成新的RDD+lazy,不存在全局修改的问题,控制难度极大的下降。又产生了链条,可以很方便的容错。2:是粗粒度模式,最简单的想,RDD就是一个List或者Array。RDD是分布式函数式编程的抽象。基于RDD编程一般都采用高级函数。
3:Stage结束,数据会写磁盘。是粗粒度模式,是为了效率,为了简化。如果是更新力度太细太多,记录成本非常高,效率就不是那么高了。对RDD的具体的数据的改变操作(写操作)都是粗粒度的。RDD的写操作是粗粒度的(限制了它的使用场景,网络爬虫这件事就不适合Rdd去做),但是RDD的读操作,既可以是粗粒度的也可以是细粒度的。Partition本身是一个很普通的数据结构,指向我们的具体的数据本身,即计算时知道数据在哪里。而且这系列数据分片的计算逻辑都是一样的。
4:compute为什么所有RDD操作返回都是迭代器?好处是让所有框架无缝集成,结果流处理,机器学习都可以互调,无论是机器学习操作sql,还是sql操作机器学习,还是流处理操作图计算,还是流处理操作sql,大家都是基于RDD,我才不管你是什么东西,只关心你是RDD。第2点,又有可以调用子类的具体东西,所以是不是流处理可以直接调用机器学习的具体功能去训练。因为有this.type(),所以可以通过运行时runtime,来具体把实际的实例赋值给RDD,你转过来就可以去操作它,这样使用了接口,还能调用接口下面的子类。
5:Scala中使用了接口,还能调用接口下面的子类。无缝集成的基础上,可以使用各自的功能。产生核裂变:如果我是做金融方面的,开发了一个金融类的子框架,子框架可以直接在代码中调机器学习,调图计算进行什么分享预测,行为分析,模式分析。也可以调sql进行数据挖掘。你写了个子框架,遵循RDD的规范,机器学习转过来可以调用我的金融框架。因为无缝集成,写个电商框架也可以调金融框架,每增强一个,可以让所有的增强。每提出一个新的框架,是不是可以使用其它所有的功能。
6:由于有了PreferedLocation,Spark可以处理一切数据,每次都符合完美的数据本地性。Spark就是要做一体化多元化的数据处理框架,不仅仅只是大数据。兼容一切文件系统,一切操作系统,一切文件格式。任何格式的数据,第一计算更快,第2使用更简单。但是Spark做实时事务性处理,反应没那么快,控制难度大。如银行转账。做实时处理是可以的。除此之外,Spark要一统数据处理的天下!
7:RDD的弊端:目前不支持细粒度的写操作(如网络爬虫)以及增量迭代计算(每次迭代的时候,只迭代其中的一部分数据,本身是粗粒度,不能很好的支持增量迭代)。
1.RDD的创建方式
Spark应用程序运行过程中,第一个RDD代表了Spark应用程序输入数据的来源,之后通过Trasformation来对RDD进行各种算子的转换,来实现具体的算法
Spark中的基本方式:
1)使用程序中的集合创建
这种方式的实际意义主要用于测试。
2)使用本地文件系统创建
这种方式的实际意义主要用于测试大量数据的文件
3)使用HDFS创建RDD
这种方式为生产环境中最常用的创建RDD的方式
4)基于DB创建
5)基于NoSQL:例如HBase
6)基于S3(SC3)创建
7)基于数据流创建
2.RDD创建实战
1)通过集合创建
代码:
object RDDBasedOnCollection {
def main (args: Array[String]) {
val conf = newSparkConf()//create SparkConf
conf.setAppName("RDDBasedOnCollection")//set app name
conf.setMaster("local")//run local
valsc =new
SparkContext(conf)
val numbers =
1 to 100 //创建一个Scala集合
valrdd = sc.parallelize(numbers)
val sum =rdd.reduce(_+_) //1+2=3 3+3=6 6+4=10
println("1+2+...+99+100"+"="+sum)
}
}
结果:
2)通过本地文件系统创建
代码:
object RDDBasedOnLocalFile {
def main (args: Array[String]) {
val conf = newSparkConf()//create SparkConf
conf.setAppName("RDDBasedOnCollection")//set app name
conf.setMaster("local")//run local
valsc =new
SparkContext(conf)
val rdd = sc.textFile("C:/Users/feng/IdeaProjects/WordCount/src/SparkText.txt")
val linesLength=rdd.map(line=>line.length())
val sum = linesLength.reduce(_+_)
println("the total characters of the file"+"="+sum)
}
}
结果:
3)通过HDFS创建RDD
代码:
val wordcount = sc.textFile("/library/wordcount/input/licenses").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_).filter(pair=>pair._2>20).collect().foreach(println)
由于RDD的不可修改的特性,导致RDD的操作与正常面向对象的操作不同,RDD的操作基本分为3大类:transformation,action,contoller
1.Transformation
Transformation是通过转化针对已有的RDD创建出新的RDD
map(func):对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集
filter(func): 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD
flatMap(func):和map差不多,但是flatMap生成的是多个结果
mapPartitions(func):和map很像,但是map是每个element,而mapPartitions是每个partition
mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index
sample(withReplacement,faction,seed):抽样
union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合
distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element
groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist
reduceByKey(func,[numTasks]):就是用一个给定的reducefunc再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数
sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending是boolean类型
join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数
cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数
Transformation特性:
lazy优化:由于Tranformation的lazy特性,也就是创建不马上运行,对于框架来说,我有足够的时间查看到尽可能多的步骤,看到的步骤越多,优化的空间就越大。最简单的优化方式就是步骤合并,例如本来的做法是a=b*3;b=c*3;c=d*3;d=3,步骤合并后就是a=3*3*3*3。
2.Action
Action操作的目的是得到一个值,或者一个结果
reduce(func):说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的
collect():一般在filter或者足够小的结果的时候,再用collect封装返回一个数组
count():返回的是dataset中的element的个数
first():返回的是dataset中的第一个元素
take(n):返回前n个elements,这个士driverprogram返回的
takeSample(withReplacement,num,seed):抽样返回一个dataset中的num个元素,随机种子seed
saveAsTextFile(path):把dataset写到一个textfile中,或者hdfs,或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中
saveAsSequenceFile(path):只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统
countByKey():返回的是key对应的个数的一个map,作用于一个RDD
foreach(func):对dataset中的每个元素都使用func
3.Contoller
Contoller动作主要为持久化RDD,例如cache(),persist(),checkpoint();
具体内容在后续刊物中会讲解。
本小节通过IDEA具体逐步调试一个WordCount案例,让学员知道各步骤中RDD的具体类型,并为下一节逐步解析做铺垫
(1)使用的wordCount代码如下:
1.object WordCount {
2.def main (args: Array[String]) {
3.val conf = new SparkConf()//create SparkConf
4.conf.setAppName("Wow,My First Spark App")//set app name
5.conf.setMaster("local")//run local
6.val sc =new SparkContext(conf)
7.val lines =sc.textFile("C://Users//feng//IdeaProjects//WordCount//src//SparkText.txt")
8.val words = lines.flatMap{ lines => lines.split(" ") }
9.val pairs =words.map ( word => (word,1) )
10.val reduce = pairs.reduceByKey(_+_)
11.val sort_1 = reduce.map(pair=>(pair._2,pair._1))
12.val sort_2 = sort_1.sortByKey(false)
13.val sort_3=sort_2.map(pair=>(pair._2,pair._1))
14.val filter=sort_3.filter(pair=>pair._2>2)
15.filter.collect.foreach(wordNumberPair => println(wordNumberPair._1+" : "+wordNumberPair._2))
16.sc.stop()
17.}
18.}
(1)程序使用的SparkText.txt文件内容如下
hadoop hadoop hadoop
spark Flink spark
scala scala object
object spark scala
spark spark
Hadoop hadoop
(2)程序WordCount调试结果:
通过IDEA的逐步调试,会在调试窗口显示每一行代码具体操作什么类型的RDD,此RDD通过什么依赖关系依赖于父RDD等重要信息
本小节基于上小节程序的调试结果,逐条查看调试信息内容,并基于信息内容进行讲解,并在讲解中回顾并复习本章所有内容。
(1)line= sc.textFile()
本语句的作用在于从外部数据中读取数据,并生成MapPartitionsRDD。此处需要注意:
如图2-16所示,可以看出次MapPartitionsRDD的deps(dependency,依赖)为HadoopRDD,从这里可以发现其实textFile()过程包含两个步骤,第一步骤将文件内容转化为HadoopRDD(key-value形式,key为行号),第二步骤将HadoopRDD转化为MapPartitionsRDD(value形式,将key-value类型的key删去)
(2)words=line.flatMap()
此命令对于RDD采取transformation(转换)操作,作用在于将MapPartitionsRDD中的每一个记录进行以空格为标记的切分,并把每一个RDD的切分的结果放在一个MapPartitionRDD中
(3)pairs=words.map(word=>(word,1))
此命令对于RDD采取transformation(转换)操作,作用在于将MapPartitionsRDD中的每一个记录(例:spark(value类型))转换为key-value类型(例:
(spark,1)),便于下一步reduceByKey操作
(4)reduce = pairs.reduceByKey(_+_)
此命令对于RDD采取action(动作)操作,作用在于通过shuffle将pairs中所有的记录按照key相同value相加的规则进行处理,并把结果放到一个shuffleRDD中。例((spark,1),(spark,1))变成((spark,2))。
同时需要注意一下两点:首先本步骤实质上分为两个步骤,第一步骤为local级别的reduce,对当前计算机所拥有的数据先进行reduce操作,生成MapPartitionsRDD;第二步骤为shuffle级别的reduce,基于第一步骤的结果,对结果进行shuffle-reduce操作,生成最终的shuffleRDD。其次
Action操作进行时,对此操作之前的所有转换操作进行执行,所以调试过程中会出现此前的除textFile操作的执行时间均非常短,说明RDD转换操作不直接进行运算。
(5)sort_1 = reduce.map(pair=>(pair._2,pair._1))
此命令对于RDD采取transformation(转换)操作,作用在于将shuffleRDD中的每一个记录的key和value互换,生成一个新的MapPartitionsRDD。例:
(spark,2)变为(2,spark)
(6)sort_2 = sort_1.sortByKey(false)
此命令对于RDD采取action(动作)操作,作用在于将MapPartitionsRDD根据key进行排序,并生成shuffleRDD
(7)sort_3=sort_2.map(pair=>(pair._2,pair._1))
此命令对于RDD采取transformation(转换)操作,作用在于将shuffleRDD中的每一个记录的key和value互换,生成一个新的MapPartitionsRDD。例:
(2,spark)变为(spark,2)
(8)filter=sort_3.filter(pair=>pair._2>2)
此命令对于RDD采取transformation(转换)操作,作用在于根据value值筛选MapPartitionsRDD中的数据,输出value大于2的记录
(9)最后通过collect()方法将结果收集后,使用foreach()方法遍历数据并通过println()方法打印出所有数据。
1.创建RDD
代码:
def sparkContext(name:String)=
{
val conf = new SparkConf().setAppName(name).setMaster("local")
val sc = new SparkContext(conf)
sc
}
2.Map
作用:适用于任何集合,且对其作用的集合中的每一个元素循环遍历,并调用其作为参数的函数对每一个遍历的元素进行具体化处理。
代码:
def mapTransformation(sc:SparkContext): Unit ={
val nums = sc.parallelize(1 to 10)//根据集合创建RDD
val mapped = nums.map(item=> 2 * item)
mapped.collect.foreach(println)
}
结果:
3.Filter
作用:遍历集合中的所有元素,将每个元素作为参数放入函数中进行判断,将判断结果为真的元素筛选出来。
代码:
def filterTransformation(sc:SparkContext): Unit ={
val nums = sc.parallelize(1 to 20)//根据集合创建RDD
val filtered = nums.filter(item => item % 2 == 0)
filtered.collect.foreach(println)
}
4.Flatmap
作用:通过传入的作为参数的函数来作用与RDD的每个字符串进行单词切分,然后把切分后的结果合并成一个大的集合
代码:
def flatmapTransformation(sc:SparkContext): Unit ={
val bigData = Array("scala","spark","java Hadoop","java tachyon")
val bigDataString =sc.parallelize(bigData)
val words= bigDataString.flatMap(line=>line.split(" "))
words.collect.foreach(println)
}
结果:
5.groupByKey
作用:将传入的tuple数组生成为RDD,通过groupByKey方法将RDD通过key进行分组汇总,并生成一个新的RDD
代码:
def groupByKeyTransformation(sc:SparkContext): Unit ={
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(90,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(70,"Scala"))
val dataRDD = sc.parallelize(data)
val group = dataRDD.groupByKey()
group.collect.foreach(pair=>println(pair._1+":"+pair._2))
}
结果:
6.reduceByKey
作用:对key相同的元素进行value值得相加。
代码:
def reduceByKeyTransformation(sc:SparkContext): Unit ={
val lines =sc.textFile("C://Users//feng//IdeaProjects//WordCount//src//SparkText.txt",1)
val reduce= lines.map(line=>(line,1)).reduceByKey(_+_)
reduce.collect.foreach(pair=>println(pair._1+":"+pair._2))
}
7.Join
作用:根据相同key,把不同的RDD合并为一个RDD
代码:
def joinTransformation(sc:SparkContext): Unit ={
//大数据中最重要的算子
val studentNames=Array(
Tuple2(1,"Spark"),
Tuple2(2,"Tachyon"),
Tuple2(3,"Hadoop")
)
val studentScore=Array(
Tuple2(1,100),
Tuple2(2,95),
Tuple2(3,65),
Tuple2(2,95),
Tuple2(3,65)
)
val names = sc.parallelize(studentNames)
val scores = sc.parallelize(studentScore)
val studentNameAndScore=names.join(scores)
studentNameAndScore.collect.foreach(println)
}
结果:
8.cogroup
作用:协同分组,首先将两个RDD的内容进行join,在此基础上,以ID为key的情况下将改ID内容的所有分数聚合到一起。
代码:
def cogroupTransformation(sc:SparkContext): Unit ={
val nameList = Array(
Tuple2(1,"Spark"),
Tuple2(2,"Scala"),
Tuple2(3,"Hadoop")
)
val scoreList = Array(
Tuple2(1,100),
Tuple2(2,90),
Tuple2(3,87),
Tuple2(1,80),
Tuple2(2,90),
Tuple2(2,60)
)
val names = sc.parallelize(nameList)
val scores =sc.parallelize(scoreList)
val nameScores= names.cogroup(scores)
nameScores.collect.foreach(println)
}