设为首页 加入收藏

TOP

Spark对ES的读写
2019-02-23 13:23:23 】 浏览:244
Tags:Spark 读写

Apache Spark是一个快速且通用的集群计算系统。 它提供了Java,Scala和Python中的高级API以及支持通用执行图的优化引擎。

Spark通常通过将数据缓存到内存中,从而为大型数据集提供快速的迭代/功能类功能。 与本文档中提到的其他库相反,Apache Spark是一种计算框架,与Map / Reduce本身无关,但它与Hadoop集成,主要针对HDFS。 elasticsearch-hadoop允许Elasticsearch在Spark中以两种方式使用:通过2.1以来的专用支持或2.0以来的Map / Reduce桥接。 自5.0版起,elasticsearch-hadoop支持Spark 2.0

就像其他库一样,elasticsearch-hadoop需要在Spark的类路径中可用。 由于Spark具有多种部署模式,因此它可以转化为目标类路径,无论它是否只在一个节点上(如本地模式下的情况那样 - 将在整个文档中使用)或每个节点,具体取决于所需基础设施。

elasticsearch-hadoop以Elasticsearch和Apache Spark之间的本地集成形式提供RDD(弹性分布式数据集)(或精确配对RDD)的形式,可以从Elasticsearch读取数据。 RDD有两种风格:一种是Scala(它以Scala集合的Tuple2形式返回数据),另一种是Java(以Tuple2形式返回包含java.util集合的数据)。

要为Apache Spark配置elasticsearch-hadoop,可以设置SparkConf对象的“配置”一章中描述的各种属性:

import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName(appName).setMaster(master)
conf.set("es.index.auto.create", "true")
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
conf.set("es.index.auto.create", "true");

命令行。 对于那些想要通过命令行设置属性(直接或者通过从文件加载它们),请注意Spark只接受那些以“spark”开头的属性。 前缀,并会忽略其余部分(并根据可能引发的警告版本)。 要解决此限制,请通过追加spark来定义elasticsearch-hadoop属性。 前缀(因此它们变成spark.es),elasticsearch-hadoop会自动解决它们:

$ ./bin/spark-submit --conf spark.es.resource=index/type ...

将数据写入ES:
使用elasticsearch-hadoop,只要将其内容翻译成文档,任何RDD都可以保存到Elasticsearch中。 实际上,这意味着RDD类型需要是Map(无论是Scala还是Java),JavaBean或Scala案例类。 如果情况并非如此,可以轻松转换Spark中的数据或插入他们自己的定制ValueWriter。

import org.apache.spark.SparkContext    
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._        

...

val conf = ...
val sc = new SparkContext(conf)         

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")

将JSON写入ES:
对于RDD中的数据已经在JSON中的情况,elasticsearch-hadoop允许直接编制索引而不应用任何转换; 数据将按原样发送并直接发送到Elasticsearch。 因此,在这种情况下,假设每个条目都代表一个JSON文档,elasticsearch-hadoop期望包含字符串或字节数组的RDD(byte [] / Array [Byte])。 如果RDD没有正确的签名,则saveJsonToEs方法不能应用(在Scala中它们将不可用)。

val json1 = """{"reason" : "business", "airport" : "SFO"}"""      
val json2 = """{"participants" : 5, "airport" : "OTP"}"""

new SparkContext(conf).makeRDD(Seq(json1, json2))
                      .saveJsonToEs("spark/json-trips") 

将资源写入ES:

val game = Map("media_type"->"game","title" -> "FF VI","year" -> "1994")
val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")

sc.makeRDD(Seq(game, book, cd)).saveToEs("my-collection-{media_type}/doc") 

从ES读取数据:

import org.apache.spark.SparkContext    
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._        

...

val conf = ...
val sc = new SparkContext(conf)         

val RDD = sc.esRDD("radio/artists") 

这里写图片描述

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Structured Streaming Tips 下一篇Spark Sql 源码剖析(一):sql ..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目