设为首页 加入收藏

TOP

kafka生产者消费者API 与sparkStreaming 整合(scala版)
2018-12-12 02:20:21 】 浏览:105
Tags:kafka 生产者 消费者 API sparkStreaming 整合 scala

maven配置文件

       <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.1.1</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.1</version>
            <scope>provided</scope>
        </dependency>
1. kafka生产者
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.io.Source
import scala.reflect.io.Path

class KafkaProduceMsg extends Runnable {

  private val BROKER_LIST = "slave6:9092,slave7:9092"
  private val TOPIC = "kafka"
  private val DIR = "C:\\Users\\admin\\Desktop\\kafka-data.txt"

  /**
    * 1、配置属性
    * metadata.broker.list : kafka集群的broker
    * serializer.class : 如何序列化发送消息
    * request.required.acks : 1代表需要broker接收到消息后acknowledgment,默认是0
    * producer.type : 默认就是同步sync
    */
  private val props = new Properties()
  props.put("bootstrap.servers",BROKER_LIST)
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("request.required.acks", "1")
  props.put("producer.type", "async")

  private val producer = new KafkaProducer[String,String](props)

  def run(): Unit = {
    println("开始生产消息!!!!!!!!!!")
    while(true){
      val files = Path(this.DIR).walkFilter(p => p.isFile)
      try {
        for(file <- files){
          val reader = Source.fromFile(file.toString(),"UTF-8")
          for(line <- reader.getLines()){
            var m = 0
            while(m < 10){
              val record = new ProducerRecord[String,String](this.TOPIC,"key",line)
              m = m + 1
              println(m + "" + record)
              producer.send(record)
            }
            try{
              Thread.sleep(3000)
            }catch {
              case e : Exception => println(e)
            }
          }
        }
      }catch{
        case e : Exception => println(e)
      }
    }
  }
}

生产者执行程序:

object Msg {
  def main(args: Array[String]): Unit = {
    new Thread(new KafkaProduceMsg()).start()
  }

}
2. 消费者sparkStreaming
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 2.spark-streaming消费数据,匹配应用层是否含有制定关键字,
  *   如果包含就存储下来,不包含就丢弃
  */
object KafkaConsumer {
  def main(args: Array[String]): Unit = {
    //    创建sparksession
    val conf = new SparkConf().setAppName("Consumer")
    val ssc = new StreamingContext(conf,Seconds(5))
    //    设置中间存储的检查点,可以进行累计计算
//    ssc.checkpoint("hdfs://master:9000/xxx")
    //    读取kafka数据
    val kafkaParam = Map("metadata.broker.list" -> "slave6:9092,slave7:9092")
    val topic = "kafka".split(",").toSet
    //    获取日志数据
    val logDStream: DStream[String] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParam,topic).map(_._2)
    logDStream.print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇使用kafkapython读取实时数据小例.. 下一篇KafkaController 分区Rebalance平..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目