设为首页 加入收藏

TOP

spark sql基础与示例
2019-03-19 13:08:53 】 浏览:45
Tags:spark sql 基础 示例

转自https://www.jianshu.com/p/a27f5f5f14e5; https://blog.csdn.net/feloxx/article/details/72819964

一、简介

Spark SQL是Spark中处理结构化数据的模块。与基础的Spark RDD API不同,Spark SQL的接口提供了更多关于数据的结构信息和计算任务的运行时信息。在Spark内部,Spark SQL会能够用于做优化的信息比RDD API更多一些。Spark SQL如今有了三种不同的API:SQL语句、DataFrame API和最新的Dataset API。不过真正运行计算的时候,无论你使用哪种API或语言,Spark SQL使用的执行引擎都是同一个。这种底层的统一,使开发者可以在不同的API之间来回切换,你可以选择一种最自然的方式,来表达你的需求。
(本文针对spark1.6版本,示例语言为Scala)

二、概念

1. SQL。Spark SQL的一种用法是直接执行SQL查询语句,你可使用最基本的SQL语法,也可以选择HiveQL语法。Spark SQL可以从已有的Hive中读取数据。更详细的请参考Hive Tables 这一节。如果用其他编程语言运行SQL,Spark SQL将以DataFrame返回结果。你还可以通过命令行command-line 或者 JDBC/ODBC 使用Spark SQL。

2. DataFrame。是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来说,她和关系型数据库的表 或者 R和Python中的data frame等价,只不过在底层,DataFrame采用了更多优化。DataFrame可以从很多数据源(sources)加载数据并构造得到,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。
DataFrame API支持Scala, Java, Python, and R

3. Datasets。是Spark-1.6新增的一种API,目前还是实验性的。Dataset想要把RDD的优势(强类型,可以使用lambda表达式函数)和Spark SQL的优化执行引擎的优势结合到一起。Dataset可以由JVM对象构建(constructed)得到,而后Dataset上可以使用各种transformation算子(map,flatMap,filter 等)。
Dataset API 对ScalaJava的支持接口是一致的,但目前还不支持Python,不过Python自身就有语言动态特性优势(例如,你可以使用字段名来访问数据,row.columnName)。对Python的完整支持在未来的版本会增加进来。







三、创建并操作DataFrame

Spark应用可以用SparkContext创建DataFrame,所需的数据来源可以是已有的RDD(existing RDD
),或者Hive表,或者其他数据源(data sources.)以下是一个从JSON文件创建并操作DataFrame的小例子:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 将DataFrame内容打印到stdout
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// 打印数据树形结构
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// select "name" 字段
df.select("name").show()
// name
// Michael
// Andy
// Justin

// 展示所有人,但所有人的 age 都加1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// 筛选出年龄大于21的人
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// 计算各个年龄的人数
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1

SQLContext.sql可以执行一个SQL查询,并返回DataFrame结果。

val sqlContext = ... // 已有一个 SQLContext 对象
val df = sqlContext.sql("SELECT * FROM table")

三、spark SQL与RDD互操作

Spark SQL有两种方法将RDD转为DataFrame。分别为反射机制编程方式

1. 利用反射推导schema。####

Spark SQL的Scala接口支持自动将包含case class对象的RDD转为DataFrame。对应的case class定义了表的schema。case class的参数名通过反射,映射为表的字段名。case class还可以嵌套一些复杂类型,如Seq和Array。RDD隐式转换成DataFrame后,可以进一步注册成表。随后,你就可以对表中数据使用 SQL语句查询了。

// sc 是已有的 SparkContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 为了支持RDD到DataFrame的隐式转换
import sqlContext.implicits._

// 定义一个case class.
// 注意:Scala 2.10的case class最多支持22个字段,要绕过这一限制,
// 你可以使用自定义class,并实现Product接口。当然,你也可以改用编程方式定义schema
case class Person(name: String, age: Int)

// 创建一个包含Person对象的RDD,并将其注册成table
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// sqlContext.sql方法可以直接执行SQL语句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// SQL查询的返回结果是一个DataFrame,且能够支持所有常见的RDD算子
// 查询结果中每行的字段可以按字段索引访问:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// 或者按字段名访问:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] 会一次性返回多列,并以Map[String, T]为返回结果类型
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回结果: Map("name" -> "Justin", "age" -> 19)

2. 编程方式定义Schema。####

如果不能事先通过case class定义schema(例如,记录的字段结构是保存在一个字符串,或者其他文本数据集中,需要先解析,又或者字段对不同用户有所不同),那么你可能需要按以下三个步骤,以编程方式的创建一个DataFrame:

从已有的RDD创建一个包含Row对象的RDD,用StructType创建一个schema,和步骤1中创建的RDD的结构相匹配,把得到的schema应用于包含Row对象的RDD,调用这个方法来实现这一步:SQLContext.createDataFrame
例如:

// sc 是已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 创建一个RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// 数据的schema被编码与一个字符串中
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL 各个数据类型
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// 基于前面的字符串生成schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 将RDD[people]的各个记录转换为Rows,即:得到一个包含Row对象的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// 将schema应用到包含Row对象的RDD上,得到一个DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// 将DataFrame注册为table
peopleDataFrame.registerTempTable("people")

// 执行SQL语句
val results = sqlContext.sql("SELECT name FROM people")

// SQL查询的结果是DataFrame,且能够支持所有常见的RDD算子
// 并且其字段可以以索引访问,也可以用字段名访问
results.map(t => "Name: " + t(0)).collect().foreach(println)

四、spark SQL与其它数据源的连接与操作

Spark SQL支持基于DataFrame操作一系列不同的数据源。DataFrame既可以当成一个普通RDD来操作,也可以将其注册成一个临时表来查询。把 DataFrame注册为table之后,你就可以基于这个table执行SQL语句了。本节将描述加载和保存数据的一些通用方法,包含了不同的 Spark数据源,然后深入介绍一下内建数据源可用选项。
  在最简单的情况下,所有操作都会以默认类型数据源来加载数据(默认是Parquet,除非修改了spark.sql.sources.default 配置)。

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

你也可以手动指定数据源,并设置一些额外的选项参数。数据源可由其全名指定(如,org.apache.spark.sql.parquet),而 对于内建支持的数据源,可以使用简写名(json, parquet, jdbc)。任意类型数据源创建的DataFrame都可以用下面这种语法转成其他类型数据格式。

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

Spark SQL还支持直接对文件使用SQL查询,不需要用read方法把文件加载进来。

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

1. 连接JSON数据集####

Spark SQL在加载JSON数据的时候,可以自动推导其schema并返回DataFrame。用SQLContext.read.json读取一个包含String的RDD或者JSON文件,即可实现这一转换。

注意,通常所说的json文件只是包含一些json数据的文件,而不是我们所需要的JSON格式文件。JSON格式文件必须每一行是一个独立、完整的的JSON对象。因此,一个常规的多行json文件经常会加载失败。

// sc是已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 数据集是由路径指定的
// 路径既可以是单个文件,也可以还是存储文本文件的目录
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

// 推导出来的schema,可由printSchema打印出来
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// 将DataFrame注册为table
people.registerTempTable("people")

// 跑SQL语句吧!
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// 另一种方法是,用一个包含JSON字符串的RDD来创建DataFrame
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
2. 连接Hive表

Spark SQL支持从Apache Hive读 写数据。然而,Hive依赖项太多,所以没有把Hive包含在默认的Spark发布包里。要支持Hive,需要在编译spark的时候增加-Phive和 -Phive-thriftserver标志。这样编译打包的时候将会把Hive也包含进来。注意,hive的jar包也必须出现在所有的worker节 点上,访问Hive数据时候会用到(如:使用hive的序列化和反序列化SerDes时)。
  Hive配置在conf/目录下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配 置)文件中。请注意,如果在YARN cluster(yarn-cluster mode)模式下执行一个查询的话,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必须在驱动器(driver)和所有执行器(executor)都可用。一种简便的方法是,通过 spark-submit命令的–jars和–file选项来提交这些文件。
  如果使用Hive,则必须构建一个HiveContext,HiveContext是派生于SQLContext的,添加了在Hive Metastore里查询表的支持,以及对HiveQL的支持。用户没有现有的Hive部署,也可以创建一个HiveContext。如果没有在 hive-site.xml里配置,那么HiveContext将会自动在当前目录下创建一个metastore_db目录,再根据HiveConf设置 创建一个warehouse目录(默认/user/hive/warehourse)。所以请注意,你必须把/user/hive/warehouse的 写权限赋予启动spark应用程序的用户。

// sc是一个已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 这里用的是HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

3. 用JDBC连接其他数据库####

Spark SQL也可以用JDBC访问其他数据库。这一功能应该优先于使用JdbcRDD。因为它返回一个DataFrame,而DataFrame在Spark SQL中操作更简单,且更容易和来自其他数据源的数据进行交互关联。JDBC数据源在javapython中用起来也很简单,不需要用户提供额外的 ClassTag。(注意,这与Spark SQL JDBC server不同,Spark SQL JDBC server允许其他应用执行Spark SQL查询)
  首先,你需要在spark classpath中包含对应数据库的JDBC driver,下面这行包括了用于访问postgres的数据库driver
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()

注意:

  • JDBC driver class必须在所有client session或者executor上,对java的原生classloader可见。这是因为Java的DriverManager在打开一个连接之 前,会做安全检查,并忽略所有对原声classloader不可见的driver。最简单的一种方法,就是在所有worker节点上修改 compute_classpath.sh,并包含你所需的driver jar包。
  • 一些数据库,如H2,会把所有的名字转大写。对于这些数据库,在Spark SQL中必须也使用大写。


五、spark SQL示例

1、需要准备好四张表,既四个文本文件逗号分隔

2、为这四张表创建好schema,并注册成表

3、时间处理有小部分改动


准备的四张表

表(一)Student (学生表)

字段名

数据类型

可否为空

含 义

Sno

Varchar2(3)

学号(主键)

Sname

Varchar2(8)

学生姓名

Ssex

Varchar2(2)

学生性别

Sbirthday

Date

学生出生年月

SClass

Varchar2(5)

学生所在班级

表(二)Course(课程表)

属性名

数据类型

可否为空

含 义

Cno

Varchar2(5)

课程号(主键)

Cname

Varchar(10)

课程名称

Tno

Varchar2(3)

教工编号(外键)

表(三)Score(成绩表)

属性名

数据类型

可否为空

含 义

Sno

Varchar2(3)

学号(外键)

Cno

Varchar2(5)

课程号(外键)

Degree

Number(4,1)

成绩

表(四)Teacher(教师表)

属性名

数据类型

可否为空

含 义

Tno

Varchar2(3)

教工编号(主键)

Tname

Varchar2(4)

教工姓名

Tsex

Varchar2(2)

教工性别

Tbirthday

Date

教工出生年月

Prof

Varchar2(6)

职称

Depart

Varchar(10)

教工所在部门


四张表中的数据


例子代码,粘贴可用,注意注释掉不需要的地方即可


  1. packagecom.cdpsql1
  2. importorg.apache.spark.sql.{Row,SparkSession}
  3. importorg.apache.spark.sql.types._
  4. importscala.collection.mutable
  5. importjava.text.SimpleDateFormat
  6. /**
  7. *2017/05/26
  8. *CDP
  9. */
  10. objectSparkSqlExample1{
  11. defmain(args:Array[String]):Unit={
  12. /*****************************************************************************************************************
  13. *sparksession
  14. */
  15. valspark=SparkSession
  16. .builder()
  17. .master("local")
  18. .appName("test")
  19. .config("spark.sql.shuffle.partitions","5")
  20. .getOrCreate()
  21. /*****************************************************************************************************************
  22. *表结构
  23. */
  24. valStudentSchema:StructType=StructType(mutable.ArraySeq(//学生表
  25. StructField("Sno",StringType,nullable=false),//学号
  26. StructField("Sname",StringType,nullable=false),//学生姓名
  27. StructField("Ssex",StringType,nullable=false),//学生性别
  28. StructField("Sbirthday",StringType,nullable=true),//学生出生年月
  29. StructField("SClass",StringType,nullable=true)//学生所在班级
  30. ))
  31. valCourseSchema:StructType=StructType(mutable.ArraySeq(//课程表
  32. StructField("Cno",StringType,nullable=false),//课程号
  33. StructField("Cname",StringType,nullable=false),//课程名称
  34. StructField("Tno",StringType,nullable=false)//教工编号
  35. ))
  36. valScoreSchema:StructType=StructType(mutable.ArraySeq(//成绩表
  37. StructField("Sno",StringType,nullable=false),//学号(外键)
  38. StructField("Cno",StringType,nullable=false),//课程号(外键)
  39. StructField("Degree",IntegerType,nullable=true)//成绩
  40. ))
  41. valTeacherSchema:StructType=StructType(mutable.ArraySeq(//教师表
  42. StructField("Tno",StringType,nullable=false),//教工编号(主键)
  43. StructField("Tname",StringType,nullable=false),//教工姓名
  44. StructField("Tsex",StringType,nullable=false),//教工性别
  45. StructField("Tbirthday",StringType,nullable=true),//教工出生年月
  46. StructField("Prof",StringType,nullable=true),//职称
  47. StructField("Depart",StringType,nullable=false)//教工所在部门
  48. ))
  49. /*****************************************************************************************************************
  50. *获取当前时间函数
  51. */
  52. defgetDate(time:String)={
  53. valnow:Long=System.currentTimeMillis()
  54. vardf:SimpleDateFormat=newSimpleDateFormat(time)
  55. df.format(now)
  56. }
  57. /*****************************************************************************************************************
  58. *读取数据
  59. */
  60. valStudentData=spark.sparkContext.textFile("input/sqltable/Student").map{
  61. lines=>
  62. valline=lines.split(",")
  63. Row(line(0),line(1),line(2),line(3),line(4))
  64. }
  65. valCourseData=spark.sparkContext.textFile("input/sqltable/Course").map{
  66. lines=>
  67. valline=lines.split(",")
  68. Row(line(0),line(1),line(2))
  69. }
  70. valScoreData=spark.sparkContext.textFile("input/sqltable/Score").map{
  71. lines=>
  72. valline=lines.split(",")
  73. Row(line(0),line(1),line(2).toInt)
  74. }
  75. valTeacherData=spark.sparkContext.textFile("input/sqltable/Teacher").map{
  76. lines=>
  77. valline=lines.split(",")
  78. Row(line(0),line(1),line(2),line(3),line(4),line(5))
  79. }
  80. /*****************************************************************************************************************
  81. *转换成表
  82. */
  83. valStudentTable=spark.createDataFrame(StudentData,StudentSchema)
  84. StudentTable.createOrReplaceTempView("Student")
  85. valCourseTable=spark.createDataFrame(CourseData,CourseSchema)
  86. CourseTable.createOrReplaceTempView("Course")
  87. valScoreTable=spark.createDataFrame(ScoreData,ScoreSchema)
  88. ScoreTable.createOrReplaceTempView("Score")
  89. valTeacherTable=spark.createDataFrame(TeacherData,TeacherSchema)
  90. TeacherTable.createOrReplaceTempView("Teacher")
  91. /*****************************************************************************************************************
  92. *走sql节奏
  93. *表名,字段名,区分大小写
  94. */
  95. ////1、查询Student表中的所有记录的Sname、Ssex和Class列。
  96. spark.sql("SELECTsname,ssex,sclassFROMStudent").show()
  97. ////2、查询教师所有的单位即不重复的Depart列。
  98. spark.sql("SELECTDISTINCTdepartFROMTeacher").show()
  99. ////3、查询Student表的所有记录
  100. spark.sql("SELECT*FROMStudent").show()
  101. ////4、查询Score表中成绩在60到80之间的所有记录。
  102. //spark.sql("SELECT*FROMScoreWHEREdegreeBETWEEN60and80").show()
  103. spark.sql("SELECT*FROMScoreWHEREdegree>=60anddegree<=80").show()
  104. ////5、查询Score表中成绩为85,86或88的记录。
  105. spark.sql("SELECT*FROMScoreWHEREdegree='85'ORdegree='86'ORdegree='88'").show()
  106. ////6、查询Student表中“95031”班或性别为“女”的同学记录。
  107. spark.sql("SELECT*FROMStudentWHEREsclass='95031'ORssex='female'").show()
  108. ////7、以Class降序,升序查询Student表的所有记录。
  109. spark.sql("SELECT*FROMStudentORDERBYsclassDESC").show()
  110. spark.sql("SELECT*FROMStudentORDERBYsclass").show()
  111. ////8、以Cno升序、Degree降序查询Score表的所有记录。
  112. spark.sql("SELECT*FROMScoretORDERBYt.snoASC,t.degreeDESC").show()
  113. ////9、查询“95031”班的学生人数。
  114. spark.sql("SELECTt.sclasstotalnumFROMStudenttWHEREsclass='95031'").show()
  115. spark.sql("SELECTt.sclassAStotalnumFROMStudenttWHEREsclass='95031'").show()
  116. ////10、查询Score表中的最高分的学生学号和课程号。(子查询或者排序)
  117. ////oracle=>WHERErownum=1
  118. ////sparksql=>LIMIT1
  119. spark.sql("SELECT*FROM(SELECT*FROMScoreORDERBYdegreeDESCLIMIT1)").show()
  120. spark.sql("SELECTt.sno,t.cnoFROMScoretORDERBYdegreeDESC").show()
  121. spark.sql("SELECT*FROMScoreWHEREdegreeIN(SELECTMAX(degree)FROMScoret)").show()
  122. ////11、查询每门课的平均成绩。
  123. spark.sql("SELECTAVG(degree)averageFROMScoretWHEREcno='3-245'").show()
  124. spark.sql("SELECTAVG(degree)averageFROMScoreWHEREcno='3-105'").show()
  125. spark.sql("SELECTAVG(degree)averageFROMScoreWHEREcno='6-166'").show()
  126. spark.sql("SELECTcno,AVG(degree)FROMScoretGROUPBYcno").show()
  127. ////12、查询Score表中至少有5名学生选修的并以3开头的课程的平均分数。
  128. spark.sql("SELECTcno,AVG(degree)FROMScoreWHEREcnoLIKE'3%'GROUPBYcnoHAVINGCOUNT(1)>=5").show()
  129. ////13、查询分数大于70,小于90的Sno列。
  130. spark.sql("SELECTsnoFROMScoreWHEREdegreeBETWEEN70AND90").show()
  131. ////14、查询所有学生的Sname、Cno和Degree列。
  132. spark.sql("SELECTs.sname,t.cno,t.degreeFROMScoret,StudentsWHEREt.sno=s.sno").show()
  133. spark.sql("SELECTs.sname,t.cno,t.degreeFROMScoretJOINStudentsONt.sno=s.sno").show()
  134. ////15、查询所有学生的Sno、Cname和Degree列。
  135. spark.sql("SELECTs.sname,t.cno,t.degreeFROMScoretJOINStudentsONt.sno=s.sno").show()
  136. ////16、查询所有学生的Sname、Cname和Degree列。
  137. spark.sql("SELECTs.sname,t.degree,c.cnameFROMScoret,Students,CoursecWHEREt.sno=s.snoANDt.cno=c.cno").show()
  138. spark.sql("SELECTs.sname,t.degree,c.cnameFROMScoret"+
  139. "JOINStudentsont.sno=s.sno"+
  140. "JOINCourseconc.cno=t.cno").show()
  141. ////17、查询“95033”班学生的平均分。
  142. spark.sql("SELECTAVG(degree)averageFROMScoreWHEREsnoIN(SELECTsnoFROMStudentWHEREsclass='95033')").show()
  143. ////19、查询选修“3-105”课程的成绩高于“109”号同学成绩的所有同学的记录。
  144. spark.sql("SELECT*FROMScoreWHEREcno='3-105'ANDdegree>(SELECTdegreeFROMscoreWHEREsno='109'ANDcno='3-105')").show()
  145. ////20、查询score中选学多门课程的同学中分数为非最高分成绩的记录。
  146. spark.sql("SELECT*FROMScoreWHEREsnoIN"+
  147. "(SELECTsnoFROMScoretGROUPBYt.snoHAVINGCOUNT(1)>1)ANDdegree!=(SELECTMAX(degree)FROMScore)").show()
  148. spark.sql("SELECT*FROMScoreWHEREdegree!=(SELECTMAX(degree)FROMScore)").show()
  149. ////21、查询成绩高于学号为“109”、课程号为“3-105”的成绩的所有记录。
  150. spark.sql("SELECT*FROMScoretWHEREt.degree>(SELECTdegreeFROMScoreWHEREsno='109'ANDcno='3-105')").show()
  151. ////22、查询和学号为108的同学同年出生的所有学生的Sno、Sname和Sbirthday列。
  152. ////oracle=>to_char(t.sbirthday,'yyyy')
  153. ////sparksql=>substring(t.sbirthday,0,4)
  154. spark.sql("SELECTsno,sname,sbirthday"+
  155. "FROMStudent"+
  156. "WHEREsubstring(sbirthday,0,4)=("+
  157. "SELECTsubstring(t.sbirthday,0,4)"+
  158. "FROMStudentt"+
  159. "WHEREsno='108')").show()
  160. ////23、查询“张旭“教师任课的学生成绩。
  161. spark.sql("SELECTt.tno,c.cno,c.cname,s.degreeFROMTeachert"+
  162. "JOINCoursecONt.tno=c.tno"+
  163. "JOINScoresONc.cno=s.cnoWHEREt.tname='Zhangxu'").show()
  164. ////24、查询选修某课程的同学人数多于5人的教师姓名。
  165. spark.sql("SELECTtnameFROMTeachere"+
  166. "JOINCoursecONe.tno=c.tno"+
  167. "JOIN(SELECTcnoFROMScoreGROUPBYcnoHAVINGCOUNT(cno)>5)tONc.cno=t.cno").show()
  168. ////25、查询95033班和95031班全体学生的记录。
  169. spark.sql("SELECT*FROMStudentWHEREsclassIN('95031','95033')").show()
  170. spark.sql("SELECT*FROMStudentWHEREsclassLIKE'9503%'").show()
  171. ////26、查询存在有85分以上成绩的课程Cno.
  172. spark.sql("SELECTcnoFROMScoreWHEREdegree>85GROUPBYcno").show()
  173. ////27、查询出“计算机系“教师所教课程的成绩表。
  174. spark.sql("SELECTt.sno,t.cno,t.degreeFROMScoret"+
  175. "JOINCoursecONt.cno=c.cno"+
  176. "JOINTeachereONc.tno=e.tnoWHEREe.depart='departmentofcomputer'").show()
  177. ////28、查询“计算机系”与“电子工程系“不同职称的教师的Tname和Prof。
  178. spark.sql("SELECTtname,prof"+
  179. "FROMTeacher"+
  180. "WHEREprofNOTIN(SELECTa.prof"+
  181. "FROM(SELECTprof"+
  182. "FROMTeacher"+
  183. "WHEREdepart='departmentofcomputer'"+
  184. ")a"+
  185. "JOIN(SELECTprof"+
  186. "FROMTeacher"+
  187. "WHEREdepart='departmentofelectronicengineering'"+
  188. ")bONa.prof=b.prof)").show()
  189. spark.sql("SELECTtname,prof"+
  190. "FROMTeacher"+
  191. "WHEREdepart='departmentofelectronicengineering'"+
  192. "ANDprofNOTIN(SELECTprof"+
  193. "FROMTeacher"+
  194. "WHEREdepart='departmentofcomputer')"+
  195. "ORdepart='departmentofcomputer'"+
  196. "ANDprofNOTIN(SELECTprof"+
  197. "FROMTeacher"+
  198. "WHEREdepart='departmentofelectronicengineering')").show()
  199. ////29、查询选修编号为“3-105“课程且成绩至少高于选修编号为“3-245”的同学的Cno、Sno和Degree,并按Degree从高到低次序排序。
  200. spark.sql("SELECTt.sno,t.cno,degree"+
  201. "FROMSCOREt"+
  202. "WHEREdegree>("+
  203. "SELECTMIN(degree)"+
  204. "FROMscore"+
  205. "WHEREcno='3-245'"+
  206. ")"+
  207. "ANDt.cno='3-105'"+
  208. "ORDERBYdegreeDESC").show()
  209. ////30、查询选修编号为“3-105”且成绩高于选修编号为“3-245”课程的同学的Cno、Sno和Degree.
  210. //oracle方式spark.sql("selectt.sno,t.cno,t.degreefromSCOREtwheret.degree>selectdegreefromscorewherecno='3-245'orcno='3-105'").show()
  211. spark.sql("SELECTt.sno,t.cno,t.degreeFROMScoretWHEREt.degree>(SELECTMAX(degree)FROMScoreWHEREcno='3-245')ANDt.cno='3-105'").show()
  212. ////31、查询所有教师和同学的name、sex和birthday.
  213. spark.sql("SELECTsname,ssex,sbirthdayFROMStudent"+
  214. "UNIONSELECTtname,tsex,tbirthdayFROMTeacher").show()
  215. ////32、查询所有“女”教师和“女”同学的name、sex和birthday.union
  216. spark.sql("SELECTsname,ssex,sbirthday"+
  217. "FROMStudent"+
  218. "WHEREssex='female'"+
  219. "UNION"+
  220. "SELECTtname,tsex,tbirthday"+
  221. "FROMTeacher"+
  222. "WHEREtsex='female'").show()
  223. ////33、查询成绩比该课程平均成绩低的同学的成绩表。
  224. spark.sql("SELECTs.*"+
  225. "FROMscores"+
  226. "WHEREs.degree<("+
  227. "SELECTAVG(degree)"+
  228. "FROMscorec"+
  229. "WHEREs.cno=c.cno)").show()
  230. ////34、查询所有任课教师的Tname和Depart.in
  231. spark.sql("SELECTtname,depart"+
  232. "FROMteachert"+
  233. "WHEREt.tnoIN("+
  234. "SELECTtno"+
  235. "FROMcoursec"+
  236. "WHEREc.cnoIN("+
  237. "SELECTcno"+
  238. "FROMscore))").show()
  239. ////35、查询所有未讲课的教师的Tname和Depart.notin
  240. spark.sql("SELECTtname,depart"+
  241. "FROMteachert"+
  242. "WHEREt.tnoNOTIN("+
  243. "SELECTtno"+
  244. "FROMcoursec"+
  245. "WHEREc.cnoIN("+
  246. "SELECTcno"+
  247. "FROMscore))").show()
  248. ////36、查询至少有2名男生的班号。groupby,havingcount
  249. spark.sql("SELECTSClass"+
  250. "FROMStudentt"+
  251. "WHERESsex='male'"+
  252. "GROUPBYSClass"+
  253. "HAVINGCOUNT(Ssex)>=2").show()
  254. ////37、查询Student表中不姓“王”的同学记录。notlike
  255. spark.sql("SELECT*FROMStudenttWHERESnameNOTLIKE('Wang%')").show()
  256. ////38、查询Student表中每个学生的姓名和年龄。
  257. ////将函数运用到sparksql中去计算,可以直接拿String的类型计算不需要再转换成数值型默认是会转换成Double类型计算
  258. spark.sql("SELECTSname,("+getDate("yyyy")+"-substring(sbirthday,0,4))ASageFROMSTUDENTt").show()
  259. ////浮点型转整型
  260. spark.sql("SELECTSname,(CAST("+getDate("yyyy")+"ASINT)-CAST(substring(sbirthday,0,4)ASINT))ASage"+
  261. "FROMStudentt").show()
  262. ////39、查询Student表中最大和最小的Sbirthday日期值。时间格式最大值,最小值
  263. spark.sql("SELECTMAX(t.sbirthday)ASmaximumFROMStudentt").show()
  264. spark.sql("SELECTMIN(t.sbirthday)ASminimumFROMStudentt").show()
  265. ////40、以班号和年龄从大到小的顺序查询Student表中的全部记录。查询结果排序
  266. spark.sql("SELECT*"+
  267. "FROMStudent"+
  268. "ORDERBYSClassDESC,CAST("+getDate("yyyy")+"ASINT)-CAST(substring(Sbirthday,0,4)ASINT)DESC").show()
  269. ////41、查询“男”教师及其所上的课程。selectjoin
  270. spark.sql("SELECTTSex,CName"+
  271. "FROMTeachert"+
  272. "JOINcoursecONt.tno=c.tno"+
  273. "WHERETSex='male'").show()
  274. ////42、查询最高分同学的Sno、Cno和Degree列。子查询
  275. spark.sql("SELECT*"+
  276. "FROMScore"+
  277. "WHEREdegree=("+
  278. "SELECTMAX(degree)"+
  279. "FROMSCOREt)").show()
  280. ////43、查询和“李军”同性别的所有同学的Sname.
  281. spark.sql("SELECTsname"+
  282. "FROMSTUDENTt"+
  283. "WHEREssexIN("+
  284. "SELECTssex"+
  285. "FROMstudent"+
  286. "WHEREsname='LiuJun')").show()
  287. ////44、查询和“李军”同性别并同班的同学Sname.
  288. spark.sql("SELECTsname"+
  289. "FROMStudentt"+
  290. "WHEREssexIN("+
  291. "SELECTssex"+
  292. "FROMstudent"+
  293. "WHEREsname='LiuJun')"+
  294. "ANDsclassIN(SELECTsclass"+
  295. "FROMstudent"+
  296. "WHEREsname='LiuJun')").show()
  297. ////45、查询所有选修“计算机导论”课程的“男”同学的成绩表。
  298. spark.sql("SELECTt.sno,t.cno,t.degree"+
  299. "FROMScoret"+
  300. "JOINCoursecONt.cno=c.cno"+
  301. "JOINStudentsONs.sno=t.sno"+
  302. "WHEREs.SSex='male'"+
  303. "ANDc.CName='Introductiontocomputer'").show()
  304. }
  305. }




】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇sparklyr-R语言访问Spark的另外一.. 下一篇Spark分析SRS日志,以及Zookeeper..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目