转自https://www.jianshu.com/p/a27f5f5f14e5; https://blog.csdn.net/feloxx/article/details/72819964
一、简介 Spark SQL是Spark中处理结构化数据的模块。与基础的Spark RDD API不同,Spark SQL的接口提供了更多关于数据的结构信息和计算任务的运行时信息。在Spark内部,Spark SQL会能够用于做优化的信息比RDD API更多一些。Spark SQL如今有了三种不同的API:SQL语句、DataFrame API和最新的Dataset API。不过真正运行计算的时候,无论你使用哪种API或语言,Spark SQL使用的执行引擎都是同一个。这种底层的统一,使开发者可以在不同的API之间来回切换,你可以选择一种最自然的方式,来表达你的需求。 (本文针对spark1.6版本,示例语言为Scala)
二、概念 1. SQL。 Spark SQL的一种用法是直接执行SQL查询语句,你可使用最基本的SQL语法,也可以选择HiveQL语法。Spark SQL可以从已有的Hive中读取数据。更详细的请参考Hive Tables 这一节。如果用其他编程语言运行SQL,Spark SQL将以DataFrame 返回结果。你还可以通过命令行command-line 或者 JDBC/ODBC 使用Spark SQL。
2. DataFrame。 是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来说,她和关系型数据库的表 或者 R和Python中的data frame等价,只不过在底层,DataFrame采用了更多优化。DataFrame可以从很多数据源(sources )加载数据并构造得到,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。 DataFrame API支持Scala , Java , Python , and R 。
3. Datasets。 是Spark-1.6新增的一种API,目前还是实验性的。Dataset想要把RDD的优势(强类型,可以使用lambda表达式函数)和Spark SQL的优化执行引擎的优势结合到一起。Dataset可以由JVM对象构建(constructed )得到,而后Dataset上可以使用各种transformation算子(map,flatMap,filter 等)。 Dataset API 对Scala 和Java 的支持接口是一致的,但目前还不支持Python,不过Python自身就有语言动态特性优势(例如,你可以使用字段名来访问数据,row.columnName)。对Python的完整支持在未来的版本会增加进来。
三、创建并操作DataFrame Spark应用可以用SparkContext创建DataFrame,所需的数据来源可以是已有的RDD(existing RDD ),或者Hive表,或者其他数据源(data sources .)以下是一个从JSON文件创建并操作DataFrame的小例子:
val sc: SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("examples/src/main/resources/people.json" )
df.show()
df.printSchema()
df.select("name" ).show()
df.select(df("name" ), df("age" ) + 1 ).show()
df.filter(df("age" ) > 21 ).show()
df.groupBy("age" ).count().show()
SQLContext.sql可以执行一个SQL查询,并返回DataFrame结果。
val sqlContext = ...
val df = sqlContext.sql("SELECT * FROM table" )
三、spark SQL与RDD互操作 Spark SQL有两种方法将RDD转为DataFrame。分别为反射机制 和编程方式 。
1. 利用反射推导schema。#### Spark SQL的Scala接口支持自动将包含case class对象的RDD转为DataFrame。对应的case class定义了表的schema。case class的参数名通过反射,映射为表的字段名。case class还可以嵌套一些复杂类型,如Seq和Array。RDD隐式转换成DataFrame后,可以进一步注册成表。随后,你就可以对表中数据使用 SQL语句查询了。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("examples/src/main/resources/people.txt" ).map (_.split("," )).map (p => Person(p(0 ), p(1 ).trim.toInt)).toDF()
people.registerTempTable("people" )
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19" )
teenagers.map (t => "Name: " + t(0 )).collect().foreach(println)
teenagers.map (t => "Name: " + t.getAs[String]("name" )).collect().foreach(println)
teenagers.map (_.getValuesMap[Any](List("name" , "age" ))).collect().foreach(println)
2. 编程方式定义Schema。#### 如果不能事先通过case class定义schema(例如,记录的字段结构是保存在一个字符串,或者其他文本数据集中,需要先解析,又或者字段对不同用户有所不同),那么你可能需要按以下三个步骤,以编程方式的创建一个DataFrame:
从已有的RDD创建一个包含Row对象的RDD,用StructType创建一个schema,和步骤1中创建的RDD的结构相匹配,把得到的schema应用于包含Row对象的RDD,调用这个方法来实现这一步:SQLContext.createDataFrame 例如:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val people = sc.textFile("examples/src/main/resources/people.txt" )
val schemaString = "name age"
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val schema =
StructType(
schemaString.split(" " ).map(fieldName => StructField(fieldName, StringType, true )))
val rowRDD = people.map(_.split("," )).map(p => Row(p(0 ), p(1 ).trim))
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
peopleDataFrame.registerTempTable("people" )
val results = sqlContext.sql("SELECT name FROM people" )
results.map(t => "Name: " + t(0 )).collect().foreach(println)
四、spark SQL与其它数据源的连接与操作 Spark SQL支持基于DataFrame操作一系列不同的数据源。DataFrame既可以当成一个普通RDD来操作,也可以将其注册成一个临时表来查询。把 DataFrame注册为table之后,你就可以基于这个table执行SQL语句了。本节将描述加载和保存数据的一些通用方法,包含了不同的 Spark数据源,然后深入介绍一下内建数据源可用选项。 在最简单的情况下,所有操作都会以默认类型 数据源来加载数据(默认是Parquet,除非修改了spark.sql.sources.default 配置)。
val df = sqlContext.read.load("examples/src/main/resources/users.parquet" )
df.select("name" , "favorite_color" ).write.save("namesAndFavColors.parquet" )
你也可以手动指定数据源 ,并设置一些额外的选项参数。数据源可由其全名指定(如,org.apache.spark.sql.parquet),而 对于内建支持的数据源,可以使用简写名(json, parquet, jdbc)。任意类型数据源创建的DataFrame都可以用下面这种语法转成其他类型数据格式。
val df = sqlContext.read.format("json" ).load("examples/src/main/resources/people.json" )
df.select("name" , "age" ).write.format("parquet" ).save("namesAndAges.parquet" )
Spark SQL还支持直接对文件使用SQL查询,不需要用read方法把文件加载进来。
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`" )
1. 连接JSON数据集#### Spark SQL在加载JSON数据的时候,可以自动推导其schema并返回DataFrame。用SQLContext.read.json读取一个包含String的RDD或者JSON文件,即可实现这一转换。
注意,通常所说的json文件只是包含一些json数据的文件,而不是我们所需要的JSON格式文件。JSON格式文件必须每一行是一个独立、完整的的JSON对象。因此,一个常规的多行json文件经常会加载失败。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)
people.printSchema()
people.registerTempTable("people" )
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19" )
val anotherPeopleRDD = sc.parallelize(
"" "{" name":" Yin"," address":{" city":" Columbus"," state":" Ohio"}}" "" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
2. 连接Hive表 Spark SQL支持从Apache Hive 读 写数据。然而,Hive依赖项太多,所以没有把Hive包含在默认的Spark发布包里。要支持Hive,需要在编译spark的时候增加-Phive和 -Phive-thriftserver标志。这样编译打包的时候将会把Hive也包含进来。注意,hive的jar包也必须出现在所有的worker节 点上,访问Hive数据时候会用到(如:使用hive的序列化和反序列化SerDes时)。 Hive配置在conf/目录下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配 置)文件中。请注意,如果在YARN cluster(yarn-cluster mode)模式下执行一个查询的话,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必须在驱动器(driver)和所有执行器(executor)都可用。一种简便的方法是,通过 spark-submit命令的–jars和–file选项来提交这些文件。 如果使用Hive,则必须构建一个HiveContext,HiveContext是派生于SQLContext的,添加了在Hive Metastore里查询表的支持,以及对HiveQL的支持。用户没有现有的Hive部署,也可以创建一个HiveContext。如果没有在 hive-site.xml里配置,那么HiveContext将会自动在当前目录下创建一个metastore_db目录,再根据HiveConf设置 创建一个warehouse目录(默认/user/hive/warehourse)。所以请注意,你必须把/user/hive/warehouse的 写权限赋予启动spark应用程序的用户。
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)" )
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src" )
sqlContext.sql("FROM src SELECT key, value" ).collect().foreach (println)
3. 用JDBC连接其他数据库#### Spark SQL也可以用JDBC访问其他数据库。这一功能应该优先于使用JdbcRDD。因为它返回一个DataFrame,而DataFrame在Spark SQL中操作更简单,且更容易和来自其他数据源的数据进行交互关联。JDBC数据源在java 和python 中用起来也很简单,不需要用户提供额外的 ClassTag。(注意,这与Spark SQL JDBC server不同,Spark SQL JDBC server允许其他应用执行Spark SQL查询) 首先,你需要在spark classpath中包含对应数据库的JDBC driver,下面这行包括了用于访问postgres的数据库driver SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
val jdbcDF = sqlContext.read.format("jdbc" ).options(
Map ("url" -> "jdbc:postgresql:dbserver" ,
"dbtable" -> "schema.tablename" )).load()
注意:
JDBC driver class必须在所有client session或者executor上,对java 的原生classloader可见。这是因为Java的DriverManager在打开一个连接之 前,会做安全检查,并忽略所有对原声classloader不可见的driver。最简单的一种方法,就是在所有worker节点上修改 compute_classpath.sh,并包含你所需的driver jar包。 一些数据库,如H2,会把所有的名字转大写。对于这些数据库,在Spark SQL中必须也使用大写。 五、 spark SQL示例 1、需要准备好四张表,既四个文本文件逗号分隔
2、为这四张表创建好schema,并注册成表
3、时间处理有小部分改动
准备的四张表
表(一)Student (学生表)
字段名
数据类型
可否为空
含 义
Sno
Varchar2(3)
否
学号(主键)
Sname
Varchar2(8)
否
学生姓名
Ssex
Varchar2(2)
否
学生性别
Sbirthday
Date
可
学生出生年月
SClass
Varchar2(5)
可
学生所在班级
表(二)Course(课程表)
属性名
数据类型
可否为空
含 义
Cno
Varchar2(5)
否
课程号(主键)
Cname
Varchar(10)
否
课程名称
Tno
Varchar2(3)
否
教工编号(外键)
表(三)Score(成绩表)
属性名
数据类型
可否为空
含 义
Sno
Varchar2(3)
否
学号(外键)
Cno
Varchar2(5)
否
课程号(外键)
Degree
Number(4,1)
可
成绩
表(四)Teacher(教师表)
属性名
数据类型
可否为空
含 义
Tno
Varchar2(3)
否
教工编号(主键)
Tname
Varchar2(4)
否
教工姓名
Tsex
Varchar2(2)
否
教工性别
Tbirthday
Date
可
教工出生年月
Prof
Varchar2(6)
可
职称
Depart
Varchar(10)
否
教工所在部门
四张表中的数据
例子代码,粘贴可用,注意注释掉不需要的地方即可
package com.cdpsql1 import org.apache.spark.sql.{Row,SparkSession} import org.apache.spark.sql.types._ import scala.collection.mutable import java.text.SimpleDateFormat objectSparkSqlExample1{ defmain(args:Array[String]):Unit={ valspark=SparkSession .builder() .master("local" ) .appName("test" ) .config("spark.sql.shuffle.partitions" , "5" ) .getOrCreate() valStudentSchema:StructType=StructType(mutable.ArraySeq( StructField("Sno" ,StringType,nullable= false ), StructField("Sname" ,StringType,nullable= false ), StructField("Ssex" ,StringType,nullable= false ), StructField("Sbirthday" ,StringType,nullable= true ), StructField("SClass" ,StringType,nullable= true ) )) valCourseSchema:StructType=StructType(mutable.ArraySeq( StructField("Cno" ,StringType,nullable= false ), StructField("Cname" ,StringType,nullable= false ), StructField("Tno" ,StringType,nullable= false ) )) valScoreSchema:StructType=StructType(mutable.ArraySeq( StructField("Sno" ,StringType,nullable= false ), StructField("Cno" ,StringType,nullable= false ), StructField("Degree" ,IntegerType,nullable= true ) )) valTeacherSchema:StructType=StructType(mutable.ArraySeq( StructField("Tno" ,StringType,nullable= false ), StructField("Tname" ,StringType,nullable= false ), StructField("Tsex" ,StringType,nullable= false ), StructField("Tbirthday" ,StringType,nullable= true ), StructField("Prof" ,StringType,nullable= true ), StructField("Depart" ,StringType,nullable= false ) )) defgetDate(time:String)={ valnow:Long=System.currentTimeMillis() vardf:SimpleDateFormat=new SimpleDateFormat(time) df.format(now) } valStudentData=spark.sparkContext.textFile("input/sqltable/Student" ).map{ lines=> valline=lines.split("," ) Row(line(0 ),line( 1 ),line( 2 ),line( 3 ),line( 4 )) } valCourseData=spark.sparkContext.textFile("input/sqltable/Course" ).map{ lines=> valline=lines.split("," ) Row(line(0 ),line( 1 ),line( 2 )) } valScoreData=spark.sparkContext.textFile("input/sqltable/Score" ).map{ lines=> valline=lines.split("," ) Row(line(0 ),line( 1 ),line( 2 ).toInt) } valTeacherData=spark.sparkContext.textFile("input/sqltable/Teacher" ).map{ lines=> valline=lines.split("," ) Row(line(0 ),line( 1 ),line( 2 ),line( 3 ),line( 4 ),line( 5 )) } valStudentTable=spark.createDataFrame(StudentData,StudentSchema) StudentTable.createOrReplaceTempView("Student" ) valCourseTable=spark.createDataFrame(CourseData,CourseSchema) CourseTable.createOrReplaceTempView("Course" ) valScoreTable=spark.createDataFrame(ScoreData,ScoreSchema) ScoreTable.createOrReplaceTempView("Score" ) valTeacherTable=spark.createDataFrame(TeacherData,TeacherSchema) TeacherTable.createOrReplaceTempView("Teacher" ) spark.sql("SELECTsname,ssex,sclassFROMStudent" ).show() spark.sql("SELECTDISTINCTdepartFROMTeacher" ).show() spark.sql("SELECT*FROMStudent" ).show() spark.sql("SELECT*FROMScoreWHEREdegree>=60anddegree<=80" ).show() spark.sql("SELECT*FROMScoreWHEREdegree='85'ORdegree='86'ORdegree='88'" ).show() spark.sql("SELECT*FROMStudentWHEREsclass='95031'ORssex='female'" ).show() spark.sql("SELECT*FROMStudentORDERBYsclassDESC" ).show() spark.sql("SELECT*FROMStudentORDERBYsclass" ).show() spark.sql("SELECT*FROMScoretORDERBYt.snoASC,t.degreeDESC" ).show() spark.sql("SELECTt.sclasstotalnumFROMStudenttWHEREsclass='95031'" ).show() spark.sql("SELECTt.sclassAStotalnumFROMStudenttWHEREsclass='95031'" ).show() spark.sql("SELECT*FROM(SELECT*FROMScoreORDERBYdegreeDESCLIMIT1)" ).show() spark.sql("SELECTt.sno,t.cnoFROMScoretORDERBYdegreeDESC" ).show() spark.sql("SELECT*FROMScoreWHEREdegreeIN(SELECTMAX(degree)FROMScoret)" ).show() spark.sql("SELECTAVG(degree)averageFROMScoretWHEREcno='3-245'" ).show() spark.sql("SELECTAVG(degree)averageFROMScoreWHEREcno='3-105'" ).show() spark.sql("SELECTAVG(degree)averageFROMScoreWHEREcno='6-166'" ).show() spark.sql("SELECTcno,AVG(degree)FROMScoretGROUPBYcno" ).show() spark.sql("SELECTcno,AVG(degree)FROMScoreWHEREcnoLIKE'3%'GROUPBYcnoHAVINGCOUNT(1)>=5" ).show() spark.sql("SELECTsnoFROMScoreWHEREdegreeBETWEEN70AND90" ).show() spark.sql("SELECTs.sname,t.cno,t.degreeFROMScoret,StudentsWHEREt.sno=s.sno" ).show() spark.sql("SELECTs.sname,t.cno,t.degreeFROMScoretJOINStudentsONt.sno=s.sno" ).show() spark.sql("SELECTs.sname,t.cno,t.degreeFROMScoretJOINStudentsONt.sno=s.sno" ).show() spark.sql("SELECTs.sname,t.degree,c.cnameFROMScoret,Students,CoursecWHEREt.sno=s.snoANDt.cno=c.cno" ).show() spark.sql("SELECTs.sname,t.degree,c.cnameFROMScoret" + "JOINStudentsont.sno=s.sno" + "JOINCourseconc.cno=t.cno" ).show() spark.sql("SELECTAVG(degree)averageFROMScoreWHEREsnoIN(SELECTsnoFROMStudentWHEREsclass='95033')" ).show() spark.sql("SELECT*FROMScoreWHEREcno='3-105'ANDdegree>(SELECTdegreeFROMscoreWHEREsno='109'ANDcno='3-105')" ).show() spark.sql("SELECT*FROMScoreWHEREsnoIN" + "(SELECTsnoFROMScoretGROUPBYt.snoHAVINGCOUNT(1)>1)ANDdegree!=(SELECTMAX(degree)FROMScore)" ).show() spark.sql("SELECT*FROMScoreWHEREdegree!=(SELECTMAX(degree)FROMScore)" ).show() spark.sql("SELECT*FROMScoretWHEREt.degree>(SELECTdegreeFROMScoreWHEREsno='109'ANDcno='3-105')" ).show() spark.sql("SELECTsno,sname,sbirthday" + "FROMStudent" + "WHEREsubstring(sbirthday,0,4)=(" + "SELECTsubstring(t.sbirthday,0,4)" + "FROMStudentt" + "WHEREsno='108')" ).show() spark.sql("SELECTt.tno,c.cno,c.cname,s.degreeFROMTeachert" + "JOINCoursecONt.tno=c.tno" + "JOINScoresONc.cno=s.cnoWHEREt.tname='Zhangxu'" ).show() spark.sql("SELECTtnameFROMTeachere" + "JOINCoursecONe.tno=c.tno" + "JOIN(SELECTcnoFROMScoreGROUPBYcnoHAVINGCOUNT(cno)>5)tONc.cno=t.cno" ).show() spark.sql("SELECT*FROMStudentWHEREsclassIN('95031','95033')" ).show() spark.sql("SELECT*FROMStudentWHEREsclassLIKE'9503%'" ).show() spark.sql("SELECTcnoFROMScoreWHEREdegree>85GROUPBYcno" ).show() spark.sql("SELECTt.sno,t.cno,t.degreeFROMScoret" + "JOINCoursecONt.cno=c.cno" + "JOINTeachereONc.tno=e.tnoWHEREe.depart='departmentofcomputer'" ).show() spark.sql("SELECTtname,prof" + "FROMTeacher" + "WHEREprofNOTIN(SELECTa.prof" + "FROM(SELECTprof" + "FROMTeacher" + "WHEREdepart='departmentofcomputer'" + ")a" + "JOIN(SELECTprof" + "FROMTeacher" + "WHEREdepart='departmentofelectronicengineering'" + ")bONa.prof=b.prof)" ).show() spark.sql("SELECTtname,prof" + "FROMTeacher" + "WHEREdepart='departmentofelectronicengineering'" + "ANDprofNOTIN(SELECTprof" + "FROMTeacher" + "WHEREdepart='departmentofcomputer')" + "ORdepart='departmentofcomputer'" + "ANDprofNOTIN(SELECTprof" + "FROMTeacher" + "WHEREdepart='departmentofelectronicengineering')" ).show() spark.sql("SELECTt.sno,t.cno,degree" + "FROMSCOREt" + "WHEREdegree>(" + "SELECTMIN(degree)" + "FROMscore" + "WHEREcno='3-245'" + ")" + "ANDt.cno='3-105'" + "ORDERBYdegreeDESC" ).show() spark.sql("SELECTt.sno,t.cno,t.degreeFROMScoretWHEREt.degree>(SELECTMAX(degree)FROMScoreWHEREcno='3-245')ANDt.cno='3-105'" ).show() spark.sql("SELECTsname,ssex,sbirthdayFROMStudent" + "UNIONSELECTtname,tsex,tbirthdayFROMTeacher" ).show() spark.sql("SELECTsname,ssex,sbirthday" + "FROMStudent" + "WHEREssex='female'" + "UNION" + "SELECTtname,tsex,tbirthday" + "FROMTeacher" + "WHEREtsex='female'" ).show() spark.sql("SELECTs.*" + "FROMscores" + "WHEREs.degree<(" + "SELECTAVG(degree)" + "FROMscorec" + "WHEREs.cno=c.cno)" ).show() spark.sql("SELECTtname,depart" + "FROMteachert" + "WHEREt.tnoIN(" + "SELECTtno" + "FROMcoursec" + "WHEREc.cnoIN(" + "SELECTcno" + "FROMscore))" ).show() spark.sql("SELECTtname,depart" + "FROMteachert" + "WHEREt.tnoNOTIN(" + "SELECTtno" + "FROMcoursec" + "WHEREc.cnoIN(" + "SELECTcno" + "FROMscore))" ).show() spark.sql("SELECTSClass" + "FROMStudentt" + "WHERESsex='male'" + "GROUPBYSClass" + "HAVINGCOUNT(Ssex)>=2" ).show() spark.sql("SELECT*FROMStudenttWHERESnameNOTLIKE('Wang%')" ).show() spark.sql("SELECTSname,(" +getDate( "yyyy" )+ "-substring(sbirthday,0,4))ASageFROMSTUDENTt" ).show() spark.sql("SELECTSname,(CAST(" +getDate( "yyyy" )+ "ASINT)-CAST(substring(sbirthday,0,4)ASINT))ASage" + "FROMStudentt" ).show() spark.sql("SELECTMAX(t.sbirthday)ASmaximumFROMStudentt" ).show() spark.sql("SELECTMIN(t.sbirthday)ASminimumFROMStudentt" ).show() spark.sql("SELECT*" + "FROMStudent" + "ORDERBYSClassDESC,CAST(" +getDate( "yyyy" )+ "ASINT)-CAST(substring(Sbirthday,0,4)ASINT)DESC" ).show() spark.sql("SELECTTSex,CName" + "FROMTeachert" + "JOINcoursecONt.tno=c.tno" + "WHERETSex='male'" ).show() spark.sql("SELECT*" + "FROMScore" + "WHEREdegree=(" + "SELECTMAX(degree)" + "FROMSCOREt)" ).show() spark.sql("SELECTsname" + "FROMSTUDENTt" + "WHEREssexIN(" + "SELECTssex" + "FROMstudent" + "WHEREsname='LiuJun')" ).show() spark.sql("SELECTsname" + "FROMStudentt" + "WHEREssexIN(" + "SELECTssex" + "FROMstudent" + "WHEREsname='LiuJun')" + "ANDsclassIN(SELECTsclass" + "FROMstudent" + "WHEREsname='LiuJun')" ).show() spark.sql("SELECTt.sno,t.cno,t.degree" + "FROMScoret" + "JOINCoursecONt.cno=c.cno" + "JOINStudentsONs.sno=t.sno" + "WHEREs.SSex='male'" + "ANDc.CName='Introductiontocomputer'" ).show() } }