前言
Spark的核心抽象是RDD,Spark程序中RDD对象无处不在,因此在基于Spark进行编程开发之前,需要对RDD的特征和基本操作有所了解,以便能顺利进行Spark程序的编程开发。
Spark程序依赖的运行环境
1)已安装好Spark集群环境(单机版或集群版均可,正式环境都是集群版)
2)已安装好Hadoop集群环境以及相关组件(如hive)
RDD创建方式
sc =sparkContext() #sc代表Spark的上下文环境
hc = HiveContext(sc) #hc代表Spark中的Hive对象
1)读取外部数据源创建sc =sparkContext()
(1)sc.textFile(filePath) #filePath为文件路径
(2)hc.sql(sqltext) #hive查询的结果数据,返回类型为dataFrame (dataFrame为RDD的封装类型,在Spark SQL中将进行详细介绍)。
2)自定义数据对象
sc.parallelize(A,size) 变量A为列表或元组类型,size为RDD分区值,
3)RDD之间的相互转换
RDD类型划分
(Spark中“算子”表示RDD的操作)
1)Transformation RDD(转换类RDD)
(1)映射型算子(map)#变换操作,func表示具体的操作过程
例如:map(func),flatmap(func),filter(func)
(2)集合型算子(union)#合并操作
例如:join(RDD),union(RDD)等
(3)聚合型算子(group)#统计分类操作
例如:groupByKey(), reduceByKey(func)
2)Action RDD (执行类RDD)
(1)输出型算子
例如:saveAsTextFile(path)
(2)统计型算子
例如:count()
(3)显示型算子
例如:taken(n),collect(),first(),reduce(func)
RDD算子分类
1)value 类transformation 如:map()
2)key-value 类transformation 如:groupByKey()
3)action算子
因此,在Spark编程过程中需基于实际需求明确每个阶段要选择什么类型的算子,然后结合算子的输入输出数据类型将各种算子组合在一起,最终指定结果的输出方式即可。
编程实例
Spark目前提供了四种编程语言(Scala、Java、Python和R)的API文档,我们可以选择自己擅长的语言进行Spark程序的编程开发(个人看法:Spark虽然提供了Java的API接口,但Java不支持函数式编程范式,使得Java编写的Spark程序代码十分冗长,可读性差)。 本文实例采用Python语言,具体如下所示:
–功能说明:读取外部数据源,进行分类统计汇总操作,并将结果写入到指定路径的文件中
from pyspark import SparkContext,SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
hc = HiveContext(sc)
path =”/home/spark/router”
logtext =”select dnum, mac from router where date=’2017-09-28’ ”
sqlRDD = hc.sql(logtext)
mapRDD = sqlRDD.map(lambda x: (x[0],x[1].split(‘:’)[0]))
groupRDD = mapRDD.group ByKey()
–结果输出
groupRDD.saveAsTextFile(path)