设为首页 加入收藏

TOP

spring kafka 整合
2019-04-24 02:35:36 】 浏览:64
Tags:spring kafka 整合
1.引用POM:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.3.RELEASE</version>
</dependency>

2,消费者配置-spring-kafka-consumer.xml
<xml version="1.0" encoding="UTF-8">
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="192.168.10.240:9092" />
<entry key="group.id" value="test" />
<entry key="enable.auto.commit" value="false" />
<entry key="retries" value="10"/>
<entry key="auto.commit.interval.ms" value="1000" />
<entry key="session.timeout.ms" value="15000" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.IntegerDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
<!-- 创建consumerFactory bean -->
<bean id="consumerFactory"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties" />
</constructor-arg>
</bean>
<bean id="concurrentKafkaListenerContainerFactory"
class="org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory">
<property name="consumerFactory" ref="consumerFactory" />
<property name="concurrency" value="3" />
<!-- <property name="containerProperties.pollTimeout" value="3000"/> -->
</bean>

<!-- 消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<!-- 重要!配置topic -->
<constructor-arg value="my-topic"/>
<!--<property name="ackOnError" value="false"/>-->
<property name="messageListener" ref="messageListernerConsumerService"/>
<property name="errorHandler" ref="messageListernerConsumerService" />
</bean>
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean>

<!--如果需要失败重试就需要配置-->

<!-- 实际执行消息消费的类 -->
<bean id="messageListernerConsumerService" class="com.shenma.paulfrank.kafka.KafkaConsumerListener">
<property name="taskExecutorUtil" ref="taskExecutorUtil" />
</bean>
<!-- 异步线程池 -->
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心线程数 -->
<property name="corePoolSize" value="20" />
<!-- 最大线程数 -->
<property name="maxPoolSize" value="100" />
<!-- 队列最大长度 >=mainExecutor.maxSize -->
<property name="queueCapacity" value="1000" />
<!-- 线程池维护线程所允许的空闲时间 -->
<property name="keepAliveSeconds" value="300" />
<!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
<bean id="taskExecutorUtil" class="com.shenma.paulfrank.kafka.TaskExecutorUtil">
<!-- <constructor-arg ref="taskExecutor" /> -->
<property name="taskExecutor" ref="taskExecutor" />
</bean>
</beans>


3,生产者配置 - spring-kafka-producer.xml
<xml version="1.0" encoding="UTF-8">
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">

<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${com.shenma.mq.address}" />
<entry key="group.id" value="0" />
<entry key="retries" value="10" />
<entry key="batch.size" value="16384" />
<entry key="linger.ms" value="1" />
<entry key="buffer.memory" value="33554432" />
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.IntegerSerializer" />
<entry key="value.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>

<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory"
class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties" />
</constructor-arg>
</bean>

<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<constructor-arg name="autoFlush" value="true" />
<property name="defaultTopic" value="test1" />
</bean>
</beans>

4,发送案例:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void send(){
kafkaTemplate.send("my-topic", "hello");
}

5,消费案例:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MsgConsumer {
@KafkaListener(topics = { "my-topic" })
public void processMessage2(String content) {
System.out.println("test2:"+content);
}
}
地址:
开发环境:
kafka开发环境服务地址:192.168.10.240:9092
kafka监控:192.168.10.240:9000
sit环境
172.16.10.44:9092
生产环境:
10.10.212.27:9092,10.10.228.81:9092,10.10.234.137:9092,10.10.163.126:9092

http://www.cnblogs.com/luotianshuai/p/5206662.html kafka安装
kafka版本:0.11.0
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇如何确定Kafka的分区数,key和con.. 下一篇2.Kafka中topic的Partition,Kafk..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目