设为首页 加入收藏

TOP

第91讲:sparkStreaming基于kafka的Direct详解
2019-02-12 02:29:21 】 浏览:36
Tags:sparkStreaming 基于 kafka Direct 详解

有兴趣想学习国内整套Spark+Spark Streaming+Machine learning最顶级课程的,可加我qq 471186150。共享视频,性价比超高!

1:Direct方式特点:

1)Direct的方式是会直接操作kafka底层的元数据信息,这样如果计算失败了,可以把数据重新读一下,重新处理。即数据一定会被处理。拉数据,是RDD在执行的时候直接去拉数据。

2)由于直接操作的是kafka,kafka就相当于你底层的文件系统。这个时候能保证严格的事务一致性,即一定会被处理,而且只会被处理一次。而Receiver的方式则不能保证,因为Receiver和ZK中的数据可能不同步,spark Streaming可能会重复消费数据,这个调优可以解决,但显然没有Direct方便。而Direct api直接是操作kafka的,spark streaming自己负责追踪消费这个数据的偏移量或者offset,并且自己保存到checkpoint,所以它的数据一定是同步的,一定不会被重复。即使重启也不会重复,因为checkpoint了,但是程序升级的时候,不能读取原先的checkpoint,面对升级checkpoint无效这个问题,怎么解决呢升级的时候读取我指定的备份就可以了,即手动的指定checkpoint也是可以的,这就再次完美的确保了事务性,有且仅有一次的事务机制。那么怎么手动checkpoint呢?构建SparkStreaming的时候,有getorCreate这个api,它就会获取checkpoint的内容,具体指定下这个checkpoint在哪就好了。或者如下图:


而如果从checkpoint恢复后,如果数据累积太多处理不过来,怎么办1)限速2)增强机器的处理能力3)放到数据缓冲池中。

3)由于底层是直接读数据,没有所谓的Receiver,直接是周期性(Batch Intervel)的查询kafka,处理数据的时候,我们会使用基于kafka原生的Consumer api来获取kafka中特定范围(offset范围)中的数据。这个时候,Direct Api访问kafka带来的一个显而易见的性能上的好处就是,如果你要读取多个partition,Spark也会创建RDD的partition,这个时候RDD的partition和kafka的partition是一致的。而Receiver的方式,这2个partition是没任何关系的。这个优势是你的RDD,其实本质上讲在底层读取kafka的时候,kafka的partition就相当于原先hdfs上的一个block。这就符合了数据本地性。RDD和kafka数据都在这边。所以读数据的地方,处理数据的地方和驱动数据处理的程序都在同样的机器上,这样就可以极大的提高性能。不足之处是由于RDD和kafka的patition是一对一的,想提高并行度就会比较麻烦。提高并行度还是repartition,即重新分区,因为产生shuffle,很耗时。这个问题,以后也许新版本可以自由配置比例,不是一对一。因为提高并行度,可以更好的利用集群的计算资源,这是很有意义的。

4)不需要开启wal机制,从数据零丢失的角度来看,极大的提升了效率,还至少能节省一倍的磁盘空间。从kafka获取数据,比从hdfs获取数据,因为zero copy的方式,速度肯定更快。

2:实战部分


kafka+sparkstreaming集群

前提:

spark安装成功,spark1.6.0

zookeeper安装成功

kafka安装成功

步骤:

1:先启动三台机器上的ZK,然后三台机器同样启动kafka,

2:在kafka上创建topictest

3:在worker1启动kafka生产者:

root@worker1:/usr/local/kafka_2.10-0.9.0.1#bin/kafka-console-producer.sh--broker-listlocalhost:9092--topictest

worker2中启动消费者:

root@worker2:/usr/local/kafka_2.10-0.9.0.1#bin/kafka-console-consumer.sh--zookeepermaster:2181--topictest

生产者生产的消息,消费者可以消费到。说明kafka集群没问题。进入下一步。

master中启动spark-shell

./spark-shell--masterlocal[2]--packagesorg.apache.spark:spark-streaming-kafka_2.10:1.6.0,org.apache.kafka:kafka_2.10:0.8.2.1

笔者用的spark1.6.0,读者根据自己版本调整。

shell中的逻辑代码(wordcount:

importorg.apache.spark.SparkConf

importkafka.serializer.StringDecoder

importorg.apache.spark.streaming.kafka.KafkaUtils

importorg.apache.spark.streaming.{Durations,StreamingContext}

valssc=newStreamingContext(sc,Durations.seconds(5))
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc
,Map("bootstrap.servers"->"master:2181,worker1:2181,worker2:2181","metadata.broker.list"->"master:9092,worker1:9092,worker2:9092","group.id"->"StreamingWordCountSelfKafkaDirectStreamScala")
,Set("test")).map(t=>t._2).flatMap(_.toString.split("")).map((_,1)).reduceByKey(_+_).print()
ssc.start()

生产者再生产消息:

sparkstreaming的反应:

返回worker2查看消费者

可见,groupId不一样,相互之间没有互斥。

上述是使用createDirectStream方式链接kafka,实际使用中,其实就是和Receiver在api以及api中参数上有不同,其它基本一样

参考:

http://spark.apache.org/docs/latest/streaming-kafka-integration.html



编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Storm之——Storm整合Kafka pom.x.. 下一篇kafka 生产者流程总结

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }