设为首页 加入收藏

TOP

Spark与kafka和flume的集成
2019-03-24 02:09:24 】 浏览:146
Tags:Spark kafka flume 集成

这里写图片描述

kafka原理
这里写图片描述
这里写图片描述
Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹

发布订阅消息系统
消息系统
类似于qq:私聊和群聊
私聊:一对一。
群聊:一对多。
在类似于群聊,消息系统里面称为发布订阅(publish-subscribe) ps
在类似于私聊,消息系统里面称为点对点(point-to-point) p2p

以前大多数的消息系统是p2p的模式,这种模式的特点,就是数据被消费了
那么这个数据就被丢弃了,不能再被其它消费者消费。

Ps 的模式,指的是如果一个用户消费了数据,那么这个数据还在。另外一个用户可以重新消费。
在kafka里面数据是有保留周期的,默认就是一个星期。

Spark Streaming 与kafka集成
kafkaUtils工具包

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.10</artifactId>
      <version>1.6.2</version>
    </dependency>

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import org.apache.spark.SparkContext

object KafkaOperation {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaOperation")
    val sc = new SparkContext(conf);
    val ssc = new StreamingContext(sc, Seconds(2));
    ssc.checkpoint(".")
    val kafkaParams = Map("metadata.broker.list" -> "hadoop1:9092");
    val topics = Set("xtwy");
    /**
     * k:其实就是偏移量 offset
     * V:就是我们消费的数据
     * InputDStream[(K, V)]
     *
     * k:数据类型
     * v:数据类型
     *
     * k的解码器
     * v的解码器
     * [K, V, KD <: Decoder[K], VD <: Decoder[V]]
     */
    val kafkaDS = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
      .map(_._2)

    val wordcountDS = kafkaDS.flatMap { line => line.split("\t") }
      .map { word => (word, 1) }
      .reduceByKey(_ + _) 
    ssc.start();
    ssc.awaitTermination();

  }
}

生产者:从这里发送数据
这里写图片描述
这里写图片描述

Flume
Flume 是 Cloudera 提供的日志收集系统,具有分布式、高可靠、高可用性等特点,对海量日志采集、聚合和传输,Flume 支持在日志系统中定制各类数据发送方,同时,Flume提供对数据进行简单处理,并写到各种数据接受方的能力。
Flume 使用 java 编写,其需要运行在 Java1.6 或更高版本之上。

Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据。
Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。Event 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组,并可携带 headers 信息。Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。
Flume 运行的核心是 Agent。它是一个完整的数据收集工具,含有三个核心组件,分别是 source、channel、sink。通过这些组件,Event 可以从一个地方流向另一个地方,如下图所示。
这里写图片描述

  • source 可以接收外部源发送过来的数据。不同的 source,可以接受不同的数据格式。比如有目录池(spooling directory)数据源,可以监控指定文件夹中的新文件变化,如果目录中有文件产生,就会立刻读取其内容。
  • channel是一个存储地,接收 source 的输出,直到有 sink 消费掉 channel 中的数据。channel中的数据直到进入到下一个channel中或者进入终端才会被删除。当 sink 写入失败后,可以自动重启,不会造成数据丢失,因此很可靠。
  • sink 会消费 channel 中的数据,然后送给外部源或者其他 source。如数据可以写入到 HDFS 或者 HBase 中。
  • -

核心组件

source:
Client端操作消费数据的来源,Flume 支持 Avro,log4j,syslog 和 http post(body为json格式)。可以让应用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。也可以 写一个 Source,以 IPC 或 RPC 的方式接入自己的应用,Avro和 Thrift 都可以(分别有 NettyAvroRpcClient 和 ThriftRpcClient 实现了 RpcClient接口),其中 Avro 是默认的 RPC 协议。具体代码级别的 Client 端数据接入,可以参考官方手册。
对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。
对于直接读取文件 Source,有两种方式:
ExecSource: 以运行 Linux 命令的方式,持续的输出最新的数据,如 tail -F 文件名 指令,在这种方式下,取的文件名必须是指定的。 ExecSource 可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法保证日志数据的完整性。
SpoolSource: 监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:拷贝到 spool 目录下的文件不可以再打开编辑;spool 目录下不可包含相应的子目录。
SpoolSource 虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。
如果应用无法实现以分钟切割日志文件的话, 可以两种收集方式结合使用。 在实际使用的过程中,可以结合 log4j 使用,使用 log4j的时候,将 log4j 的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。
log4j 有一个 TimeRolling 的插件,可以把 log4j 分割文件到 spool 目录。基本实现了实时的监控。Flume 在传完文件之后,将会修改文件的后缀,变为 .COMPLETED(后缀也可以在配置文件中灵活指定)

Channel
当前有几个 channel 可供选择,分别是 Memory Channel, JDBC Channel , File Channel,Psuedo Transaction Channel。比较常见的是前三种 channel。
MemoryChannel 可以实现高速的吞吐,但是无法保证数据的完整性。
MemoryRecoverChannel 在官方文档的建议上已经建义使用FileChannel来替换。
FileChannel保证数据的完整性与一致性。在具体配置FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
File Channel 是一个持久化的隧道(channel),它持久化所有的事件,并将其存储到磁盘中。因此,即使 Java 虚拟机当掉,或者操作系统崩溃或重启,再或者事件没有在管道中成功地传递到下一个代理(agent),这一切都不会造成数据丢失。Memory Channel 是一个不稳定的隧道,其原因是由于它在内存中存储所有事件。如果 java 进程死掉,任何存储在内存的事件将会丢失。另外,内存的空间收到 RAM大小的限制,而 File Channel 这方面是它的优势,只要磁盘空间足够,它就可以将所有事件数据存储到磁盘上。

sink
Sink在设置存储数据时,可以向文件系统、数据库、hadoop存数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析.
更多sink的内容可以参考官方手册。

项目:flume监控kafka的server.log日志。只要往此文件写数据,flume就监控到,并推送(也就是spark streaming往其中pull数据)
这里写图片描述
修改flume的配置文件:/conf/a1.properties
这里写图片描述
把这个jar包:spark-streaming-flume-sink_2.10-1.6.2.jar 放入flume/lib目录下

依赖:

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.10</artifactId>
    <version>1.6.2</version>
</dependency>

<dependency>
    <groupId> org.apache.commons</groupId>
    <artifactId> commons-lang3</artifactId>
    <version> 3.3.2</version>
</dependency>
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import org.apache.spark.SparkContext

object KafkaOperation {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaOperation")
    val sc = new SparkContext(conf);
    val ssc = new StreamingContext(sc, Seconds(2));
    ssc.checkpoint(".")
    val kafkaParams = Map("metadata.broker.list" -> "hadoop1:9092");
    val topics = Set("xtwy");
    /**
     * k:其实就是偏移量 offset
     * V:就是我们消费的数据
     * InputDStream[(K, V)]
     *
     * k:数据类型
     * v:数据类型
     *
     * k的解码器
     * v的解码器
     * [K, V, KD <: Decoder[K], VD <: Decoder[V]]
     */
    val kafkaDS = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
      .map(_._2)

    val wordcountDS = kafkaDS.flatMap { line => line.split("\t") }
      .map { word => (word, 1) }
      .reduceByKey(_ + _) //window  mapwithstate updatewithstateByKey topK
    wordcountDS.print();
    ssc.start();
    ssc.awaitTermination();

  }
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇黑猴子的家:Flume 案例 双层 flu.. 下一篇Flume笔记(一) flume工作原理以及..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目