inator.poll(time.milliseconds(), timeout);
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// 如果已经有record数据了直接返回
// if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
// 发送一次fetch请求
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
// 等待fetch请求结果
client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
});
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.needRejoin())
return Collections.emptyMap();
// 返回fetch结果
return fetcher.fetchedRecords();
}
|