设为首页 加入收藏

TOP

SparkStreaming和Flume整合
2019-04-29 02:08:44 】 浏览:59
Tags:SparkStreaming Flume 整合

maven依赖:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

Streaming+Flume整合有两种模式
1.Flume-style Push-based Approach

Flume源码:

avro-sink-agent.sources = netcat-source
avro-sink-agent.sinks = avro-sink
avro-sink-agent.channels = netcat-memory-channel

avro-sink-agent.sources.netcat-source.type = netcat
avro-sink-agent.sources.netcat-source.bind = localhost
avro-sink-agent.sources.netcat-source.port = 44444

avro-sink-agent.channels.netcat-memory-channel.type = memory

avro-sink-agent.sinks.avro-sink.type = avro
avro-sink-agent.sinks.avro-sink.hostname = localhost
avro-sink-agent.sinks.avro-sink.port = 41414

avro-sink-agent.sources.netcat-source.channels = netcat-memory-channel
avro-sink-agent.sinks.avro-sink.channel = netcat-memory-channel

Spark源码:

package com.ruoze.spark.SparkStreaming_flume
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume.FlumeUtils
object ssf {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ssf")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    val lines = FlumeUtils.createStream(ssc, "ruozehadoop000", 41414)
    // 业务操作区域,此处为WC
    // SparkFlumeEvent ==> String
    lines.map(x => new String(x.event.getBody.array()).trim)
      .flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
      .print()
    ssc.start()
    ssc.awaitTermination()
  }
}

先起Spark再起Flume
spark-submit --master local[2] \
--packages org.apache.spark:spark-streaming-flume_2.11:2.3.0 \
--class com.ruozedata.spark.streaming.day03.StreamingFlumeApp02 \
/home/hadoop/lib/g3-spark-1.0.jar

flume-ng agent \
--name avro-sink-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/flume_push_streaming.conf \
-Dflume.root.logger=INFO,console &

2.Pull-based Approach using a Custom Sink(先起Flume再起Spark)(相对较好)

Flume源码:

avro-sink-agent.sources = netcat-source
avro-sink-agent.sinks = spark-sink
avro-sink-agent.channels = netcat-memory-channel

avro-sink-agent.sources.netcat-source.type = netcat
avro-sink-agent.sources.netcat-source.bind = localhost
avro-sink-agent.sources.netcat-source.port = 44444

avro-sink-agent.channels.netcat-memory-channel.type = memory

avro-sink-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
avro-sink-agent.sinks.spark-sink.hostname = localhost
avro-sink-agent.sinks.spark-sink.port = 41414

avro-sink-agent.sources.netcat-source.channels = netcat-memory-channel
avro-sink-agent.sinks.spark-sink.channel = netcat-memory-channel

Spark源码与模式一对比换个API即可,其他不变
val lines = FlumeUtils.createPollingStream(ssc, "ruozehadoop000", 41414)

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇使用Flume向HDFS持久化数据(日志.. 下一篇flume:日志收集工具

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目