设为首页 加入收藏

TOP

spark+kafka+idea+sbt+scala踩坑
2019-05-06 14:29:04 】 浏览:75
Tags:spark kafka idea sbt scala 踩坑

集群的spark还没有用起来,自建一个单机spark,然后连接本机的kafka生产者消费消息。

idea+spark

scala代码

package ex

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaCount {
  def main(args: Array[String]): Unit = {
    //创建streamingContext
    var conf=new SparkConf().setAppName("SparkStreamKaflaWordCountDemo");
    var ssc=new StreamingContext(conf,Seconds(4));
    //创建topic
    //var topic=Map{"test" -> 1}
    var topic=Array("test");
    //指定zookeeper
    //创建消费者组
    var group: String ="con-consumer-group"
    //消费者配置
    val kafkaParam = Map(
      "bootstrap.servers" -> "localhost:9092",//用于初始化链接到集群的地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      //用于标识这个消费者属于哪个消费团体
      "group.id" -> group,
      //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
      //可以使用这个配置,latest自动重置偏移量为最新的偏移量
      "auto.offset.reset" -> "latest",
      //如果是true,则这个消费者的偏移量会在后台自动提交
      "enable.auto.commit" -> (false: java.lang.Boolean)
    );
    //创建DStream,返回接收到的输入数据
    var stream=KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent,Subscribe[String,String](topic,kafkaParam))
    //每一个stream都是一个ConsumerRecord
    stream.map(s =>(s.key(),s.value())).print();
    ssc.start();
    ssc.awaitTermination();
  }
}

sbt工程

name := "test2"

version := "0.1"

scalaVersion := "2.11.8"


libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.2"// % "provided"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.3.2" % "provided"

libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.1" % "provided"

因为自己环境的spark为2.3.2,所以需要下载对应的spark-core_2.11为2.3.2版本,2.11代表spark基于scala版本2.11;

因为kafka为2.0版本,spark-streaming-kafka-0-10选0-10版本,因为自己环境的spark为2.3.2, 跟spark最为接近2.3.0版本,虽然我写的是2.3.2,但是下载的是2.3.0版本。spark-streaming-kafka里2.3.0版本有些方法跟2.1.0方法不一样,spark如果是2.3.0版本,必须选2.3.0以上的spark-streaming-kafka;

spark-streaming版本2.1.1也适用于2.3.2版本的spark,里面方法没有差异。

kafka生产者

[root@zhangchenglong-test kafka_2.11-2.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
>asdf
>122222222222222
>1234'
>zcl123
>zcl124
>zcl125
>ssssssssssssssssssssssss

spark结果

-------------------------------------------
Time: 1541685796000 ms
-------------------------------------------

-------------------------------------------
Time: 1541685800000 ms
-------------------------------------------
(null,ssssssssssssssssssssssss)

-------------------------------------------
Time: 1541685804000 ms
-------------------------------------------

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka入门一:安装与使用 下一篇大数据实战之Logstash采集->Ka..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目