设为首页 加入收藏

TOP

Spark-SQL学习笔记(1) - Datasets and DataFrames
2019-03-19 13:18:34 】 浏览:51
Tags:Spark-SQL 学习 笔记 Datasets and DataFrames
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_26369213/article/details/80206919

概述

Spark SQL是Spark中的一个模块,负责结构化数据的处理。它跟Spark RDD API不一样,Spark SQL提供的接口会提供更多关于数据和执行计算的信息。在内部,Spark SQL使用这些额外的信息去执行额外的优化。可以通过SQL 和 Dataset API与Spark SQL进行交互。当使用同一个执行引擎得到的计算结果,它是不会依赖于使用的API/编程语言。这意味着开发人员能更容易的在不同API进行切换。

SQL

Spark SQL可以执行SQL的查询,也可以从现有的hive中读取数据。当从另一种编程语言中运行SQL时,结果将作为一个dataset/dataframe返回。可以通过命令行或者JDBC/ODBC与SQL进行交互。

其实Spark SQL的功能不仅仅只是SQL的功能,它比SQL拥有更多的功能。

Datasets and DataFrames

一个Dataset是一个分部的数据集。Dataset是Spark 1.6中新增的一个新接口它有利于RDDs和Spark SQL中的优化引擎。一个Dataset可以JVM的对象中构建还可以被transformations函数(map,flatMap,filter,etc)操作。Dataset的API可以用于Scala和java。不适用于Python和R语言。

一个DataFrame是一个被加入列名的Dataset。从概念上理解可以等同于关系型数据库里的一张表。DataFrame的构建数据源有很多,例如:结构化文件,Hive中的表,外部数据库,或者已存在的RDDs。DataFrame API 可以用于Scala,java,Python 和 R语言。Scala API中,DataFrame进进是Dataset[Row]的别名。Java API中为Dataset<Row>


创建一个DataFrames

package com.hihi.learn.sparkSql

import com.hihi.learn.sparkSql.DatasetsDemo.Person
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import scala.collection.mutable.ArrayBuffer

object DataFrameDemo {
  case class Person(name: String, age: Long)
  def main(args: Array[String]): Unit = {
    // 创建SparkSession,此为spark SQL的入口
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .master("local[2]")
      .getOrCreate()
    import spark.implicits._
    //Creating DataFrames
    val df = spark.read.format("json").load("E:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.json")
    //df.show()
    /*
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    */

    // 使用算子操作DataFrame
    //UntypedDatasetOperations(df)

    // 使用SQL编程方式
    RunningSqlQueriesProgrammatically(spark, df)
    // Interoperating with RDDs
    val peopleDF = spark.sparkContext.textFile("E:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.txt")
      .mapPartitions(its => {
        var arrPerson = ArrayBuffer[Person]()
        for (it <- its) {
          val arr = it.split(",")
          arrPerson += Person(arr(0), arr(1).trim.toLong)
        }
        arrPerson.toIterator
      })
      .toDF().show

    // Programmatically Specifying the Schema
    val personDS4 = spark.sparkContext.textFile("E:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.txt")
    val schemaString = "name age"
    val fields = schemaString.split(" ").map(StructField(_, StringType, nullable = true))
    val schema = StructType(fields)

    val rowRDD = personDS4
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim))

    val peopleDF2 = spark.createDataFrame(rowRDD, schema)
    peopleDF2.show()
    spark.stop()

  }

  def UntypedDatasetOperations(df:DataFrame): Unit = {

    // 打印Schema
    df.printSchema()
    // root
    // |-- age: long (nullable = true)
    // |-- name: string (nullable = true)

    // 查询 name
    df.select("name").show
    // +-------+
    // |   name|
    // +-------+
    // |Michael|
    // |   Andy|
    // | Justin|
    // +-------+

    // 查询age > 21的数据
    df.filter("age > 21").show
    // +---+----+
    // |age|name|
    // +---+----+
    // | 30|Andy|
    // +---+----+

    // 使用分组
    df.groupBy("age").count.show()
    // +----+-----+
    // | age|count|
    // +----+-----+
    // |  19|    1|
    // |null|    1|
    // |  30|    1|
    // +----+-----+
  }

  def RunningSqlQueriesProgrammatically(spark: SparkSession, df: DataFrame) : Unit = {
    // 注册一张临时表
    df.createOrReplaceTempView("people")

    spark.sql("select name, age from people").show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+

    spark.sql("select name, age from people where name = 'Andy'").show
    // +----+---+
    // |name|age|
    // +----+---+
    // |Andy| 30|
    // +----+---+
  }

}

创建一个Datasets

package com.hihi.learn.sparkSql

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

import scala.collection.mutable.ArrayBuffer


object DatasetsDemo {

  case class Person(name: String, age: Long)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("DatasetsDemo")
      .master("local")
      .getOrCreate()

    // Creating Datasets
    import spark.implicits._
    val personDS = Seq(Person("Hihi", 22), Person("Tom", 11)).toDS
    personDS.show

    val personDS2 = spark.read.format("json").load("E:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.json").as[Person]
    personDS2.show

    spark.stop()
  }
}



】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scala   WordCount 下一篇深入理解Spark Streaming执行模型

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目