版权声明:码字太累!一般是复制粘贴。如有侵权行为,请立即联系我 :wang_shubing@126.com https://blog.csdn.net/TXBSW/article/details/83377005
一.背景
spark2.x
Scala 2.11.x
截取pom.xml
<dependencies>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
</dependencies>
二,sparkRDD操纵mongoDB
1,导入Mongodb Connector依赖
为了SparkContext和RDD能使用Mongodb Connector特殊的函数和隐式转换,需要引入相关依赖。
importcom.mongodb.spark._
写入数据到mongodb
将RDD数据写入到mongodb的时候,数据必须转化为BSON document。可以写个简单的map函数来实现将数据转化为Document或者BSONDocument或者DBObject
一些scala的类型是不被支持的,应该转化为相等的java类型。为了转化Scala类型到原生的类型,需要导入下面的包,然后用.asJava方法:
import scala.collection.JavaConverters._
import org.bson.Document
val documents = sc.parallelize(
Seq(new Document("fruits", List("apples", "oranges", "pears").asJava)))MongoSpark.save(documents)
A),MongoSpark.save()
可以使用MongoSpark.save()方法将RDD数据写入到Mongodb,如下:
import org.bson.Document
val documents = sc.parallelize((1 to 10).map(i => Document.parse(s"{test: $i}")))
MongoSpark.save(documents) // Uses the SparkConf for configuration
也可以指定WriteConfig。
import com.mongodb.spark.config._
val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(sc)))
val sparkDocuments = sc.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}")))
MongoSpark.save(sparkDocuments, writeConfig)
B),RDD Save Helper Methods
RDD还有一个隐式的辅助函数,saveToMongoDB(),可以用来写数据到Mongodb,如下:
也可以指定WriteConfig。
documents.saveToMongoDB(WriteConfig(Map("uri" ->"mongodb://example.com/database.collection")))
documents.saveToMongoDB() // Uses the SparkConf for configuration
4,从mongodb读取数据分析
A),MongoSpark.load()
该方法主要是从mongodb里面捞取数据做RDD,。
val rdd = MongoSpark.load(sc)
println(rdd.count)
println(rdd.first.toJson)
也可以指定ReadConfig
import com.mongodb.spark.config._
val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc, readConfig)
println(customRdd.count)
println(customRdd.first.toJson)
B),SparkContext Load Helper Methods
SparkContext有一个隐式的辅助方法loadFromMongoDB,用来从Mongodb捞取数据。
sc.loadFromMongoDB() // Uses the SparkConf for configuration
也可以为其,指定配置ReadConfig
sc.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://example.com/database.collection"))) // Uses the ReadConfig
5,Aggregation
在某些情况下,使用aggregation pipeline可能比直接使用RDD的filter性能更好。Filter过滤数据看似是一个简单的RDD操作,实际上性能很低。比如,通常我们会如下使用:
val rdd = MongoSpark.load(sc)
val filteredRdd = rdd.filter(doc => doc.getInteger("test") > 5)
println(filteredRdd.count)
println(filteredRdd.first.toJson)
MongodbRDD可以传入一个aggregation pipeline ,允许在mongodb中过滤数据,然后仅仅传入需要的数据给Spark。
val rdd = MongoSpark.load(sc)
val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("{ $match: { test : { $gt : 5 } } }")))
println(aggregatedRdd.count)
println(aggregatedRdd.first.toJson)
使用aggregation pipeline也提供了处理空值结果的好处,而过滤方法则没有。比如上面的例子中,假如filter没有任何数据,将会抛出异常如下:
ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 8) java.lang.NullPointerException.
6,MongoSpark.builder()
如果需要对配置进行精细控制,那么MongoSpark配套提供了一个builder()方法,用于配置Mongo Spark Connector的所有方面。也提供了创建RDD,DataFrame,Dataset的API。
三,SparkSql操纵mongodb
1,引入依赖
与RDD操纵mongodb不同的是,以SparkSql的形式操纵mongodb还需要引入SqlContext相关的特定的方法和隐式转换。
import com.mongodb.spark._
importcom.mongodb.spark.sql._
i
2,创建SparkSession
val sparkSession = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.characters")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.characters")
.getOrCreate()
代码示例:
object SparkSQL {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
/* For Self-Contained Scala Apps: Create the SparkSession * CREATED AUTOMATICALLY IN spark-shell */
val sparkSession = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.characters")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.characters")
.getOrCreate()
import com.mongodb.spark._
import com.mongodb.spark.config._
import org.bson.Document
val docs = """ {"name": "Bilbo Baggins", "age": 50} {"name": "Gandalf", "age": 1000} {"name": "Thorin", "age": 195} {"name": "Balin", "age": 178} {"name": "Kíli", "age": 77} {"name": "Dwalin", "age": 169} {"name": "in", "age": 167} {"name": "Glóin", "age": 158} {"name": "Fíli", "age": 82} {"name": "Bombur"}""".trim.stripMargin.split("[\\r\\n]+").toSeq
sparkSession.sparkContext.parallelize(docs.map(Document.parse)).saveToMongoDB()
// Additional operations go here...
}}
3,DataFrames和Datasets
在spark2.0,一个DataFrame由a表示Dataset的Rows,现在的别名Dataset[Row]。
Mongo Spark Connector提供了com.mongodb.spark.sql.DefaultSource类,可以通过它来从mongodb里创建DataFrame和Datasets。但是,为了方便创建一个DataFrame,该连接器提供了MongoSpark助手load(sqlContext)。MongoSpark.load(sqlContext)是一个通过DataFrameReader读取数据的简单封装。
4,显式指定schema
默认情况下,从SQLContext中的MongoDB读取通过从数据库中抽样文档来推测schema信息的。显示的声明schema信息,如下操作
val df = MongoSpark.load(sparkSession) // Uses the SparkSessiondf.printSchema()
df.printSchema() // Prints DataFrame schema
case class Character(name: String, age: Int)
val explicitDF = MongoSpark.load[Character](sparkSession)
explicitDF.printSchema()
可以使用case class将DataFrame转化为Dataset
val dataset = explicitDF.as[Character]
RDD也可以转化为DataFrame和Dataset
// Passing the SparkContext to load returns a RDD, not DF or DS
val rdd = MongoSpark.load(sc)
val dfInferredSchema = rdd.toDF()
val dfExplicitSchema = rdd.toDF[Character]()
val ds = rdd.toDS[Character]()
5,更多创建DataFrame的方法
使用 SparkSession的方法创建DataFrame
val df2 = sparkSession.loadFromMongoDB() // SparkSession used for configuration
val df3 = sparkSession.loadFromMongoDB(ReadConfig(
Map("uri" -> "mongodb://example.com/database.collection")
)) // ReadConfig used for configuration
val df4 = sparkSession.read.mongo() // SparkSession used for configuration
sqlContext.read.format("com.mongodb.spark.sql").load()
// Set custom options
import com.mongodb.spark.config._
val customReadConfig = ReadConfig(Map("readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val df5 = sparkSession.read.mongo(customReadConfig)
val df6 = sparkSession.read.format("com.mongodb.spark.sql").options(customReadConfig.asOptions).load()
6.SQL查询
在对数据集运行SQL查询之前,必须为数据集注册临时视图。
val characters = MongoSpark.load[Character](sparkSession)characters.createOrReplaceTempView("characters")
val centenarians = sparkSession.sql("SELECT name, age FROM characters WHERE age >= 100")centenarians.show()
7,Filter
当DataFrame或者SParkSql使用filter的时候,MongoConnector会构建一个aggregation pipeline将数据过滤下推到mongodb。
df.filter(df("age") < 100).show()
8,Save DataFrames to MongoDB
Mongodb Spark Connector还提供了将DataFrame持久化到mongodb的操作。
MongoSpark.save(centenarians.write.option("collection", "hundredClub").mode("overwrite"))
println("Reading from the 'hundredClub' collection:")
MongoSpark.load[Character](sparkSession, ReadConfig(Map("collection"-> "hundredClub"), Some(ReadConfig(sparkSession)))).show()
也可以使用下面的方法来保存数据。
centenarians.write.option("collection", "hundredClub").mode("overwrite").mongo()
centenarians.write.option("collection","hundredClub").mode("overwrite").format("com.mongodb.spark.sql").save()
四,Spark Streaming
命令行启动Netcat:
$ nc -lk 9999
终端导入
import com.mongodb.spark.sql._
import org.apache.spark.streaming._
创建一个新StreamingContext
val ssc = new StreamingContext(sc, Seconds(1))
使用此socketTextStream方法在端口9999上创建与Netcat的连接:
val lines = ssc.socketTextStream("localhost", 9999)
确定每行中每个单词出现的次数:
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
创建数据结构以保存结果:
case class WordCount(word: String, count: Int)
使用foreachRDD循环收集结果并写入Spark Connector配置中指定的MongoDB集合
wordCounts.foreachRDD({ rdd =>import spark.implicits._
val wordCounts = rdd.map({ case (word: String, count: Int)=> WordCount(word, count) }).toDF()
wordCounts.write.mode("append").mongo()})
ssc.start()
五,数据类型
Spark支持数量有限的数据类型,以确保所有BSON类型于Spark DataFrames / Datasets中的类型都可以相互转化。
Bson类型
|
spark类型
|
Document
|
StructType
|
Array
|
ArrayType
|
32-bitinteger
|
Integer
|
64-bitinteger
|
Long
|
Binarydata
|
Array[Byte]或者StructType:{subType:Byte,data:Array[Byte]}
|
Boolean
|
Boolean
|
Date
|
java.sql.Timestamp
|
DBPointer
|
StructType:{ref:String,oid:String}
|
Double
|
Double
|
java script
|
StructType:{code:String}
|
java scriptwithscope
|
StructType:{code:String,scope:String}
|
Maxkey
|
StructType:{maxKey:Integer}
|
Minkey
|
StructType:{minKey:Integer}
|
Null
|
null
|
ObjectId
|
StructType:{oid:String}
|
RegularExpression
|
StructType:{regex:String,options:String}
|
String
|
String
|
Symbol
|
StructType:{symbol:String}
|
Timestamp
|
StructType:{time:Integer,inc:Integer}
|
Undefined
|
StructType:{undefined:Boolean}
|
为了更好的支持Dataset,已经创建好了下面的Scala的case class,(com.mongodb.spark.sql.fieldTypes)和JavaBean class (com.mongodb.spark.sql.fieldTypes.api.java)
Bson类型
|
Scala案例类
|
JavaBean的
|
Binarydata
|
Binary
|
Binary
|
DBPointer
|
DBPointer
|
DBPointer
|
java script
|
java script
|
java script
|
java scriptwithscope
|
java scriptWithScope
|
java scriptWithScope
|
Maxkey
|
MaxKey
|
MaxKey
|
Minkey
|
MinKey
|
MinKey
|
ObjectId
|
ObjectId
|
ObjectId
|
RegularExpression
|
RegularExpression
|
RegularExpression
|
Symbol
|
Symbol
|
Symbol
|
Timestamp
|
Timestamp
|
Timestamp
|
Undefined
|
Undefined
|
Undefined
|
五,配置
1,配置的方法
A),使用Spark配置
三种方式可以实现
a),sparkconf:使用SparkConf配置的时候,需要在配置项前面带上特定前缀。
b),--conf
c),spark-default.conf
B),使用ReadConfig和WriteConfig
该方式的配置会覆盖掉所有的SparkConf配置
C),使用Options Map
Spark的API有部分支持Map[String,String],比如DataFrameReader和DataFrameWriter。可以使用asOptions()方法,将自定义的ReadConfig或者WriteConfig转化为一个map。
D),使用System Property
Mongodb Spark Connector为MongoClient提供了cache机制,只能通过SystemProperty配置。
2,输入配置
如果通过SparkConf设置Connector,配置必须加的前缀是:spark.mongodb.input
属性名称
|
描述
|
uri
|
Required。格式:mongodb://host:port/
|
database
|
Required。要读的数据库名称
|
collection
|
Required。要读的collection
|
localThreshold
|
从多个mongodbserver中选取一个Server的阈值,默认15ms
|
readPreference.name
|
要使用的Read Preference。默认:Primary。
|
readPreference.tagSets
|
要用的ReadPreference TagSets。
|
readConcern.level
|
要使用的Read Concern等级。
|
sampleSize
|
制作schema时的采样数据的条数:1000.
|
partitioner
|
分区的策略。MongoDefaultPartitioner,下面讲。
|
3,Partitioner配置
Mongodb作为spark数据源,分区数据的策略有很多种。目前,提供以下几种分区策略。在通过sparkconf配置的时候需要使用spark.mongodb.input.partitionerOptions.做前缀
A),MongoDefaultPartitioner
默认的分区策略。实际上是封装了MongoSamplePartitioner。
B),MongoSamplePartitioner
要求mongodb版本是3.2+。用于所有部署的通用分区器。使用平均文档大小和集合的随机抽样来确定集合的合适分区。
属性名
|
描述
|
partitionKey
|
分割收集数据的字段。该字段应该被索引并且包含唯一的值。默认_id
|
partitionSizeMB
|
每个分区的大小(以MB为单位).默认64 MB
|
samplesPerPartition
|
每个分区要采集的样本文档的数量。默认10
|
C),MongoShardedPartitioner
针对分片集群的分区器。根据chunk数据集对collection进行分片。需要读取配置数据库。
属性名
|
描述
|
shardkey
|
分割collection数据的字段,该字段应该被索引并且包含唯一的值。默认_id
|
D),MongoSplitVectorPartitioner
独立或复制集的分区器。在standalone或primary上使用splitVector命令来确定数据库的分区。需要运行splitVector命令的权限。
属性名
|
描述
|
partitionKey
|
默认:_id.分割collection数据的字段。该字段会被索引,必须包含唯一的值
|
partitionSizeMB
|
默认:64MB.每2个分区的大小,以MB为单位。
|
E),MongoPaginateByCountPartitioner
用于所有部署模式的缓慢的通用分区器。创建特定数量的分区。需要查询每个分区。
属性名
|
描述
|
partitionKey
|
默认:_id.分割collection数据的字段。该字段会被索引,值唯一
|
numberOfPartitions
|
分区数,默认64.
|
F),MongoPaginateBySizePartitioner
用于所有部署模式的缓慢的通用分区器。根据数据大小创建分区。需要查询每个分区。
属性名
|
描述
|
partitionKey
|
默认:_id.分割collection数据的字段。该字段会被索引,必须包含唯一的值
|
partitionSizeMB
|
默认:64MB.每2个分区的大小,以MB为单位。
|
4,uri配置设置
通过SparkConf配置的话,需要加上spark.mongodb.input.前缀。
spark.mongodb.input.uri=mongodb://127.0.0.1/databaseName.collectionNamereadPreference=primaryPreferred
单独每个属性进行配置
spark.mongodb.input.uri=mongodb://127.0.0.1/
spark.mongodb.input.database=databaseName
spark.mongodb.input.collection=collectionName
spark.mongodb.input.readPreference.name=primaryPreferred
如果你既在uri里面指定了配置,也单独设置了配置,那么uri里的会覆盖单独的配置。
spark.mongodb.input.uri=mongodb://127.0.0.1/foobar
spark.mongodb.input.database=bar
真正的链接数据库就是foobar。
5,输出配置
spark.mongodb.output.前缀。
属性名
|
描述
|
uri
|
Required。mongodb://host:port/
|
database
|
Required。
|
collection
|
Required。
|
localThreshold
|
The threshold (milliseconds) for choosing a server from multiple MongoDB servers.Default: 15 ms
|
writeConcern.w
|
The write concern w value.Default w: 1
|
writeConcern.journal
|
The write concern journal value.
|
writeConcern.wTimeoutMS
|
The write concern wTimeout value.
|
uri配置,前缀spark.mongodb.output.
spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection
也可以进行独立配置
spark.mongodb.output.uri=mongodb://127.0.0.1/
spark.mongodb.output.database=test
spark.mongodb.output.collection=myCollection
Uri和单独都进行了配置,以uri里配置为准。如,下面最终就是foobar。
spark.mongodb.output.uri=mongodb://127.0.0.1/foobar
spark.mongodb.output.database=bar
6,Cache Configuration
MongoConnector的每个MongoClient包含了一个cache,所以可以实现多线程共享MongoClient。
由于cache的设置是在spark configuration配置生效之前,所以cache仅仅支持通过System Property设置。
属性名称
|
描述
|
spark.mongodb.keep_alive_ms
|
The length of time to keep a MongoClient available for sharing.Default: 5000
|
六.Spark RW MogoDB Code
向Mongo写入数据
import com.mongodb.spark.MongoSpark;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import static java.util.Arrays.asList;
public final class WriteToMongoDB {
public static void main(final String[] args) throws InterruptedException {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// Create a RDD of 10 documents
JavaRDD<Document> documents = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map
(new Function<Integer, Document>() {
public Document call(final Integer i) throws Exception {
return Document.parse("{test: " + i + "}");
}
});
/*Start Example: Save data from RDD to MongoDB*****************/
MongoSpark.save(documents);
/*End Example**************************************************/
jsc.close();
}
}
用一个WriteConfig
MongoSpark.save()可以接受WriteConfig指定各种写配置设置的对象,例如集合或写入问题。
例如:
package com.mongodb.spark_examples;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.config.WriteConfig;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import static java.util.Arrays.asList;
import java.util.HashMap;import java.util.Map;
public final class WriteToMongoDBWriteConfig {
public static void main(final String[] args) throws InterruptedException {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// Create a custom WriteConfig
Map<String, String> writeOverrides = new HashMap<String, String>();
writeOverrides.put("collection", "spark");
writeOverrides.put("writeConcern.w", "majority");
WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);
// Create a RDD of 10 documents
JavaRDD<Document> sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map
(new Function<Integer, Document>() {
public Document call(final Integer i) throws Exception {
return Document.parse("{spark: " + i + "}");
}
});
/*Start Example: Save data from RDD to MongoDB*****************/
MongoSpark.save(sparkDocuments, writeConfig);
/*End Example**************************************************/
jsc.close();
}
}
从Mongo读取数据
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
public final class ReadFromMongoDB {
public static void main(final String[] args) throws InterruptedException {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
// Create a JavaSparkContext using the SparkSession's SparkContext object
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
/*Start Example: Read data from MongoDB************************/
JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
/*End Example**************************************************/
// Analyze data from MongoDB
System.out.println(rdd.count());
System.out.println(rdd.first().toJson());
jsc.close();
}}
指定一个ReadConfig
MongoSpark.load()可以接受ReadConfig指定各种读取配置设置的对象,例如集合或读取首选项。
以下示例spark使用secondaryPreferred读取首选项从集合中读取:
package com.mongodb.spark_examples;
import java.util.HashMap;import java.util.Map;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.config.ReadConfig;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
public final class ReadFromMongoDBReadConfig {
public static void main(final String[] args) throws InterruptedException {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
// Create a JavaSparkContext using the SparkSession's SparkContext object
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
/*Start Example: Read data from MongoDB************************/
// Create a custom ReadConfig
Map<String, String> readOverrides = new HashMap<String, String>();
readOverrides.put("collection", "spark");
readOverrides.put("readPreference.name", "secondaryPreferred");
ReadConfig readConfig = ReadConfig.create(jsc).withOptions(readOverrides);
// Load data using the custom ReadConfig
JavaMongoRDD<Document> customRdd = MongoSpark.load(jsc, readConfig);
/*End Example**************************************************/
// Analyze data from MongoDB
System.out.println(customRdd.count());
System.out.println(customRdd.first().toJson());
jsc.close();
}}
详情参考:https://docs.mongodb.com/spark-connector/current/