spark从kafka获取数据两种方式
1.kafkaUtils.createStream
利用 Kafka 消费者高级 API 在 Spark 的工作节点上创建消费者线程,订阅 Kafka 中的消息,数据会传输到 Spark 工作节点的执行器中,但是默认配置下这种方法在
Spark Job 出错时会导致数据丢失,如果要保证数据可靠性,需要在 Spark Streaming 中开启Write Ahead Logs(WAL),也就是上文提到的
Kafka 用来保证数据可靠性和一致性的数据保存方式。可以选择让 Spark 程序把 WAL 保存在分布式文件系统(比如 HDFS)中, 通过WAL 和checkPiont可以保证数据的安全性 但是效率很低 因为读取数据时需要往文件系统中存储一份,大量的磁盘Io和网络带宽会限制性能,如果数据不需要保证完全安全 可以考虑使用 另外一种
在 源码中可以看出
一共有五种创建方法。但是底层最后调用的 是
其中kafkaParams主要的参数为zk 参数 groupId 默认会添加链接kafka的超时时间为10000
可以看出 首先进行判断是否使用wal机制。返回值为KafkainputDstream
在KafkaInputDstream中 会根据是否使用wal 创建kafkaReceiver 和ReliableKafkaReceiver 两者的区别就是 后者会storeBlock
保证数据的安全
此种方式是走zookeeper的 会将offset存放在zookeeper中
1.kafkaUtils.createDirectStream
好处:
1、并行 高并发,不需要多个kafka输入流,该方法将会创建和kafka分区一样的rdd个数,而且会从kafka并行读取。
2、高效,这种方式并不需要WAL,如果需要保证数据安全可以通过checkPoint
3、恰好一次语义(Exactly-once-semantics),传统的读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,存在数据丢失的可能性是zookeeper中和ssc的偏移量不一致。这种通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具
* checkPoint 1.恢复Driver(元数据)2.恢复数据(offset)
在源码 : 返回一个DirectKafkaInputDstream
如果想要实现往zookeeper中手动添加offset http://www.tuicool.com/articles/vaUzquJ(别人的)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics)
没有formOffsets和messageHandler 两个构造参数 默认会创建