设为首页 加入收藏

TOP

Spark系列之:Spark SQL(2)
2019-05-11 13:15:52 】 浏览:143
Tags:Spark 系列 SQL

Spark系列之:Spark SQL(2)



四、RDD转换为DataFrame

1. 利用反射机制解析RDD

  • 在利用反射机制推断RDD模式时,首先需要先定义一个样例类,因为只有样例类才能被spark隐式转换为DataFrame

  • A. Spark-shell操作

开发前要导入三个包:

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import spark.implicits._   //启用隐式转换

(1)创建RDD

sc.textFile(path)
sc.paralle(集合)
sc.makeRDD(集合)

sc.textFile("/home/duck/software/spark/examples/src/main/resources/people.txt")

在这里插入图片描述

(2)创建样例类

case class People(name:String,age:Int)

在这里插入图片描述
//用“,”分割数据

res0.map(_.split(","))

在这里插入图片描述

(3)把RDD包装成样例类

res1.map(t=>People(t(0),t(1).trim.toInt))

在这里插入图片描述res2.foreach(println)
在这里插入图片描述

(4)将包装好的样例类转化为DF
res2.toDF
在这里插入图片描述res4.show()
在这里插入图片描述

  • B. IDEA中操作
    依赖配置:

在这里插入图片描述

代码:

import org.apache.spark.rdd.RDD
	import org.apache.spark.sql.{DataFrame, SparkSession}
	import org.apache.spark.{SparkConf, SparkContext}
	
	case class People(name: String, age: Int)
	object SQLlesson01 {
	  def main(args: Array[String]): Unit = {
	    //设置spark
	    val conf = new SparkConf().setMaster("local").setAppName("sparksql01")
	    val sc = new SparkContext(conf)
	    //创建spark sql对象
	    val sparkSql:SparkSession = SparkSession.builder().config(conf).getOrCreate()
	    //启用隐式转换
	    import sparkSql.implicits._
	    //创建RDD
	    val unit: RDD[People] = sc.textFile("./people.txt").map(_.split(",")).map(t=>People(t(0),t(1).trim.toInt))
	    unit.foreach(println)
	    //转换为DataFrame
	    val df:DataFrame = unit.toDF()
	    //创建临时表
	    df.createOrReplaceTempView("peopleTemp")
	    //通过临时表查询
	    sparkSql.sql("select * from peopleTemp").show()
	  }
	}

运行结果:
在这里插入图片描述

2. 使用编程方式定义RDD模式

在这里插入图片描述

  • A. Spark-shell操作

引包:

import org.apache.spark.sql.types._   //类型
import org.apache.spark.sql.Row      //行

(1)创建行对象
//获取数据

sc.textFile("/home/duck/software/spark/examples/src/main/resources/people.txt")

在这里插入图片描述
//创建行对象

res0.map(_.split(",")).map(t=>Row(t(0),t(1).trim.toInt))

在这里插入图片描述

(2)创建表头

Array(StructField("name",StringType,true),StructField("age",IntegerType,true))

在这里插入图片描述
//把字段封装进表头

StructType(res2) 

在这里插入图片描述

(3)将表头和行拼接

spark.createDataFrame(res1,res3)

在这里插入图片描述

res4.show()

在这里插入图片描述

  • B. IDEA中操作

代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

object SQLlesson02 {
  def main(args: Array[String]): Unit = {
    //设置spark对象
    val conf = new SparkConf().setMaster("local").setAppName("sparkSQL02")
    val sc = new SparkContext(conf)
    //创建spark sql对象
    val spark = SparkSession.builder().config(conf).getOrCreate()
    //启用隐式转换
    import spark.implicits._
    
    //1. 创建表头
    val fields = Array(StructField("name",StringType,true),StructField("age",IntegerType,true))
    val structType:StructType = StructType(fields)
    //2. 创建行对象
    val value:RDD[Row] = sc.textFile("./people.txt").map(_.split(",")).map(t=>Row(t(0),t(1).trim.toInt))
    //3. 将表头和行对象拼接
    val df = spark.createDataFrame(value,structType)
    //创建临时表
    df.createOrReplaceTempView("people2")
    //通过临时表查询
    spark.sql("select * from people2").show()

  }
}

五、使用Spark SQL读写数据库(MySQL)

  • 启动Hadoop集群和spark集群

  • 启动MySQL并登录

  • 支持Parquet、Json、hive等数据源,通过JDBC连接数据库:

1. 启动MySQL数据库
在这里插入图片描述

2. 将mysql驱动包放入spark的jars文件夹中,再启动spark命令行

./spark-shell \
--jars ~/software/spark/jars/mysql-connector-java-5.1.32-bin.jar \
--driver-class-path ~/software/spark/jars/mysql-connector-java-5.1.32-bin.jar

在这里插入图片描述
3. 创建jdbc连接

val jdbcDF = spark.read.format("jdbc").
					     | option("url","jdbc:mysql://Cloud00:3306/test").
					     | option("driver","com.mysql.jdbc.Driver").
					     | option("dbtable","tb1").
					     | option("user","root").
					     | option("password","admin").
					     | load()

在这里插入图片描述在这里插入图片描述


六、使用spark sql写入MySQL数据库

  • 启动Hadoop集群和spark集群

  • 启动MySQL并登录
    1. 引包

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import java.util.Properties //配置属性

2. 创建RDD数据

 val RDD=sc.parallelize(Array("5 haha","6 xixi")).map(_.split(" ")).map(t=>Row(t(0).trim.toInt,t(1)))

在这里插入图片描述

RDD.foreach(println)

在这里插入图片描述

3. 创建表头

val schema = StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true)))

在这里插入图片描述
4. 转换为DataFrame

val df = spark.createDataFrame(RDD,schema)

在这里插入图片描述
5. 创建配置对象,并设置参数

val prop = new Properties()
prop.put("user","root")
prop.put("password","admin")
prop.put("driver","com.mysql.jdbc.Driver")

在这里插入图片描述
6. 写入操作

df.write.mode("append").jdbc("jdbc:mysql://Cloud00:3306/test","tb1",prop)

在这里插入图片描述在这里插入图片描述


七、通过spark sql将数据写入hive

  • 启动Hadoop集群、YARN和spark集群
  • 启动MySQL
  • 启动hive
    1. 将hive中的配置文件hive-site.xml拷贝到spark/conf目录下,并添加配置
    在这里插入图片描述
  1. 启动元数据监听

    hive --service metastore &
    在这里插入图片描述

3. 引包

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

4. 创建RDD对象 以及数据

val RowRDD = sc.parallelize(Array("7 zs","8 lisi")).map(_.split(" ")).map(t=>Row(t(0).trim.toInt,t(1)))

在这里插入图片描述
5. 设置表头信息

val schema = StructType(Array(StructField("id",IntegerType,true),StructField("name",StringType,true)))

在这里插入图片描述
6. 将RDD对象和表头拼接并转换为DataFrame

val DF=spark.createDataFrame(RowRDD,schema)

在这里插入图片描述
7. 注册临时表

DF.createOrReplaceTempView("stu")

在这里插入图片描述
8. 添加数据到hive并查看

sql("insert into pgdb.student select * from stu")

在hive中查看
在这里插入图片描述***

--->有问题请联系

在这里插入图片描述


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scala   WordCount 下一篇大数据学习23:Spark:大数据的计..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目