设为首页 加入收藏

TOP

kafka 源码分析4: broker 处理生产请求(五)
2018-05-22 08:53:15 】 浏览:759
Tags:kafka 源码 分析 broker 处理 生产 请求
hile appending to log '%s'".format(name), e) } validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp appendInfo.lastOffset = offset.value - 1 if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) appendInfo.logAppendTime = now // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message // format conversion) if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { for (batch <- validRecords.batches.asScala) { if (batch.sizeInBytes > config.maxMessageSize) { // we record the original message set size instead of the trimmed size // to be consistent with pre-compression bytesRejectedRate recording brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d." .format(batch.sizeInBytes, config.maxMessageSize)) } } } } else { // we are taking the offsets we are given if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset) throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset)) } // update the epoch cache with the epoch stamped onto the message by the leader validRecords.batches.asScala.foreach { batch => if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } // check messages set size may be exceed config.segmentSize if (validRecords.sizeInBytes > config.segmentSize) { throw new RecordBatchTooLargeException("Message batch size is %d bytes which exceeds the maximum configured segment size of %d." .format(validRecords.sizeInBytes, config.segmentSize)) } // now that we have valid records, offsets assigned, and timestamps updated, we need to // validate the idempotent/transactional state of the producers and collect some metadata val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient) maybeDuplicate.foreach { duplicate => appendInfo.firstOffset = duplicate.firstOffset appendInfo.lastOffset = duplicate.lastOffset appendInfo.logAppendTime = duplicate.timestamp return appendInfo } // 如果segment满了则换一个新的segment // maybe roll the log if this segment is full val segment = maybeRoll(messagesSize = validRecords.sizeInBytes, maxTimestampInMessages = appendInfo.maxTimestamp, maxOffsetInMessages = appendInfo.lastOffset) val logOffsetMetadata = LogOffsetMetadata( messageOffset = appendInfo.firstOffset, segmentBaseOffset = segment.baseOffset, relativePositionInSegment = segment.size) // 由segment写入 segment.append(firstOffset = appendInfo.firstOffset, largestOffset = appendInfo.lastOffset, largestTimestamp = appendInfo.maxTimestamp, shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords) // update the
首页 上一页 2 3 4 5 6 下一页 尾页 5/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇记一次内存溢出的分析经历 — thr.. 下一篇kafka源码分析4 : broker处理生产..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目