设为首页 加入收藏

TOP

spark streaming的入门案例
2019-05-12 14:17:26 】 浏览:123
Tags:spark streaming 入门 案例

1, spark streaming: tcp 源

maven依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.0</version>
</dependency>

程序:


import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object wc {
  def main(args: Array[String]): Unit = {
    //配置:spark
    val conf = new SparkConf().setMaster("local[3]").setAppName("wc")
    //流的上下文
    val ssc = new StreamingContext(conf,Seconds(2))
    //获取输入源
    val dstream1 = ssc.socketTextStream("localhost",9999)
    val dstream2 = dstream1.map((_, 1)).reduceByKey(_+_)
    
    //开启上下文
    dstream2.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

2, spark streaming : kafka数据源

1.5.2版本

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.10</artifactId>
   <version>1.5.2</version>
</dependency>
<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming_2.10</artifactId>
   <version>1.5.2</version>
</dependency>
<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming-kafka_2.10</artifactId>
   <version>1.5.2</version>
</dependency>

object wc_kafka {

	def  updateFunc ()  = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
	 iterator.flatMap {
	   case (x, y, z) =>
		 Some(y.sum + z.getOrElse(0)).map(n => (x, n))
	 }
	}
	
  def main(args: Array[String]): Unit = {
  
	//创建SparkConf并设置AppName
	val Array(zkQuorum, groupId, topics, numThreads, hdfs) = Array(
	 "localhost:2181", "g1", "t1,topoic1", "1", "check-point2")
	val conf = new SparkConf().setAppName("UrlCount").setMaster("local[3]")
	
	//创建StreamingContext
	val ssc = new StreamingContext(conf, Seconds(2))
	ssc.checkpoint(hdfs)//设置检查点
	
	//从Kafka中拉取数据
	val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap //设置topic信息
	val lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
	val wc = lines.flatMap(_.split(" ")).map((_, 1))
	val result = wc.updateStateByKey(updateFunc _, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
	
	// 将结果打印到控制台
	result.print()
	ssc.start()
	ssc.awaitTermination()}
  }
}

2.1.0版本

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.11</artifactId>
   <version>2.1.0</version>
</dependency>
<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming_2.11</artifactId>
   <version>2.1.0</version>
</dependency>
<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
   <version>2.1.0</version>
</dependency>

 //代码      
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, LocationStrategies, KafkaUtils}

object wc_kafka {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("kafka")
    conf.setMaster("local[*]")

    val ssc = new StreamingContext(conf, Seconds(2))

    //kafka参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g1",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    //创建streaming输入源
    val topics = Array("t1")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    //打印结果
    val ds2 = stream.map(record => (record.key, record.value))
    ds2.print()

    //启动
    ssc.start()
    ssc.awaitTermination()
  }
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Authentication plugin 'cach.. 下一篇Flume 自定义source   -- S..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目