import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* Created by Administrator on 2017/10/21.
*/
object TransFormation {
def main(args: Array[String]): Unit = {
//map()//filter()//flatMap()//groupByKey()//reduceByKey()//sortByKey()//join()//union()//intersection()//distinct()
cartesian()
}
def cartesian(): Unit ={
val conf = new SparkConf().setAppName("cartesian").setMaster("local")
val sc = new SparkContext(conf)
varlist =List(1,2,3,4,5,6,7)
var list1=List("a","b","c","d")
sc.parallelize(list).cartesian(sc.parallelize(list1)).foreach(t=>println(t._1+"\t"+t._2))
}
def distinct(): Unit ={
val conf = new SparkConf().setAppName("distinct").setMaster("local")
val sc = new SparkContext(conf)
varlist=List(1,1,1,2,2,3,4,3,5,6,4,5,7)
sc.parallelize(list).distinct().foreach(println(_))
}
def intersection(): Unit ={
val conf = new SparkConf().setAppName("intersection").setMaster("local")
val sc = new SparkContext(conf)
varlist =List(1,2,3,4,5,6)
var list1=List(4,5,6,7,8,9)
sc.parallelize(list).intersection(sc.parallelize(list1)).foreach(println(_))
}
def union(): Unit ={
val conf = new SparkConf().setAppName("union").setMaster("local")
val sc = new SparkContext(conf)
varlist =List(1,2,3,4)
var list1=List(5,6,7,8)
sc.parallelize(list).union(sc.parallelize(list1)).foreach(println(_))
}
def join(): Unit ={
val conf = new SparkConf().setAppName("join").setMaster("local")
val sc = new SparkContext(conf)
var list1=List((1,"hadoop"),(2,"spark"),(3,"hbase"))
var list2=List((1,"had"),(2,"spa"),(3,"hba"))
sc.parallelize(list1).join(sc.parallelize(list2)).foreach(t=>println(t._1+"\t"+t._2._1+"\t"+t._2._2))
}
def sortByKey(): Unit ={
val conf = new SparkConf().setAppName("sortByKey").setMaster("local")
val sc = new SparkContext(conf)
varlist=List((6,"hadoop"),(8,"spark"),(10,"hbase"))
sc.parallelize(list).sortByKey().foreach(t=>println(t._1+"\t"+t._2))
}
def reduceByKey(): Unit ={
val conf = new SparkConf().setAppName("reduceByKey").setMaster("local")
val sc = new SparkContext(conf)
varlist=List(("hadoop",111),("spark",222),("hadoop",333),("spark",444))
sc.parallelize(list).reduceByKey(_+_).foreach(t=>println(t._1+"\t"+t._2))
}
def groupByKey(): Unit ={
val conf = new SparkConf().setAppName("groupByKey").setMaster("local")
val sc = new SparkContext(conf)
varlist=List((1->"docu"),(2->"idontkoow"))
sc.parallelize(list).groupByKey().foreach(t=>println(t._1+"\t"+t._2))
}
def flatMap(): Unit ={
val conf = new SparkConf().setAppName("flatMap").setMaster("local")
val sc = new SparkContext(conf)
varlist=List("you,jump","i,jump")
sc.parallelize(list).flatMap(_.split(",")).foreach(println(_))
}
def filter(): Unit ={
val conf = new SparkConf().setAppName("filter").setMaster("local")
val sc = new SparkContext(conf)
varlist=List(1,2,3,4,5,6,7,8,9,10)
sc.parallelize(list).filter(_%2==0).foreach(println(_))
}
def map(): Unit ={
val conf = new SparkConf().setAppName("map").setMaster("local")
val sc = new SparkContext(conf)
varlist=List("hadoop","spark","hive")
sc.parallelize(list).map("hello"+_).foreach(x=>println(x))
}
}