设为首页 加入收藏

TOP

Spark学习(捌)- Spark Streaming入门
2018-12-06 17:06:08 】 浏览:20
Tags:Spark 学习 Streaming 入门
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/bingdianone/article/details/84847884

spark概念

Spark流是核心Spark API的扩展,它支持对实时数据流进行可伸缩、高吞吐量、容错的流处理。数据可以从Kafka、Flume、Kinesis或TCP sockets等许多来源获取,也可以使用map、reduce、join和window等高级函数表示的复杂算法进行处理。最后,可以将处理后的数据推送到文件系统、数据库和实时仪表板。事实上,您可以将Spark的机器学习和图形处理算法应用于数据流。
在这里插入图片描述

Spark Streaming个人的定义
将不同的数据源的数据经过Spark Streaming处理之后将结果输出到外部文件系统

特点
低延时
能从错误中高效的恢复:fault-tolerant
能够运行在成百上千的节点
能够将批处理、机器学习、图计算等子框架和Spark Streaming综合起来使用

Spark Streaming是否需要独立安装?
不需要;因为spark是一栈式服务框架
One stack to rule them all : 一栈式

Spark Streaming应用场景

在这里插入图片描述
上半图是实时交易欺诈的应用
下半图是实时电子传感器监控

现实生产中应用更广

Spark Streaming集成Spark生态系统的使用

在这里插入图片描述
将批处理与流处理相结合
在这里插入图片描述
上图中;后续文章会有讲解实现

离线学习模型可以接入sparkstreaming,在线应用它们
在这里插入图片描述
使用SQL交互式地查询流数据
在这里插入图片描述
上图中;后续文章会有讲解实现

Spark Streaming发展史

在这里插入图片描述
Spark Streaming从0.9版本毕业;开始进入生产环境。

从词频统计功能着手入门Spark Streaming

spark源码地址 GitHub
https://github.com/apache/spark
在里面有很多examples供学习。

NetworkWordCount测试

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
 *
 * Usage: NetworkWordCount <hostname> <port>
 * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
 */
object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println

spark-submit提交

example中提示打开9999端口
在这里插入图片描述
使用spark-submit来提交我们的spark应用程序运行的脚本(生产)

./spark-submit --master local[2] \
--class org.apache.spark.examples.streaming.NetworkWordCount \
--name NetworkWordCount \
/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.2.0.jar hadoop000 9999

打开另一个client端
在这里插入图片描述
测试:输入
在这里插入图片描述
查看spark-submit提交的界面
在这里插入图片描述
输入
在这里插入图片描述
查看spark-submit提交的界面
在这里插入图片描述

spark-shell提交

如何使用spark-shell来提交(测试)

./spark-shell --master local[2]

只需要在spark-shell启动界面粘贴以下代码即可

import org.apache.spark.streaming.{Seconds, StreamingContext}

val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("hadoop000", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

测试步骤和spark-submit一样;都是在一个client输入测试数据;spark-shell界面查看结果。

Spark Streaming工作原理(粗粒度)

在这里插入图片描述
工作原理:粗粒度
Spark Streaming接收到实时数据流,把数据按照指定的时间段切成一片片小的数据块,然后把小的数据块传给Spark Engine处理。

Spark Streaming工作原理(细粒度)

在这里插入图片描述
1、在Driver端会构建context来准备处理Application;SparkContext是StreamingContext的底层
2、Dirver端启动一些Receiver来接受数据(处理数据的交互)
3、把receiver作为一个任务来运行
4、数据input进来;receiver把数据拆分为多个block放入内存中。如果设置副本就会拷贝到其他Executor上
5、receiver反馈给StreamingContext的blocks信息;StreamingContext提交jobs给SparkContext
6、SparkContext将jobs分发给各个Executor处理作业。


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark电商用户行为分析 下一篇Spark 多线程模型

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }