设为首页 加入收藏

TOP

kafka 源码分析4: broker 处理生产请求(六)
2018-05-22 08:53:15 】 浏览:761
Tags:kafka 源码 分析 broker 处理 生产 请求
producer state for ((producerId, producerAppendInfo) <- updatedProducers) { producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) producerStateManager.update(producerAppendInfo) } // update the transaction index with the true last stable offset. The last offset visible // to consumers using READ_COMMITTED will be limited by this value and the high watermark. for (completedTxn <- completedTxns) { val lastStableOffset = producerStateManager.completeTxn(completedTxn) segment.updateTxnIndex(completedTxn, lastStableOffset) } // always update the last producer id map offset so that the snapshot reflects the current offset // even if there isn't any idempotent data being written producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1) // increment the log end offset updateLogEndOffset(appendInfo.lastOffset + 1) // update the first unstable offset (which is used to compute LSO) updateFirstUnstableOffset() trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords)) // 如果超过了刷新间隔,则调用一次fsync if (unflushedMessages >= config.flushInterval) flush() appendInfo } } }

LogSegment.scala
追加record,如果追加的字节数超过一定大小则记录index、timeIndex

@nonthreadsafe
def append(firstOffset: Long,
         largestOffset: Long,
         largestTimestamp: Long,
         shallowOffsetOfMaxTimestamp: Long,
         records: MemoryRecords): Unit = {
if (records.sizeInBytes > 0) {
  trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"
      .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))
  val physicalPosition = log.sizeInBytes()
  if (physicalPosition == 0)
    rollingBasedTimestamp = Some(largestTimestamp)
  // append the messages
  require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")
  val appendedBytes = log.append(records)
  trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")
  // Update the in memory max timestamp and corresponding offset.
  if (largestTimestamp > maxTimestampSoFar) {
    maxTimestampSoFar = largestTimestamp
    offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
  }
  // append an entry to the index (if needed)
  if(bytesSinceLastIndexEntry > indexIntervalBytes) {
    index.append(firstOffset, physicalPosition)
    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
    bytesSinceLastIndexEntry = 0
  }
  bytesSinceLastIndexEntry += records.sizeInBytes
}

FileRecords.scala

public int append(MemoryRecords records) throws IOException {
    int written = records.writeFullyTo(channel);
    size.getAndAdd(written);
    return written;
}

通过FileChannel write到磁盘
MemoryRecords.scala

/**
 * Write all records to the given channel (including partial records).
 * @param channel The channel to write to
 * @return The number of bytes written
 * @throws IOException For any IO errors writing to the channel
 */
public int writeFullyTo(GatheringByteChannel channel) throws IOException {
    buffer.mark();
    int written = 0;
    while (written < sizeInBytes())
        written += channel.writ
首页 上一页 3 4 5 6 下一页 尾页 6/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇记一次内存溢出的分析经历 — thr.. 下一篇kafka源码分析4 : broker处理生产..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目