原文出处:
刘正阳
Kafka broker上对于produce生产者生产消息的处理
Kafka Server处理生成者请求
入口在KafkaApis.scala, 通过request.header.apikey判断消息类型
def handle(request: RequestChannel.Request) { try { trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s". format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal)) ApiKeys.forId(request.header.apiKey) match { case ApiKeys.PRODUCE => handleProduceRequest(request)
生产消息则调用replicaManager.appendRecords
// call the replica manager to append messages to the replicas replicaManager.appendRecords( timeout = produceRequest.timeout.toLong, requiredAcks = produceRequest.acks, internalTopicsAllowed = internalTopicsAllowed, isFromClient = true, entriesPerPartition = authorizedRequestInfo, responseCallback = sendResponseCallback) // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log produceRequest.clearPartitionRecords()
ReplicaManager.scala
appendRecords 先写消息到partition的leader上,如果requireAcks==-1说明需要所有isr都写入成功才返回response,而isr同样作为leader的消费者来拉取的
/** * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock. */ def appendRecords(timeout: Long, requiredAcks: Short, internalTopicsAllowed: Boolean, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] => Unit, delayedProduceLock: Option[Object] = None) { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, isFromClient = isFromClient, entriesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) val produceStatus = localProduceResults.map { case (topicPartition, result) => topicPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status } // 1. required acks = -1 // 2. there is data to append // 3. at least one partition append was successful (fewer errors than partitions) if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock) // create a list of (topic, partition) pairs to use as keys for this delayed produce operation val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq // try to complete the request immediately, otherwise put it into the purgatory // this is because while the delayed produce operation is being created, new // requests may arrive and he