设为首页 加入收藏

TOP

Spring Boot教程十二:集成Kafka
2019-02-18 02:26:52 】 浏览:98
Tags:Spring Boot 教程 十二 集成 Kafka
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wang_shuyu/article/details/80971090
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
  • Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
  • Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
  • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
  • Segment:partition物理上由多个segment组成,每个Segment存着message信息
  • Producer: 生产message发送到topic
  • Consumer: 订阅topic消费message, consumer作为一个线程来消费
  • Consumer Group:一个Consumer Group包含多个consumer, 这个是预先在配置文件中配置好的。各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group ) 中的一个consumer(consumer 线程 )消费,如果一个message可以被多个consumer(consumer 线程 ) 消费的话,那么这些consumer必须在不同的组。Kafka不支持一个partition中的message由两个或两个以上的consumer thread来处理,即便是来自不同的consumer group的也不行。它不能像AMQ那样可以多个BET作为consumer去处理message,这是因为多个BET去消费一个Queue中的数据的时候,由于要保证不能多个线程拿同一条message,所以就需要行级别悲观所(for update),这就导致了consume的性能下降,吞吐量不够。而kafka为了保证吞吐量,只允许一个consumer线程去访问一个partition。如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。

springboot项目中使用kafka的配置如下:

首先引入必须的依赖jar:

 <!-- springboot整合kafka -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>4.3.6.RELEASE</version>
            <classifier>sources</classifier>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.7.RELEASE</version>
        </dependency>

配置文件如下:

#============== kafka ===================
#kafka相关配置
spring.kafka.bootstrap-servers=39.105.104.132:9092
#设置一个默认组
spring.kafka.consumer.group-id=alarmTopic
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288


消息生产者:

/**
 * @author Shuyu.Wang
 * @package:com.ganinfo.kafka
 * @className:
 * @description:生产者
 * @date 2018-07-05 16:30
 **/
@Component
public class KafkaSender {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    /** * 发送消息到kafka */
    public void sendChannelMess(String channel, String message) {
        kafkaTemplate.send(channel,message);
    }
消息消费者:
/**
 * @author Shuyu.Wang
 * @package:com.ganinfo.kafka
 * @className:
 * @description:消费者
 * @date 2018-07-05 16:31
 **/
@Component
public class KafkaConsumer {
    @Autowired
    private SendMessageUtil sendMessageUtil;
    /**
     * 监听alarmTopic主题,有消息就读取 *
     * @param message
     */
    @KafkaListener(topics = {"alarmTopic"})
    public void receiveMessage(String message) {
        //收到通道的消息之后执行秒杀操作
        System.out.println("KafkaConsumer的订阅消息:"+message);
        sendMessageUtil.send("h1","1",message);
    }

}

测试类:

    @Autowired
    private KafkaSender kafkaSender;
    private static final String ALRAM_TOPIC = "alarmTopic";

    @Autowired
    private AlarmService alarmService;

    @ResponseBody
    @RequestMapping(value = "/kafka", method = RequestMethod.GET)
    public ApiResult getCarInout(@RequestParam(value = "refId") String refId, @RequestParam(value = "passType") Integer passType, @RequestParam(value = "type") Integer type) {
        ApiResult apiResult = new ApiResult();
        try {
            Map<String, Object> map = new HashMap<>();
            map.put("refId", refId);
            map.put("platformCode", "650106");
            map.put("passType", passType);
            map.put("type", type);
            map.put("level", 2);
            map.put("Gettime", 1000);
            kafkaSender.sendChannelMess(ALRAM_TOPIC, GsonUtil.GsonString(map));
            System.out.println("发送数据:" + GsonUtil.GsonString(map));
//            kafkaTemplate.send(ALRAM_TOPIC, "alarm", GsonUtil.GsonString(map));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return apiResult;
    }
kafka的相关代码就完成了,请求测试方法,就会在消费者的类中打印相关的参数了,另外可以通过插件查看kafka的相关主题和消费者情况。


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka安装之一 Zookeeper 下一篇Kafka针对大容量消息的优化

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目