设为首页 加入收藏

TOP

Spark学习笔记3——RDD(下)(一)
2019-09-19 11:12:04 】 浏览:151
Tags:Spark 学习 笔记 RDD

Spark学习笔记3——RDD(下)

笔记摘抄自 [美] Holden Karau 等著的《Spark快速大数据分析》

向Spark传递函数

大部分 Spark 的转化操作和一部分行动操作,都需要传递函数后进行计算。如何传递函数下文将用 Java 展示。

Java 向 Spark 传递函数需要实现 Spark 的 org.apache.spark.api.java.function 包中的接口。一些基本的接口如下表:

函数名 实现的方法 用途
Function<T, R> R call(T) 接收一个输入值并返回一个输出值,用于类似map() 和
filter() 等操作中
Function2<T1, T2, R> R call(T1, T2) 接收两个输入值并返回一个输出值,用于类似aggregate()
和fold() 等操作中
FlatMapFunction<T, R> Iterable call(T) 接收一个输入值并返回任意个输出,用于类似flatMap()
这样的操作中

通过匿名内部类

见上篇笔记例程。

通过具名类传递

class ContainsError implements Function<String, Boolean>() {
public Boolean call(String x) { return x.contains("error"); }
}
...
RDD<String> errors = lines.filter(new ContainsError());
  • 使用具名类在程序组织比较庞大是显得比较清晰
  • 可以使用构造函数如“通过带参数的 Java 函数类传递”中所示

通过带参数的 Java 函数类传递

例程

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.List;

public class Contains implements Function<String, Boolean> {
    private String query;

    public Contains(String query) {
        this.query = query;
    }

    public Boolean call(String x) {
        return x.contains(query);
    }

    public static void main(String[] args) {
        SparkConf sc = new SparkConf().setAppName("Contains");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sc);
        JavaRDD<String> log = javaSparkContext.textFile(args[0]);
        
        JavaRDD<String> content = log.filter(new Contains(args[1]));
        
        List<String> contentList = content.collect();
        for (String output : contentList) {
            System.out.println(output);
        }
        javaSparkContext.stop();
    }
}

测试文本 test.txt

this is a test
this is a simple test
this is a simple test about RDD
let us check it out

测试结果

[root@server1 spark-2.4.4-bin-hadoop2.7]# bin/spark-submit --class Contains ~/RDDFuncNamedClass.jar ~/test.txt RDD
...
19/09/16 15:06:50 INFO DAGScheduler: Job 0 finished: collect at Contains.java:24, took 0.445049 s
this is a simple test about RDD
...

通过 lambda 表达式传递(仅限于 Java 8 及以上)

例程

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.List;

public class LambdaTest {
    public static void main(final String[] args) {
        SparkConf sc = new SparkConf().setAppName("Contains");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sc);
        JavaRDD<String> log = javaSparkContext.textFile(args[0]);

        JavaRDD<String> content = log.filter(s -> s.contains(args[1]));

        List<String> contentList = content.collect();
        for (String output : contentList) {
            System.out.println(output);
        }
        javaSparkContext.stop();
    }
}

测试文本

使用上文同一个文本

运行结果

[root@server1 spark-2.4.4-bin-hadoop2.7]# bin/spark-submit --class Contains ~/RDDFuncNamedClass.jar ~/test.txt check
...
19/09/16 15:27:10 INFO DAGScheduler: Job 0 finished: collect at Contains.java:24, took 0.440515 s
let us check it out
...

常见的转化操作和行动操作

Spark 中有不同类型的 RDD,不同的 RDD 可以支持不同的操作。

除了基本的RDD外,还有数字类型的 RDD 支持统计型函数操作、键值对形式的 RDD 支持聚合数据的键值对操作等等。

基本RDD

针对各个

首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇〈一〉ElasticSearch的介绍 下一篇centos7放行1521端口

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目