设为首页 加入收藏

TOP

Spark Streaming 自适应上游 kafka topic partition 数量变化
2019-04-20 02:25:44 】 浏览:100
Tags:Spark Streaming 适应 上游 kafka topic partition 数量 变化
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/chen20111/article/details/80827226

背景

Spark Streaming 作业在运行过程中,上游 topic 增加 partition 数目从 A 增加到 B,会造成作业丢失数据,因为该作业只从 topic 中读取了原来的 A 个 partition 的数据,新增的 B-A 个 partition 的数据会被忽略掉。

思考过程

为了作业能够长时间的运行,一开始遇到这种情况的时候,想到两种方案:

  1. 感知上游 topic 的 partition 数目变化,然后发送报警,让用户重启
  2. 直接在作业内部自适应上游 topic partition 的变化,完全不影响作业

方案 1 是简单直接,第一反应的结果,但是效果不好,需要用户人工介入,而且可能需要删除 checkpoint 文件

方案 2 从根本上解决问题,用户不需要关心上游 partition 数目的变化,但是第一眼会觉得较难实现。

方案 1 很快被 pass 掉,因为人工介入的成本太高,而且实现起来很别扭。接下来考虑方案 2.

Spark Streaming 程序中使用 Kafka 的最原始方式为KafkaUtils.createDirectStream通过源码,我们找到调用链条大致是这样的

KafkaUtils.createDirectStream->new DirectKafkaInputDStream-> 最终由DirectKafkaInputDStream#compute(validTime : Time)函数来生成 KafkaRDD。

而 KafkaRDD 的 partition 数和作业开始运行时topic 的 partition 数一致,topic 的 partition 数保存在 currentOffsets 变量中,currentOffsets 是一个 Map[TopicAndPartition, Long]类型的变量,保存每个 partition 当前消费的 offset 值,但是作业运行过程中 currentOffsets 不会增加 key,就是是不会增加 partition,这样导致每次生成 KafkaRDD 的时候都使用开始运行作业时topic 的 partition 数作为 KafkaRDD 的 partition 数,从而会造成数据的丢失。

解决方案

我们只需要在每次生成 KafkaRDD 的时候,将 currentOffsets 修正为正常的值(往里面增加对应的 partition 数,总共 B-A 个,以及每个增加的 partition 的当前 offset 从零开始)。

  • 第一个问题出现了,我们不能修改 Spark 的源代码,重新进行编译,因为这不是我们自己维护的。想到的一种方案是继承 DirectKafkaInputDStream。我们发现不能继承 DirectKafkaInputDStream 该类,因为这个类是使用private[streaming]修饰的。
  • 第二个问题出现了,怎么才能够继承 DirectKafkaInputDStream,这时我们只需要将希望继承 DirectKafkaInputDStream 的类放到一个单独的文件 F 中,文件 F 使用package org.apache.spark.streaming进行修饰即可,这样可以绕过不能继承 DirectKafkaInputDStream 的问题。这个问题解决后,我们还需要修改Object KafkaUtils,让该 Object 内部调用我们修改后的 DirectKafkaInputDStream(我命名为 MTDirectKafkaInputDStream)
  • 第三个问题如何让 Spark 调用 HTDirectKafkaInputDStream,而不是 DirectKafkaInputDStream,这里我们使用简单粗暴的方式,将 KafkaUtils 的代码 copy 一份,然后将其中调用 DirectKafkaInputDStream 的部分都修改为 HTDirectKafkaInputDStream,这样就实现了我们的需要。当然该文件也需要使用package org.apache.spark.streaming进行修饰

总结下,我们需要做两件事

  1. 修改 DirectKafkaInputDStream#compute 使得能够自适应 topic 的 partition 变更
  2. 修改 KafkaUtils,使得我们能够调用修改过后的 DirectKafkaInputDStream

预置条件

compile (group: 'org.apache.spark', name: 'spark-core_2.10', version:'2.1.0')
compile (group: 'org.apache.spark', name: 'spark-streaming-kafka_2.10', version:'1.6.3'

代码

package org.apache.spark.streaming.kafka

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.{StreamingContext, Time}
import scala.collection.mutable.Set
import scala.reflect.ClassTag
class HTDirectKafkaInputDStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[K]: ClassTag,
T <: Decoder[V]: ClassTag,
R: ClassTag](
              @transient ssc_ : StreamingContext,
              val HTkafkaParams: Map[String, String],
              val HTfromOffsets: Map[TopicAndPartition, Long],
              messageHandler: MessageAndMetadata[K, V] => R
) extends DirectKafkaInputDStream[K, V, U, T, R](ssc_, HTkafkaParams , HTfromOffsets, messageHandler) {
  @transient private val logger = Logger.getLogger("HTDirectKafkaInputDStream")
  logger.setLevel(Level.INFO)
  private val kafkaBrokerList:String = HTkafkaParams.get("metadata.broker.list").get
  override def compute(validTime: Time) : Option[KafkaRDD[K, V, U, T, R]] = {
    /**
      * 在这更新 currentOffsets 从而做到自适应上游 partition 数目变化
      */
    updateCurrentOffsetForKafkaPartitionChange()
    super.compute(validTime)
  }

  val topic:Set[String] = Set()
  HTfromOffsets.keys.foreach(x=>{
    topic += x.topic
  })

  private def updateCurrentOffsetForKafkaPartitionChange() : Unit = {
    val parts = kc.getPartitions(topic.toSet).right.get
    val newPartitions = parts.diff(currentOffsets.keySet)
    if(!newPartitions.isEmpty) {
      logger.info(s"Old partition number:${currentOffsets.keys.size}, and now is: ${parts.size} ,updating currentOffsets...")
      currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 0l).toMap
    }
  }

}

在修改过后的 KafkaUtils 文件中,将所有的DirectKafkaInputDStream都替换为 HTDirectKafkaInputDStream即可


问题

项目Spark版本为2.0.0版本,然而在spark 1.5.2版本之后 org/apache/spark/Logging 已经被移除了(被移到org/apache/spark/internal/Logging)。由于spark-streaming-kafka 1.6.3版本中使用到了logging,所以会有找不到这个类的问题。

解决方法:copy spark-core包中org/apache/spark/internal/Logging修改类的修饰为package org.apache.spark

附:kafka Producer感知broker topic分区变化由topic.metadata.refresh.interval.ms参数决定的,默认10min

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇使用java代码连接不上kafka的解决.. 下一篇从Kafka日志拆分来看系统架构

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目