设为首页 加入收藏

TOP

SparkStreaming应用解析(一)
2019-05-12 01:08:33 】 浏览:216
Tags:SparkStreaming 应用 解析

一、Spark Streaming初识

(1)、Spark Streaming是什么

实时流计算,流,可以理解为像水流那样,源源不断的流动,而计算,也都是不停歇的一直在伴随着计算,所以SparkStreaming不是一种全量计算,是相当于水的过滤器一样,过滤掉水中的杂质。流式计算,目前流行的就是storm,spark,flink,parkStreaming处理方式就是,先用接收器接收数据,然后以时间n秒分割接收到的数据,每n秒接收到的数据当做一个RDD,然后再交给spark-core去对rdd进行计算。
在这里插入图片描述
Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。
在这里插入图片描述
和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此 得名“离散化”)。
在这里插入图片描述
DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创建出来的DStream 支持两种操作,一种是转化操作(transformation),会生成一个新的DStream,另一种是输出操作(output operation),可以把数据写入外部系统中。DStream 提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口。

(2)、Spark Streaming关键抽象

Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:
在这里插入图片描述
这几个不是共存的,每进来一个time分段好的RDD,之前处理好RDD的就已经把段落分好,进入转换操作。
接下来,就是流的转换操作,针对wordcount解释一下,之前分区好的数据变成一行一行的,然后按时间分区好的行,去进行flatMap操作,转换完成之后,形成words DStream,其实实质上还是对rdd进行操作,和之前的WordCount相比,这就是一个多次转换的行为,如下图:
在这里插入图片描述

(3)、Spark Streaming整体架构

在这里插入图片描述
接收数据开始,不手动强制停止程序,会一直进行流计算。

(4)、Spark Streaming背压机制

在这里插入图片描述
当Receiver开始接收数据时,会通过supervisor.pushSingle()方法将接收的数据存入currentBuffer等待BlockGenerator定时将数据取走,包装成block. 在将数据存放入currentBuffer之时,要获取许可(令牌)。如果获取到许可就可以将数据存入buffer, 否则将被阻塞,进而阻塞Receiver从数据源拉取数据。
令牌桶机制:大小固定的令牌桶自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直至将桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。当进行某操作时需要令牌时会从令牌桶中取出相应的令牌数,如果获取到则继续操作,否则阻塞。用完之后不用放回。

(5)、Spark Streaming入口

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
// 可以通过ssc.sparkContext 来访问SparkContext
// 或者通过已经存在的SparkContext来创建StreamingContext
val sc = ...      
val ssc = new StreamingContext(sc, Seconds(1))

初始化完Context之后:
1、定义消息输入源来创建DStreams.
2、定义DStreams的转化操作和输出操作。
3、通过 streamingContext.start()来启动消息采集和处理.
4、等待程序终止,可以通过streamingContext.awaitTermination()来设置
5、通过streamingContext.stop()来手动终止处理程序。

(6)、Spark Streaming牛刀小试

<1>、在Linux上安装Netcat

切换到root用户然后使用yum install -y nc命令,等待安装即可,netcat可以对指定端口进行字面量的注入,方法是nc -lk port
运行,即可开始输入数据

<2>、WordCount小程序
package streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingWordCount extends App {

  val sparkConf = new SparkConf().setAppName("streaming").setMaster("local[*]")
  val ssc = new StreamingContext(sparkConf,Seconds(5))

  //从socket接收数据
  ssc.socketTextStream("hadoop1",9999).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

  ssc.start()
  ssc.awaitTermination()
}
# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
hello world
-------------------------------------------
Time: 1504665716000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665717000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665718000 ms
-------------------------------------------
(227.7202-1,2)
(created,2)
(offer,8)
(BUSINESS,11)
(agree,10)
(hereunder,,1)
(“control”,1)
(Grant,2)
(2.2.,2)
(include,11)
...
-------------------------------------------
Time: 1504665719000 ms
-------------------------------------------
Time: 1504665739000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665740000 ms
-------------------------------------------
(under,1)
(Technology,1)
(distribution,2)
(http://hadoop.apache.org/core/,1)
(Unrestricted,1)
(740.13),1)
(check,1)
(have,1)
(policies,1)
(uses,1)
...
-------------------------------------------
Time: 1504665741000 ms
-------------------------------------------

成功实现对数据的处理,netcat采用的是tcp协议。

二、DStreams输入

(1)、基本数据源

  • 文件数据源
    监控文件目录,如果监控的文件目录,把数据放入了目录中,他就会把文件当成一个rdd,针对文件进行转换,下面就用sparkShell来做一个基于hdfs的基本数据源输入的实时计算
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> val ssc = new StreamingContext(sc, Seconds(1))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@4027edeb

scala> val lines = ssc.textFileStream("hdfs://master01:9000/data/")
lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@61d9dd15

scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@1e084a26

scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@8947a4b

scala> wordCounts.print()

scala> ssc.start()

将文件上传:

[hadoop1@master01 hadoop-2.7.2]$ bin/hdfs dfs -put ./README.txt /data/

运行结果

-------------------------------------------
Time: 1504665716000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665717000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665718000 ms
-------------------------------------------
(227.7202-1,2)
(created,2)
(offer,8)
(BUSINESS,11)
(agree,10)
(hereunder,,1)
(“control”,1)
(Grant,2)
(2.2.,2)
(include,11)
...
-------------------------------------------
Time: 1504665719000 ms
-------------------------------------------
Time: 1504665739000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665740000 ms
-------------------------------------------
(under,1)
(Technology,1)
(distribution,2)
(http://hadoop.apache.org/core/,1)
(Unrestricted,1)
(740.13),1)
(check,1)
(have,1)
(policies,1)
(uses,1)
...
-------------------------------------------
Time: 1504665741000 ms
-------------------------------------------
  • RDD队列
package streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object QueueRdd {

  def main(args: Array[String]) {

    val conf = new SparkConf().setMaster("local[2]").setAppName("QueueRdd")
    val ssc = new StreamingContext(conf, Seconds(1))

    // Create the queue through which RDDs can be pushed to
    // a QueueInputDStream
    //创建RDD队列
    val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()

    // Create the QueueInputDStream and use it do some processing
    // 创建QueueInputDStream
    val inputStream = ssc.queueStream(rddQueue)

    //处理队列中的RDD数据
    val mappedStream = inputStream.map(x => (x % 10, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)

    //打印结果
    reducedStream.print()

    //启动计算
    ssc.start()

    // Create and push some RDDs into
    for (i <- 1 to 30) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
      Thread.sleep(2000)

      //通过程序停止StreamingContext的运行
      //ssc.stop()
    }
  }
}

(2)、高级数据源

自定义一个Receiver采集器

package streaming

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

import scala.collection.mutable

class customReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){

  //程序启动的时候调用
  override def onStart(): Unit = {
    val socket = new Socket(host,port)

    var inputText= ""

    var reader = new BufferedReader(new InputStreamReader(socket.getInputStream(),StandardCharsets.UTF_8))

    inputText = reader.readLine()

    while(!isStopped() && inputText!=null) {
      //接受到数据,保存
      store(inputText)
      inputText = reader.readLine()
    }
    restart("")
  }

  //程序停止调用
  override def onStop(): Unit = {

  }
}

object customReceiver{
  def main(args: Array[String]): Unit = {
    //创建配置
    val sc = new SparkConf().setAppName("sparkStreaming").setMaster("local[*]")

    //创建streamingContext
    val ssc = new StreamingContext(sc,Seconds(5))

    //从socket接收数据
    val lineDStream = ssc.receiverStream(new customReceiver("linux01",5456))

    val wordDStream = lineDStream.flatMap(_.split(" "))

    val word2CountDStream = wordDStream.map((_,1))

    val resultDStream = word2CountDStream.reduceByKey(_ + _)

    resultDStream.print()

    //启动ssc
    ssc.start()
    ssc.awaitTermination()
  }
}

Kafka
在这里插入图片描述
从Web、App,或者通过一些日志框架,像SLF4j那种的将日志实时注入到KafKa中,根据业务不同会产生很多的主题,每个主题都有自己不同的Partition,它的作用就是提升存储量,然而KafKa的缺点在于无法保证全体的日志采集顺序,只能保证分区内的存储顺序,但是我们每天采集到的日志是非常多的,到一定情况下本地是打不开的,而在消息队列中又存在发布订阅模式(将信息发送到主题所有订阅它的都可以收到,我们就需要多个ConsumerGroupe来监听实现发布功能)和消息队列模式(在ConsumerGroup里面的所有的Consumer拿到的信息都是不同的,而最好情况下就是每一个Consumer监听多余一个Partition最节省资源,但要是少于一个就代表会有Consumer停止工作)
在这里插入图片描述
SparkStreaming和KafKa连接是有数据丢失风险的,如果SparkStreaming用了高级API去连接KafKa,等于是自动维护了OffSet,如此时SparkStreaming挂了,数据本身没有做过任何处理,重启之后因为经过了Offset的维护,因此就不会消费上次没处理的数据,在这种情况下有两种解决方法:
(左图)1.使用High Level API,拿到数据之后就写到WAL预写日志中,相当于数据保存,如果SparkStreaming挂了,就可以从预写日志中拿出数据,其实这个WAL在关系型数据库非常常见,比如oracle崩了,但是数据还在,因为日志在进行之前就已经进行预写保存,但是这种方式是老方式,第二种方式现在使用比较多
(右图)2.新的方式就是SparkStreaming直接使用KafKa的低级API,所有OffSet由Spark自动进行维护,比如说我通过api获取到了某些数据,因为我使用的是低级api所以不会更新zookeeper里面的数据,把业务处理完在后台更新OffSet,如在中间宕机了,重新启动的时候会从上次的程序继续读取操作,这个情况的业务代码要单独编写。
在这里插入图片描述

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scala   WordCount 下一篇大数据学习之路(六)spark

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目