设为首页 加入收藏

TOP

spark常用算子的简单使用
2019-04-04 13:05:05 】 浏览:57
Tags:spark 常用 算子 简单 使用
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/weixin_39627946/article/details/78305346

Scala开发

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  * Created by Administrator on 2017/10/21.
  */
object TransFormation {
  def main(args: Array[String]): Unit = {
    //map()
    //filter()
    //flatMap()
    //groupByKey()
    //reduceByKey()
    //sortByKey()
    //join()
    //union()
    //intersection()
    //distinct()
    cartesian()

  }

  def cartesian(): Unit ={
    val conf = new SparkConf().setAppName("cartesian").setMaster("local")
    val sc = new SparkContext(conf)
    var list =List(1,2,3,4,5,6,7)
    var list1=List("a","b","c","d")
    sc.parallelize(list).cartesian(sc.parallelize(list1)).foreach(t=>println(t._1+"\t"+t._2))
  }

  def distinct(): Unit ={
    val conf = new SparkConf().setAppName("distinct").setMaster("local")
    val sc = new SparkContext(conf)
    var list=List(1,1,1,2,2,3,4,3,5,6,4,5,7)
    sc.parallelize(list).distinct().foreach(println(_))
  }

  def intersection(): Unit ={
    val conf = new SparkConf().setAppName("intersection").setMaster("local")
    val sc = new SparkContext(conf)
    var list =List(1,2,3,4,5,6)
    var list1=List(4,5,6,7,8,9)
    sc.parallelize(list).intersection(sc.parallelize(list1)).foreach(println(_))
  }

  def union(): Unit ={
    val conf = new SparkConf().setAppName("union").setMaster("local")
    val sc = new SparkContext(conf)
    var list =List(1,2,3,4)
    var list1=List(5,6,7,8)
    sc.parallelize(list).union(sc.parallelize(list1)).foreach(println(_))
  }
  def join(): Unit ={
    val conf = new SparkConf().setAppName("join").setMaster("local")
    val sc = new SparkContext(conf)
    var list1=List((1,"hadoop"),(2,"spark"),(3,"hbase"))
    var list2=List((1,"had"),(2,"spa"),(3,"hba"))
    sc.parallelize(list1).join(sc.parallelize(list2)).foreach(t=>println(t._1+"\t"+t._2._1+"\t"+t._2._2))
  }
  def sortByKey(): Unit ={
    val conf = new SparkConf().setAppName("sortByKey").setMaster("local")
    val sc = new SparkContext(conf)
    var list=List((6,"hadoop"),(8,"spark"),(10,"hbase"))
    sc.parallelize(list).sortByKey().foreach(t=>println(t._1+"\t"+t._2))
  }
  def reduceByKey(): Unit ={
    val conf = new SparkConf().setAppName("reduceByKey").setMaster("local")
    val sc = new SparkContext(conf)
    var list=List(("hadoop",111),("spark",222),("hadoop",333),("spark",444))
    sc.parallelize(list).reduceByKey(_+_).foreach(t=>println(t._1+"\t"+t._2))

  }

  def groupByKey(): Unit ={
    val conf = new SparkConf().setAppName("groupByKey").setMaster("local")
    val sc = new SparkContext(conf)
    var list=List((1->"docu"),(2->"idontkoow"))
    sc.parallelize(list).groupByKey().foreach(t=>println(t._1+"\t"+t._2))
  }
  def flatMap(): Unit ={
    val conf = new SparkConf().setAppName("flatMap").setMaster("local")
    val sc = new SparkContext(conf)
    var list=List("you,jump","i,jump")
    sc.parallelize(list).flatMap(_.split(",")).foreach(println(_))
  }

  def filter(): Unit ={
    val conf = new SparkConf().setAppName("filter").setMaster("local")
    val sc = new SparkContext(conf)
    var list=List(1,2,3,4,5,6,7,8,9,10)
    sc.parallelize(list).filter(_%2==0).foreach(println(_))
  }
  def map(): Unit ={
    val conf = new SparkConf().setAppName("map").setMaster("local")
    val sc = new SparkContext(conf)
    var list=List("hadoop","spark","hive")
    sc.parallelize(list).map("hello"+_).foreach(x=>println(x))
  }

}

Java开发

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Function2;
import scala.Tuple2;
import scala.reflect.runtime.SynchronizedSymbols;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;

/**
 * Created by Administrator on 2017/10/21.
 */
public class TransFormationOperator {

    public static void main(String[] args) {
        //map();
        //filter();
       //flatMap();
        //groupByKey();
        //reduceByKey();
        //sortByKey();
        //join();
        //union();
        //intersection();
        //distinct();
       // cartesian();


    }
    //笛卡尔积
      public static void cartesian(){
          SparkConf conf = new SparkConf().setAppName("cartesian").setMaster("local");
          JavaSparkContext sc = new JavaSparkContext(conf);
          List<Integer> list1 = Arrays.asList(1, 2, 3, 4, 5);
          List<String> list2 = Arrays.asList("hadoop", "spark", "hive", "hbase");
          JavaRDD<Integer> list1RDD = sc.parallelize(list1);
          JavaRDD<String> list2RDD = sc.parallelize(list2);
          list1RDD.cartesian(list2RDD).foreach(new VoidFunction<Tuple2<Integer, String>>() {
              @Override
              public void call(Tuple2<Integer, String> t) throws Exception {
                  System.out.println(t._1+"<==>"+t._2);
              }
          });
      }
    //去重
    public static void distinct(){
        SparkConf conf = new SparkConf().setAppName("distinct").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<String> list = Arrays.asList("hadoop", "hadoop", "hadoop", "spark", "spark","hbase");
        JavaRDD<String> listRDD = sc.parallelize(list);
        JavaRDD<String> distinct = listRDD.distinct();
        distinct.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });
    }

    //求交集
    public static void intersection(){
        SparkConf conf = new SparkConf().setAppName("intersection").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        final List<String> list1 = Arrays.asList("hadoop", "spark", "hbase","hive");

        final List<String> list2 = Arrays.asList("hbase", "hive", "zookeeper");

        JavaRDD<String> list1RDD = sc.parallelize(list1);
        JavaRDD<String> list2RDD = sc.parallelize(list2);

        JavaRDD<String> intersection = list1RDD.intersection(list2RDD);
        intersection.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });
    }
    //求并集
    public static void union(){
        SparkConf conf = new SparkConf().setAppName("union").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<String> list1 = Arrays.asList("hadoop", "spark", "hbase","hive");

        List<String> list2 = Arrays.asList("hive", "sqoop", "akka");

        JavaRDD<String> list1RDD = sc.parallelize(list1);
        JavaRDD<String> list2RDD = sc.parallelize(list2);

        JavaRDD<String> union = list1RDD.union(list2RDD);
        union.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });


    }

    public  static void join(){
        SparkConf conf = new SparkConf().setAppName("join").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Tuple2<Integer, String>> list1 = Arrays.asList(
                new Tuple2<Integer, String>(11, "mayun"),
                new Tuple2<Integer, String>(22, "mahuateng"),
                new Tuple2<Integer, String>(33, "zll"));

        List<Tuple2<Integer, String>> list2 = Arrays.asList(
                new Tuple2<Integer, String>(11, "alibaba"),
                new Tuple2<Integer, String>(22, "tenxun"),
                new Tuple2<Integer, String>(33, "zsjituan")
        );
        JavaPairRDD<Integer, String> list1RDD = sc.parallelizePairs(list1);
        JavaPairRDD<Integer, String> list2RDD = sc.parallelizePairs(list2);

        JavaPairRDD<Integer, Tuple2<String, String>> joinRDD = list1RDD.join(list2RDD);
        joinRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, String>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<String, String>> t) throws Exception {
                System.out.println(t._1+"<==>"+t._2._1+"<==>"+t._2._2);
            }
        });


    }

    public static void sortByKey(){
        final SparkConf conf = new SparkConf().setAppName("sortBykey").setMaster("local");

        final JavaSparkContext sc = new JavaSparkContext(conf);

        final List<Tuple2<Integer, String>> list = Arrays.asList(
                new Tuple2<Integer, String>(77, "hadoop"),
                new Tuple2<Integer, String>(44, "spark"),
                new Tuple2<Integer, String>(55, "hive"),
                new Tuple2<Integer, String>(66, "hbase")
        );

        JavaRDD<Tuple2<Integer, String>> listRDD = sc.parallelize(list);
        listRDD.mapToPair(new PairFunction<Tuple2<Integer,String>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<Integer, String> t) throws Exception {
                return new Tuple2<Integer,String>(t._1,t._2);
            }
        }).sortByKey().foreach(new VoidFunction<Tuple2<Integer, String>>() {
            @Override
            public void call(Tuple2<Integer, String> t) throws Exception {
                System.out.println(t._2+"<--->"+t._1);
            }
        });
        sc.stop();
    }

    public static void reduceByKey(){
        final SparkConf conf = new SparkConf().setAppName("reduceByKey").setMaster("local");
        final JavaSparkContext sc = new JavaSparkContext(conf);

        final List<Tuple2<String, Integer>> list = Arrays.asList(
                new Tuple2<String, Integer>("hadoop", 90),
                new Tuple2<String, Integer>("spark", 80),
                new Tuple2<String, Integer>("hbase", 85),
                new Tuple2<String, Integer>("hive", 82)
        );

        final JavaRDD<Tuple2<String, Integer>> listRDD = sc.parallelize(list);
        listRDD.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
                return new Tuple2<String,Integer>(t._1,t._2);
            }
        }).reduceByKey(new org.apache.spark.api.java.function.Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1+i2;
            }
        }).foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> t) throws Exception {
                System.out.println(t._1+"==>"+t._2);
            }
        });
        sc.stop();
    }

    public static void groupByKey(){
        SparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Tuple2<String, String>> list = Arrays.asList(
                new Tuple2<String, String>("hadoop", "had"),
                new Tuple2<String, String>("spark", "spa"),
                new Tuple2<String, String>("hbase", "hba"),
                new Tuple2<String, String>("hive", "hiv")
        );
        JavaRDD<Tuple2<String, String>> listRDD = sc.parallelize(list);
        listRDD.groupBy(new org.apache.spark.api.java.function.Function<Tuple2<String,String>, String>() {
            @Override
            public String call(Tuple2<String, String> t) throws Exception {
                return t._1;
            }
        }).foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<String, String>>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Tuple2<String, String>>> t) throws Exception {
                String menpai=t._1;
                Iterator<Tuple2<String, String>> iterator = t._2.iterator();
                System.out.println(menpai);
                while(iterator.hasNext()){
                    Tuple2<String, String> ren = iterator.next();
                    //System.out.println(ren.toString());
                    System.out.println( ren._2);
                }

            }
        });
        sc.stop();
    }
    public static void flatMap(){
        SparkConf conf = new SparkConf().setAppName("flatMap").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<String> list = Arrays.asList("you,jump", "i,jump");
        JavaRDD<String> listRDD = sc.parallelize(list);
        listRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(",")).iterator();
            }
        }).foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });
        sc.stop();
    }
    //过滤奇数偶数
    public static void filter(){
        final SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
        final JavaSparkContext sc = new JavaSparkContext(conf);
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> listRDD = sc.parallelize(list);
        listRDD.filter(new org.apache.spark.api.java.function.Function<Integer, Boolean>() {
            @Override
            public Boolean call(Integer i) throws Exception {
                return i % 2==0;
            }
        }).foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer t) throws Exception {
                System.out.println(t+"\t");
            }
        });
        sc.stop();
    }
    public static void map() {
        final SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
        final JavaSparkContext sc = new JavaSparkContext(conf);
        List<String> list = new ArrayList<String>();
        list.add("hadoop");
        list.add("spark");
        list.add("hbase");
        JavaRDD<String> listRDD = sc.parallelize(list);
        listRDD.map(new org.apache.spark.api.java.function.Function<String, Tuple2<String,Integer>>() {

            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String,Integer>(s,1993);
            }
        }).foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> t) throws Exception {
                System.out.println(t._1+"\t"+t._2);
            }
        });
        sc.stop();

    }
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scala   WordCount 下一篇spark 1.6.x 编译安装

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目