本人所使用的kafka版本是kafka_2.11-2.0.1版本,jdk1.8、zookeeper-3.3.6,kafka运行于JVM环境,依赖zookeeper注册中心,所以需要准备这三个软件,本文测试使用的是windows版本的kafka和zookeeper便于测试
环境搭建 步骤
1、搭建jdk环境,配置环境变量(省略,请自行百度搜索)
2、zookeeper环境配置,修改zookeeper配置文件
将zoo_sample.cfg 修改为zoo.cfg文件,修改zookeeper的访问端口
启动zookeeper,在bin目录下zkServer.cmd双击即可
3.1 下载安装文件: http://kafka.apache.org/downloads.html
3.2 解压文件(本文解压到 E:\worksoft\kafka_2.11-2.0.1)
3.3 打开E:\worksoft\kafka_2.11-2.0.1\config
3.4 从文本编辑器里打开 server.properties
3.5 把 log.dirs的值改成 “E:\worksoft\kafka_2.11-2.0.1\kafka-logs”
3.6 打开cmd
3.7 进入kafka文件目录: cd /d E:\worksoft\kafka_2.11-2.0.1\
3.8 输入并执行以打开kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties
4. 创建topics
4.1 打开cmd 并进入E:\worksoft\kafka_2.11-2.0.1\bin\windows
4.2 创建一个topic:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
5. 打开一个Producer:
cd /d E:\worksoft\kafka_2.11-2.0.1\bin\windows
kafka-console-producer.bat --broker-list localhost:9092 --topic test
6. 打开一个Consumer:
cd /d E:\worksoft\kafka_2.11-2.0.1\bin\windows
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
查看kafka中的所有组
./kafka-consumer-groups.bat --bootstrap-server 127.0.0.1:9092 --list
查看某个consumer组的消费堆积量
kafka-consumer-groups.bat --bootstrap-server 127.0.0.1:9092 --group lt_datacenter_processor_kafka_to_len_consumer --describe
查看kafka某个topic下partition信息
kafka-topics.bat --zookeeper 127.0.0.1:2181 --topic lenovo --describe
springboot集成kafka。通过在启动类上开启注解方式启动
pom文件里面添加kafka依赖jar包
<!-- 提供kafka核心工具包 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
<!-- 提供AdminClient管理 AdminClientConfig-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1</version>
</dependency>
/**
* 自动启用 Kafka
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(KafkaConsumerConfiguration.class)
@Documented
public @interface EnableKafkaConsumer {
}
@EnableKafka
public class KafkaConsumerConfiguration {
public static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfiguration.class);
/**
* 引用kafkaProperties属性类
*/
@Resource
private KafkaProperties kafkaProperties;
/**
* 消费者线程使用自定义线程池
*
* @return
*/
@Bean(name = "consumerTaskExecutor")
public ThreadPoolTaskExecutor consumerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//线程池活跃的线程数
executor.setCorePoolSize(20);
//executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
//线程池最大活跃的线程数
executor.setMaxPoolSize(200);
//线程名称前缀
executor.setThreadNamePrefix("kafkaThread-C-");
//线程池维护线程所允许的空闲时间
executor.setKeepAliveSeconds(30 * 60);
//核心线程池也会请0
executor.setAllowCoreThreadTimeOut(true);
//线程池拒绝机制,抛弃最久的线程
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//executor.setWaitForTasksToCompleteOnShutdown(true);
//线程池队列
executor.setQueueCapacity(10000);
executor.initialize();
return executor;
}
/**
* 重写设置自定义线程池
*
* @param <K>
* @param <V>
*/
public class CustomConcurrentKafkaListenerContainerFactory<K, V> extends ConcurrentKafkaListenerContainerFactory<K, V> {
/**
* The executor for threads that poll the consumer.
*/
private AsyncListenableTaskExecutor consumerTaskExecutor;
public CustomConcurrentKafkaListenerContainerFactory(AsyncListenableTaskExecutor consumerTaskExecutor) {
this.consumerTaskExecutor = consumerTaskExecutor;
}
@Override
protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance) {
super.initializeContainer(instance);
instance.getContainerProperties().setConsumerTaskExecutor(consumerTaskExecutor);
}
}
/**
* 注入消费者监听器
*
* @return
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<, > kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new CustomConcurrentKafkaListenerContainerFactory<Object, Object>(consumerTaskExecutor());
if (null != kafkaProperties.getListener().getConcurrency()) {
factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
}
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
/**
* 注入消费者工厂
*
* @return
*/
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
/**
* 注入消费者
*
* @return
*/
@Bean
public KafkaConsumer<Object, Object> consumer() {
log.info("<<<<<<<<<<<<<<< 加载 KafkaConsumer 服务 >>>>>>>>>>>>>>>>>>");
return new KafkaConsumer<>(kafkaProperties.buildConsumerProperties());
}
/**
* Admin管理配置
*
* @return
*/
@Bean
public KafkaAdmin kafkaAdmin() {
log.info("<<<<<<<<<<<<<<< 加载 KafkaAdmin 服务 >>>>>>>>>>>>>>>>>>");
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
// 使用内置的Kafka
//configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
return new KafkaAdmin(configs);
}
}
@Component
public class KafkaConsumerUtils {
private static KafkaConsumer consumer;
public KafkaConsumerUtils() {
}
@Autowired
public KafkaConsumerUtils(KafkaConsumer kafkaConsumer) {
KafkaConsumerUtils.consumer = kafkaConsumer;
System.out.println("kafka消费者初始化完成...");
}
private static volatile KafkaConsumerUtils instance;
/**
* 单例模式,线程安全
*
* @return
* @throws Exception
*/
public static KafkaConsumerUtils getIstance() {
// 对象实例化时与否判断(不使用同步代码块,instance不等于null时,直接返回对象,提高运行效率)
if (instance == null) {
//同步代码块(对象未初始化时,使用同步代码块,保证多线程访问时对象在第一次创建后,不再重复被创建)
synchronized (KafkaConsumerUtils.class) {
//未初始化,则初始instance变量
if (instance == null) {
instance = new KafkaConsumerUtils();
//instance.afterPropertySet();//初始化配置和Pool池
}
}
}
return instance;
}
}
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092 #zookeeper配置地址
# client-id: wi_datacenter_processor #自定义客户端,并发设置此项会造成异常Error registering AppInfo mbean
listener:
#ack-mode: batch # 批量提交Offset
#ack-time:
# poll-timeout: 5000 #pool Time
consumer:
group-id: lt_datacenter_processor_lenovo
auto-offset-reset: earliest
auto-commit-interval: 1000
enable-auto-commit: true #手动提交Offset
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 5 # 每次返回100条数据
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
import cn.litsoft.configuration.SpringContextUtil;
import cn.litsoft.configuration.kafka.KafkaConsumerConfiguration;
import cn.litsoft.iot.ailog.constant.ConstantGlobal;
import cn.litsoft.iot.common.utils.thread.ThreadUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
/**
* trs数据消费kafka
*/
@Component
@ConditionalOnProperty(name = "media.kafka.consumer.enabled", havingValue = "true") // 开启注解才会启动
public class KafaConsumerManual implements CommandLineRunner {
public static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfiguration.class);
public KafaConsumerManual() {
log.info("初始化:LenovoKafKaContentConsumer");
}
@Autowired
private KafkaProperties kafkaProperties;
public KafkaConsumer kafkaConsumer() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, ConstantGlobal.KAFKA_TO_ES_CONSUMER_GROUP_ID);
return new KafkaConsumer(props);
}
/**
* 简短任务使用默认线程池处理
*/
@Resource(name = "manualTaskExecutor")
private ThreadPoolTaskExecutor taskExecutor;
@Override
public void run(String... strings) {
KafkaConsumer consumer = kafkaConsumer();
taskExecutor.execute(new ThreadRunnable(consumer));// 丢 到线程池里面去运行
}
public class ThreadRunnable implements Runnable {
private KafkaConsumer consumer;
public ThreadRunnable(KafkaConsumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
// 订阅主题
consumer.subscribe(Arrays.asList("lenovo"));
ThreadPoolTaskExecutor taskExecutor = SpringContextUtil.getBean("lkTaskExecutor");//线程池子
//死循环不停的从broker中拿数据
ConsumerRecords<String, String> list = null;
while (true) {
list = consumer.poll(100);
if (null == list || list.count() == 0) {
log.info("业务【KafaConsumerManual】取出数据为空,休息一段时间进行等待");
ThreadUtil.sleep(60 * 1000);
continue;
}
log.info("业务【KafaConsumerManual】取出数据[{}]条", list.count());
final CountDownLatch countDownLatch = new CountDownLatch(list.count());
list.forEach(record -> taskExecutor.execute(new Runnable() {
@Override
public void run() {
Optional<> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
try {
System.out.println("获取到kafka消息 :"+kafkaMessage.get().toString());
// HashMap hashMap = (HashMap) JsonHelper.jsonToMap(kafkaMessage.get().toString());
} catch (Exception ex) {
ThreadUtil.sleep(1000);//暂停一会,防止异常信息刷屏
log.error("[KafaConsumerManual]发生异常:[{}]", ex);
} finally {
countDownLatch.countDown();
}
} else {
countDownLatch.countDown();
}
}
}));
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumer.commitAsync();
}
}
}
}
}