设为首页 加入收藏

TOP

Spark05-SparkSQL入门(SparkSession,DataFrame,DataSet)
2019-01-16 13:09:05 】 浏览:60
Tags:Spark05-SparkSQL 入门 SparkSession DataFrame DataSet

SparkSQL的基本认识

Spark SQL是用于结构化数据处理的Spark模块(结构化数据可以来自外部结构化数据源也可以通过RDD获取)。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用此额外信息来执行额外的优化。有几种与Spark SQL交互的方法,包括SQL和Dataset API。在计算结果时,使用相同的执行引擎,与您用于表达计算的API /语言无关。这种统一意味着开发人员可以轻松地在不同的API之间来回切换,从而提供表达给定转换的最自然的方式。
下图为SparkSQL的基本架构:
在这里插入图片描述
这里我先给出一个SparkSQL的基本架构图。后面的内容我们会对其中的一些专有名词进行详细讲解,娓娓道来其中的内核。

SparkSQL的特点

  • 支持多种数据源:Hive、RDD、Parquet、JSON、JDBC等。
  • 多种性能优化技术:in-memory columnar storage、byte-code generation、cost model动态评估等。
  • 组件扩展性:对于SQL的语法解析器、分析器以及优化器,用户都可以自己重新开发,并且动态扩展。

Spark SQL的性能优化技术简介

1、内存列存储(in-memory columnar storage)
内存列存储意味着,Spark SQL的数据,不是使用Java对象的方式来进行存储,而是使用面向列的内存存储的方式来进行存储。也就是说,每一列,作为一个数据存储的单位。从而大大优化了内存使用的效率。采用了内存列存储之后,减少了对内存的消耗,也就避免了gc大量数据的性能开销。
2、字节码生成技术(byte-code generation)
Spark SQL在其catalyst模块的expressions中增加了codegen模块,对于SQL语句中的计算表达式,比如select num + num from t这种的sql,就可以使用动态字节码生成技术来优化其性能。
3、Scala代码编写的优化
对于Scala代码编写中,可能会造成较大性能开销的地方,自己重写,使用更加复杂的方式,来获取更好的性能。比如Option样例类、for循环、map/filter/foreach等高阶函数,以及不可变对象,都改成了用null、while循环等来实现,并且重用可变的对象。

Spark SQL主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame和DataSet。同时Spark SQL还可以作为分布式的SQL查询引擎

DataSet和DataFrame

DataSet(数据集)是分布式数据集合[A Dataset is a distributed collection of data]。数据集是Spark 1.6中添加的一个新接口,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。数据集可以从JVM对象中被构造,然后使用transformation(转换操作)(map,flatMap,filter等等)。数据集API在Scala和 Java中可用。Python没有对Dataset API的支持。但由于Python的动态特性,数据集API的许多好处已经可用(即您可以自然地按名称访问行的字段 row.columnName)。R语言的情况类似。这里,博客中的实现代码大多采用Spark的原生代码scala实现。
DataFrame是组织为命名列的数据集[A DataFrame is a Dataset organized into named columns]。它在概念上等同于关系数据库中的表或R / Python中的数据框,但在引擎盖下具有更丰富的优化。DataFrame可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。DataFrame API在Scala,Java,Python和R中可用。在Scala和Java中,DataFrame由Rows 的数据集表示。在Scala API中,DataFrame它只是一个类型别名Dataset[Row]。而在Java API中,用户需要使用Dataset来表示DataFrame。
在Spark的官网说明,我们经常将Rows(行)的Scala / Java数据集称为DataFrame。

DataFrame与RDD的区别

简单的来理解:RDD(弹性分布式数据集)是我们在Spark Core中常用的最小基本单位。而DataFrame是我们在进行SparkSQL进行数据查询数据分析时的最小基本单位。那么这两者有何区别联系了,请看下图。
在这里插入图片描述
RDD是分布式的Java对象的集合,比如,RDD[Person]是以Person为类型参数,但Person类的内部结构对于RDD而言确实不可知的。DataFrame是一种以RDD为基础的分布式数据集,也就是分布式的Row对象的集合(每个Row对象代表一行记录),提供了详细的结构信息,也就是我们经常说的模式(schema),SparkSQL可以清楚的知道该数据集中包含哪些列、每列的名称和类型。
和RDD一样,DataFrame的各种变换操作也可以采用惰性机制,只是记录了各种转换的逻辑转换路线图(DAG图),不会发生真正的计算,这个DAG相当于一个逻辑查询计划,最终会被翻译成物理查询计划,生成RDD DAG,按照之前介绍的RDD DAG的执行方式去完成最终的计算得到结果。

总结
1、RDD的局限性
RDD仅表示数据集,RDD没有元数据,也就是说没有字段语义定义

2、什么是DataFrame
可以把DataFrame理解为按列名的方式去组织的一个分布式数据集(RDD)
由于RDD的局限性,Spark产生了DataFrame
DataFrame=RDD+Schema=schemaRDD
其中Schenam就是元数据,是语义描述
特点:

  内部数据无类型,统一为Row
  DataFrame是一种特殊类型的DataSet,DataSet[Row]=DataFrame
  DataFrame自带优化器Catalyst,可以自动优化程序
  DataFrame提供了一整套的Data Source API 

3、DataSet的产生
Row 运行时类型检查,比如salary是字符串类型,下面的语句也只有运行时才进行类型检查

dataframe.filter("salary>1000").show()

由于DataFrame的数据类型统一是Row,所以DataFrame也是有缺点的。

明显缺点:

        1、Row不能直接操作domain对象
        2、函数风格编程,没有面向对象风格的API

所以,Spark SQL引入了DataSet,扩展了DataFrmae的API,提供了编译时类型检查,面向对象风格的API

DataFrame的创建

从spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理功能。SparkSession实现了SQLContext及HiveContext所有功能。
SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession提供了HiveSQL以及其他依赖与Hive的功能支持。
案例:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
  * 通过SparkSession创建DataFrame
  * @author xjh 2018.11.26
  */
object CreateDataFrame {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("CreateDataFrame").setMaster("local")
    val spark=SparkSession.builder().appName("CreateDataFrame").config(conf).getOrCreate()
    val df=spark.read.json("hdfs://m1:9000/spark/people.json")
    df.show()
  }
}

运行结果:
在这里插入图片描述
上面这个例子,实现了最简单的读取目标json的数据并将其显示出来。json经常使用在web开发中,关于这部分的知识这里不做过多描述,读者可自行网上百度学习。除了json之外,SparkSQL还支持多种多种数据源,博文前面已经总结。

DataFrame常见的其他操作

在Spark 2.0中,DataFrames只是RowScala和Java API中的数据集。与“类型转换”相比,这些操作也称为“无类型转换”,带有强类型Scala / Java数据集。

object CreateDataFrame {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("CreateDataFrame").setMaster("local")
    val spark=SparkSession.builder().appName("CreateDataFrame").config(conf).getOrCreate()
    import spark.implicit._
    val df=spark.read.json("hdfs://m1:9000/spark/people.json")
    //df.show()
    // Print the schema in a tree format
	df.printSchema()
	// root
	// |-- age: long (nullable = true)
	// |-- name: string (nullable = true)
	
	// Select only the "name" column
	df.select("name").show()
	// +-------+
	// |   name|
	// +-------+
	// |Michael|
	// |   Andy|
	// | Justin|
	// +-------+
	
	// Select everybody, but increment the age by 1
	df.select($"name", $"age" + 1).show()
	// +-------+---------+
	// |   name|(age + 1)|
	// +-------+---------+
	// |Michael|     null|
	// |   Andy|       31|
	// | Justin|       20|
	// +-------+---------+
	
	// Select people older than 21
	df.filter($"age" > 21).show()
	// +---+----+
	// |age|name|
	// +---+----+
	// | 30|Andy|
	// +---+----+
	
	// Count people by age
	df.groupBy("age").count().show()
	// +----+-----+
	// | age|count|
	// +----+-----+
	// |  19|    1|
	// |null|    1|
	// |  30|    1|
	// +----+-----+
	
	// Register the DataFrame as a SQL temporary view
	df.createOrReplaceTempView("people")
	
	val sqlDF = spark.sql("SELECT * FROM people")
	sqlDF.show()
	// +----+-------+
	// | age|   name|
	// +----+-------+
	// |null|Michael|
	// |  30|   Andy|
	// |  19| Justin|
	// +----+-------+
  }
}

全局临时视图(Global Temporary View)

Spark SQL中的临时视图(TempView)是会话范围的,如果创建它的会话终止,它将消失。例如上面的代码样例,在执行spark.sql之前,注册DataFrame为SQL临时会话。
如果希望拥有一个在所有会话之间共享的临时视图并保持活动状态,直到Spark应用程序终止,您可以创建一个全局临时视图。全局临时视图与系统保留的数据库绑定global_temp,我们必须使用限定名称来引用它,例如SELECT * FROM global_temp.view1。
// 将DataFrame注册为全局临时视图
df.createGlobalTempView(“people”)

创建DataSet(数据集)

数据集与rdd类似,但它不使用Java序列化或Kryo,而是使用专门的编码器序列化对象,以便通过网络进行处理或传输。虽然编码器和标准序列化都负责将对象转换为字节,但编码器是动态生成的代码,使用的格式允许Spark执行许多操作,如过滤、排序和散列,而无需将字节反序列化回对象。

case  class  Person (name : String , age : Long )

//为case类创建编码器
val  caseClassDS  =  Seq (Person (“Andy” , 32 ))。toDS ()
caseClassDS 。show ()
// + ---- + --- + 
// | name | age | 
// + ---- + --- + 
// | Andy | 32 | 
// + ---- + --- +

对于最常见的类型//编码器是通过导入spark.implicits._自动提供
VAL  primitiveDS  =  SEQ (1 , 2 , 3 )。toDS ()
primitiveDS 。地图(_  +  1 )。collect () //返回:数组(2,3,4)

//通过提供类可以将DataFrame转换为数据集。映射将通过名称
val  path  =  “examples / src / main / resources / people.json” 
val  peopleDS  =  spark完成。读。json (路径)。作为[ 人] 
人DS 。show ()
// + ---- + ------- + 
// | 年龄| 名称| 
// + ---- + ------- + 
// | null | Michael | 
// | 30 | 安迪| 
// | 19 | 贾斯汀| 
// + ---- + ------- +

从RDD转换得到DataFrame

Spark官网提供了两种方法来实现从RDD转换得到DataFrame。
第一种方法是,利用反射机制来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;
在利用反射机制推断RDD模式时,需要首先定义一个case class,因为case class能被Spark隐式地转换为DataFrame。
Spark SQL的Scala接口支持自动将包含RDD的案例类转换为DataFrame。case类定义表的模式。使用反射读取case类的参数名称,并成为列的名称。案例类也可以嵌套或包含复杂类型,如Seqs或Arrays。可以将此RDD隐式转换为DataFrame,然后将其注册为表。表可以在后续SQL语句中使用。

ase class Person(name: String, age: Long)
object RDDToDataFrame {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("CreateDataFrame").setMaster("local")
    val spark=SparkSession.builder().appName("CreateDataFrame").config(conf).getOrCreate()
    import spark.implicits._  //这里的spark不是某个包下面的东西,而是我们SparkSession.builder()对应的变量值
    val peopleDF=spark.sparkContext
      .textFile("file:///D:\\softInstall\\BigData\\spark-2.3.1-bin-hadoop2.6\\spark-2.3.1-bin-hadoop2.6\\examples\\src\\main\\resources\\people.txt")
      .map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
      //从文本文件创建Person对象的RDD,将其转换为DataFrame
    peopleDF.createOrReplaceTempView("people")  //必须注册为临时表才能供下面的查询使用
    val personsRDD=spark.sql("select name,age from people where age > 20")
    personsRDD.map(t => "Name:"+t(0)+","+"Age:"+t(1)).show()
      //DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值
  }
}

运行结果:

+-------------------+
|              value|
+-------------------+
|Name:Michael,Age:29|
|   Name:Andy,Age:30|
+-------------------+

第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上。
如果无法提前定义案例类(例如,记录的结构以字符串形式编码,或者文本数据集将被解析,而字段将针对不同的用户进行不同的投影),DataFrame则可以通过三个步骤以编程方式创建a 。

  • Row从原始RDD 创建s的RDD;
  • 创建由与步骤1中创建的RDD中的行结构匹配的StructType表示的模式。
  • Row通过createDataFrame提供的方法将模式应用于s 的RDD SparkSession。
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._

/**
  * 编程方式定义RDD模式
  * @author xjh 2018.11.28
  */
object RDDToDataFrame2 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("RDDToDataFrame2").setMaster("local")
    val spark=SparkSession.builder().appName("RDDToDataFrame2").config(conf).getOrCreate()
    import spark.implicits._
    //创建RDD
    val peopleRDD=spark.sparkContext.textFile("file:///D:\\softInstall\\BigData\\spark-2.3.1-bin-hadoop2.6\\" +
      "spark-2.3.1-bin-hadoop2.6\\examples\\src\\main\\resources\\people.txt")
    //定义一个模式字符串
    val schemaString="name age"
    //根据模式字符串生成模式schema
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema=StructType(fields)
    //将RDD(人员)的记录转换为行
    val rowRDD=peopleRDD.map(_.split(",")).map(attributes=>Row(attributes(0), attributes(1).trim))
    //将模式应用于RDD
    val peopleDF=spark.createDataFrame(rowRDD,schema)
    //使用DataFrame创建临时视图
    peopleDF.createOrReplaceTempView("people")
    //SQL可以在使用DataFrames创建的临时视图上运行
    val results = spark.sql("SELECT name,age FROM people")
//    results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1))
    results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show()
        //官网上这句运行报错,需添加rdd 但是这样又不能写show() 
  }
}

运行结果:

+--------------------+
|               value|
+--------------------+
|name: Michael,age:29|
|   name: Andy,age:30|
| name: Justin,age:19|
+--------------------+

博客参考:
1.http://spark.apache.org/docs/2.3.0/sql-programming-guide.html#datasets-and-dataframes
2.https://blog.csdn.net/weixin_43909426/article/details/86141232#commentBox

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark与深度学习框架——H2O、dee.. 下一篇Spark与深度学习框架——H2O、dee..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目