设为首页 加入收藏

TOP

kafka消费者总结(三)
2019-01-20 14:36:59 】 浏览:142
Tags:kafka 消费者 总结

这次我们来总结一下kafka Consumer客户端从 brokers集群上拉取拉取消息的过程,首先看一下KafkaConsumer所以依赖的组件:


metadata 记录了集群的元数据信息

netClient表示consumer与kafka集群网络通信的组件

client 是对netClient的封装,引用了netClient实例

subscriptions 包含了两部分信息,SubscriptionType和TopicPartitionState,SubscriptionType是一个枚举类,表示订阅topic的方式,包括NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED这4种值,TopicPartitionState类表示topic分区的消费状态,有两个比较重要的字段position和lastStableOffset,position表示上次消费到的位置,也就是下次开始消费的位置,lastStableOffset表示最后一次提交的偏移量offset

coordinator是一个ConsumerCoordinator类,表示与kafka集群协调交互的节点,consumer的Rebalance和提交偏移量的操作都是通过coordinator来完成的

fetcher就是本次要介绍的主角,主要负责从kafka集群拉取消息

首先来看一下consumer负责拉取消息的方法poll的处理代码:

public ConsumerRecords<K, V> poll(long timeout) {

acquireAndEnsureOpen();
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup();
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}

timeout参数表示拉取消息的最大阻塞时间,这个方法主要是调用pollOnce拉取消息

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
client.maybeTriggerWakeup();
long startMs = time.milliseconds();
//触发协调器事件,保证 coordinator协调器存在并且消费者已经加入cunsumerGroup
coordinator.poll(startMs, timeout);
// 查找消费分区对应的potion信息,确保所有的分区都有对应的position信息,次方法是异步完成的
boolean hasAllFetchPositions = updateFetchPositions();
// 如果已经有拉取数据则立即返回数据
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
// 发送拉取数据请求
fetcher.sendFetches();
long nowMs = time.milliseconds();
long remainingTimeMs = Math.max(0, timeout - (nowMs - startMs));
long pollTimeout = Math.min(coordinator.timeToNextPoll(nowMs), remainingTimeMs);
//如果有分区的positon不存在,则需要通过网络请求获取
if (!hasAllFetchPositions && pollTimeout > retryBackoffMs)
pollTimeout = retryBackoffMs;
client.poll(pollTimeout, nowMs, new PollCondition() {
@Override
public boolean shouldBlock() {
//如果已经有完成的拉取到的数据,则次方法无需阻塞
return !fetcher.hasCompletedFetches();
}
});
//检查是否需要rebalance
if (coordinator.needRejoin())
return Collections.emptyMap();
return fetcher.fetchedRecords();
}

重点关注一下fetcher.sendFetches()这个方法,其处理流程如下:


上面流程只是对Fetcher对象拉取数据的一个大致流程做了介绍,里面涉及到很多异步网络通信,异步回调等方法

下一节我们将对consumer做一个整体的总结并讨论如何实现多线程消费



】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka_Kafka的消息消费机制 下一篇kafka+zookeeper伪集群搭建

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目