设为首页 加入收藏

TOP

KafkaController 分区Rebalance平衡机制
2018-12-12 02:20:19 】 浏览:668
Tags:KafkaController分区 Rebalance 平衡 机制
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhanglh046/article/details/72822013

private def checkAndTriggerPartitionRebalance(): Unit = {
if (isActive()) {
trace("checking need to trigger partition rebalance")
// 获取(存活的broker,AR副本集) => (2,Map([message,0]-> List(2, 0), [hadoop,0] -> List(2, 1)))
var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
inLock(controllerContext.controllerLock) {
preferredReplicasForTopicsByBrokers=
controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
case(topicAndPartition, assignedReplicas) => assignedReplicas.head
}
}
debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
// 过滤每一个存活的broker,检查是否需要一个preferredreplica 选举被触发
preferredReplicasForTopicsByBrokers.foreach {
case(leaderBroker, topicAndPartitionsForBroker) => {
var imbalanceRatio: Double = 0
var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
inLock(controllerContext.controllerLock) {
// 我们知道,正常情况下,brokerAR副本集第一个副本(preferred replica )就是leader
//
如果leader不是preferred replica,比如 Leader : 0 ISR[2,0]
//
我们需要过滤出这种topicPartition,然后我们好进行在平衡
topicsNotInPreferredReplica=
topicAndPartitionsForBroker.filter {
case(topicPartition, replicas) => {
controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
}
}
debug("topics not in preferred replica " + topicsNotInPreferredReplica)
// broker AR副本数量
val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
// 过滤出的leader不是AR副本集的preferred replica的数量
val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
// 计算(过滤出的leader不是AR副本集的preferred replica的数量)/(broker AR副本数量)不平衡比例
imbalanceRatio= totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
}
// 如果比例大于我们配置的leader.imbalance.per.broker.percentage参数,比如50%,就触发这个topic partitions的再平衡操作
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
topicsNotInPreferredReplica.foreach {
case(topicPartition, replicas) => {
inLock(controllerContext.controllerLock) {
// 首先确保broker存活,而且没有分区正在重新分配或者没有进行preferredreplica 选举,且没有分区将被删除
if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
controllerContext.partitionsBeingReassigned.isEmpty &&
controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
controllerContext.allTopics.contains(topicPartition.topic)) {
// 然后真正触发Prederred Replica选举操作
onPreferredReplicaElection(Set(topicPartition), true)
}
}
}
}
}
}
}
}
}


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka生产者消费者API 与sparkStr.. 下一篇kafka数据丢失的原因

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目