设为首页 加入收藏

TOP

关于 springboot 集成 kafka 单机部署的尝试
2019-05-02 02:31:42 】 浏览:49
Tags:关于 springboot 集成 kafka 单机 部署 尝试

关于 springboot + kafka 单机部署的尝试

本文主要内容来自以下参考链接,表示感谢,内容仅供个人学习参考,留作笔记,方便日后学习:

一、安装环境:
1.安装 jdk
2.安装 Zookeeper
使用安装包安装,下载地址:http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/
进入目录下 /usr/local/wanger/,解压安装包
进入 conf 目录,将zoo_sample.cfg 重命名为 zoo.cfg
zoo.cfg 文件中这两个配置可以根据自己修改 clientPort=2181
dataDir=/tmp/zookeeper
启动 zookeeper:进入 bin 目录:执行命令 ./zkServer.sh start
停止:./zkServer.sh stop
重启:./zkServer.sh restart
查看状态:./zkServer.sh status
3.安装 kafka
使用安装包安装,下载地址:http://kafka.apache.org/downloads
进入目录下 /usr/local/wanger/,解压安装包
重命名: mv kafka_2.10-0.10.2.1 kafka
在 kafka 目录下创建: mkdir logs
进入 conf kafka 的 conf 目录下:
vi server.properties
修改:
broker.id=0
#端口号 默认 9092
zookeeper.connect=localhost:2181 # zookeeper 地址和端口,单机配置部署,192.168.168.128:2181
log.dirs=/usr/local/wanger/kafka/logs
host.name=192.168.168.128
4.启动
(1)启动 zookeeper
(2)启动 kafka:
进入 bin 目录:./kafka-server-start.sh -daemon /usr/local/wanger/kafka/config/server.properties
【-daemon 后台启动(&)】
停止 kafka:./kafka-server-stop.sh
创建 topic:
./kafka-topics.sh --create --zookeeper 192.168.168.128:2181 --replication-factor 1 --partitions 2 --topic testMessage
topic 名称:testMessage 一个副本,2个分区
查看所有的 topic:
./kafka-topics.sh -list -zookeeper 192.168.168.128:2181
5.测试是否成功:
启动 producer 并发送消息:
./kafka-console-producer.sh --broker-list 192.168.168.128:9092 --topic testMessage
(新开一个终端,执行命令后,输入消息,按回车发送,换行即发送消息)
启动 consumer并接收消息:
./kafka-console-consumer.sh --zookeeper 192.168.168.128:2181 --topic testMessage --from-beginning

(新开一个终端,执行命令后,会看到相应的消息)

二、springboot + kafka:

1. 添加依赖文件

<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>1.1.1.RELEASE</version>
		</dependency>

2. 增加配置文件(.yml)

kafka:
  consumer:
    zookeeper:
      # 指定 zookeeper 的连接的字符串, 可以多个 hostname1:port1,hostname2:port2
      connect: 192.168.168.128:2181
    servers: 192.168.168.128:9092
    #  自动提交位移
    enable:
      auto:
        commit: true
    session:
      timeout: 6000
    auto:
      commit:
        interval: 100
      offset:
        reset: latest
    topic: testMessage
    # 用来唯一标识 consumer 进程所在组的字符串,如果设置同样的 group id,表示这些 processes 都是属于同一个 consumer group
# group id 不能多于 partitions 的值,不然会有一些消费者一直收不到消息。group id 名称自定义,一个组内的消费者只有一个可以收到 topic 的消息
    group:
     id: test
    concurrency: 10

  producer:
    servers: 192.168.168.128:9092
    # 如果请求失败,生产者会自动重试,设置为 0,防止消息重复
    retries: 0
    # 缓存每个分区未发送消息。缓存的大小是通过 batch.size 配置指定的
    batch:
      size: 4096
    # 生产者发送请求之前等待 1 毫秒,希望在一个请求中加入更多的消息
    linger: 0
    # 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间
    buffer:
      memory: 40960
server:
  port: 8013
3.producer 配置类
package com.lover.wanger.provider.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

/**
 * @author: wanger
 * @Date: Created in 2018/5/28  15:11
 * @description: 生产者配置文件
 */

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${kafka.producer.servers}")
    private String servers;

    @Value("${kafka.producer.retries}")
    private int retries;

    @Value("${kafka.producer.batch.size}")
    private int batchSize;

    @Value("${kafka.producer.linger}")
    private int linger;

    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;


    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}
4. consumer 配置类
package com.lover.wanger.provider.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
 * @author: wanger
 * @Date: Created in 2018/5/28  15:33
 * @description: kafka 消费者配置
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.servers}")
    private String servers;

    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;

    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;

    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;

    @Value("${kafka.consumer.group.id}")
    private String groupId;

    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;

    @Value("${kafka.consumer.concurrency}")
    private int concurrency;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }
}

5. 测试发送消息
package com.lover.wanger.provider.controller;

import com.lover.wanger.provider.constants.BaseConstants;
import com.lover.wanger.provider.entity.Result;
import com.lover.wanger.provider.utils.ResultUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * @author: wanger
 * @Date: Created in 2018/5/28  15:17
 * @description: 测试 kafka 生产消息
 */
@RestController
@RequestMapping("/kafka")
public class KafkaController {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @RequestMapping(value = "/send", method = RequestMethod.GET)
    public Result sendKafka(HttpServletRequest request, HttpServletResponse response) {
        try {
            for (int i = 0; i < 200; i++) {
                kafkaTemplate.send("testMessage", "message", "message ==================" + i);
            }
            logger.info("发送 kafka 成功.");
            return ResultUtil.success(BaseConstants.SUCCESS_CODE);
        } catch (Exception e) {
            logger.error("发送 kafka 失败", e);
            return ResultUtil.error(BaseConstants.ERROR_CODE, "发送 kafka 失败");
        }
    }

}

6.接收消息
package com.lover.wanger.provider.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

/**
 * @author: wanger
 * @Date: Created in 2018/5/29  14:11
 * @description: 测试接受 kafka 消息
 */
@Service
public class TestService {

    @KafkaListener(topics = {"testMessage"})
    public void getMessage(String msg) {
        try {
            System.out.println("service 层的 ------------------------ msg: " + msg);
//            Thread.currentThread().sleep(0);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}





】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka 配置文件 server.propetie.. 下一篇kafka java 生产消费程序示例

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目