版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wangpei1949/article/details/89277490
在流计算引擎如Apache Storm、Apache Kafka(Kafka Streams)、Apache Spark(Spark Streaming、Spark Structured Streaming)、Apache Flink中,经常提到Exactly-Once语义,那Exactly-Once究竟是啥意思?当流计算引擎声称Exactly-Once时,究竟意味着啥?Spark Streaming如何保证Exactly-Once?
关于此,自己有时也百思不得其解,查阅了众多资料,咨询了众多大佬,将自己理解整理成如下笔记。
让人误解的Exactly-Once
Exactly-Once不是指对输入的数据只处理一次,指的是, 在流计算引擎中, 算子给下游的结果是Exactly-Once的(即:给下游的结果有且仅有一个,且不重复、不少算)。
如在Spark Streaming处理过程中,从一个算子(Operator)到另一个算子(Operator),可能会因为各种不可抗力如机器挂掉等原因,导致某些Task处理失败,Spark内部会基于Lineage或Checkpoint启动重试Task去重新处理同样的数据。因不可抗力的存在,流处理引擎内部不可能做到一条数据仅被处理一次。所以,当流处理引擎声称提供Exactly-Once语义时,指的是从一个Operator到另一个Operator,同样的数据,无论重复处理多少次,最终的结果状态是Exactly-Once。
流计算引擎保证Exactly-Once时一般用到的算法
Micro-Batch
典型流处理引擎:Apache Spark(Spark Streaming)。
Spark Streaming将输入的流周期性的划分成一个一个的Batch,然后用Spark批处理的方式,处理每个Batch,一个Batch要么成功,要么失败,失败后重新Replay,Recompute。偶尔可用Checkpoint快照每个RDD状态,恢复时,找到最近的Checkpoint,确定依赖,然后Recompute。
Distributed Snapshot
Distributed Snapshot(分布式快照),简单来说,就是为了保存分布式系统的Global State,当系统Failure Recovery时,从最近一次成功保存的全局快照中恢复每个节点的状态。
典型流处理引擎:Apache Spark(Spark Structured Streaming)、Apache Flink。
Flnk分布式快照是通过Asynchronous Barrier Snapshots算法实现的,该算法借鉴了Chandy-Lamport算法的主要思想,同时也做了一些改进。
Spark Structured Streaming 的Continuous Processing Mode的容错处理使用了基于Chandy-Lamport的分布式快照(Distributed Snapshot)算法。
流处理应用如何保证Exactly-Once
-
Source支持Replay。
-
流计算引擎本身处理能保证Exactly-Once。
-
Sink支持幂等或事务更新。
Spark Streaming保证Exactly-Once语义
一个Spark Streaming流处理程序,从广义上讲,包含三个步骤。
-
接收数据:从Source中接收数据。
-
转换数据:用DStream和RDD算子转换。
-
储存数据:将结果保存至外部系统。
如果流处理程序需要实现Exactly-Once语义,那么每一个步骤都要保证Exactly-Once。
接收数据
不同的数据源提供不同的保证。
如HDFS中的数据源,直接支持Exactly-Once语义。如使用基于Kafka Direct API从Kafka获取数据,也能保证Exactly-Once。
转换数据
Spark Streaming内部是天然支持Exactly-once语义。任务失败,不论重试多少次,一个算子给另一个算子的结果有且仅有一个,不重不丢。
储存数据
Spark Streaming中的输出操作foreachRDD默认具有At-Least Once语义,因此当任务失败时会重试多次输出,这样就会重复多次写入外部存储。 如果储存数据想实现Exactly-once,有两种途径。
幂等输出
幂等输出,即同样的数据输出多次,结果一样。一般需要借助外部存储中的唯一键实现。具体步骤:
-
将kafka参数enable.auto.commit
设置为false。
-
幂等写入后手动提交offset。这里用checkpoint,不需要手动提交,生产中可用Kafka、Zookeeper、HBase等保存offset。
Spark Streaming 读取Kafka数据并将结果存储到Mysql—幂等
package com.bigdata.spark
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory
import scalikejdbc.{DB, _}
object SparkStreamingEOSKafkaMysqlIdempotent {
@transient lazy val logger = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
val topic="topic1"
val group="spark_app1"
val kafkaParams= Map[String, Object](
"bootstrap.servers" -> "node1:6667,node2:6667,node3:6667",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"group.id" -> group)
val checkpointDir ="/apps/bigdata/spark/app1/checkpoint"
val ssc=StreamingContext.getOrCreate(checkpointDir, createContext(topic,group,checkpointDir,kafkaParams))
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
}
def createContext(topic:String,group:String,checkpointDir:String,kafkaParams:Map[String,Object])():StreamingContext={
val conf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$",""))
val ssc = new StreamingContext(conf,Seconds(5))
val initOffset=DB.readOnly(implicit session=>{
sql"select `partition`,offset from kafka_topic_offset where topic =${topic} and `group`=${group}"
.map(item=> new TopicPartition(topic, item.get[Int]("partition")) -> item.get[Long]("offset"))
.list().apply().toMap
})
val sourceDStream =KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String,String](initOffset.keys,kafkaParams,initOffset)
)
sourceDStream.foreachRDD(rdd=>{
if (!rdd.isEmpty()){
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offsetRange=>{
println(s"Topic: ${offsetRange.topic},Group: ${group},Partition: ${offsetRange.partition},fromOffset: ${offsetRange.fromOffset},untilOffset: ${offsetRange.untilOffset}")
})
val sparkSession = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import sparkSession.implicits._
val dataFrame = sparkSession.read.json(rdd.map(_.value()).toDS)
dataFrame.createOrReplaceTempView("tmpTable")
val result=sparkSession.sql(
"""
|select
| --每分钟
| eventTimeMinute,
| --每种语言
| language,
| -- 次数
| count(1) pv,
| -- 人数
| count(distinct(userID)) uv
|from(
| select *, substr(eventTime,0,16) eventTimeMinute from tmpTable
|) as tmp group by eventTimeMinute,language
""".stripMargin
)
result.rdd.foreachPartition(partition=>{
ConnectionPool.singleton("jdbc:mysql://node3:3306/bigdata", "", "")
partition.foreach(row=>{
DB.autoCommit{implicit session=> {
sql"""
insert into twitter_pv_uv (eventTimeMinute, language,pv,uv)
value (
${row.getAs[String]("eventTimeMinute")},
${row.getAs[String]("language")},
${row.getAs[Long]("pv")},
${row.getAs[Long]("uv")}
)
on duplicate key update pv=pv,uv=uv
""".update.apply()
}}})})}})
ssc
}}
事务输出
事务输出,即数据输出和Kafka Offset提交在同一原子性事务中。具体步骤:
-
将kafka参数enable.auto.commit
设置为false。
-
结果存储与Offset提交在同一事务中原子执行。
Spark Streaming 读取Kafka数据并将结果存储到Mysql—事务
package com.bigdata.spark
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory
import scalikejdbc.{ConnectionPool, DB, _}
object SparkStreamingEOSKafkaMysqlAtomic {
@transient lazy val logger = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
val topic="topic1"
val group="spark_app1"
val kafkaParams= Map[String, Object](
"bootstrap.servers" -> "node1:6667,node2:6667,node3:6667",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"group.id" -> group)
ConnectionPool.singleton("jdbc:mysql://node3:3306/bigdata", "", "")
val conf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$",""))
val ssc = new StreamingContext(conf,Seconds(5))
val initOffset=DB.readOnly(implicit session=>{
sql"select `partition`,offset from kafka_topic_offset where topic =${topic} and `group`=${group}"
.map(item=> new TopicPartition(topic, item.get[Int]("partition")) -> item.get[Long]("offset"))
.list().apply().toMap
})
val sourceDStream =KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String,String](initOffset.keys,kafkaParams,initOffset)
)
sourceDStream.foreachRDD(rdd=>{
if (!rdd.isEmpty()){
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offsetRange=>{
logger.info(s"Topic: ${offsetRange.topic},Group: ${group},Partition: ${offsetRange.partition},fromOffset: ${offsetRange.fromOffset},untilOffset: ${offsetRange.untilOffset}")
})
val sparkSession = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import sparkSession.implicits._
val dataFrame = sparkSession.read.json(rdd.map(_.value()).toDS)
dataFrame.createOrReplaceTempView("tmpTable")
val result=sparkSession.sql(
"""
|select
| --每分钟
| eventTimeMinute,
| --每种语言
| language,
| -- 次数
| count(1) pv,
| -- 人数
| count(distinct(userID)) uv
|from(
| select *, substr(eventTime,0,16) eventTimeMinute from tmpTable
|) as tmp group by eventTimeMinute,language
""".stripMargin
).collect()
DB.localTx(implicit session=>{
result.foreach(row=>{
sql"""
insert into twitter_pv_uv (eventTimeMinute, language,pv,uv)
value (
${row.getAs[String]("eventTimeMinute")},
${row.getAs[String]("language")},
${row.getAs[Long]("pv")},
${row.getAs[Long]("uv")}
)
on duplicate key update pv=pv,uv=uv
""".update.apply()
})
offsetRanges.foreach(offsetRange=>{
val affectedRows = sql"""
update kafka_topic_offset set offset = ${offsetRange.untilOffset}
where
topic = ${topic}
and `group` = ${group}
and `partition` = ${offsetRange.partition}
and offset = ${offsetRange.fromOffset}
""".update.apply()
if (affectedRows != 1) {
throw new Exception(s"""Commit Kafka Topic: ${topic} Offset Faild!""")
}
})
})
}
})
ssc.start()
ssc.awaitTermination()
}
}