TOP

kafka设置监听器的启动和禁用
2019-03-23 14:32:51 】 浏览:634
Tags:kafka 设置 监听 启动 禁用

拿个需求是要对kafka监听设置动态配置...

开关定义kafka是否启用;

但是在使用kafka的时候,springcloud集成 的kafka就是直接添加注解,启用监听就可以了...但是监听不监听,有点儿难...

    @KafkaListener(topics = {Constants.KAFKA_TOPIC_CONSUMER_4G_ZM}, containerFactory = "kafkaListenerContainerFactory")
    public void listen4GZM(List<ConsumerRecord> records, Acknowledgment ack) {
        try {
            List<TDatadetectInfoEntity> datadetectInfoEntityList4G = new ArrayList<>();
            List<TDatadetectInfoErrEntity> datadetectInfoErrEntityList4G = new ArrayList<>();
            //一个循环就提交了;list大于500提交一次;循环最后一次提交一次;
            int size = records.size();
            logger.info("收到的kafka4G帧码信息的数量为 :" + size);
            for (ConsumerRecord record : records) {
                size--;
                Optional<> kafkaMessage = Optional.ofNullable(record.value());
                if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    String messagestr = message.toString();

就到这里吧,下面的估计就涉及到公司的机密了..........

KafkaListener 监听的topics是可配的,常量类中读取,然后就是containerFactory中做文章了....

kafak消费的配置信息类

package com.trs.idap.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Value("${spring.data.kafka.consumer.zookeeper-connect}")
    private String zookeeper;
    @Value("${spring.data.kafka.consumer.servers}")
    private String servers;
    @Value("${spring.data.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${spring.data.kafka.consumer.session-timeout}")
    private String sessionTimeout;
    @Value("${spring.data.kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
    @Value("${spring.data.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.data.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.data.kafka.consumer.concurrency}")
    private int concurrency;
    @Value("${spring.data.kafka.consumer.maxpollrecordsconfig}")
    private int maxPollRecordsConfig;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

        return factory;
    }
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>(9);
        //propsMap.put("zookeeper.connect", zookeeper);

        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);//每个批次获取数
        return propsMap;
    }
}

对kafka set了一堆属性

factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

当然再仔细翻api会发现,还有一个

setAutoStartup

该属性是默认kafka监听是否开启,默认是true状态,开启;

所以我们可以在这个属性上做文章;

从属性文件中获取值,然后set给setAutoStartup(Boolean),这样是否启用监听就是属性文件的配置;

一行代码,一下午...............

好用的blog:https://blog.csdn.net/lp284558195/article/details/80297208


kafka设置监听器的启动和禁用 https://www.cppentry.com/bencandy.php?fid=120&id=215036

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka-topics.sh --describe &nbs.. 下一篇0.10版本后的kafka配置producer和..