设为首页 加入收藏

TOP

如何在Spring Boot v2.x 中 操作kafka (kafka v1.1.0)
2019-01-20 14:30:59 】 浏览:68
Tags:何在 Spring Boot v2.x 操作 kafka v1.1.0
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/bigtree_3721/article/details/81045854

概述

本文采用的是 spring boot 官方文档说的集成方式,具体见Apache Kafka Support.

思路是通过在 spring boot application.properties 中配置 生产者和消费者的基本信息,然后spring boot 启动后会创建 KafkaTemplate 对象,这个对象可以用来发送消息到Kafka,然后用 @KafkaListener 注解来消费 kafka 里面的消息。

版本信息:

spring boot: 2.0.3.RELEASE
spring-kafka: 2.1.7.RELEAS
kafka: 1.1.0 

提示:得先安装和配置zookper, kafka,并且让它们能正确运行起来。

Spring Boot 和 Spring for Apache Kafka 集成步骤

  1. 在pom.xml中引入 Spring Kafka依赖
<!-- kafka -->
  <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>2.1.7.RELEASE</version>
 </dependency>

然后 application.properties 配置文件中加入如下配置:

各个配置的解释见:spring boot 附录中的 kafka 配置,搜索kafka 关键字即可定位。

server.port=8090

####### kafka

### producer 配置
spring.kafka.producer.bootstrap-servers=192.168.10.48:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

### consumer 配置
spring.kafka.consumer.bootstrap-servers=192.168.10.48:9092
spring.kafka.consumer.group-id=anuoapp
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=5

创建 Kafka Producer 生产者

package com.example.anuoapp.kafka;

import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

@Component
public class KafkaProducer {
    @Autowired
    KafkaTemplate kafkaTemplate;

    public void kafkaSend() throws Exception {
        UserAccount userAccount=new UserAccount();
        userAccount.setCard_name("jk");
        userAccount.setAddress("cd");
        ListenableFuture send = kafkaTemplate.send("mytopic", "key", JSON.toJSONString(userAccount));
    }
}

创建 Kafka Consumer 消费者

package com.example.anuoapp.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {
    public static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    @KafkaListener(topics = {"mytopic"})
    public void jktopic(ConsumerRecord consumerRecord) throws InterruptedException {
        System.out.println(consumerRecord.offset());
        System.out.println(consumerRecord.value().toString());
        Thread.sleep(3000);
    }
}

创建一个rest api 来调用 Kafka 的消息生产者

package com.example.anuoapp.controller;

import com.example.anuoapp.kafka.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafkaAPI")
public class SystemController {
    private Logger logger = LoggerFactory.getLogger(SystemController.class);

    @Autowired
    KafkaProducer kafkaProducer;

    @RequestMapping(value = "/produce", method = RequestMethod.GET)
    public void WarnInfo() throws Exception {
        int count=10;
        for (int i = 0; i < count; i++) {
            kafkaProducer.kafkaSend();
        }
    }
}

输出如下:

{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}  
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}  
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇SpringCloud使用Kafka消息总线、K.. 下一篇Kafka储存位置解析

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目