目录
Spark学习笔记3——RDD(下)
笔记摘抄自 [美] Holden Karau 等著的《Spark快速大数据分析》
向Spark传递函数
大部分 Spark 的转化操作和一部分行动操作,都需要传递函数后进行计算。如何传递函数下文将用 Java 展示。
Java 向 Spark 传递函数需要实现 Spark 的 org.apache.spark.api.java.function 包中的接口。一些基本的接口如下表:
函数名 | 实现的方法 | 用途 |
---|---|---|
Function<T, R> | R call(T) | 接收一个输入值并返回一个输出值,用于类似map() 和 filter() 等操作中 |
Function2<T1, T2, R> | R call(T1, T2) | 接收两个输入值并返回一个输出值,用于类似aggregate() 和fold() 等操作中 |
FlatMapFunction<T, R> | Iterable
|
接收一个输入值并返回任意个输出,用于类似flatMap() 这样的操作中 |
通过匿名内部类
见上篇笔记例程。
通过具名类传递
class ContainsError implements Function<String, Boolean>() {
public Boolean call(String x) { return x.contains("error"); }
}
...
RDD<String> errors = lines.filter(new ContainsError());
- 使用具名类在程序组织比较庞大是显得比较清晰
- 可以使用构造函数如“通过带参数的 Java 函数类传递”中所示
通过带参数的 Java 函数类传递
例程
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.List;
public class Contains implements Function<String, Boolean> {
private String query;
public Contains(String query) {
this.query = query;
}
public Boolean call(String x) {
return x.contains(query);
}
public static void main(String[] args) {
SparkConf sc = new SparkConf().setAppName("Contains");
JavaSparkContext javaSparkContext = new JavaSparkContext(sc);
JavaRDD<String> log = javaSparkContext.textFile(args[0]);
JavaRDD<String> content = log.filter(new Contains(args[1]));
List<String> contentList = content.collect();
for (String output : contentList) {
System.out.println(output);
}
javaSparkContext.stop();
}
}
测试文本 test.txt
this is a test
this is a simple test
this is a simple test about RDD
let us check it out
测试结果
[root@server1 spark-2.4.4-bin-hadoop2.7]# bin/spark-submit --class Contains ~/RDDFuncNamedClass.jar ~/test.txt RDD
...
19/09/16 15:06:50 INFO DAGScheduler: Job 0 finished: collect at Contains.java:24, took 0.445049 s
this is a simple test about RDD
...
通过 lambda 表达式传递(仅限于 Java 8 及以上)
例程
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.List;
public class LambdaTest {
public static void main(final String[] args) {
SparkConf sc = new SparkConf().setAppName("Contains");
JavaSparkContext javaSparkContext = new JavaSparkContext(sc);
JavaRDD<String> log = javaSparkContext.textFile(args[0]);
JavaRDD<String> content = log.filter(s -> s.contains(args[1]));
List<String> contentList = content.collect();
for (String output : contentList) {
System.out.println(output);
}
javaSparkContext.stop();
}
}
测试文本
使用上文同一个文本
运行结果
[root@server1 spark-2.4.4-bin-hadoop2.7]# bin/spark-submit --class Contains ~/RDDFuncNamedClass.jar ~/test.txt check
...
19/09/16 15:27:10 INFO DAGScheduler: Job 0 finished: collect at Contains.java:24, took 0.440515 s
let us check it out
...
常见的转化操作和行动操作
Spark 中有不同类型的 RDD,不同的 RDD 可以支持不同的操作。
除了基本的RDD外,还有数字类型的 RDD 支持统计型函数操作、键值对形式的 RDD 支持聚合数据的键值对操作等等。