设为首页 加入收藏

TOP

spark与MogoDB不得不说的故事
2018-12-24 13:04:16 】 浏览:124
Tags:spark MogoDB 不得 不说 故事
版权声明:码字太累!一般是复制粘贴。如有侵权行为,请立即联系我 :wang_shubing@126.com https://blog.csdn.net/TXBSW/article/details/83377005

一.背景

spark2.x

Scala 2.11.x

截取pom.xml

<dependencies>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
</dependencies>

二,sparkRDD操纵mongoDB

1,导入Mongodb Connector依赖

为了SparkContextRDD能使用Mongodb Connector特殊的函数和隐式转换,需要引入相关依赖。

importcom.mongodb.spark._

写入数据到mongodb

将RDD数据写入到mongodb的时候,数据必须转化为BSON document。可以写个简单的map函数来实现将数据转化为Document或者BSONDocument或者DBObject

一些scala的类型是不被支持的,应该转化为相等的java类型。为了转化Scala类型到原生的类型,需要导入下面的包,然后用.asJava方法:

import scala.collection.JavaConverters._
import org.bson.Document

val documents = sc.parallelize(
Seq(new Document("fruits", List("apples", "oranges", "pears").asJava)))MongoSpark.save(documents)

A),MongoSpark.save()

可以使用MongoSpark.save()方法将RDD数据写入到Mongodb,如下:

import org.bson.Document
val documents = sc.parallelize((1 to 10).map(i => Document.parse(s"{test: $i}")))
MongoSpark.save(documents) // Uses the SparkConf for configuration

也可以指定WriteConfig

import com.mongodb.spark.config._

val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(sc)))
val sparkDocuments = sc.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}")))

MongoSpark.save(sparkDocuments, writeConfig)

B),RDD Save Helper Methods

RDD还有一个隐式的辅助函数,saveToMongoDB(),可以用来写数据到Mongodb,如下:

也可以指定WriteConfig

documents.saveToMongoDB(WriteConfig(Map("uri" ->"mongodb://example.com/database.collection")))
documents.saveToMongoDB() // Uses the SparkConf for configuration

4,从mongodb读取数据分析

A),MongoSpark.load()

该方法主要是从mongodb里面捞取数据做RDD,。

val rdd = MongoSpark.load(sc)

println(rdd.count)

println(rdd.first.toJson)

也可以指定ReadConfig

import com.mongodb.spark.config._

val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))

val customRdd = MongoSpark.load(sc, readConfig)

println(customRdd.count)

println(customRdd.first.toJson)

B),SparkContext Load Helper Methods

SparkContext有一个隐式的辅助方法loadFromMongoDB,用来从Mongodb捞取数据。

sc.loadFromMongoDB() // Uses the SparkConf for configuration

也可以为其,指定配置ReadConfig

sc.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://example.com/database.collection"))) // Uses the ReadConfig

5Aggregation

在某些情况下,使用aggregation pipeline可能比直接使用RDDfilter性能更好。Filter过滤数据看似是一个简单的RDD操作,实际上性能很低。比如,通常我们会如下使用:

val rdd = MongoSpark.load(sc)

val filteredRdd = rdd.filter(doc => doc.getInteger("test") > 5)

println(filteredRdd.count)

println(filteredRdd.first.toJson)

MongodbRDD可以传入一个aggregation pipeline ,允许在mongodb中过滤数据,然后仅仅传入需要的数据给Spark

val rdd = MongoSpark.load(sc)

val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("{ $match: { test : { $gt : 5 } } }")))

println(aggregatedRdd.count)

println(aggregatedRdd.first.toJson)

使用aggregation pipeline也提供了处理空值结果的好处,而过滤方法则没有。比如上面的例子中,假如filter没有任何数据,将会抛出异常如下:

ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 8) java.lang.NullPointerException.

6MongoSpark.builder()

如果需要对配置进行精细控制,那么MongoSpark配套提供了一个builder()方法,用于配置Mongo Spark Connector的所有方面。也提供了创建RDDDataFrameDatasetAPI

三,SparkSql操纵mongodb

1,引入依赖

RDD操纵mongodb不同的是,以SparkSql的形式操纵mongodb还需要引入SqlContext相关的特定的方法和隐式转换。

import com.mongodb.spark._

importcom.mongodb.spark.sql._

i

2,创建SparkSession

val sparkSession = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.characters")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.characters")
.getOrCreate()

代码示例:


object SparkSQL {

def main(args: Array[String]): Unit = {

import org.apache.spark.sql.SparkSession

/* For Self-Contained Scala Apps: Create the SparkSession * CREATED AUTOMATICALLY IN spark-shell */
val sparkSession = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.characters")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.characters")
.getOrCreate()



import com.mongodb.spark._
import com.mongodb.spark.config._
import org.bson.Document

val docs = """ {"name": "Bilbo Baggins", "age": 50} {"name": "Gandalf", "age": 1000} {"name": "Thorin", "age": 195} {"name": "Balin", "age": 178} {"name": "Kíli", "age": 77} {"name": "Dwalin", "age": 169} {"name": "in", "age": 167} {"name": "Glóin", "age": 158} {"name": "Fíli", "age": 82} {"name": "Bombur"}""".trim.stripMargin.split("[\\r\\n]+").toSeq

sparkSession.sparkContext.parallelize(docs.map(Document.parse)).saveToMongoDB()

// Additional operations go here...

}}

3DataFramesDatasets

在spark2.0,一个DataFrame由a表示DatasetRows,现在的别名Dataset[Row]

Mongo Spark Connector提供了com.mongodb.spark.sql.DefaultSource类,可以通过它来从mongodb里创建DataFrameDatasets。但是,为了方便创建一个DataFrame,该连接器提供了MongoSpark助手load(sqlContext)MongoSpark.load(sqlContext)是一个通过DataFrameReader读取数据的简单封装。

4,显式指定schema

默认情况下,从SQLContext中的MongoDB读取通过从数据库中抽样文档来推测schema信息的。显示的声明schema信息,如下操作

val df = MongoSpark.load(sparkSession) // Uses the SparkSessiondf.printSchema()

df.printSchema() // Prints DataFrame schema

case class Character(name: String, age: Int)

val explicitDF = MongoSpark.load[Character](sparkSession)

explicitDF.printSchema()

可以使用case classDataFrame转化为Dataset

val dataset = explicitDF.as[Character]

RDD也可以转化为DataFrameDataset

// Passing the SparkContext to load returns a RDD, not DF or DS

val rdd = MongoSpark.load(sc)

val dfInferredSchema = rdd.toDF()

val dfExplicitSchema = rdd.toDF[Character]()

val ds = rdd.toDS[Character]()

5,更多创建DataFrame的方法

使用 SparkSession的方法创建DataFrame

val df2 = sparkSession.loadFromMongoDB() // SparkSession used for configuration

val df3 = sparkSession.loadFromMongoDB(ReadConfig(

Map("uri" -> "mongodb://example.com/database.collection")

)) // ReadConfig used for configuration


val df4 = sparkSession.read.mongo() // SparkSession used for configuration

sqlContext.read.format("com.mongodb.spark.sql").load()


// Set custom options

import com.mongodb.spark.config._

val customReadConfig = ReadConfig(Map("readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))

val df5 = sparkSession.read.mongo(customReadConfig)

val df6 = sparkSession.read.format("com.mongodb.spark.sql").options(customReadConfig.asOptions).load()

6.SQL查询

在对数据集运行SQL查询之前,必须为数据集注册临时视图。

val characters = MongoSpark.load[Character](sparkSession)characters.createOrReplaceTempView("characters")

val centenarians = sparkSession.sql("SELECT name, age FROM characters WHERE age >= 100")centenarians.show()

7,Filter

DataFrame或者SParkSql使用filter的时候,MongoConnector会构建一个aggregation pipeline将数据过滤下推到mongodb

df.filter(df("age") < 100).show()

8Save DataFrames to MongoDB

Mongodb Spark Connector还提供了将DataFrame持久化到mongodb的操作。

MongoSpark.save(centenarians.write.option("collection", "hundredClub").mode("overwrite"))

println("Reading from the 'hundredClub' collection:")

MongoSpark.load[Character](sparkSession, ReadConfig(Map("collection"-> "hundredClub"), Some(ReadConfig(sparkSession)))).show()

也可以使用下面的方法来保存数据。

centenarians.write.option("collection", "hundredClub").mode("overwrite").mongo()

centenarians.write.option("collection","hundredClub").mode("overwrite").format("com.mongodb.spark.sql").save()

四,Spark Streaming

命令行启动Netcat:

$ nc -lk 9999

终端导入

import com.mongodb.spark.sql._
import org.apache.spark.streaming._ 


创建一个新StreamingContext

val ssc = new StreamingContext(sc, Seconds(1))

使用此socketTextStream方法在端口9999上创建与Netcat的连接:

val lines = ssc.socketTextStream("localhost", 9999) 

确定每行中每个单词出现的次数:

val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

创建数据结构以保存结果:

case class WordCount(word: String, count: Int)

使用foreachRDD循环收集结果并写入Spark Connector配置中指定的MongoDB集合

wordCounts.foreachRDD({ rdd =>import spark.implicits._
val wordCounts = rdd.map({ case (word: String, count: Int)=> WordCount(word, count) }).toDF()
wordCounts.write.mode("append").mongo()})
ssc.start()

五,数据类型

Spark支持数量有限的数据类型,以确保所有BSON类型于Spark DataFrames / Datasets中的类型都可以相互转化。

Bson类型

spark类型

Document

StructType

Array

ArrayType

32-bitinteger

Integer

64-bitinteger

Long

Binarydata

Array[Byte]或者StructType{subType:Byte,data:Array[Byte]}

Boolean

Boolean

Date

java.sql.Timestamp

DBPointer

StructType{ref:String,oid:String}

Double

Double

java script

StructType{code:String}

java scriptwithscope

StructType{code:String,scope:String}

Maxkey

StructType{maxKey:Integer}

Minkey

StructType{minKey:Integer}

Null

null

ObjectId

StructType{oid:String}

RegularExpression

StructType{regex:String,options:String}

String

String

Symbol

StructType{symbol:String}

Timestamp

StructType{time:Integer,inc:Integer}

Undefined

StructType{undefined:Boolean}

为了更好的支持Dataset,已经创建好了下面的Scalacase class(com.mongodb.spark.sql.fieldTypes)JavaBean class (com.mongodb.spark.sql.fieldTypes.api.java)

Bson类型

Scala案例类

JavaBean的

Binarydata

Binary

Binary

DBPointer

DBPointer

DBPointer

java script

java script

java script

java scriptwithscope

java scriptWithScope

java scriptWithScope

Maxkey

MaxKey

MaxKey

Minkey

MinKey

MinKey

ObjectId

ObjectId

ObjectId

RegularExpression

RegularExpression

RegularExpression

Symbol

Symbol

Symbol

Timestamp

Timestamp

Timestamp

Undefined

Undefined

Undefined

五,配置

1,配置的方法

A),使用Spark配置

三种方式可以实现

a),sparkconf:使用SparkConf配置的时候,需要在配置项前面带上特定前缀。

b),--conf

c),spark-default.conf

B),使用ReadConfigWriteConfig

该方式的配置会覆盖掉所有的SparkConf配置

C),使用Options Map

SparkAPI有部分支持Map[String,String],比如DataFrameReaderDataFrameWriter。可以使用asOptions()方法,将自定义的ReadConfig或者WriteConfig转化为一个map

D),使用System Property

Mongodb Spark ConnectorMongoClient提供了cache机制,只能通过SystemProperty配置。

2,输入配置

如果通过SparkConf设置Connector,配置必须加的前缀是:spark.mongodb.input

属性名称

描述

uri

Required。格式:mongodb://host:port/

database

Required。要读的数据库名称

collection

Required。要读的collection

localThreshold

从多个mongodbserver中选取一个Server的阈值,默认15ms

readPreference.name

要使用的Read Preference。默认:Primary。

readPreference.tagSets

要用的ReadPreference TagSets。

readConcern.level

要使用的Read Concern等级。

sampleSize

制作schema时的采样数据的条数:1000.

partitioner

分区的策略。MongoDefaultPartitioner,下面讲。

3Partitioner配置

Mongodb作为spark数据源,分区数据的策略有很多种。目前,提供以下几种分区策略。在通过sparkconf配置的时候需要使用spark.mongodb.input.partitionerOptions.做前缀

A),MongoDefaultPartitioner

默认的分区策略。实际上是封装了MongoSamplePartitioner

B),MongoSamplePartitioner

要求mongodb版本是3.2+。用于所有部署的通用分区器。使用平均文档大小和集合的随机抽样来确定集合的合适分区。

属性名

描述

partitionKey

分割收集数据的字段。该字段应该被索引并且包含唯一的值。默认_id

partitionSizeMB

每个分区的大小(以MB为单位).默认64 MB

samplesPerPartition

每个分区要采集的样本文档的数量。默认10

C),MongoShardedPartitioner

针对分片集群的分区器。根据chunk数据集对collection进行分片。需要读取配置数据库。

属性名

描述

shardkey

分割collection数据的字段,该字段应该被索引并且包含唯一的值。默认_id

D),MongoSplitVectorPartitioner

独立或复制集的分区器。在standaloneprimary上使用splitVector命令来确定数据库的分区。需要运行splitVector命令的权限。

属性名

描述

partitionKey

默认:_id.分割collection数据的字段。该字段会被索引,必须包含唯一的值

partitionSizeMB

默认:64MB.每2个分区的大小,以MB为单位。

E),MongoPaginateByCountPartitioner

用于所有部署模式的缓慢的通用分区器。创建特定数量的分区。需要查询每个分区。

属性名

描述

partitionKey

默认:_id.分割collection数据的字段。该字段会被索引,值唯一

numberOfPartitions

分区数,默认64.

F),MongoPaginateBySizePartitioner

用于所有部署模式的缓慢的通用分区器。根据数据大小创建分区。需要查询每个分区。

属性名

描述

partitionKey

默认:_id.分割collection数据的字段。该字段会被索引,必须包含唯一的值

partitionSizeMB

默认:64MB.每2个分区的大小,以MB为单位。

4uri配置设置

通过SparkConf配置的话,需要加上spark.mongodb.input.前缀。

spark.mongodb.input.uri=mongodb://127.0.0.1/databaseName.collectionNamereadPreference=primaryPreferred

单独每个属性进行配置

spark.mongodb.input.uri=mongodb://127.0.0.1/

spark.mongodb.input.database=databaseName

spark.mongodb.input.collection=collectionName

spark.mongodb.input.readPreference.name=primaryPreferred

如果你既在uri里面指定了配置,也单独设置了配置,那么uri里的会覆盖单独的配置。

spark.mongodb.input.uri=mongodb://127.0.0.1/foobar

spark.mongodb.input.database=bar

真正的链接数据库就是foobar

5,输出配置

spark.mongodb.output.前缀。

属性名

描述

uri

Required。mongodb://host:port/

database

Required。

collection

Required。

localThreshold

The threshold (milliseconds) for choosing a server from multiple MongoDB servers.Default: 15 ms

writeConcern.w

The write concern w value.Default w: 1

writeConcern.journal

The write concern journal value.

writeConcern.wTimeoutMS

The write concern wTimeout value.

uri配置,前缀spark.mongodb.output.

spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection

也可以进行独立配置

spark.mongodb.output.uri=mongodb://127.0.0.1/

spark.mongodb.output.database=test

spark.mongodb.output.collection=myCollection

Uri和单独都进行了配置,以uri里配置为准。如,下面最终就是foobar

spark.mongodb.output.uri=mongodb://127.0.0.1/foobar

spark.mongodb.output.database=bar

6Cache Configuration

MongoConnector的每个MongoClient包含了一个cache,所以可以实现多线程共享MongoClient

由于cache的设置是在spark configuration配置生效之前,所以cache仅仅支持通过System Property设置。

属性名称

描述

spark.mongodb.keep_alive_ms

The length of time to keep a MongoClient available for sharing.Default: 5000

六.Spark RW MogoDB Code

向Mongo写入数据

import com.mongodb.spark.MongoSpark;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import static java.util.Arrays.asList;

public final class WriteToMongoDB {

public static void main(final String[] args) throws InterruptedException {

SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

// Create a RDD of 10 documents
JavaRDD<Document> documents = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map
(new Function<Integer, Document>() {
public Document call(final Integer i) throws Exception {
return Document.parse("{test: " + i + "}");

}

});


/*Start Example: Save data from RDD to MongoDB*****************/
MongoSpark.save(documents);
/*End Example**************************************************/

jsc.close();
}
}

用一个WriteConfig

MongoSpark.save()可以接受WriteConfig指定各种写配置设置对象,例如集合或写入问题。

例如:

package com.mongodb.spark_examples;

import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.config.WriteConfig;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import static java.util.Arrays.asList;
import java.util.HashMap;import java.util.Map;

public final class WriteToMongoDBWriteConfig {

public static void main(final String[] args) throws InterruptedException {

SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// Create a custom WriteConfig
Map<String, String> writeOverrides = new HashMap<String, String>();
writeOverrides.put("collection", "spark");
writeOverrides.put("writeConcern.w", "majority");
WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);
// Create a RDD of 10 documents
JavaRDD<Document> sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map
(new Function<Integer, Document>() {
public Document call(final Integer i) throws Exception {
return Document.parse("{spark: " + i + "}");
}
});

/*Start Example: Save data from RDD to MongoDB*****************/
MongoSpark.save(sparkDocuments, writeConfig);
/*End Example**************************************************/
jsc.close();

}
}

从Mongo读取数据

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;

public final class ReadFromMongoDB {

public static void main(final String[] args) throws InterruptedException {

SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();

// Create a JavaSparkContext using the SparkSession's SparkContext object
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
/*Start Example: Read data from MongoDB************************/
JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
/*End Example**************************************************/
// Analyze data from MongoDB
System.out.println(rdd.count());
System.out.println(rdd.first().toJson());
jsc.close();

}}

指定一个ReadConfig

MongoSpark.load()可以接受ReadConfig指定各种读取配置设置对象,例如集合或读取首选项

以下示例spark使用secondaryPreferred读取首选项集合中读取:

package com.mongodb.spark_examples;

import java.util.HashMap;import java.util.Map;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.config.ReadConfig;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;

public final class ReadFromMongoDBReadConfig {

public static void main(final String[] args) throws InterruptedException {

SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();

// Create a JavaSparkContext using the SparkSession's SparkContext object

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

/*Start Example: Read data from MongoDB************************/
// Create a custom ReadConfig
Map<String, String> readOverrides = new HashMap<String, String>();
readOverrides.put("collection", "spark");
readOverrides.put("readPreference.name", "secondaryPreferred");
ReadConfig readConfig = ReadConfig.create(jsc).withOptions(readOverrides);
// Load data using the custom ReadConfig
JavaMongoRDD<Document> customRdd = MongoSpark.load(jsc, readConfig);
/*End Example**************************************************/

// Analyze data from MongoDB
System.out.println(customRdd.count());
System.out.println(customRdd.first().toJson());

jsc.close();

}}

详情参考:https://docs.mongodb.com/spark-connector/current/

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark与深度学习框架——H2O、dee.. 下一篇Spark Configuration

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目