设为首页 加入收藏

TOP

spark join操作
2019-02-28 01:04:35 】 浏览:71
Tags:spark join 操作
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010800708/article/details/87905554

1、创建SparkSession

val sparkSession:SparkSession = SparkSession.builder()
.appName("SparkSqlJoin")
.master("local[2]")
.getOrCreate()

2、创建DataSet数据集

val datas1:Dataset[String] = sparkSession.createDataset(List(
"1 tony 18",
"2 reba 22",
"3 mimi 20"
))

val datas2:Dataset[String] = sparkSession.createDataset(List(
"18 young",
"22 old"
))

3.整理数据

val dataDS1:Dataset[(Int,String,Int)] = datas1.map(x => {
val fields:Array[String] = x.split(" ")
val id = fields(0)
val name = fields(1)
val age = fields(2)
//转换为元组数据
(id,name,age)
})
val dataDF1:DataFrame = dataDS1.toDF("id","name","age")

val dataDS2:Dataset[(Int,String,Int)] = datas2.map(x => {
val fields:Array[String] = x.split(" ")
val age = fields(0)
val desc = fields(1)
//转换为元组
(age,desc)
})
val dataDF2 = dataDS2.toDF("age","desc")

i 使用sql执行join操作

4、注册视图

dataDF1.createTmpView("d1_t")
dataDF2.createTmpView("d2_t")

5、写sql(join)

val r:DataFrame = sparkSession.
sql("select name,desc from d1_t join d2_t on d1_t.age = d2_t.age")

6、触发任务

r.show()

结果:

+----+-----+
|name| desc|
+----+-----+
|tony|young|
|reba|  old|
+----+-----+

ii 使用DSL完成join操作

4、join
inner join

val r: DataFrame = dataDF1.join(dataDF2,$"age" === $"dage")
r.show()

结果
+---+----+---+----+-----+
| id|name|age|dage| desc|
+---+----+---+----+-----+
|  1|tony| 18|  18|young|
|  2|reba| 22|  22|  old|
+---+----+---+----+-----+

left join

val r: DataFrame = dataDF1.join(dataDF2,$"age" === $"dage","left")
r.show()

结果:
+---+----+---+----+-----+
| id|name|age|dage| desc|
+---+----+---+----+-----+
|  1|tony| 18|  18|young|
|  2|reba| 22|  22|  old|
|  3|mimi| 20|null| null|
+---+----+---+----+-----+

right join

val r: DataFrame = dataDF1.join(dataDF2,$"age" === $"dage","right")
r.show()

结果:
+---+----+---+----+-----+
| id|name|age|dage| desc|
+---+----+---+----+-----+
|  1|tony| 18|  18|young|
|  2|reba| 22|  22|  old|
+---+----+---+----+-----+

除此外还有 left_outer,right_outer

完整代码:

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object JoinDemo {
  def main(args: Array[String]): Unit = {
    //1.创建sparkSession
    val sparkSession: SparkSession = SparkSession.builder().appName("JoinDemo")
      .master("local[2]").getOrCreate()

    import sparkSession.implicits._
    //2.直接创建dataSet
    val datas1: Dataset[String] = sparkSession.createDataset(List(
      "1 tony 18",
      "2 reba 22",
      "3 mimi 20"
    ))

    //3.整理数据
    val dataDS1: Dataset[(Int, String, Int)] = datas1.map(x => {
      val fields: Array[String] = x.split(" ")
      val id = fields(0).toInt
      val name = fields(1).toString
      val age = fields(2).toInt

      //元组输出
      (id, name, age)
    })

    val dataDF1: DataFrame = dataDS1.toDF("id","name","age")


    //4.创建第二份DataSet
    val datas2: Dataset[String] = sparkSession.createDataset(List(
      "18 young",
      "22 old"
    ))

    //5.切分数据
    val dataDS2: Dataset[(Int, String)] = datas2.map(x => {
      val fields: Array[String] = x.split(" ")
      val age = fields(0).toInt
      val desc: String = fields(1).toString

      //元组输出
      (age, desc)
    })

    //6.转换为DataFrame
    val dataDF2: DataFrame = dataDS2.toDF("age","desc")
/**
* sql方式
**/
    //7.注册视图
    dataDF1.createTempView("d1_t")
    dataDF2.createTempView("d2_t")

    //8.写sql(join)
    val r: DataFrame = sparkSession.sql("select name,desc from d1_t join d2_t on d1_t.age = d2_t.age ")
    
    //9.触发任务
    r.show()

	/**
	* DSL方式
	**/
//val r: DataFrame = dataDF1.join(dataDF2,$"age" === $"dage","left")
//val r: DataFrame = dataDF1.join(dataDF2,$"age" === $"dage","left_outer")
//val r: DataFrame = dataDF1.join(dataDF2,$"age" === $"dage","right")
//val r: DataFrame = dataDF1.join(dataDF2,$"age" === $"dage","right_outer")
	r.show()

  }
}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark算子--action篇 下一篇Spark与深度学习框架——H2O、dee..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目