关于 springboot + kafka 单机部署的尝试
本文主要内容来自以下参考链接,表示感谢,内容仅供个人学习参考,留作笔记,方便日后学习:
一、安装环境:
1.安装 jdk
2.安装 Zookeeper
使用安装包安装,下载地址:http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/
进入目录下 /usr/local/wanger/,解压安装包
进入 conf 目录,将zoo_sample.cfg 重命名为 zoo.cfg
zoo.cfg 文件中这两个配置可以根据自己修改 clientPort=2181
dataDir=/tmp/zookeeper
启动 zookeeper:进入 bin 目录:执行命令 ./zkServer.sh start
停止:./zkServer.sh stop
重启:./zkServer.sh restart
查看状态:./zkServer.sh status
3.安装 kafka
使用安装包安装,下载地址:http://kafka.apache.org/downloads
进入目录下 /usr/local/wanger/,解压安装包
重命名: mv kafka_2.10-0.10.2.1 kafka
在 kafka 目录下创建: mkdir logs
进入 conf kafka 的 conf 目录下:
vi server.properties
修改:
broker.id=0
#端口号 默认 9092
zookeeper.connect=localhost:2181 # zookeeper 地址和端口,单机配置部署,192.168.168.128:2181
log.dirs=/usr/local/wanger/kafka/logs
host.name=192.168.168.128
4.启动
(1)启动 zookeeper
(2)启动 kafka:
进入 bin 目录:./kafka-server-start.sh -daemon /usr/local/wanger/kafka/config/server.properties
【-daemon 后台启动(&)】
停止 kafka:./kafka-server-stop.sh
创建 topic:
./kafka-topics.sh --create --zookeeper 192.168.168.128:2181 --replication-factor 1 --partitions 2 --topic testMessage
topic 名称:testMessage 一个副本,2个分区
查看所有的 topic:
./kafka-topics.sh -list -zookeeper 192.168.168.128:2181
5.测试是否成功:
启动 producer 并发送消息:
./kafka-console-producer.sh --broker-list 192.168.168.128:9092 --topic testMessage
(新开一个终端,执行命令后,输入消息,按回车发送,换行即发送消息)
启动 consumer并接收消息:
./kafka-console-consumer.sh --zookeeper 192.168.168.128:2181 --topic testMessage --from-beginning
(新开一个终端,执行命令后,会看到相应的消息)
二、springboot + kafka:
1. 添加依赖文件
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
2. 增加配置文件(.yml)
kafka:
consumer:
zookeeper:
# 指定 zookeeper 的连接的字符串, 可以多个 hostname1:port1,hostname2:port2
connect: 192.168.168.128:2181
servers: 192.168.168.128:9092
# 自动提交位移
enable:
auto:
commit: true
session:
timeout: 6000
auto:
commit:
interval: 100
offset:
reset: latest
topic: testMessage
# 用来唯一标识 consumer 进程所在组的字符串,如果设置同样的 group id,表示这些 processes 都是属于同一个 consumer group
# group id 不能多于 partitions 的值,不然会有一些消费者一直收不到消息。group id 名称自定义,一个组内的消费者只有一个可以收到 topic 的消息
group:
id: test
concurrency: 10
producer:
servers: 192.168.168.128:9092
# 如果请求失败,生产者会自动重试,设置为 0,防止消息重复
retries: 0
# 缓存每个分区未发送消息。缓存的大小是通过 batch.size 配置指定的
batch:
size: 4096
# 生产者发送请求之前等待 1 毫秒,希望在一个请求中加入更多的消息
linger: 0
# 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间
buffer:
memory: 40960
server:
port: 8013
3.producer 配置类package com.lover.wanger.provider.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
/**
* @author: wanger
* @Date: Created in 2018/5/28 15:11
* @description: 生产者配置文件
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
4. consumer 配置类
package com.lover.wanger.provider.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author: wanger
* @Date: Created in 2018/5/28 15:33
* @description: kafka 消费者配置
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
}
5. 测试发送消息
package com.lover.wanger.provider.controller;
import com.lover.wanger.provider.constants.BaseConstants;
import com.lover.wanger.provider.entity.Result;
import com.lover.wanger.provider.utils.ResultUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* @author: wanger
* @Date: Created in 2018/5/28 15:17
* @description: 测试 kafka 生产消息
*/
@RestController
@RequestMapping("/kafka")
public class KafkaController {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private KafkaTemplate kafkaTemplate;
@RequestMapping(value = "/send", method = RequestMethod.GET)
public Result sendKafka(HttpServletRequest request, HttpServletResponse response) {
try {
for (int i = 0; i < 200; i++) {
kafkaTemplate.send("testMessage", "message", "message ==================" + i);
}
logger.info("发送 kafka 成功.");
return ResultUtil.success(BaseConstants.SUCCESS_CODE);
} catch (Exception e) {
logger.error("发送 kafka 失败", e);
return ResultUtil.error(BaseConstants.ERROR_CODE, "发送 kafka 失败");
}
}
}
6.接收消息
package com.lover.wanger.provider.service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
/**
* @author: wanger
* @Date: Created in 2018/5/29 14:11
* @description: 测试接受 kafka 消息
*/
@Service
public class TestService {
@KafkaListener(topics = {"testMessage"})
public void getMessage(String msg) {
try {
System.out.println("service 层的 ------------------------ msg: " + msg);
// Thread.currentThread().sleep(0);
} catch (Exception e) {
e.printStackTrace();
}
}
}