设为首页 加入收藏

TOP

Kafka 源码分析 5 :KafkaConsumer 消费处理(二)
2018-06-04 08:50:46 】 浏览:482
Tags:Kafka 源码 分析 KafkaConsumer 消费 处理
inator.poll(time.milliseconds(), timeout); // fetch positions if we have partitions we're subscribed to that we // don't know the offset for if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions()); // 如果已经有record数据了直接返回 // if data is available already, return it immediately Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) return records; // 发送一次fetch请求 // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); long now = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout); // 等待fetch请求结果 client.poll(pollTimeout, now, new PollCondition() { @Override public boolean shouldBlock() { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); } }); // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster if (coordinator.needRejoin()) return Collections.emptyMap(); // 返回fetch结果 return fetcher.fetchedRecords(); }


编程开发网
首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇使用 JITWatch 查看 JVM 的 JIT .. 下一篇RocketMQ 源码学习 2 : Namesrv

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }