设为首页 加入收藏

TOP

spirngboot 配置kafka实现group订阅消息
2019-04-24 02:34:18 】 浏览:97
Tags:spirngboot 配置 kafka 实现 group 订阅 消息

本人所使用的kafka版本是kafka_2.11-2.0.1版本,jdk1.8、zookeeper-3.3.6,kafka运行于JVM环境,依赖zookeeper注册中心,所以需要准备这三个软件,本文测试使用的是windows版本的kafka和zookeeper便于测试

环境搭建 步骤

1、搭建jdk环境,配置环境变量(省略,请自行百度搜索)

2、zookeeper环境配置,修改zookeeper配置文件

将zoo_sample.cfg 修改为zoo.cfg文件,修改zookeeper的访问端口

启动zookeeper,在bin目录下zkServer.cmd双击即可

3.1 下载安装文件: http://kafka.apache.org/downloads.html
3.2 解压文件(本文解压到 E:\worksoft\kafka_2.11-2.0.1)
3.3 打开E:\worksoft\kafka_2.11-2.0.1\config
3.4 从文本编辑器里打开 server.properties
3.5 把 log.dirs的值改成 “E:\worksoft\kafka_2.11-2.0.1\kafka-logs”
3.6 打开cmd
3.7 进入kafka文件目录: cd /d E:\worksoft\kafka_2.11-2.0.1\
3.8 输入并执行以打开kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties

4. 创建topics
4.1 打开cmd 并进入E:\worksoft\kafka_2.11-2.0.1\bin\windows
4.2 创建一个topic:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

5. 打开一个Producer:
cd /d E:\worksoft\kafka_2.11-2.0.1\bin\windows
kafka-console-producer.bat --broker-list localhost:9092 --topic test

6. 打开一个Consumer:
cd /d E:\worksoft\kafka_2.11-2.0.1\bin\windows
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

查看kafka中的所有组

./kafka-consumer-groups.bat --bootstrap-server 127.0.0.1:9092 --list

查看某个consumer组的消费堆积量

kafka-consumer-groups.bat --bootstrap-server 127.0.0.1:9092 --group lt_datacenter_processor_kafka_to_len_consumer --describe

查看kafka某个topic下partition信息

kafka-topics.bat --zookeeper 127.0.0.1:2181 --topic lenovo --describe

springboot集成kafka。通过在启动类上开启注解方式启动

pom文件里面添加kafka依赖jar包

<!-- 提供kafka核心工具包 -->
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.3.5.RELEASE</version>
</dependency>

<!-- 提供AdminClient管理 AdminClientConfig-->
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>1.0.1</version>
</dependency>
/**
 * 自动启用 Kafka
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(KafkaConsumerConfiguration.class)
@Documented
public @interface EnableKafkaConsumer {
}
@EnableKafka
public class KafkaConsumerConfiguration {
    public static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfiguration.class);

    /**
     * 引用kafkaProperties属性类
     */
    @Resource
    private KafkaProperties kafkaProperties;


    /**
     * 消费者线程使用自定义线程池
     *
     * @return
     */
    @Bean(name = "consumerTaskExecutor")
    public ThreadPoolTaskExecutor consumerTaskExecutor() {


        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //线程池活跃的线程数
        executor.setCorePoolSize(20);
        //executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        //线程池最大活跃的线程数
        executor.setMaxPoolSize(200);
        //线程名称前缀
        executor.setThreadNamePrefix("kafkaThread-C-");
        //线程池维护线程所允许的空闲时间
        executor.setKeepAliveSeconds(30 * 60);
        //核心线程池也会请0
        executor.setAllowCoreThreadTimeOut(true);
        //线程池拒绝机制,抛弃最久的线程
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //executor.setWaitForTasksToCompleteOnShutdown(true);
        //线程池队列
        executor.setQueueCapacity(10000);
        executor.initialize();

        return executor;
    }


    /**
     * 重写设置自定义线程池
     *
     * @param <K>
     * @param <V>
     */
    public class CustomConcurrentKafkaListenerContainerFactory<K, V> extends ConcurrentKafkaListenerContainerFactory<K, V> {

        /**
         * The executor for threads that poll the consumer.
         */
        private AsyncListenableTaskExecutor consumerTaskExecutor;


        public CustomConcurrentKafkaListenerContainerFactory(AsyncListenableTaskExecutor consumerTaskExecutor) {
            this.consumerTaskExecutor = consumerTaskExecutor;
        }

        @Override
        protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance) {
            super.initializeContainer(instance);
            instance.getContainerProperties().setConsumerTaskExecutor(consumerTaskExecutor);
        }
    }

    /**
     * 注入消费者监听器
     *
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<, > kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new CustomConcurrentKafkaListenerContainerFactory<Object, Object>(consumerTaskExecutor());

        if (null != kafkaProperties.getListener().getConcurrency()) {
            factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
        }
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }
 
    /**
     * 注入消费者工厂
     *
     * @return
     */
    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }

    /**
     * 注入消费者
     *
     * @return
     */
    @Bean
    public KafkaConsumer<Object, Object> consumer() {

        log.info("<<<<<<<<<<<<<<< 加载 KafkaConsumer 服务 >>>>>>>>>>>>>>>>>>");

        return new KafkaConsumer<>(kafkaProperties.buildConsumerProperties());
    }


    /**
     * Admin管理配置
     *
     * @return
     */
    @Bean
    public KafkaAdmin kafkaAdmin() {

        log.info("<<<<<<<<<<<<<<< 加载 KafkaAdmin 服务 >>>>>>>>>>>>>>>>>>");

        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        // 使用内置的Kafka
        //configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
        return new KafkaAdmin(configs);
    }

}
@Component
public class KafkaConsumerUtils {

    private static KafkaConsumer consumer;

    public KafkaConsumerUtils() {
    }

    @Autowired
    public KafkaConsumerUtils(KafkaConsumer kafkaConsumer) {
        KafkaConsumerUtils.consumer = kafkaConsumer;
        System.out.println("kafka消费者初始化完成...");
    }

    private static volatile KafkaConsumerUtils instance;

    /**
     * 单例模式,线程安全
     *
     * @return
     * @throws Exception
     */
    public static KafkaConsumerUtils getIstance() {
        // 对象实例化时与否判断(不使用同步代码块,instance不等于null时,直接返回对象,提高运行效率)
        if (instance == null) {
            //同步代码块(对象未初始化时,使用同步代码块,保证多线程访问时对象在第一次创建后,不再重复被创建)
            synchronized (KafkaConsumerUtils.class) {
                //未初始化,则初始instance变量
                if (instance == null) {
                    instance = new KafkaConsumerUtils();
                    //instance.afterPropertySet();//初始化配置和Pool池
                }
            }
        }
        return instance;
    }

}

spring:
  kafka:
      bootstrap-servers: 127.0.0.1:9092 #zookeeper配置地址
#      client-id: wi_datacenter_processor #自定义客户端,并发设置此项会造成异常Error registering AppInfo mbean
      listener:
        #ack-mode: batch # 批量提交Offset
        #ack-time:
#        poll-timeout: 5000 #pool Time
      consumer:
        group-id: lt_datacenter_processor_lenovo
        auto-offset-reset: earliest
        auto-commit-interval: 1000
        enable-auto-commit: true #手动提交Offset
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        max-poll-records: 5 # 每次返回100条数据
      producer:
        retries: 0
        batch-size: 16384
        buffer-memory: 33554432
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer

import cn.litsoft.configuration.SpringContextUtil;
import cn.litsoft.configuration.kafka.KafkaConsumerConfiguration;
import cn.litsoft.iot.ailog.constant.ConstantGlobal;
import cn.litsoft.iot.common.utils.thread.ThreadUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;

/**
 * trs数据消费kafka
 */
@Component
@ConditionalOnProperty(name = "media.kafka.consumer.enabled", havingValue = "true") // 开启注解才会启动
public class KafaConsumerManual implements CommandLineRunner {
    public static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfiguration.class);

    public KafaConsumerManual() {
        log.info("初始化:LenovoKafKaContentConsumer");
    }


    @Autowired
    private KafkaProperties kafkaProperties;

    public KafkaConsumer kafkaConsumer() {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, ConstantGlobal.KAFKA_TO_ES_CONSUMER_GROUP_ID);
        return new KafkaConsumer(props);
    }

    /**
     * 简短任务使用默认线程池处理
     */
    @Resource(name = "manualTaskExecutor")
    private ThreadPoolTaskExecutor taskExecutor;

    @Override
    public void run(String... strings) {

        KafkaConsumer consumer = kafkaConsumer();
        taskExecutor.execute(new ThreadRunnable(consumer));// 丢 到线程池里面去运行
    }


    public class ThreadRunnable implements Runnable {

        private KafkaConsumer consumer;

        public ThreadRunnable(KafkaConsumer consumer) {
            this.consumer = consumer;
        }

        @Override
        public void run() {

            // 订阅主题
            consumer.subscribe(Arrays.asList("lenovo"));
            ThreadPoolTaskExecutor taskExecutor = SpringContextUtil.getBean("lkTaskExecutor");//线程池子

            //死循环不停的从broker中拿数据
            ConsumerRecords<String, String> list = null;
            while (true) {
                list = consumer.poll(100);

                if (null == list || list.count() == 0) {
                    log.info("业务【KafaConsumerManual】取出数据为空,休息一段时间进行等待");
                    ThreadUtil.sleep(60 * 1000);
                    continue;
                }

                log.info("业务【KafaConsumerManual】取出数据[{}]条", list.count());

                final CountDownLatch countDownLatch = new CountDownLatch(list.count());

                list.forEach(record -> taskExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        Optional<> kafkaMessage = Optional.ofNullable(record.value());
                        if (kafkaMessage.isPresent()) {
                            try {
                                System.out.println("获取到kafka消息   :"+kafkaMessage.get().toString());
//                                HashMap hashMap = (HashMap) JsonHelper.jsonToMap(kafkaMessage.get().toString());

                            } catch (Exception ex) {
                                ThreadUtil.sleep(1000);//暂停一会,防止异常信息刷屏
                                log.error("[KafaConsumerManual]发生异常:[{}]", ex);
                            } finally {
                                countDownLatch.countDown();
                            }

                        } else {
                            countDownLatch.countDown();
                        }
                    }
                }));

                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    consumer.commitAsync();
                }
            }

        }
    }
}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark streaming整合kafka的两种.. 下一篇Kafka中的消费者组(Consumer Gro..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目