设为首页 加入收藏

TOP

Spring中手动开启kafka监听.md
2019-05-02 02:30:18 】 浏览:54
Tags:Spring 手动 开启 kafka 监听 .md
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u014453515/article/details/86177716

0. 背景重现

最近搭建一个新项目,基于SpringBoot框架,使用Kafka做消息中间件。
使用@KafkaListener注解来创建一个消费者,实现对Kafka消息的消费。我计划的执行顺序是这样的:服务启动之后,创建Consumer实例,执行loadResourceConfig初始化方法,之后才开始消费Kafka的消息。
但是出现了一个问题:没有等loadResourceConfig方式执行完毕,@KafkaListener就开始消费消息了。
这显然不是我们期望的,下面是大概的代码:

@Component      
public  class Consumer{
    @PostConstruct
    private void loadResourceConfig () {//加载数据
        // 加载资源配置
     }
    /**
     * 接收数据处理
     * @param record
     */
    @KafkaListener(id = "device-data",topics = {"${DataTopic}"})
    public void listen(ConsumerRecord<String, > record) {
        Optional kafkaMessage = Optional.ofNullable(record.value());
        Optional<String> kafkaKey = Optional.ofNullable(record.key());
        if (kafkaKey.isPresent()) {
            Object value = kafkaMessage.get();
            String gatewayId = kafkaKey.get();
              //使用 加载的资源信息对数据进行处理
        }
    }
}

1.原因分析

@KafkaListener这个注解所标注的方法并没有在IOC容器中注册为Bean,而是会被注册在KafkaListenerEndpointRegistry中,KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean,具体可以看一下该类的源码,当然不是使用注解方式注册。
KafkaListenerEndpointRegistry注册完Kafka中的topic之后,就会自动启动监听容器,如此KafkaListener注解的方法就开始消费消息了。这个过程可能在自定义Bean创建完成之前执行。

知道了问题,以及原因,解决方法就比较简单了,我们只需要完成2点:
1.禁止KafkaListener自启动(AutoStartup)
2.手动启动单个Kafka的topic的监听

2.解决方法

@Component      
public    class Consumer{
	@Autowired
     KafkaManager  kafkaManager;
     @PostConstruct
     private void loadResourceConfig () {//加载数据
        // 加载资源配置
        kafkaManager.startListener();//开启topic的监听
    }

}


@Component
public class KafkaManager {
    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private ConsumerFactory consumerFactory;

    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止自动启动
        container.setAutoStartup(false);
        return container;
    }

    /**
     * 开启kafka监听
     */
    public void startListener() {
        if (!registry.getListenerContainer("device-data").isRunning()) {
            registry.getListenerContainer("device-data").start();
        }
        registry.getListenerContainer("device-data").resume();
   
}

上面的代码做了几件事:
1.使用ConsumerFactory 构建Kafka监听容器工厂ConcurrentKafkaListenerContainerFactory
2.Kafka监听容器工厂注册为Bean
3.禁止Kafka监听容器自动启动
4.在loadResourceConfig方法加载完成资源之后,调用startListener方法,手动启动Kafka容器监听。注意registry.getListenerContainer(“device-data”)的参数,就是 @KafkaListener注解中的id参数。
5.startListener中我们先判断容器是否运行(isRunning),如果没有则调用start方法启动。 resume方法是恢复运行。这样写的目的是,即便startListener多次执行,也没有问题。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka查看消费了多少条数据 下一篇如何从ActiveMQ平滑迁移到Kafka?

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目