TOP

-spark基础操作
2019-02-08 13:30:59 】 浏览:51
Tags:-spark 基础 操作

dataframe

spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能。主要对类SQL的支持。

DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据块中的表,它与RDD最主要的区别在于:DataFrame有schema元数据,即DataFrame所表示的数据集的每一列都有名称和数据类型。正是因为有了这些schema元数据,Sparl SQL的查询优化器就可以进行针对性的优化。

spark dataframe 的几个关键点:

  • 分布式的数据集

  • 类似关系型数据库中的table,或者 excel 里的一张 sheet,或者 python/R 里的 dataframe

  • 拥有丰富的操作函数,类似于 rdd 中的算子

  • 一个 dataframe 可以被注册成一张数据表,然后用 sql 语言在上面操作

  • 丰富的创建方式

    • 已有的RDD

    • 结构化数据文件

    • JSON数据集

    • Hive表

    • 外部数据库

RDD和 DataFrame的比较

# 前者没有schema信息;后者有schema信息

# RDD无法得知所存的数据元素的具体内部结构,Spark Core只能在stage层面进行简单的优化;后者因为有schema信息,Sparl SQL的查询优化器就可以进行针对性的优化

# RDD通过函数式调用API,虽然简洁明了,但是需要创建新的对象,不容易重用旧的对象,给GC带来挑战;DataFrame是尽可能的重用对象

在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选、合并,重新入库。

首先加载数据集,然后在提取数据集的前几行过程中,才找到limit的函数。

而合并就用到union函数,重新入库,就是registerTemple注册成表,再进行写入到HIVE中。

不得不赞叹dataframe的强大。

DataFrame创建方式

跟关系数据库的表(Table)一样,DataFrame是Spark中对带模式(schema)行列数据的抽象。DateFrame广泛应用于使用SQL处理大数据的各种场景。创建DataFrame有很多种方法,比如从本地List创建、从RDD创建或者从源数据创建,下面简要介绍创建DataFrame的三种方法。

方法一,Spark中使用toDF函数创建DataFrame

通过导入(importing)Spark sql implicits, 就可以将本地序列(seq), 数组或者RDD转为DataFrame。只要这些数据的内容能指定数据类型即可。

本地seq + toDF创建DataFrame示例:

import sqlContext.implicits._
val df = Seq(
  (1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  (2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")
 sqlContext.implicits._
val df = Seq(
  (1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  (2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")

注意:如果直接用toDF()而不指定列名字,那么默认列名为"1", "2", ...

通过case class + toDF创建DataFrame的示例

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// 使用 sqlContext 执行 sql 语句.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// 注:sql()函数的执行结果也是DataFrame,支持各种常用的RDD操作.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// 使用 sqlContext 执行 sql 语句.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// 注:sql()函数的执行结果也是DataFrame,支持各种常用的RDD操作.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

方法二,Spark中使用createDataFrame函数创建DataFrame

SqlContext中使用createDataFrame也可以创建DataFrame。跟toDF一样,这里创建DataFrame的数据形态也可以是本地数组或者RDD。

通过row+schema创建示例

import org.apache.spark.sql.types._
val schema = StructType(List(
  StructField("integer_column", IntegerType, nullable = false),
  StructField("string_column", StringType, nullable = true),
  StructField("date_column", DateType, nullable = true)
))

val rdd = sc.parallelize(Seq(
 Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
 Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))
val df = sqlContext.createDataFrame(rdd, schema)
 org.apache.spark.sql.types._
val schema = StructType(List(
  StructField("integer_column", IntegerType, nullable = false),
  StructField("string_column", StringType, nullable = true),
  StructField("date_column", DateType, nullable = true)
))

val rdd = sc.parallelize(Seq(
 Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
 Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))
val df = sqlContext.createDataFrame(rdd, schema)

方法三,通过文件直接创建DataFrame

使用parquet文件创建


val df = sqlContext.read.parquet("hdfs:/path/to/file")val df = sqlContext.read.parquet("hdfs:/path/to/file")

使用json文件创建

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|  name|
// +----+-------+
// |null|Michael|
// |  30|  Andy|
// |  19| Justin|
// +----+-------+ df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|  name|
// +----+-------+
// |null|Michael|
// |  30|  Andy|
// |  19| Justin|
// +----+-------+

/**
  * DataFrame API基本操作
  */
object DataFrameApp {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("DataFrameApp")
      .master("local[2]")
      .getOrCreate()

    // 将json文件加载成一个dataframe
    val peopleDF = spark.read
      .format("json")
      .load("people.json")

    // 输出dataframe对应的schema信息
    peopleDF.printSchema()

    // 输出数据集的前20条记录
    peopleDF.show()

    //查询某列所有的数据: select name from table
    peopleDF.select("name").show()

    // 查询某几列所有的数据,并对列进行计算: select name, age+10 as age2 from table
    peopleDF
      .select(peopleDF.col("name"), (peopleDF.col("age") + 10).as("age2"))
      .show()

    //根据某一列的值进行过滤: select * from table where age>19
    peopleDF.filter(peopleDF.col("age") > 19).show()

    //根据某一列进行分组,然后再进行聚合操作: select age,count(1) from table group by age
    peopleDF.groupBy("age").count().show()

    spark.stop()
  }
  
}

/**
  * RDD to DataFrame以及DataFrame操作
  */
object DataFrameCase {

  case class Student(id: Int, name: String, phone: String, email: String)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("DataFrameRDDApp")
      .master("local[2]")
      .getOrCreate()

    // RDD ==> DataFrame
    val rdd =
      spark.sparkContext.textFile("student.data")

    //注意:需要导入隐式转换
    import spark.implicits._
    val studentDF = rdd
      .map(_.split("""|"""))
      .map(line => Student(line(0).toInt, line(1), line(2), line(3)))
      .toDF()

    //show默认只显示前20条
    studentDF.show
    studentDF.show(30)
    studentDF.show(30, false)

    studentDF.take(10)
    studentDF.first()
    studentDF.head(3)

    studentDF.select("email").show(30, false)

    studentDF.filter("name=''").show
    studentDF.filter("name='' OR name='NULL'").show

    //name以M开头的人
    studentDF.filter("SUBSTR(name,0,1)='M'").show

    studentDF.sort(studentDF("name")).show
    studentDF.sort(studentDF("name").desc).show

    studentDF.sort("name", "id").show
    studentDF.sort(studentDF("name").asc, studentDF("id").desc).show

    studentDF.select(studentDF("name").as("student_name")).show

    val studentDF2 = rdd
      .map(_.split("\\|"))
      .map(line => Student(line(0).toInt, line(1), line(2), line(3)))
      .toDF()

    studentDF
      .join(studentDF2, studentDF.col("id") === studentDF2.col("id"))
      .show

    spark.stop()

  }

}

package spark_basic

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{
  StringType,
  IntegerType,
  StructField,
  StructType
}

/**
  * DataFrame和RDD的互操作
  */
object DataFrameRDDApp {

  case class Info(id: Int, name: String, age: Int)

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("DataFrameRDDApp")
      .master("local[2]")
      .getOrCreate()

    inferReflection(spark)

    program(spark)

    spark.stop()
  }

  def inferReflection(spark: SparkSession): Unit = {

    // RDD ==> DataFrame
    val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")

    //注意:需要导入隐式转换
    import spark.implicits._
    val infoDF = rdd
      .map(_.split(","))
      .map(line => Info(line(0).toInt, line(1), line(2).toInt))
      .toDF()

    infoDF.show()

    infoDF.filter(infoDF.col("age") > 30).show

    infoDF.createOrReplaceTempView("infos")
    spark.sql("select * from infos where age > 30").show()
  }

  def program(spark: SparkSession): Unit = {
    // RDD ==> DataFrame
    val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")

    val infoRDD = rdd
      .map(_.split(","))
      .map(line => Row(line(0).toInt, line(1), line(2).toInt))

    val structType = StructType(
      Array(StructField("id", IntegerType, true),
            StructField("name", StringType, true),
            StructField("age", IntegerType, true)))

    val infoDF = spark.createDataFrame(infoRDD, structType)
    infoDF.printSchema()
    infoDF.show()

    //通过df的api进行操作
    infoDF.filter(infoDF.col("age") > 30).show

    //通过sql的方式进行操作
    infoDF.createOrReplaceTempView("infos")
    spark.sql("select * from infos where age > 30").show()
  }

}

package spark_basic

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

/**
  * HiveContext的使用
  * 使用时需要通过--jars 把mysql的驱动传递到classpath
  */
object HiveContextApp {

  def main(args: Array[String]): Unit = {

    val warehouseLocation = "spark-warehouse"

    val sparkConf =
      new SparkConf().set("spark.sql.warehouse.dir", warehouseLocation)

    val spark =
      SparkSession.builder.enableHiveSupport.config(sparkConf).getOrCreate()

    //2)相关的处理:
    spark.table("emp").show

    //3)关闭资源
    spark.stop()
  }
}

package spark_basic

import org.apache.spark.sql.SparkSession

/**
  * 使用外部数据源综合查询Hive和MySQL的表数据
  */
object HiveMySQLApp {

  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("HiveMySQLApp")
      .master("local[2]")
      .getOrCreate()

    // 加载Hive表数据
    val hiveDF = spark.table("emp")

    // 加载MySQL表数据
    val mysqlDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306")
      .option("dbtable", "spark.DEPT")
      .option("user", "root")
      .option("password", "root")
      .option("driver", "com.mysql.jdbc.Driver")
      .load()

    // JOIN
    val resultDF =
      hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
    resultDF.show

    resultDF
      .select(hiveDF.col("empno"),
              hiveDF.col("ename"),
              mysqlDF.col("deptno"),
              mysqlDF.col("dname"))
      .show

    spark.stop()
  }
}

package spark_basic

import org.apache.spark.sql.SparkSession

/**
  * Parquet文件操作
  */
object ParquetApp {

  def main(args: Array[String]) {

    val spark = SparkSession
      .builder()
      .appName("SparkSessionApp")
      .master("local[2]")
      .getOrCreate()

    /**
      * spark.read.format("parquet").load 这是标准写法
      */
    val userDF = spark.read
      .format("parquet")
      .load("users.parquet")

    userDF.printSchema()
    userDF.show()

    userDF.select("name", "favorite_color").show

    userDF
      .select("name", "favorite_color")
      .write
      .format("json")
      .save("file:///home/hadoop/tmp/jsonout")

    spark.read
      .load("users.parquet")
      .show

    //会报错,因为sparksql默认处理的format就是parquet
    spark.read
      .load("people.json")
      .show

    spark.read
      .format("parquet")
      .option("path", "users.parquet")
      .load()
      .show

    spark.stop()
  }
}


package spark_basic

import org.apache.spark.sql.SparkSession

/**
  * Schema Infer
  *
  * "spark.sql.sources.partitionColumnTypeInference.enabled" 默认是 true
  */
object SchemaInferApp {

  def main(args: Array[String]) {

    val spark = SparkSession
      .builder()
      .appName("SchemaInferApp")
      .master("local[2]")
      .config("spark.sql.sources.partitionColumnTypeInference.enabled", false)
      .getOrCreate()

    val df = spark.read
      .format("json")
      .load("file:///Users/rocky/data/json_schema_infer.json")

    df.printSchema()

    df.show()

    spark.stop()
  }

}


package spark_basic

import org.apache.spark.sql.SparkSession

/**
  * SparkSession的使用
  *
  * SparkSession是spark2.0以后默认的的统一客户端程序入口。
  *
  * sparkSession是HiveContext和sqlContext的统一入口
  * sparkContext可以通过spark.sparkContext获得
  */
object SparkSessionApp {

  def main(args: Array[String]) {

    val spark = SparkSession
      .builder()
      .enableHiveSupport()
      .appName("SparkSessionApp")
      .master("local[2]")
      .getOrCreate()
    val sc = spark.sparkContext

    val people = spark.read.json("file:///Users/rocky/data/people.json")
    people.show()

    spark.stop()
  }
}

package spark_basic

import java.sql.DriverManager

/**
  *  通过JDBC的方式访问
  */
object SparkSQLThriftServerApp {

  def main(args: Array[String]) {

    Class.forName("org.apache.hive.jdbc.HiveDriver")

    val conn = DriverManager.getConnection("jdbc:hive2://hadoop001:14000","hadoop","")
    val pstmt = conn.prepareStatement("select empno, ename, sal from emp")
    val rs = pstmt.executeQuery()
    while (rs.next()) {
      println("empno:" + rs.getInt("empno") +
        " , ename:" + rs.getString("ename") +
        " , sal:" + rs.getDouble("sal"))

    }

    rs.close()
    pstmt.close()
    conn.close()
  }
}


package spark_basic

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf

/**
  * SQLContext的使用:
  * 注意:IDEA是在本地,而测试数据是在服务器上 ,能不能在本地进行开发测试的?
  */
object SQLContextApp {

  def main(args: Array[String]): Unit = {

    val path = args(0)

    //1)创建相应的Context
    val sparkConf = new SparkConf()

    //在测试或者生产中,AppName和Master我们是通过脚本进行指定
    //sparkConf.setAppName("SQLContextApp").setMaster("local[2]")

    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    val sc = spark.sparkContext

    //2)相关的处理: json
    val people = spark.sqlContext.read.format("json").load(path)
    people.printSchema()
    people.show()

    //3)关闭资源
    sc.stop()
  }
}


-spark基础操作 https://www.cppentry.com/bencandy.php?fid=116&id=207481

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark与深度学习框架——H2O、dee.. 下一篇spark配置详解