设为首页 加入收藏

TOP

kafka 源码分析4: broker 处理生产请求(二)
2018-05-22 08:53:15 】 浏览:760
Tags:kafka 源码 分析 broker 处理 生产 请求
nce make this operation completable. delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) } else { // we can respond immediately val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) responseCallback(produceResponseStatus) } } else { // If required.acks is outside accepted range, something is wrong with the client // Just return an error and don't handle the request at all val responseStatus = entriesPerPartition.map { case (topicPartition, _) => topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS, LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP) } responseCallback(responseStatus) } }

追加消息到本地log中

/**
  * Append the messages to the local replica logs
  */
 private def appendToLocalLog(internalTopicsAllowed: Boolean,
                              isFromClient: Boolean,
                              entriesPerPartition: Map[TopicPartition, MemoryRecords],
                              requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
   trace("Append [%s] to local log ".format(entriesPerPartition))
   entriesPerPartition.map { case (topicPartition, records) =>
     brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
     brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
     // reject appending to internal topics if it is not allowed
     if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
       (topicPartition, LogAppendResult(
         LogAppendInfo.UnknownLogAppendInfo,
         Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
     } else {
       try {
         val partitionOpt = getPartition(topicPartition)
         val info = partitionOpt match {
           case Some(partition) =>
             if (partition eq ReplicaManager.OfflinePartition)
               throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")
             partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
           case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
             .format(topicPartition, localBrokerId))
         }
         val numAppendedMessages =
           if (info.firstOffset == -1L || info.lastOffset == -1L)
             0
           else
             info.lastOffset - info.firstOffset + 1
         // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
         brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
         brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)
         brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
         brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)
         trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
           .format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset))
         (topicPartition, LogAppendResult(info))
       } catch {
         // NOTE: Failed produce requests metric is not incremented for known exceptions
         // it is supposed to indicate un-expected failures of a broker in handling a produce request
         case e@ (_: UnknownTopicOrPartitionException |
                  _: NotLeaderForPartitionException |
                  _: RecordTooLargeExcep
首页 上一页 1 2 3 4 5 6 下一页 尾页 2/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇记一次内存溢出的分析经历 — thr.. 下一篇kafka源码分析4 : broker处理生产..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目