先从spark checkpointing说起
如果启用Spark检查点,则偏移量将存储在检查点中。这很容易实现,但也有缺点。您的输出操作必须是幂等的,因为您将获得重复输出;交易不是一种选择。此外,如果应用程序代码已更改,则无法从检查点恢复。对于计划的升级,您可以通过在旧代码的同时运行新代码来缓解这种情况(因为无论如何输出都必须是幂等的,它们不应该发生冲突)。但是对于需要更改代码的计划外故障,除非您有其他方法来识别已知良好的起始偏移,否则您将丢失数据。还有一个小弊端因为checkpointing是把代码序列化二进制放到hdfs;当你修改了代码、再去hdfs读取zk offset、序列化失败,造成了消费数据丢失。
关键点:注意看处理数据
Kafka有一个偏移提交API,用于在特殊的Kafka主题中存储偏移量。默认情况下,新消费者将定期自动提交偏移量。这几乎肯定不是您想要的,因为消费者成功轮询的消息可能尚未导致Spark输出操作,从而导致未定义的语义。这就是上面的流示例将“enable.auto.commit”设置为false的原因。但是,在使用commitAsync
API知道输出已存储后,您可以向Kafka提交偏移量。与检查点相比的好处是,无论您的应用程序代码如何变化,Kafka都是一个耐用的商店。但是,Kafka不是交易性的,因此您的输出必须仍然是幂等的。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges}
object SparkSteamingLogAnalysis {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[3]").setAppName("SparkSteamingLogAnalysis")
val tt = args(0).trim.toLong
val ssc = new StreamingContext(conf, Seconds(tt))
val sc = ssc.sparkContext
//获取kafka中的数据
val sheData = TotalUtil.getKafka(ssc, "topic", "groupId")
//处理数据
sheData.foreachRDD { rdds =>
val offsetRanges = rdds.asInstanceOf[HasOffsetRanges].offsetRanges
// 方法(rdds)
sheData.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
ssc.start()
ssc.awaitTermination()
}
}
与HasOffsetRanges一样,只有在createDirectStream的结果上调用而不是在转换后,才能成功转换为CanCommitOffsets。commitAsync调用是线程安全的,但如果您需要有意义的语义,则必须在输出后发生。
获取kafka相关配置
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
/**
* 获取kafka配置信息
*/
def getKafka(ssc: StreamingContext, topic: String, groupId: String) = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "latest",
"fetch.max.wait.ms" -> Integer.valueOf(500),
"enable.auto.commit" -> java.lang.Boolean.valueOf(false)
)
val topics = Array(topic)
val data = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
data
}