首先下载kafka:http://kafka.apache.org/downloads 注意下载的是Binary文件
进入config目录,打开server.properties文件,取消advertised.listeners一行的注解,配上自己的host;再确认一下zookeeper.connect一行的配置,根据实际情况调整。例如:
advertised.listeners=PLAINTEXT://localhost:9092
zookeeper.connect=localhost:2181
(同目录下的consumer.properties文件,可以设置一下group-id的值)
进入bin/windows目录,在该目录下打开命令提示符,使用命令启动自带的zookeeper:
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
接下来启动kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties
【注】可以不用启动kafka自带的zookeeper,自己另外下载一个zookeeper也没问题
创建一个springboot工程,引入相关maven依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
【注】请确认这两个依赖的版本对应关系,参考springboot关于kafka的官方文档
配置application.yml:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test-consumer-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
定义springboot启动类:
@SpringBootApplication
public class Application implements CommandLineRunner {
public static void main(String[] agrs){
SpringApplication.run(Application.class, agrs);
}
@Autowired
private KafkaTemplate kafkaTemplate;
@Override
public void run(String... args) throws Exception {
kafkaTemplate.send("ooxx","this is a message");
}
@KafkaListener(topics = "ooxx", groupId = "test-consumer-group")
public void listener(String message) {
System.out.println(">>>>>" + message);
}
}
OK,执行后,控制台输出
>>>>>this is a message