设为首页 加入收藏

TOP

kafka源码分析4 : broker处理生产请求(一)
2018-05-22 08:53:14 】 浏览:672
Tags:kafka 源码 分析 broker 处理 生产 请求

Kafka broker上对于produce生产者生产消息的处理

Kafka Server处理生成者请求

入口在KafkaApis.scala, 通过request.header.apikey判断消息类型

def handle(request: RequestChannel.Request) {
try {
  trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
    format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
  ApiKeys.forId(request.header.apiKey) match {
    case ApiKeys.PRODUCE => handleProduceRequest(request)

生产消息则调用replicaManager.appendRecords

// call the replica manager to append messages to the replicas
  replicaManager.appendRecords(
    timeout = produceRequest.timeout.toLong,
    requiredAcks = produceRequest.acks,
    internalTopicsAllowed = internalTopicsAllowed,
    isFromClient = true,
    entriesPerPartition = authorizedRequestInfo,
    responseCallback = sendResponseCallback)
  // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
  // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log
  produceRequest.clearPartitionRecords()

ReplicaManager.scala
appendRecords 先写消息到partition的leader上,如果requireAcks==-1说明需要所有isr都写入成功才返回response,而isr同样作为leader的消费者来拉取的

/**
  * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
  * the callback function will be triggered either when timeout or the required acks are satisfied;
  * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
  */
 def appendRecords(timeout: Long,
                   requiredAcks: Short,
                   internalTopicsAllowed: Boolean,
                   isFromClient: Boolean,
                   entriesPerPartition: Map[TopicPartition, MemoryRecords],
                   responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                   delayedProduceLock: Option[Object] = None) {
   if (isValidRequiredAcks(requiredAcks)) {
     val sTime = time.milliseconds
     val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
       isFromClient = isFromClient, entriesPerPartition, requiredAcks)
     debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
     val produceStatus = localProduceResults.map { case (topicPartition, result) =>
       topicPartition ->
               ProducePartitionStatus(
                 result.info.lastOffset + 1, // required offset
                 new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
     }
     // 1. required acks = -1
     // 2. there is data to append
     // 3. at least one partition append was successful (fewer errors than partitions)
     if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
       // create delayed produce operation
       val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
       val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
       // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
       val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
       // try to complete the request immediately, otherwise put it into the purgatory
       // this is because while the delayed produce operation is being created, new
       // requests may arrive and he
首页 上一页 1 2 3 4 5 6 下一页 尾页 1/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka 源码分析4: broker 处理生.. 下一篇 java有什么用

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目