设为首页 加入收藏

TOP

Springboot + kafka 集群 消费者 指定服务器监听
2019-04-23 14:24:43 】 浏览:53
Tags:Springboot kafka 集群 消费者 指定 服务器 监听

1.kafkaConfig

@Configuration
@EnableKafka
public class KafkaConfig {
    
    
    @Bean
    @Primary//默认
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }


    /**
     * 第二个服务器
     */

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryTwoSchedule() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryTwoSchedule());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactoryTwoSchedule() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigsTwoSchedule());
    }


    /**
     *
     *第二个消费者配置
     */
    @Bean
    public Map<String, Object> consumerConfigsTwoSchedule() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapDriverServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
}

2、消费者

@Component
public class KafkaConsumer {
    //第一个
    @KafkaListener(topics = "${one-topic}")
    public void odConsumer(ConsumerRecord<Integer, String> msg) {
        //消费了
         String payload = (String) record.value();
         log.info("收到原始数据{}", payload);
        // 解析数据
         Book book = JsonUtils.parseJsonToObj(payload, Book.class);
    }
    

    //第二个服务器
    @KafkaListener(topics = "${twoTopic}", containerFactory = "kafkaListenerContainerFactoryTwoSchedule")
    public void driverSheduleConsumer(ConsumerRecord<Integer, String> msg) {
         //消费了
       String payload = (String) record.value();
         log.info("收到原始数据{}", payload);
        // 解析数据
         Book book = JsonUtils.parseJsonToObj(payload, Book.class);
    }
}

3、基本实现了。。。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka单机伪分布式搭建 下一篇kafka文档(3)----0.8.2-kaf..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目