dataframe
spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能。主要对类SQL的支持。
DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据块中的表,它与RDD最主要的区别在于:DataFrame有schema元数据,即DataFrame所表示的数据集的每一列都有名称和数据类型。正是因为有了这些schema元数据,Sparl SQL的查询优化器就可以进行针对性的优化。
spark dataframe 的几个关键点:
RDD和 DataFrame的比较
# 前者没有schema信息;后者有schema信息
# RDD无法得知所存的数据元素的具体内部结构,Spark Core只能在stage层面进行简单的优化;后者因为有schema信息,Sparl SQL的查询优化器就可以进行针对性的优化
# RDD通过函数式调用API,虽然简洁明了,但是需要创建新的对象,不容易重用旧的对象,给GC带来挑战;DataFrame是尽可能的重用对象
在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选、合并,重新入库。
首先加载数据集,然后在提取数据集的前几行过程中,才找到limit的函数。
而合并就用到union函数,重新入库,就是registerTemple注册成表,再进行写入到HIVE中。
不得不赞叹dataframe的强大。
DataFrame创建方式
跟关系数据库的表(Table)一样,DataFrame是Spark中对带模式(schema)行列数据的抽象。DateFrame广泛应用于使用SQL处理大数据的各种场景。创建DataFrame有很多种方法,比如从本地List创建、从RDD创建或者从源数据创建,下面简要介绍创建DataFrame的三种方法。
方法一,Spark中使用toDF
函数创建DataFrame
通过导入(importing)Spark sql implicits, 就可以将本地序列(seq), 数组或者RDD转为DataFrame。只要这些数据的内容能指定数据类型即可。
本地seq + toDF创建DataFrame示例:
import sqlContext.implicits._
val df = Seq(
(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")
sqlContext.implicits._
val df = Seq(
(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")
注意:如果直接用toDF()而不指定列名字,那么默认列名为"1", "2", ...
通过case class + toDF创建DataFrame的示例
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// 使用 sqlContext 执行 sql 语句.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// 注:sql()函数的执行结果也是DataFrame,支持各种常用的RDD操作.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// 使用 sqlContext 执行 sql 语句.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// 注:sql()函数的执行结果也是DataFrame,支持各种常用的RDD操作.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
方法二,Spark中使用createDataFrame
函数创建DataFrame
在SqlContext
中使用createDataFrame也可以创建DataFrame。跟toDF
一样,这里创建DataFrame的数据形态也可以是本地数组或者RDD。
通过row+schema创建示例
import org.apache.spark.sql.types._
val schema = StructType(List(
StructField("integer_column", IntegerType, nullable = false),
StructField("string_column", StringType, nullable = true),
StructField("date_column", DateType, nullable = true)
))
val rdd = sc.parallelize(Seq(
Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))
val df = sqlContext.createDataFrame(rdd, schema)
org.apache.spark.sql.types._
val schema = StructType(List(
StructField("integer_column", IntegerType, nullable = false),
StructField("string_column", StringType, nullable = true),
StructField("date_column", DateType, nullable = true)
))
val rdd = sc.parallelize(Seq(
Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))
val df = sqlContext.createDataFrame(rdd, schema)
方法三,通过文件直接创建DataFrame
使用parquet文件创建
val df = sqlContext.read.parquet("hdfs:/path/to/file")
val df = sqlContext.read.parquet("hdfs:/path/to/file")
使用json文件创建
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
/**
* DataFrame API基本操作
*/
object DataFrameApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("DataFrameApp")
.master("local[2]")
.getOrCreate()
// 将json文件加载成一个dataframe
val peopleDF = spark.read
.format("json")
.load("people.json")
// 输出dataframe对应的schema信息
peopleDF.printSchema()
// 输出数据集的前20条记录
peopleDF.show()
//查询某列所有的数据: select name from table
peopleDF.select("name").show()
// 查询某几列所有的数据,并对列进行计算: select name, age+10 as age2 from table
peopleDF
.select(peopleDF.col("name"), (peopleDF.col("age") + 10).as("age2"))
.show()
//根据某一列的值进行过滤: select * from table where age>19
peopleDF.filter(peopleDF.col("age") > 19).show()
//根据某一列进行分组,然后再进行聚合操作: select age,count(1) from table group by age
peopleDF.groupBy("age").count().show()
spark.stop()
}
}
/**
* RDD to DataFrame以及DataFrame操作
*/
object DataFrameCase {
case class Student(id: Int, name: String, phone: String, email: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("DataFrameRDDApp")
.master("local[2]")
.getOrCreate()
// RDD ==> DataFrame
val rdd =
spark.sparkContext.textFile("student.data")
//注意:需要导入隐式转换
import spark.implicits._
val studentDF = rdd
.map(_.split("""|"""))
.map(line => Student(line(0).toInt, line(1), line(2), line(3)))
.toDF()
//show默认只显示前20条
studentDF.show
studentDF.show(30)
studentDF.show(30, false)
studentDF.take(10)
studentDF.first()
studentDF.head(3)
studentDF.select("email").show(30, false)
studentDF.filter("name=''").show
studentDF.filter("name='' OR name='NULL'").show
//name以M开头的人
studentDF.filter("SUBSTR(name,0,1)='M'").show
studentDF.sort(studentDF("name")).show
studentDF.sort(studentDF("name").desc).show
studentDF.sort("name", "id").show
studentDF.sort(studentDF("name").asc, studentDF("id").desc).show
studentDF.select(studentDF("name").as("student_name")).show
val studentDF2 = rdd
.map(_.split("\\|"))
.map(line => Student(line(0).toInt, line(1), line(2), line(3)))
.toDF()
studentDF
.join(studentDF2, studentDF.col("id") === studentDF2.col("id"))
.show
spark.stop()
}
}
package spark_basic
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{
StringType,
IntegerType,
StructField,
StructType
}
/**
* DataFrame和RDD的互操作
*/
object DataFrameRDDApp {
case class Info(id: Int, name: String, age: Int)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("DataFrameRDDApp")
.master("local[2]")
.getOrCreate()
inferReflection(spark)
program(spark)
spark.stop()
}
def inferReflection(spark: SparkSession): Unit = {
// RDD ==> DataFrame
val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")
//注意:需要导入隐式转换
import spark.implicits._
val infoDF = rdd
.map(_.split(","))
.map(line => Info(line(0).toInt, line(1), line(2).toInt))
.toDF()
infoDF.show()
infoDF.filter(infoDF.col("age") > 30).show
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}
def program(spark: SparkSession): Unit = {
// RDD ==> DataFrame
val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")
val infoRDD = rdd
.map(_.split(","))
.map(line => Row(line(0).toInt, line(1), line(2).toInt))
val structType = StructType(
Array(StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)))
val infoDF = spark.createDataFrame(infoRDD, structType)
infoDF.printSchema()
infoDF.show()
//通过df的api进行操作
infoDF.filter(infoDF.col("age") > 30).show
//通过sql的方式进行操作
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}
}
package spark_basic
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
/**
* HiveContext的使用
* 使用时需要通过--jars 把mysql的驱动传递到classpath
*/
object HiveContextApp {
def main(args: Array[String]): Unit = {
val warehouseLocation = "spark-warehouse"
val sparkConf =
new SparkConf().set("spark.sql.warehouse.dir", warehouseLocation)
val spark =
SparkSession.builder.enableHiveSupport.config(sparkConf).getOrCreate()
//2)相关的处理:
spark.table("emp").show
//3)关闭资源
spark.stop()
}
}
package spark_basic
import org.apache.spark.sql.SparkSession
/**
* 使用外部数据源综合查询Hive和MySQL的表数据
*/
object HiveMySQLApp {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("HiveMySQLApp")
.master("local[2]")
.getOrCreate()
// 加载Hive表数据
val hiveDF = spark.table("emp")
// 加载MySQL表数据
val mysqlDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306")
.option("dbtable", "spark.DEPT")
.option("user", "root")
.option("password", "root")
.option("driver", "com.mysql.jdbc.Driver")
.load()
// JOIN
val resultDF =
hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
resultDF.show
resultDF
.select(hiveDF.col("empno"),
hiveDF.col("ename"),
mysqlDF.col("deptno"),
mysqlDF.col("dname"))
.show
spark.stop()
}
}
package spark_basic
import org.apache.spark.sql.SparkSession
/**
* Parquet文件操作
*/
object ParquetApp {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("SparkSessionApp")
.master("local[2]")
.getOrCreate()
/**
* spark.read.format("parquet").load 这是标准写法
*/
val userDF = spark.read
.format("parquet")
.load("users.parquet")
userDF.printSchema()
userDF.show()
userDF.select("name", "favorite_color").show
userDF
.select("name", "favorite_color")
.write
.format("json")
.save("file:///home/hadoop/tmp/jsonout")
spark.read
.load("users.parquet")
.show
//会报错,因为sparksql默认处理的format就是parquet
spark.read
.load("people.json")
.show
spark.read
.format("parquet")
.option("path", "users.parquet")
.load()
.show
spark.stop()
}
}
package spark_basic
import org.apache.spark.sql.SparkSession
/**
* Schema Infer
*
* "spark.sql.sources.partitionColumnTypeInference.enabled" 默认是 true
*/
object SchemaInferApp {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("SchemaInferApp")
.master("local[2]")
.config("spark.sql.sources.partitionColumnTypeInference.enabled", false)
.getOrCreate()
val df = spark.read
.format("json")
.load("file:///Users/rocky/data/json_schema_infer.json")
df.printSchema()
df.show()
spark.stop()
}
}
package spark_basic
import org.apache.spark.sql.SparkSession
/**
* SparkSession的使用
*
* SparkSession是spark2.0以后默认的的统一客户端程序入口。
*
* sparkSession是HiveContext和sqlContext的统一入口
* sparkContext可以通过spark.sparkContext获得
*/
object SparkSessionApp {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.enableHiveSupport()
.appName("SparkSessionApp")
.master("local[2]")
.getOrCreate()
val sc = spark.sparkContext
val people = spark.read.json("file:///Users/rocky/data/people.json")
people.show()
spark.stop()
}
}
package spark_basic
import java.sql.DriverManager
/**
* 通过JDBC的方式访问
*/
object SparkSQLThriftServerApp {
def main(args: Array[String]) {
Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://hadoop001:14000","hadoop","")
val pstmt = conn.prepareStatement("select empno, ename, sal from emp")
val rs = pstmt.executeQuery()
while (rs.next()) {
println("empno:" + rs.getInt("empno") +
" , ename:" + rs.getString("ename") +
" , sal:" + rs.getDouble("sal"))
}
rs.close()
pstmt.close()
conn.close()
}
}
package spark_basic
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
/**
* SQLContext的使用:
* 注意:IDEA是在本地,而测试数据是在服务器上 ,能不能在本地进行开发测试的?
*/
object SQLContextApp {
def main(args: Array[String]): Unit = {
val path = args(0)
//1)创建相应的Context
val sparkConf = new SparkConf()
//在测试或者生产中,AppName和Master我们是通过脚本进行指定
//sparkConf.setAppName("SQLContextApp").setMaster("local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = spark.sparkContext
//2)相关的处理: json
val people = spark.sqlContext.read.format("json").load(path)
people.printSchema()
people.show()
//3)关闭资源
sc.stop()
}
}