Kafka 源码分析 5 :KafkaConsumer 消费处理(二)
2018-06-04 08:50:46 】 浏览:674次 本网站的内容取自网络,仅供学习参考之用,绝无侵犯任何人知识产权之意。如有侵犯请您及时与本人取得联系,万分感谢。
Tags:Kafka 源码 分析 KafkaConsumer 消费 处理

inator.poll(time.milliseconds(), timeout); // fetch positions if we have partitions we're subscribed to that we // don't know the offset for if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions()); // 如果已经有record数据了直接返回 // if data is available already, return it immediately Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) return records; // 发送一次fetch请求 // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); long now = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout); // 等待fetch请求结果 client.poll(pollTimeout, now, new PollCondition() { @Override public boolean shouldBlock() { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); } }); // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster if (coordinator.needRejoin()) return Collections.emptyMap(); // 返回fetch结果 return fetcher.fetchedRecords(); }


Kafka 源码分析 5 :KafkaConsumer 消费处理(二)

首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇使用 JITWatch 查看 JVM 的 JIT .. 下一篇RocketMQ 源码学习 2 : Namesrv


验 证 码:
表  情:
内  容: