设为首页 加入收藏

TOP

【Kafka】阿里云消息队列kafka 结合 spring cloud stream
2019-04-20 02:21:44 】 浏览:132
Tags:Kafka 阿里 消息 队列 kafka 结合 spring cloud stream
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/kisscatforever/article/details/86231039

一、前言

在以前的博客中,小编使用过spring cloud stream 结合rabbitmq,rabbitmq是自己搭建的,没有用阿里云的。这次结合前面的博客,小编要使用阿里云的kafka,所以就想通过spring cloud stream kafka,来完成调用。但是这样就有一些配置不太一样了。通过对比阿里云的kafka的github网站,找到了相关的demo。这里小编总结一些。

二、spring cloud stream kafka

本文借鉴与https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-spring-stream-demo。

注意:本Demo仅适用于Spring Cloud Camden.SR5版本,其它版本的配置略有差异,请参考官方文档进行调整

Demo跑起来

1.将项目导入IDE(如MyEclipse, IntelliJ)中
2.添加自己的AccessKey,SecretKey到src/main/resources/kafka_client_jaas.conf中
3.请参考文档创建资源 创建Topic和ConsumerGroup
4.将Topic与ConsumerGroup添加到src/main/resources/application.properties
5.spring.cloud.stream.kafka.binder.brokers请参考获取接入点
6.修改src/main/resources/application.properties中的kafka.ssl.truststore.location为自己的路径
7.执行KafkaDemoApplication.main,以启动消息消费的监听器,将直接打印消息内容
8.执行MessageOutputTest.testSend测试发送,看Console中是否打印"Hello Aliyun Kafka"

2.1 maven依赖

<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>1.1.3.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-kafka</artifactId>
			<version>2.1.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>1.0.0</version>
		</dependency>

		<dependency>
			<groupId>com.aliyun.openservices</groupId>
			<artifactId>ons-sasl-client</artifactId>
			<version>0.1</version>
		</dependency>
	    <dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-kafka</artifactId>
			<version>1.1.1.RELEASE</version>
		</dependency>

2.2 配置文件

###下面三项请改成自己的值
kafka.consumer.group=XXXXXXXXXXXXXX
kafka.topic.name=XXXXXXXXXXXXXX
kafka.ssl.truststore.location=G:/kafka/kafka.client.truststore.jks

spring.cloud.stream.kafka.binder.autoCreateTopics=false

spring.cloud.stream.bindings.kafka_input.destination=${kafka.topic.name}
spring.cloud.stream.bindings.kafka_input.contentType=application/json
spring.cloud.stream.bindings.kafka_input.group=${kafka.consumer.group}
spring.cloud.stream.bindings.kafka_input.consumer.concurrency=5

spring.cloud.stream.bindings.kafka_output.destination=${kafka.topic.name}
spring.cloud.stream.bindings.kafka_output.contentType=application/json
spring.cloud.stream.bindings.kafka_output.group=${kafka.consumer.group}
spring.cloud.stream.bindings.kafka_output.producer.sync=true

spring.cloud.stream.kafka.binder.brokers=SASL_SSL:/XXXXXXXXXXX.aliyun.com:8080
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=ONS
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=${kafka.ssl.truststore.location}
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=KafkaOnsClient

这里需要说明的是:阿里云配置kafka需要两个文件:kafka.client.truststore.jks 和 kafka.client.truststore.jks,其中kafka.client.truststore.jks是放到了classpath下了,kafka.client.truststore.jks小编是通过jvm参数来配置放的位置。

2.3 通道

public interface KafkaMessageSource {
    String KAFKA_INPUT = "kafka_input";
    String KAFKA_OUTPUT = "kafka_output";

    @Input(KafkaMessageSource.KAFKA_INPUT)
    SubscribableChannel inputNoticeMessage();

    @Output(KafkaMessageSource.KAFKA_OUTPUT)
    SubscribableChannel outputNoticeMessage();

}

2.4 发送与接收

发送:

   @Autowired
    private KafkaMessageSource kafkaMessageSource;

    flag = kafkaMessageSource.outputNoticeMessage().send(MessageBuilder.withPayload(appNotificationAddBO).build());
            if (flag) {
                DushuLogger.info("批量添加通知到kafka完成");
                resultBO.setStatus(CommonResponseCodeEnum.SUCCESS.getCode());
                resultBO.setData(true);
            } else {
                DushuLogger.info("批量添加通知到kafka失败");
                resultBO.setStatus(CommonResponseCodeEnum.ERROR.getCode());
                resultBO.setData(false);
            }

接收:

@EnableBinding(KafkaMessageSource.class)
public class NoticeListener {
    /**
     * 通知接口监听-王雷-2018年11月9日10:28:40
     * @param message 读取kafka的内容信息
     */
    @StreamListener(KafkaMessageSource.KAFKA_INPUT)
    public void receiveMethod(Message message) {
        DushuLogger.info("监听kafka消息");
        ....
    }
}

三、生产环境部署问题

1.SASL配置系统变量
Aliyun Kafka采用SASL机制对通道进行鉴权,在此之前,需要配置JVM属性java.security.auth.login.config 首先将src/main/resources/kafka_client_jaas.conf(注意配置自己的AccessKey,SecretKey)放置在某个路径下,如/home/admin; 然后可以采用以下方式配置(二选一,建议采用第二种)

1.1 jvm配置方式
jvm启动时加上 -Djava.security.auth.login.config=/home/admin/kafka_client_jaas.conf

1.2 设置Spring的启动监听器
编写自己类实现ApplicationListener,可以参考Demo工程中的KafkaConfigListener,只是记得要把路径改成自己的; 然后在src/main/resources/META-INF/spring.factories中配置类的全称,参考demo: org.springframework.context.ApplicationListener=com.alibaba.cloud.KafkaConfigListener

2.SSL配置Kafka属性
将src/main/resources/kafka.client.truststore.jks放在某个目录下,然后参考 application.properties进行配置

报错“Failed to send SSL close message”

该错误后面通常还会跟“connection reset by peer”或“broken pipe”。该错误可以忽略,不影响使用。服务端是VIP网络环境,会主动掐掉空闲连接。 你可以通过修改日志级别来避免该错误,以log4j为例,加上下面这行配置:

log4j.logger.org.apache.kafka.common.network.SslTransportLayer=ERROR

如果是logback,添加下面配置:

	<!--关闭kafka主动掐掉空闲连接日志-->
	<logger name="org.apache.kafka.common.network" level="OFF">
	</logger>

四、小结

通过这次配置阿里云kafka,更加熟悉了kafka,从kafka的配置也很好的用到了。可以在以后的学习中,更进一步。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka组件 下一篇【kafka】Centos7安装kafka

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目