集群的spark还没有用起来,自建一个单机spark,然后连接本机的kafka生产者消费消息。
idea+spark
scala代码
package ex
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaCount {
def main(args: Array[String]): Unit = {
//创建streamingContext
var conf=new SparkConf().setAppName("SparkStreamKaflaWordCountDemo");
var ssc=new StreamingContext(conf,Seconds(4));
//创建topic
//var topic=Map{"test" -> 1}
var topic=Array("test");
//指定zookeeper
//创建消费者组
var group: String ="con-consumer-group"
//消费者配置
val kafkaParam = Map(
"bootstrap.servers" -> "localhost:9092",//用于初始化链接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于标识这个消费者属于哪个消费团体
"group.id" -> group,
//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
//可以使用这个配置,latest自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
//如果是true,则这个消费者的偏移量会在后台自动提交
"enable.auto.commit" -> (false: java.lang.Boolean)
);
//创建DStream,返回接收到的输入数据
var stream=KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent,Subscribe[String,String](topic,kafkaParam))
//每一个stream都是一个ConsumerRecord
stream.map(s =>(s.key(),s.value())).print();
ssc.start();
ssc.awaitTermination();
}
}
sbt工程
name := "test2"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.2"// % "provided"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.3.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.1" % "provided"
因为自己环境的spark为2.3.2,所以需要下载对应的spark-core_2.11为2.3.2版本,2.11代表spark基于scala版本2.11;
因为kafka为2.0版本,spark-streaming-kafka-0-10选0-10版本,因为自己环境的spark为2.3.2, 跟spark最为接近2.3.0版本,虽然我写的是2.3.2,但是下载的是2.3.0版本。spark-streaming-kafka里2.3.0版本有些方法跟2.1.0方法不一样,spark如果是2.3.0版本,必须选2.3.0以上的spark-streaming-kafka;
spark-streaming版本2.1.1也适用于2.3.2版本的spark,里面方法没有差异。
kafka生产者
[root@zhangchenglong-test kafka_2.11-2.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>asdf
>122222222222222
>1234'
>zcl123
>zcl124
>zcl125
>ssssssssssssssssssssssss
spark结果
-------------------------------------------
Time: 1541685796000 ms
-------------------------------------------
-------------------------------------------
Time: 1541685800000 ms
-------------------------------------------
(null,ssssssssssssssssssssssss)
-------------------------------------------
Time: 1541685804000 ms
-------------------------------------------