TOP

                    Spark SQL玩起来  
2019-05-08 01:18:28 】 浏览:450
Tags:  Spark SQL 起来

标签(空格分隔): Spark


[toc]

前言

Spark SQL的介绍只包含官方文档的Getting Started、DataSource、Performance Tuning和Distributed SQL Engine部分。不含其他的迁移和PySpark等部分。

Spark SQL介绍

Spark SQL是一个Spark模块用于结构化数据处理。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。 在内部,Spark SQL使用此额外信息来执行额外的优化。 有几种与Spark SQL交互的方法,包括SQL和Dataset API。 在使用相同的执行引擎计算结果时,与使用表达计算的API或者语言无关。 这种统一意味着开发人员可以轻松地在不同的API之间来回切换,从而提供表达给定转换的最自然的方式。

SQL

Spark SQL的一个用途是执行SQL查询。Spark SQL还可用于从现有Hive中读取数据。 有关如何配置此功能的更多信息,请参阅Hive Tables部分。 从其他编程语言中运行SQL时,结果将作为Dataset/DataFrame返回。 还可以使用命令行或JDBC/ODBC与SQL接口进行交互。

Dataset和DataFrame

Dataset数据集是分布式数据集合。数据集是Spark 1.6中添加的一个新接口,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)和Spark SQL优化执行引擎的优点。 数据集可以从JVM对象构造,然后使用功能转换(map,flatMap,filter等)进行操作。 数据集API在Scala和Java中可用。 Python没有对Dataset API的支持。 但由于Python的动态特性,数据集API的许多好处已经可用(即可以通过名称自然地访问行的字段row.columnName)。 R的情况类似。

DataFrame是一个组织成命名列的数据集。 它在概念上等同于关系数据库中的表或R / Python中的数据框,但在底层具有更丰富的优化。 DataFrame可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。 DataFrame API在Scala,Java,Python和R中可用。在Scala和Java中,DataFrame由行数据集表示。 在Scala API中,DataFrame只是Dataset[Row]的类型别名。 而在Java API中,用户需要使用Dataset<Row>来表示DataFrame。

Spark SQL入门知识

SparkSession

Spark中所有功能的入口点是SparkSession类。通过类似下面的代码来创建:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

Spark 2.0中的SparkSession为Hive功能提供内置支持,包括使用HiveQL编写查询,访问Hive UDF以及从Hive表读取数据的功能。 要使用这些功能,并不需拥有现有的Hive设置。

创建DataFrame

使用SparkSession,应用程序可以从现有RDD,Hive表或Spark数据源创建DataFrame。作为示例,以下内容基于JSON文件的内容创建DataFrame:

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

无类型数据集操作(又名DataFrame操作)

DataFrames为Scala,Java,Python和R中的结构化数据操作提供一种DSL的语言。如上所述,在Spark 2.0中,DataFrames只是Scala和Java API中的行数据集。与“类型转换”相比,这些操作也称为“无类型转换”,带有强类型Scala / Java数据集。这里展示使用数据集进行结构化数据处理的一些基本示例:

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

编程方式运行SQL查询

SparkSession上的sql函数使应用程序能够以编程方式运行SQL查询并将结果作为DataFrame返回。

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

全局临时视图

Spark SQL中的临时视图是会话范围的,如果创建它的会话终止,它将消失。 如果希望拥有一个在所有会话之间共享的临时视图并保持活动状态,直到Spark应用程序终止,您可以创建一个全局临时视图。 全局临时视图与系统保留的数据库global_temp绑定,我们必须使用限定名称来引用它,例如 SELECT * FROM global_temp.view1

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

创建数据集

数据集与RDD类似,但是,它们不使用Java序列化或Kryo,而是使用专用的编码器来序列化对象以便通过网络进行处理或传输。 虽然编码器和标准序列化都负责将对象转换为字节,但编码器是动态生成的代码,并使用一种格式,允许Spark执行许多操作,如过滤,排序和散列,而无需将字节反序列化为对象。

case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

与RDD交互操作

Spark SQL支持两种不同的方法将现有RDD转换为数据集。 第一种方法使用反射来推断包含特定类型对象的RDD的schema。 这种基于反射的方法可以提供更简洁的代码,并且在您编写Spark应用程序时已经了解schema时可以很好地工作。

创建数据集的第二种方法是通过编程接口,这种方法允许你构建模式,然后将其应用于现有RDD。 虽然此方法更繁琐一些,但它允许在直到运行时才知道列及其类型时构造数据集。

利用反射推断的方法

Spark SQL的Scala接口支持自动将包含RDD的case class转换为DataFrame。 case class用来定义表的模式。 case类的参数名称是通过反射读取的,这些名称会成为列的名称。 case类也可以被嵌套或包含复杂类型,如Seqs或Arrays。 此RDD可以隐式转换为DataFrame,然后注册为表。而这个表可以在后续SQL语句中使用。

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

利用编程接口声明schema的方法

如果无法提前定义case类(例如,记录的结构以字符串形式编码,或者文本数据集将被解析,字段将针对不同的用户进行不同的映射),则可以通过三个步骤以编程方式创建DataFrame。

  1. 从原始RDD创建行RDD;
  2. 创建由与步骤1中创建的RDD中的行结构匹配的StructType表示的schema。
  3. 通过SparkSession提供的createDataFrame方法将schema应用于行RDD。
import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

聚合

内置的DataFrames函数提供常见的聚合,如count(),countDistinct(),avg(),max(),min()等。虽然这些函数是为DataFrames设计的,但Spark SQL也有类型安全的版本 其中一些在Scala和Java中使用强类型数据集。 此外,用户不限于使用预定义的聚合函数,也可以创建自己的聚合函数。

无类型的UDAF

用户必须扩展UserDefinedAggregateFunction抽象类以实现自定义无类型聚合函数。 例如,用户定义的平均值可能如下所示:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._

object MyAverage extends UserDefinedAggregateFunction {
  // Data types of input arguments of this aggregate function
  def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
  // Data types of values in the aggregation buffer
  def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  }
  // The data type of the returned value
  def dataType: DataType = DoubleType
  // Whether this function always returns the same output on the identical input
  def deterministic: Boolean = true
  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }
  // Updates the given aggregation buffer `buffer` with new input data from `input`
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }
  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
  // Calculates the final result
  def eva luate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

// Register the function to access it
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

类型安全的用户定义聚合函数

强类型数据集的用户定义聚合通过Aggregator抽象类来实现。 例如,类型安全的用户定义平均值可能如下所示:

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator

case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  def zero: Average = Average(0L, 0L)
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  def reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum += employee.salary
    buffer.count += 1
    buffer
  }
  // Merge two intermediate values
  def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }
  // Transform the output of the reduction
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  // Specifies the Encoder for the intermediate value type
  def bufferEncoder: Encoder[Average] = Encoders.product
  // Specifies the Encoder for the final output value type
  def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

数据源

Spark SQL支持通过DataFrame接口对各种数据源进行操作。 DataFrame可以使用关系型转换操作进行操作,也可以用于创建临时视图。 将DataFrame注册为临时视图允许您对其数据运行SQL查询。 下面的部分会介绍使用Spark数据源加载和保存数据的一般方法,然后介绍可用于内置数据源的特定配置选项。

通用加载/保存功能

在最简单的形式中,默认数据源(parquet文件,除非另外由spark.sql.sources.default配置指定)将用于所有操作。

val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

手动指定选项

你还可以手动指定将要使用的数据源以及要传递给数据源的任何其他选项。 数据源由其完全限定名称(即org.apache.spark.sql.parquet)指定,但对于内置源,你还可以使用其短名称(json,parquet,jdbc,orc,libsvm,csv,text)。 从任何数据源类型加载的DataFrame都可以使用此语法转换为其他类型。
加载一个json文件可以用如下方法:

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

而加载一个csv可以这样:

val peopleDFCsv = spark.read.format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv")

在写操作期间也使用额外选项。 例如,您可以控制ORC数据源的bloom过滤器和字典编码。 以下ORC示例将在favorite_color上创建bloom过滤器,并对name和favorite_color使用字典编码。 对于Parquet,也存在parquet.enable.dictionary。 要查找有关额外ORC / Parquet选项的更多详细信息,请访问官方Apache ORC / Parquet网站。

usersDF.write.format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .save("users_with_options.orc")

直接在文件上运行SQL

您可以直接使用SQL查询该文件,而不是使用读取API将文件加载到DataFrame并进行查询。

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

保存模式

保存操作可以有选择地使用SaveMode,不同选项模式指定如何处理现有数据(如果存在)。 重要的是要意识到这些保存模式不使用任何锁定并且不是原子的。 此外,执行覆盖时,将在写出新数据之前删除数据。

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists (default) "error" or "errorifexists" (default) 将DataFrame保存到数据源时,如果数据已存在,则会引发异常。
SaveMode.Append "append" 将DataFrame保存到数据源时,如果数据/表已存在,则DataFrame的内容应附加到现有数据。
SaveMode.Overwrite "overwrite" 覆盖模式意味着在将DataFrame保存到数据源时,如果数据/表已经存在,则预期现有数据将被DataFrame的内容覆盖。
SaveMode.Ignore "ignore" 忽略模式意味着在将DataFrame保存到数据源时,如果数据已存在,则预期保存操作不会保存DataFrame的内容而不会更改现有数据。 这类似于SQL中的CREATE TABLE IF NOT EXISTS

保存到持久表

也可以使用saveAsTable命令将DataFrames作为持久表保存到Hive Metastore中。请注意,使用此功能不需要现有的Hive部署。 Spark将为您创建默认的本地Hive Metastore(使用Derby)。 与createOrReplaceTempView命令不同,saveAsTable将实现DataFrame的内容并创建指向Hive Metastore中数据的指针。 只要您保持与同一Metastore的连接,即使您的Spark程序重新启动后,持久表仍然存在。 可以通过使用表的名称调用SparkSession上的table方法来创建持久表的DataFrame。
对于基于文件的数据源,例如 text,parquet,json等,你可以通过路径选项指定自定义表路径,例如 df.write.option(“path”,“/ some / path”).saveAsTable(“t”)。 删除表时,将不会删除自定义表路径,并且表数据仍然存在。 如果未指定自定义表路径,则Spark会将数据写入仓库目录下的默认表路径。 删除表时,也将删除默认表路径。

从Spark 2.1开始,持久数据源表将每个分区元数据存储在Hive Metastore中。 这带来了几个好处:

  • 由于Metastore只能返回查询所需的分区,因此不再需要在表的第一个查询中发现所有分区。
  • 现在,对于使用Datasource API创建的表,可以使用ALTER TABLE PARTITION ... SET LOCATION等Hive DDL。

请注意,在创建外部数据源表(具有路径选项的表)时,默认情况下不会收集分区信息。 要同步Metastore中的分区信息,可以调用MSCK REPAIR TABLE

分桶、排序和分区

对于基于文件的数据源,也可以对输出进行分桶和排序或分区。 分桶和排序仅适用于持久表:

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

分区可以在使用数据集API时与savesaveAsTable一起使用。

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

可以对单个表同时使用分区和分桶:

usersDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed")

partitionBy会创建一个目录结构,如”分区发现“这一章所述。 因此,它对具有高基数的列的适用性有限。 相比之下,bucketBy可以在固定数量的桶中分配数据,并且可以在出现许多无界的唯一值时使用。

Parquet文件

Parquet是一种面向列的存储格式,许多数据处理系统都支持它。Spark SQL支持读取和写入Parquet文件,这些文件自动保留原始数据的schema。 在写Parquet文件时,出于兼容性原因,所有列都会自动转换为可为空(nullable)模式。

以编程方式加载数据

使用如下的例子来实现:

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

分区发现

表分区是Hive等系统中常用的优化方法。 在分区表中,数据通常存储在不同的目录中,分区列值被编码为每个分区目录路径。所有内置文件源(包括Text / CSV / JSON / ORC / Parquet)都能够自动发现和推断分区信息。 例如,我们可以使用以下目录结构将所有以前使用的人口数据存储到分区表中,并将两个额外的列(性别和国家)作为分区列:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

通过将 path/to/table 传递给SparkSession.read.parquetSparkSession.read.load,Spark SQL将自动从路径中提取分区信息。 现在返回的DataFrame的schema变为:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

请注意,分区列的数据类型是自动推断的。 目前,支持数字数据类型,日期,时间戳和字符串类型。 有时,用户可能不希望自动推断分区列的数据类型。 对于这些用例,可以通过spark.sql.sources.partitionColumnTypeInference.enabled配置自动类型推断,默认为true。 禁用类型推断时,字符串类型将用于分区列。
从Spark 1.6.0开始,分区发现默认只查找给定路径下的分区。 对于上面的示例,如果用户将path/to/table/gender=male传递给SparkSession.read.parquetSparkSession.read.load,则不会将性别视为分区列。 如果用户需要指定分区发现应该从哪个基本路径开始,则可以在数据源选项中设置basePath。 例如,当path/to/table/gender=male是数据的路径并且用户将basePath设置为path/to/table/时,gender将是分区列。

模式合并Schema Merging

与Protocol Buffer,Avro和Thrift一样,Parquet也支持模式演变。 用户可以从简单模式开始,并根据需要逐渐向模式添加更多列。 通过这种方式,用户可能最终得到具有不同但相互兼容的模式的多个Parquet文件。 Parquet数据源现在能够自动检测这种情况并合并所有这些文件的模式。
由于模式合并是一项相对昂贵的操作,并且在大多数情况下不是必需的,因此我们默认从1.5.0开始关闭它。 您可以通过以下两种方式启用它:

  1. 在读取Parquet文件时将数据源选项mergeSchema设置为true(如下面的示例所示),或
  2. 将全局SQL选项spark.sql.parquet.mergeSchema设置为true。
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)

Hive Metastore Parquet表转换

在读取和写入Hive Metastore Parquet表时,Spark SQL将尝试使用自己的Parquet支持而不是Hive SerDe来获得更好的性能。 此行为由spark.sql.hive.convertMetastoreParquet配置控制,默认情况下处于打开状态。

Hive/Parquet Schema Reconciliation

从表模式处理的角度来看,Hive和Parquet之间存在两个主要区别。

  1. Hive不区分大小写,而Parquet则区分大小写
  2. Hive认为所有列都可以为空,而Parquet中的可空性设定很重要

由于这个原因,在将Hive Metastore Parquet表转换为Spark SQL Parquet表时,我们必须将Hive Metastore模式与Parquet模式进行协调。 相应的规则是:

  1. 两个模式中具有相同名称的字段必须具有相同的数据类型,而不管可空性如何。 协调字段应具有Parquet端的数据类型,以便遵循可为空性。
  2. 协调的模式要精准的包含Hive Metastore模式中定义的那些字段。
  • 仅出现在Parquet模式中的任何字段都将在协调的模式中被放弃。
  • 仅出现在Hive Metastore模式中的任何字段都将在协调模式中添加为可空字段。
元数据刷新Metadata Refreshing

Spark SQL缓存Parquet元数据以获得更好的性能。 启用Hive Metastore Parquet表转换后,还会缓存这些转换表的元数据。 如果这些表由Hive或其他外部工具更新,则需要手动刷新它们以确保元数据一致。

// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")

配置

可以使用SparkSession上的setConf方法或使用SQL运行SET key = value命令来完成Parquet的配置。

Property Name Default Meaning
spark.sql.parquet.binaryAsString false 其他一些Parquet生产系统,特别是Impala,Hive和旧版本的Spark SQL在写出Parquet模式时,不要区分二进制数据和字符串。 这个flag告诉Spark SQL将二进制数据解释为字符串,以提供与这些系统的兼容性。
spark.sql.parquet.int96AsTimestamp true 一些Parquet生产系统,特别是Impala和Hive,将时间戳存储到INT96中。 这个flag告诉Spark SQL将INT96数据解释为时间戳,以提供与这些系统的兼容性。
spark.sql.parquet.compression.codec snappy 设置编写Parquet文件时使用的压缩编解码器。 如果是compressionparquet.compression在声明表的选项/属性中指定声明,优先级为compressionparquet.compressionspark.sql.parquet.compression.codec。 可接受的值包括:none,uncompressed,snappy,gzip,lzo,brotli,lz4,zstd。注意zstd需要在Hadoop 2.9.0之前安装ZStandardCodecbrotli需要要安装BrotliCodec
spark.sql.parquet.filterPushdown true 设置为true时启用Parquet过滤器下推优化。
spark.sql.hive.convertMetastoreParquet true 设置为false时,Spark SQL将使用Hive SerDe作为Parquet而不是内置支持。
spark.sql.parquet.mergeSchema false

如果为true,则Parquet数据源合并从所有数据文件收集的模式,否则从摘要文件选取模式,如果没有可用的摘要文件,则从随机数据文件中选取模式。

spark.sql.optimizer.metadataOnly true

如果为true,则利用表的元数据来做仅元数据查询优化生成分区列而不是表扫描。 它适用于扫描的所有列都是分区列,并且查询具有满足distinct语义的聚合运算符的情况。

spark.sql.parquet.writeLegacyFormat false 如果为true,则数据将以Spark 1.4及更早版本的方式写入。 例如,十进制值将以Apache Parquet的固定长度字节数组格式编写,供其他系统如Apache Hive和Apache Impala使用。如果为false,将使用Parquet中的较新格式。例如,十进制数将以基于int的格式编写。如果打算使用Parquet输出的对应系统不支持此新格式,请设置为true。

ORC Files

从Spark 2.3开始,Spark使用新ORC文件格式的向量化的ORC reader来支持ORC文件。为此,新添加了以下配置。 当spark.sql.orc.impl设置为native并且spark.sql.orc.enableVectorizedReader设置为true时,向量化reader用于原生ORC表(例如,使用USING ORC子句创建的表)。对于Hive ORC serde表(例如,使用USING HIVE OPTIONS(fileFormat'ORC')子句创建的表),当spark.sql.hive.convertMetastoreOrc也设置为true时,使用向量化reader。

Property Name Default Meaning
spark.sql.orc.impl native ORC实现的名称。 它可以是 native hive 之一。 native 表示在Apache ORC 1.4上构建的原生ORC支持。 hive表示Hive 1.2.1中的ORC库。
spark.sql.orc.enableVectorizedReader true native 实现中启用矢量化orc解码。如果 false ,则在 native 实现中使用新的非向量化ORC reader。 对于 hive 实现,这将被忽略。

JSON Files

Spark SQL可以自动推断JSON数据集的模式,并将其作为Dataset[Row]加载。 可以使用Dataset[String]或JSON文件上的SparkSession.read.json()完成此转换。

请注意,作为json文件提供的文件不是典型的JSON文件。 每行必须包含一个单独的,自包含的有效JSON对象。 有关更多信息,请参阅JSON Lines文本格式,也称为换行符分隔的JSON。
For a regular multi-line JSON file, set the multiLine option to true.
对于一个常规的多行JSON文件,设置multiLine选项为true。

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

Hive表Hive Tables

Spark SQL还支持读取和写入存储在Apache Hive中的数据。 但是,由于Hive具有大量依赖项,而这些依赖项不包含在默认的Spark分发版本中。如果可以在类路径上找到Hive依赖项,Spark将自动加载它们。 请注意,这些Hive依赖项也必须存在于所有工作节点上,因为它们需要访问Hive序列化和反序列化库(SerDes)才能访问存储在Hive中的数据。
通过在conf/中放置hive-site.xml,core-site.xml(用于安全性配置)和hdfs-site.xml(用于HDFS配置)文件来完成Hive的配置。
使用Hive时,必须使用Hive支持来实例化SparkSession,包括连接到持久化的Hive Metastore,支持Hive serdes和Hive用户定义函数。 没有现有Hive部署的用户仍可以启用Hive支持。 当未由hive-site.xml配置时,上下文会自动在当前目录中创建metastore_db,并创建一个由spark.sql.warehouse.dir配置的目录,该目录默认为当前目录中的spark-warehouse目录,Spark应用程序从此开始。 请注意,自Spark 2.0.0起,不推荐使用hive-site.xml中的hive.metastore.warehouse.dir属性。 而是使用spark.sql.warehouse.dir指定仓库中数据库的默认位置。 您可能需要向启动Spark应用程序的用户授予写入权限。

import java.io.File

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|
// ...

// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_ints").show()
// +---+
// |key|
// +---+
// |  0|
// |  1|
// |  2|
// ...

// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// |  value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...

spark.stop()

指定Hive表的存储格式

创建Hive表时,需要定义此表应如何从/向文件系统读取/写入数据,即“输入格式”和“输出格式”。 您还需要定义此表如何将数据反序列化为行,或将行序列化为数据,即“serde”。 以下选项可用于指定存储格式(“serde”,“输入格式”,“输出格式”),例如, CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。 默认情况下,我们将表文件作为纯文本读取。 请注意,创建表时尚不支持Hive存储handler,您可以使用Hive端的存储handler创建表,并使用Spark SQL读取它。

Property Name Meaning
fileFormat fileFormat是一种存储格式规范包,包括“serde”,“input format”和“output format”。 目前我们支持6种fileFormats:'sequencefile','rcfile','orc','parquet','textfile'和'avro'。
inputFormat, outputFormat 这两个选项将相应的InputFormatOutputFormat类的名称指定为字符串文字,例如org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。 这两个选项必须成对出现,如果已经指定了fileFormat选项,你不能请再指定它们。
serde 此选项指定serde类的名称。 指定fileFormat选项时,如果给定的fileFormat已经包含了serde的信息则请勿再指定此选项。 目前“sequencefile”,“textfile”和“rcfile”不包含serde信息,您可以将此选项与这3个fileFormats一起使用。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这些选项只能与“textfile”文件格式一起使用。 它们定义了如何将文件内容分隔为行。

与不同版本的Hive Metastore交互

Spark SQL的Hive支持最重要的部分之一是与Hive Metastore的交互,这使得Spark SQL能够访问Hive表的元数据。从Spark 1.4.0开始,可以使用单个二进制构建的Spark SQL来查询不同版本的Hive Metastores,使用下面描述的配置。 请注意,独立于用于与Metastore通信的Hive版本,Spark SQL将针对Hive 1.2.1进行编译作为内部实现,并使用这些类进行内部执行(serdes,UDF,UDAF等)。
下面的选项用来配置Hive的版本,从而检索元数据。

Property Name Default Meaning
spark.sql.hive.metastore.version 1.2.1 Hive metastore的版本。可选的配置从0.12.02.3.3
spark.sql.hive.metastore.jars builtin 用来实例化HiveMetastoreClient的jar包的地址。可以是一下3个选项之一:
  1. builtin 当使用
-Phive时,使用Hive 1.2.1,这是与Spark绑定的版本。当选择该项时,spark.sql.hive.metastore.version必须是1.2.1或者无定义。 maven 使用从Maven仓库中下载的指定的Hive jar包。该配置在生产环境不推荐。 JVM标准格式的类路径。 此类路径必须包含Hive及其所有依赖项,包括正确版本的Hadoop。 这些jar包只需要存在于驱动程序中,但如果您以yarn集群模式运行,则必须确保它们与您的应用程序一起打包。
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

以逗号分隔的类前缀列表,应使用在Spark SQL和特定版本的Hive之间共享的类加载器加载。 举个应该被共享的类的示例是与Metastore进行通信所需的JDBC驱动程序。 其他需要共享的类是与已共享的类交互的类。 例如,log4j使用的自定义appender。

spark.sql.hive.metastore.barrierPrefixes (empty)

以逗号分隔的类前缀列表,应为Spark SQL在与每个Hive版通信时需要显式重新加载的类。 例如,在前缀中声明的Hive的UDF就是典型的需要被共享的。(例如 org.apache.spark.*

JDBC To Other Databases

Spark SQL还包括一个可以使用JDBC从其他数据库读取数据的数据源。 与使用JdbcRDD相比,此功能应该更受欢迎。 这是因为这样操作的结果作为DataFrame返回,可以在Spark SQL中轻松处理,也可以与其他数据源连接。 JDBC数据源也更易于在Java或Python中使用,因为它不需要用户提供ClassTag。 (请注意,这与Spark SQL JDBC服务器不同,后者允许其他应用程序使用Spark SQL运行查询)。
首先,您需要在spark类路径中包含特定数据库的JDBC驱动程序。 例如,要从Spark Shell连接到postgres,您将运行以下命令:

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

可以使用Data Sources API将远程数据库中的表加载为DataFrame或Spark SQL临时视图。用户可以在数据源选项中指定JDBC连接属性。 用户名和密码通常作为登录数据源的连接属性提供。 除连接属性外,Spark还支持以下不区分大小写的选项:

Property Name Meaning
url JDBC连接串URL。特定源的连接属性以URL的形式声明。比如jdbc:postgresql://localhost/testuser=fred&password=secret
dbtable 应该读取或写入的JDBC表。 请注意,在读取路径中使用它时,可以使用SQL查询的 FROM 子句中有效的任何内容。 例如,您也可以在括号中使用子查询,而不是完整的表。 不允许同时指定dbtablequery选项。
query 将数据读入Spark的查询。指定的查询将被括起来并用作 FROM 子句中的子查询。 Spark还会为子查询子句分配别名。 例如,spark将向JDBC Source发出以下形式的查询。

SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

使用此选项时,以下是一些限制。
  1. 不允许同时指定dbtablequery选项。
  2. 不允许同时指定querypartitionColumn选项。 当需要指定partitionColumn选项时,可以使用dbtable选项指定子查询,并且可以使用作为dbtable一部分提供的子查询别名来限定分区列。
    例如:
    spark.read.format("jdbc")
    .option("dbtable", "(select c1, c2 from t1) as subq")
    .option("partitionColumn", "subq.c1")
    .load()
driver JDBC驱动的类名。
partitionColumn, lowerBound, upperBound 如果指定了任何选项,则必须全部指定这些选项。 此外,必须指定 numPartitions 。 它们描述了在从多个工作者并行读取时如何对表进行分区。 partitionColumn 必须是相关表中的数字、日期或时间戳列。 请注意, lowerBound upperBound 仅用于决定分区步幅,而不是用于过滤表中的行。 因此,表中的所有行都将被分区并返回。 此选项仅适用于读数据。
numPartitions 可用于并行读取和写入的表的最大分区数。还确定了最大并发JDBC连接数。如果要写入的分区数超过此限制,我们通过在写入之前调用coalesce(numPartitions)将其减少到此限制。
queryTimeout 驱动程序等待Statement对象执行到指定秒数的超时时长。 0意味着没有限制。在写入路径中,此选项取决于JDBC驱动程序如何实现 setQueryTimeout 这个API,例如,h2 JDBC驱动程序检查每个查询的超时而不是整个JDBC批处理。它默认为 0
fetchsize JDBC的fetch大小,用于确定每次读取回合要获取的行数。这有助于JDBC驱动程序的性能,默认为低fetch大小(例如,Oracle是10行)。 此选项仅适用于读取。
batchsize JDBC批处理大小,用于确定每次IO往返要插入的行数。 这有助于JDBC驱动程序的性能。此选项仅适用于写入。默认为 1000
isolationLevel 事务隔离级别,适用于当前连接。它可以是 NONE READ_COMMITTED READ_UNCOMMITTED REPEATABLE_READ SERIALIZABLE 之一 ,对应于JDBC的Connection对象定义的标准事务隔离级别,默认为 READ_UNCOMMITTED 。此选项仅适用于写入。 请参阅 java.sql.Connection 中的文档。
sessionInitStatement 在向远程数据库打开每个数据库会话之后,在开始读取数据之前,此选项将执行自定义SQL语句(或PL/SQL块)。使用它来实现会话初始化代码。 示例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")
truncate 这是JDBC writer相关选项。启用 SaveMode.Overwrite code>时,此选项会导致Spark截断现有表,而不是删除并重新创建它。 这可以更有效,并且防止删除表元数据(例如,索引)。 但是,在某些情况下,例如新数据具有不同的schema时,它将无法工作。 它默认为 false 。 此选项仅适用于写入。
cascadeTruncate 这是JDBC writer相关选项。 如果JDBC数据库(目前是PostgreSQL和Oracle)启用并支持,则此选项允许执行 TRUNCATE TABLE t CASCADE (在PostgreSQL的情况下, TRUNCATE TABLE ONLY t CASCADE 以防止无意中截断下层的表)。这将影响其他表,因此应谨慎使用。 此选项仅适用于写入。它默认为当前配置的JDBC数据库的默认级联截断行为,在每个JDBCDialect中的 isCascadeTruncate 中指定。
createTableOptions 这是JDBC writer相关选项。如果指定,则此选项允许在创建表时设置特定于数据库的表和分区选项(例如,CREATE TABLE t (name string) ENGINE=InnoDB)。此选项仅适用于写入。
createTableColumnTypes 创建表时要使用的数据库列的数据类型而不是默认值。应以与CREATE TABLE列语法相同的格式指定数据类型信息(例如:"name CHAR(64), comments VARCHAR(1024)")。指定的类型应该是有效的spark sql数据类型。此选项仅适用于写入。
customSchema 用于从JDBC连接器读取数据的自定义schema。例如,"id DECIMAL(38, 0), name STRING"。 您还可以只指定部分字段,其他字段使用默认类型映射。 例如,"id DECIMAL(38, 0)"。 列名应与JDBC表的相应列名相同。用户可以指定Spark SQL的相应数据类型,而不是使用默认值。此选项仅适用于读取。
pushDownPredicate 这个选项用于在JDBC数据源启用或禁用谓词下推。默认值为true,在这种情况下,Spark会尽可能地将过滤条件下推到JDBC数据源。否则,如果设置为false,则不会将过滤条件下推到JDBC数据源,因此所有过滤条件都将由Spark处理。当Spark能够比JDBC数据源更快地执行谓词过滤时,谓词下推通常会被关闭。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

Apache Avro 数据源

从Spark 2.4后,Spark SQL提供对于读写Apache Avro数据的内置支持。

部署

spark-avro模块是外置的,默认情况下不包含在spark-submit或spark-shell中。

与任何Spark应用程序一样,spark-submit用于启动您的应用程序。 使用--packages可以将spark-avro_2.11及其依赖项直接添加到spark-submit,例如,

./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0 ...

对于在spark-shell上进行试验,您也可以使用--packages直接添加org.apache.sparkspark-avro_2.11及其依赖项,

./bin/spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.0 ...

Load and Save Functions

由于spark-avro模块是外部的,因此DataFrameReader或DataFrameWriter中没有.avro API。

要以Avro格式加载/保存数据,您需要将数据源选项格式指定为avro(或org.apache.spark.sql.avro)。

val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

to_avro() and from_avro()

Avro软件包提供了to_avro函数,可以将列编码为Avro格式的二进制文件,from_avro()将Avro二进制数据解码为列。两个函数都将一列转换为另一列,输入/输出SQL数据类型可以是复杂类型或基本类型。

在读取或写入像Kafka这样的流式数据源时,将Avro记录作为列非常有用。 每个Kafka键值记录都会增加一些元数据,例如Kafka的摄取时间戳,Kafka的偏移量等。

  • 如果包含数据的“value”字段位于Avro中,则可以使用from_avro()提取数据,丰富数据,清理数据,然后再将其推送到Kafka下游或将其写入文件。
  • to_avro()可用于将结构体转换为Avro记录。 在将数据写入Kafka时,如果要将多个列重新编码为单个列,此方法特别有用。
    这两个方法目前仅支持Scala和Java。
import org.apache.spark.sql.avro._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro('value, jsonFormatSchema) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

数据源选项

Avro的数据源选项可以通过DataFrameReader或者DataFrameWriter的.option方法来设置。

Property Name Default Meaning Scope
avroSchema None 用户以JSON格式提供可选的Avro schema。记录字段的日期类型和命名应匹配输入的Avro数据或Catalyst数据,否则读/写操作将失败。 read and write
recordName topLevelRecord 在写入结果时的顶层记录名字,这在Avro的spec是需要的 write
recordNamespace "" 写入结果的记录命名空间 write
ignoreExtension true 该选项控制在读取时忽略没有 .avro 扩展名的文件。
如果启用该选项,则加载所有文件(带有和不带 .avro 扩展名)。
read
compression snappy compression 选项允许指定write中使用的压缩编解码器
目前支持的编解码器有 uncompressed snappy deflate bzip2 xz
如果未设置该选项,则要考虑配置spark.sql.avro.compression.codec
write

配置

可以使用SparkSession的setConf方法或使用SQL运行SET key = value命令来完成Avro的配置。

Property Name Default Meaning
spark.sql.legacy.replaceDatabricksSparkAvro.enabled true 如果设置为true,则数据源提供者 com.databricks.spark.avro 将映射到内置的外部Avro数据源模块,以实现向后兼容性。
spark.sql.avro.compression.codec snappy 用于编写AVRO文件的压缩编解码器。支持的编解码器:uncompressed,deflate,snappy,bzip2和xz。默认编解码器是snappy。
spark.sql.avro.deflate.level -1 用于编写AVRO文件的deflate编解码器的压缩级别。 有效值必须介于1到9之间(包括1或9)或-1。 默认值为-1,对应于当前实现中的6级。

Compatibility with Databricks spark-avro

此Avro数据源模块最初来自Databricks的开源存储库spark-avro并与之兼容。

默认情况下,启用SQL配置spark.sql.legacy.replaceDatabricksSparkAvro.enabled,数据源提供者com.databricks.spark.avro将映射到此内置Avro模块。对于在目录元数据库中使用Provider属性创建的Spark表作为com.databricks.spark.avro,如果您使用此内置Avro模块,则映射对于加载这些表至关重要。

请注意,在Databricks的spark-avro中,为快捷函数.avro()创建了隐式类AvroDataFrameWriter和AvroDataFrameReader。在这个内置但外部的模块中,两个隐式类都被删除了。请改用DataFrameWriter或DataFrameReader中的.format(“avro”),它应该干净且足够好。

如果您更喜欢使用自己构建的spark-avro jar文件,则只需禁用配置spark.sql.legacy.replaceDatabricksSparkAvro.enabled,并在部署应用程序时使用选项--jars。有关详细信息,请阅读“应用程序提交指南”中的“高级依赖关系管理”部分。

Supported types for Avro -> Spark SQL conversion

目前,Spark支持在Avro记录下读取所有原始类型和复杂类型。

Avro type Spark SQL type
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
fixed BinaryType
bytes BinaryType
record StructType
array ArrayType
map MapType
union See below

除了上面列出的类型,它还支持读取联合类型。 以下三种类型被视为基本联合类型:

  1. union(int, long)将映射到LongType。
  2. union(float, double)将映射到DoubleType。
  3. union(something, null),其中something是任何支持的Avro类型。这将被映射到与something相同的Spark SQL类型,并将nullable设置为true。所有其他联合类型都被认为是复杂的 根据union的成员,它们将映射到StructType,其中字段名称是member0,member1等。 这与Avro和Parquet之间的转换行为一致。

它还支持读取以下Avro逻辑类型:

Avro logical type Avro type Spark SQL type
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal fixed DecimalType
decimal bytes DecimalType

目前,它忽略了Avro文件中存在的文档,别名和其他属性。

Supported types for Spark SQL -> Avro conversion

Spark支持将所有Spark SQL类型写入Avro。 对于大多数类型,从Spark类型到Avro类型的映射很简单(例如,IntegerType转换为int); 但是,下面列出了一些特殊情况:

Spark SQL type Avro type Avro logical type
ByteType int
ShortType int
BinaryType bytes
DateType int date
TimestampType long timestamp-micros
DecimalType fixed decimal

您还可以使用选项avroSchema指定整个输出Avro schema,以便可以将Spark SQL类型转换为其他Avro类型。 默认情况下不应用以下转换,并且需要用户指定的Avro schema:

Spark SQL type Avro type Avro logical type
BinaryType fixed
StringType enum
TimestampType long timestamp-millis
DecimalType bytes decimal

故障排除Troubleshooting

  • JDBC驱动程序类必须对客户端会话和所有执行程序上的原始类加载器可见。 这是因为Java的DriverManager类进行了安全检查,导致它忽略了当打开连接时原始类加载器不可见的所有驱动程序。 一种方便的方法是修改所有工作节点上的compute_classpath.sh以包含驱动程序JAR。
  • 某些数据库(如H2)会将所有名称转换为大写。您需要使用大写字母在Spark SQL中引用这些名称。
  • 用户可以在数据源选项中指定特定于供应商的JDBC连接属性以进行特殊处理。例如,spark.read.format("jdbc").option("url", oracleJdbcUrl).option("oracle.jdbc.mapDateToTimestamp", "false")。 oracle.jdbc.mapDateToTimestamp默认为true,用户通常需要禁用此标志以避免Oracle日期被解析为时间戳。

性能调优

对于某些工作负载,可以通过在内存中缓存数据或打开一些实验选项来提高性能。

Caching Data In Memory

Spark SQL可以通过调用spark.catalog.cacheTable("tableName")或dataFrame.cache()使用内存中的列式格式来缓存表。 然后,Spark SQL将仅扫描所需的列,并自动调整压缩以最小化内存使用和GC压力。 您可以调用spark.catalog.uncacheTable("tableName")从内存中删除表。

可以使用SparkSession的setConf方法或使用SQL运行SET key = value命令来完成内存中缓存的配置。

Property Name Default Meaning
spark.sql.inMemoryColumnarStorage.compressed true 设置为true时,Spark SQL将根据数据统计信息自动为每列选择压缩编解码器。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列存缓存的批次大小。较大的批处理大小可以提高内存利用率和压缩率,但在缓存数据时存在OOM风险。

其他配置项

以下选项也可用于调整查询执行的性能。由于更多优化会自动执行,因此在将来的版本中可能会弃用这些选项。

Property Name Default Meaning
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 读取文件时打包到单个分区的最大字节数。
spark.sql.files.openCostInBytes 4194304 (4 MB) 打开文件的估计成本是通过可以在同一时间扫描的字节数测量的。这在将多个文件放入分区时是有用的。最好是做过度估计,这样使用较小文件的分区将比较大文件的分区(首先安排的分区)更快。
spark.sql.broadcastTimeout 300

广播连接中广播等待时间的超时(以秒为单位)

spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置在执行join时将广播到所有工作节点的表的最大大小(以字节为单位)。 通过将此值设置为-1,可以禁用广播。请注意,当前的统计信息仅支持Hive Metastore表,并且其中命令ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan已经运行。
spark.sql.shuffle.partitions 200 配置在为join或聚合shuffle数据时要使用的分区数。

Broadcast Hint for SQL Queries

BROADCAST hint指导Spark在将其与另一个表或视图join时广播每个指定的表。 当Spark决定join方法时,广播散列连接(broadcast hash join即BHJ)是首选,即使统计信息高于spark.sql.autoBroadcastJoinThreshold配置的。当join的两端都被指定时,Spark会广播具有较低统计信息的那一方。 注意Spark并不保证始终选择BHJ,因为并非所有情况(例如全外连接)都支持BHJ。 当选择广播嵌套循环连接(broadcast nested loop join)时,我们仍然听从hint的。

import org.apache.spark.sql.functions.broadcast
broadcast(spark.table("src")).join(spark.table("records"), "key").show()

分布式SQL引擎Distributed SQL Engine

Spark SQL还可以使用其JDBC/ODBC或命令行界面充当分布式查询引擎。 在此模式下,最终用户或应用程序可以直接与Spark SQL交互以运行SQL查询,而无需编写任何代码。

Running the Thrift JDBC/ODBC server

此处实现的Thrift JDBC/ODBC服务器对应于Hive 1.2.1中的HiveServer2。 您可以使用Spark或Hive 1.2.1附带的beeline脚本测试JDBC服务器。

要启动JDBC / ODBC服务器,请在Spark目录中运行以下命令:

./sbin/start-thriftserver.sh

此脚本接受所有bin/spark-submit命令行选项,以及--hiveconf选项以指定Hive属性。 您可以运行./sbin/start-thriftserver.sh --help以获取所有可用选项的完整列表。默认情况下,服务器监听localhost:10000。您可以通过任一环境变量覆盖此行为,例如:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...

或者系统属性system properties

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

现在您可以使用beeline来测试Thrift JDBC/ODBC服务器:

./bin/beeline

使用以下方式直接连接到JDBC/ODBC服务器:

beeline> !connect jdbc:hive2://localhost:10000

Beeline会询问您的用户名和密码。在非安全模式下,只需在您的计算机上输入用户名和空白密码即可。对于安全模式,请按照beeline文档中的说明进行操作。

通过将hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf/中来完成Hive的配置。

您也可以使用Hive附带的beeline脚本。

Thrift JDBC服务器还支持通过HTTP传输发送thrift RPC消息。使用以下设置将HTTP模式作为系统属性或在conf/中的hive-site.xml文件中启用:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要进行测试,请使用beeline以http模式连接到JDBC/ODBC服务器:

beeline> !connect jdbc:hive2://<host>:<port>/<database>hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

Running the Spark SQL CLI

Spark SQL CLI是一种方便的工具,可以在本地模式下运行Hive Metastore服务,并执行从命令行输入的查询。 请注意,Spark SQL CLI无法与Thrift JDBC服务器通信。

要启动Spark SQL CLI,请在Spark目录中运行以下命令:

./bin/spark-sql

通过将hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf/中来完成Hive的配置。 您可以运行./bin/spark-sql --help以获取所有可用选项的完整列表。


                    Spark SQL玩起来   https://www.cppentry.com/bencandy.php?fid=116&id=222568

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Apache Spark 之 SparkSQL(章节六) 下一篇        ..