// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interfacecaseclassPerson(name: String, age: Long)// Encoders are created for case classesval caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+// |name|age|// +----+---+// |Andy| 32|// +----+---+// Encoders for most common types are automatically provided by importing spark.implicits._val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by nameval path = "examples/src/main/resources/people.json"val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+// 所有的示例代码可以在 Spark repo 的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中找到。
10.数据源(读写操作)
spark SQL 支持通过 DataFrame 接口对各种 data sources (数据源)进行操作. DataFrame 可以使用 relational transformations (关系转换)操作, 也可用于创建 temporary view (临时视图). 将 DataFrame 注册为 temporary view (临时视图)允许您对其数据运行 SQL 查询. 本节 描述了使用 Spark Data Sources 加载和保存数据的一般方法, 然后涉及可用于 built-in data sources (内置数据源)的 specific options (特定选项).
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name","age").write.format("parquet").save("namesAndAges.parquet")
13.直接在文件上运行 SQL
不使用读取 API 将文件加载到 DataFrame 并进行查询, 也可以直接用 SQL 查询该文件.
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
14.Parquet Files
Parquet 是许多其他数据处理系统支持的 columnar format (柱状格式). Spark SQL 支持读写 Parquet 文件, 可自动保留 schema of the original data (原始数据的模式). 当编写 Parquet 文件时, 出于兼容性原因, 所有 columns 都将自动转换为可空.
Loading Data Programmatically (以编程的方式加载数据)
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+