设为首页 加入收藏

TOP

kafka和spring集成
2019-04-24 02:32:07 】 浏览:58
Tags:kafka spring 集成

一.配置文件

首先下载kafka,我的版本是kafka_2.11-1.0.0,下载完成后配置server.properties文件

# The id of the broker. This must be set to a unique integer for each broker.

broker.id=0 配置broke的id,也可以不配置,必须唯一.

# The address the socket server listens on. It will get the value returned from

# java.net.InetAddress.getCanonicalHostName() if not configured.

listeners=PLAINTEXT://127.0.0.1:9092 配置listeners,在以前的版本是host.name,在这个版本已经不推荐使用了,

如果不配置会使用java.net.InetAddress.getCanonicalHostName()获取值.

# A comma seperated list of directories under which to store log files

#log.dirs=/tmp/kafka-logs

log.dirs=E:/program/kafka_2.11-1.0.0/data 配置日志文件,我的是windows,linux类似

zookeeper.connect=127.0.0.1:2181 配置zookeeper地址,如果是集群用逗号分开,日志中有个javax.management.MalformedObjectNameException: Invalid character ‘:’ in value part of property错误,可以忽略。

二.启动kafka,创建topic

bin\windows\kafka-server-start.bat config\server.properties 启动命令

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

创建了一个topic为test,复制因子为1(因为只有1个kafka) 1个分区,连接到本地zookeeper;

三.spring与kafka集成

命令行创建生产者和消费者就省略了,很简单.主要说spring和kafka集成过程.

pom文件

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.2.1.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-kafka -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
          <!--  <scope>test</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.44</version>
        </dependency>

具体类,首先是消息常量类

public class KafkaMesConstant {

	public static final String SUCCESS_CODE = "00000";
	public static final String SUCCESS_MES = "成功";

	/* kakfa-code */
	public static final String KAFKA_SEND_ERROR_CODE = "30001";
	public static final String KAFKA_NO_RESULT_CODE = "30002";
	public static final String KAFKA_NO_OFFSET_CODE = "30003";

	/* kakfa-mes */
	public static final String KAFKA_SEND_ERROR_MES = "发送消息超时,联系相关技术人员";
	public static final String KAFKA_NO_RESULT_MES = "未查询到返回结果,联系相关技术人员";
	public static final String KAFKA_NO_OFFSET_MES = "未查到返回数据的offset,联系相关技术人员";

}

kafka消费者服务类,当监听到消息时自动消费,我们的需求是接收到消息,通过日志保存到文件.核对数据格式

/**
 * kafka监听器启动 自动监听是否有消息需要消费
 * 
 * @author 
 *
 */
public class KafkaConsumerServer implements MessageListener<String, String> {
	protected final Logger LOG = LoggerFactory.getLogger(KafkaConsumerServer.class);

	private static Logger MSG = LoggerFactory.getLogger("operation");

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;
	
	@Value("#{configProperties['send.kafka']}")
	private boolean sendKafka = false;
	
	@Value("#{configProperties['send.topic']}")
	private String sendTopics;
	
	private List<String> list = null;
	
	private List<String> getTopicList() {
		if (list == null) {
			list = Arrays.asList(sendTopics.split("\\,"));
		}
		return list;
	}

	/**
	 * 监听器自动执行该方法 消费消息 自动提交offset 执行业务代码 (high level api
	 * 不提供offset管理,不能指定offset进行消费)
	 */
	@Override
	public void onMessage(ConsumerRecord<String, String> record) {
		List<String> list = getTopicList();
		LOG.info("=============kafkaConsumer开始消费=============");
		String topic = record.topic();
		String key = record.key();
		String value = record.value();
		long offset = record.offset();
		int partition = record.partition();
		LOG.info("-------------topic:" + topic);
		LOG.info("-------------value:" + value);
		LOG.info("-------------key:" + key);
		LOG.info("-------------offset:" + offset);
		LOG.info("-------------partition:" + partition);
		LOG.info("~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~");

		MSG.info("consumer topic:" + topic + ",value:" + value);
		if (list != null && !list.contains(topic)) {
			LOG.info("-------------config sendTopics:"+sendTopics+" is not contain the topic:" + topic);
			return;
		}

		/*if (sendKafka) { //此处是公司的需求,消费者消费完消息发回kafka,可以不用
			kafkaTemplate.send(topic, value);
		} else {
			LOG.info("-------------config sendKafka:false no send other kafka");
		}
		LOG.info("~~~~~~~~~~~~~发送结束~~~~~~~~~~~~~");*/
	}

}

kafka生产者监听类,当生产者发送消息时自动调用

/**
 * kafkaProducer监听器,在producer配置文件中开启
 * @author 
 *
 */
@SuppressWarnings("rawtypes")
public class KafkaProducerListener implements ProducerListener{
    protected final Logger LOG = LoggerFactory.getLogger(KafkaProducerListener.class);
    
    private static Logger MSG = LoggerFactory.getLogger("operation");
    /**
     * 发送消息成功后调用
     */
    @Override
    public void onSuccess(String topic, Integer partition, Object key,
            Object value, RecordMetadata recordMetadata) {
        LOG.info("==========kafka发送数据成功(日志开始)==========");
        LOG.info("----------topic:"+topic);
        LOG.info("----------partition:"+partition);
        LOG.info("----------key:"+key);
        LOG.info("----------value:"+value);
        LOG.info("----------RecordMetadata:"+recordMetadata);
        LOG.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~");
        
        MSG.info("send success topic:" + topic + ",value:" + value);
    }

    /**
     * 发送消息错误后调用
     */
    @Override
    public void onError(String topic, Integer partition, Object key,
            Object value, Exception exception) {
        LOG.info("==========kafka发送数据错误(日志开始)==========");
        LOG.info("----------topic:"+topic);
        LOG.info("----------partition:"+partition);
        LOG.info("----------key:"+key);
        LOG.info("----------value:"+value);
        LOG.error("----------Exception:",exception);
        LOG.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~");
        
        MSG.info("send error topic:" + topic + ",value:" + value);
    }

    /**
     * 方法返回值代表是否启动kafkaProducer监听器
     */
    @Override
    public boolean isInterestedInSuccess() {
        LOG.info("///kafkaProducer监听器启动///");
        return true;
    }

}

启动服务

public class Launcher {
	private static final Logger LOG = LoggerFactory.getLogger("Launcher");

	public static void main(String[] args) {
		try {
			ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(
					"applicationContext-main.xml");
			LOG.info("service is start .....");
		} catch (Exception e) {
			LOG.error("",e);
		}

	}
}

配置文件

<xml version="1.0" encoding="UTF-8">
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd">
    
    <context:component-scan base-package="test.kafka" /> <!--注意此处配置,改成自己的包名 -->
    
    <bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="locations">
            <list>
                <value>classpath:init.properties</value>
            </list>
        </property>
    </bean>
    <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PreferencesPlaceholderConfigurer">
        <property name="properties" ref="configProperties" />
    </bean>

	<import resource="classpath:kafkaConsumer.xml" />
	<import resource="classpath:kafkaProducer.xml" />

</beans>

init.properites文件

send.kafka=false #对应consumerServer类中的sendKafa属性

send.topic=test   #发送的topic
consumer.servers=127.0.0.1:9092 #消费端地址


producer.servers=127.0.0.1:9092 #生产端地址

kafkaCousumer.xml

<xml version="1.0" encoding="UTF-8">
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">


	<!-- 定义consumer的参数 -->
	<bean id="consumerProperties" class="java.util.HashMap">
		<constructor-arg>
			<map>
				<entry key="bootstrap.servers" value="${consumer.servers}" />
				<entry key="group.id" value="0" />
				<entry key="enable.auto.commit" value="true" />
				<entry key="auto.commit.interval.ms" value="1000" />
				<entry key="session.timeout.ms" value="30000" />
				<entry key="key.deserializer"
					value="org.apache.kafka.common.serialization.StringDeserializer" />
				<entry key="value.deserializer"
					value="org.apache.kafka.common.serialization.StringDeserializer" />
			</map>
		</constructor-arg>
	</bean>

	<!-- 创建consumerFactory bean -->
	<bean id="consumerFactory"
		class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
		<constructor-arg>
			<ref bean="consumerProperties" />
		</constructor-arg>
	</bean>

	<!-- 实际执行消息消费的类 -->
	<bean id="messageListernerConsumerService" class="mmc.kafka.consumer.KafkaConsumerServer" />

	<!-- 消费者容器配置信息 -->
	<bean id="containerProperties"
		class="org.springframework.kafka.listener.config.ContainerProperties">
		<constructor-arg>
			<array>
				<value>test</value> <!-- 配置topic -->
			</array>
		</constructor-arg>
		<property name="messageListener" ref="messageListernerConsumerService" />
	</bean>


	<!-- 创建messageListenerContainer bean,使用的时候,只需要注入这个bean -->
	<bean id="messageListenerContainer"
		class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
		init-method="doStart">
		<constructor-arg ref="consumerFactory" />
		<constructor-arg ref="containerProperties" />
	</bean>

</beans>

kafkaProducer.xml

<xml version="1.0" encoding="UTF-8">
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd">

	<!-- 定义producer的参数 -->
	<bean id="producerProperties" class="java.util.HashMap">
		<constructor-arg>
			<map>
				<entry key="bootstrap.servers" value="${producer.servers}" />
				<entry key="group.id" value="0" />
				<entry key="retries" value="1" />
				<entry key="batch.size" value="16384" />
				<entry key="linger.ms" value="1" />
				<entry key="buffer.memory" value="33554432" />
				<entry key="key.serializer"
					value="org.apache.kafka.common.serialization.StringSerializer" />
				<entry key="value.serializer"
					value="org.apache.kafka.common.serialization.StringSerializer" />
			</map>

		</constructor-arg>
	</bean>

	<!-- 创建kafkatemplate需要使用的producerfactory bean -->
	<bean id="producerFactory"
		class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
		<constructor-arg>
			<ref bean="producerProperties" />
		</constructor-arg>
	</bean>

	<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
	<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
		<constructor-arg ref="producerFactory" />
		<constructor-arg name="autoFlush" value="true" />
		<property name="defaultTopic" value="test2" />
		<property name="producerListener" ref="producerListener" />
	</bean>

	<bean id="producerListener" class="mmc.kafka.producer.KafkaProducerListener" />
</beans>

log4j.properties

log4j.rootLogger = info,console,FILE

log4j.appender.console = org.apache.log4j.ConsoleAppender
log4j.appender.console.Target = System.out
log4j.appender.console.layout = org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern = %-d{yyyy-MM-dd HH\:mm\:ss} [%p]-[%c] %m%n

log4j.appender.FILE = org.apache.log4j.DailyRollingFileAppender
log4j.appender.FILE.File = ../logs/mmc-bo-kafka.log
log4j.appender.FILE.Append = true
log4j.appender.FILE.Threshold = info
log4j.appender.FILE.layout = org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.ConversionPattern = %-d{yyyy-MM-dd HH\:mm\:ss} [%p]-[%c] %m%n

log4j.logger.operation=DEBUG,operation
log4j.appender.operation = org.apache.log4j.DailyRollingFileAppender
log4j.appender.operation.File = ../logs/mmc-msg.log
log4j.appender.operation.Append = true
log4j.appender.operation.Threshold = info
log4j.appender.operation.layout = org.apache.log4j.PatternLayout
log4j.appender.operation.layout.ConversionPattern = %-d{yyyy-MM-dd HH\:mm\:ss} [%p]-[%c] %m%n

建一个测试类SpringProducerMain,spring与kafka集成后,可以直接使用KafakTemplate直接发送消息

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.kafka.core.KafkaTemplate;

public class SpringProducerMain {

    public static void main(String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext-main.xml");
        KafkaTemplate kafkaTemplate = ctx.getBean("KafkaTemplate", KafkaTemplate.class);
        for (int i = 1; i < 5; i++) {
            String msg = "msg-" + i;
            //向topic发送消息
            kafkaTemplate.send("test", msg);
            System.out.println("send msg  : " + msg);
        }
    }
}

首先启动Launch类


然后启动SpringProducerMain类,发送5条消息


只截取了部分截图,可以看到监听器已经启动,并且成功发送了消息,为了更清楚可以使用控制台启动一个消费者,可以同时看见消费的数据


消费者服务消费消息


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka-topics.sh --describe &nbs.. 下一篇【Kafka九】Kafka High Level API..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目