设为首页 加入收藏

TOP

Spring Cloud系列教程 | 第十一篇:Spring Boot Spring Cloud Stream 和 Kafka案例教程
2019-04-23 14:31:16 】 浏览:58
Tags:Spring Cloud 系列 教程 十一篇 Boot Stream Kafka 案例

推荐 Spring Cloud 视频:

Spring Boot Spring Cloud Stream 和 Kafka案例教程

在这篇文章中,我们将介绍如何使用Spring Cloud Stream和Kafka构建实时流式微服务应用程序。本示例项目演示了如何使用事件驱动的体系结构,Spring Boot,Spring Cloud Stream,Apache Kafka和Lombok构建实时流应用程序。
在本教程中,我们开发一个简单的基于Spring Boot的Greetings微服务,功能包括:

  • 从REST API获取消息,
  • 把消息写入Kafka主题,
  • 从主题中读取消息
  • 将消息 输出到控制台。

什么是Spring Cloud Streaming?

Spring Cloud Stream是一个基于Spring Boot用于构建消息驱动的微服务的框架。

什么是卡夫卡?

Kafka是一个最初由LinkedIn开发的、流行的高性能和水平可扩展的消息传递平台。
安装Kafka
从这里下载Kafka 并解开它:

tar -xzf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0
启动Zookeeper和Kafka

在Windows上:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties
在Linux或Mac上:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

如果Kafka未运行且计算机从休眠状态唤醒后无法启动,请删除该<TMP_DIR>/kafka-logs文件夹,然后再次启动Kafka。

什么是Lombok

Lombok是一个Java框架,可以在代码中自动生成getter,setter, toString(), builders, loggers, 等方法代码。

Maven依赖

转到https://start.spring.io以创建Maven项目:

  • 添加必要的依赖关系:Spring Cloud Stream,Kafka,Devtools(在开发过程中的热重新部署,可选), Actuator(用于监测应用,可选配), Lombok(确保也有安装在你的IDE龙目插件)。
  • 单击Generate Project按钮将项目下载为zip文件。
  • 解压缩zip文件并将maven项目导入你喜欢的IDE。

注意pom.xml文件中的maven依赖项:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<!-- 还要在IDE中安装Lombok插件 -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<!-- 热重新加载 - 在应用程序运行时更改代码后,在IntelliJ中按Ctrl + F9  -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
    <optional>true</optional>
</dependency>

定义Kafka Streams

public interface GreetingsStreams {
    String INPUT = "greetings-in";
    String OUTPUT = "greetings-out";
    @Input(INPUT)
    SubscribableChannel inboundGreetings();
    @Output(OUTPUT)
    MessageChannel outboundGreetings();
}

为了使我们的应用程序能够与Kafka通信,我们需要定义一个出站流来将消息写入Kafka主题,并使用入站流来读取来自Kafka主题的消息。

Spring Cloud提供了一种方便的方法:可以简单地创建为每个流定义单独的方法接口。

inboundGreetings()方法定义要从Kafka读取的入站流,并且outboundGreetings()方法定义要写入Kafka的出站流。

在运行时,Spring将创建一个基于Java代理的GreetingsStreams接口实现,可以在代码中的任何位置注入Spring Bean来访问我们的两个流。

配置Spring Cloud Stream

我们的下一步是将Spring Cloud Stream绑定到GreetingsStreams接口中的流。这可以通过使用以下代码创建一个StreamsConfig类来完成:

@EnableBinding(GreetingsStreams.class)
public class StreamsConfig {
}

使用传递接口的@EnableBinding注释来完成绑定到流GreetingsService(见下文)。

Kafka的配置属性

默认情况下,配置属性存储在src/main/resources/application.properties文件中。

但是,我更喜欢使用YAML格式,因为它不那么详细,并且允许在同一文件中保留公共和特定于环境的属性。

现在,让我们重命名application.properties到application.yaml和粘贴下面的配置片断到文件:

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        greetings-in:
          destination: greetings
          contentType: application/json
        greetings-out:
          destination: greetings
          contentType: application/json
        

上面的配置属性配置了要连接的Kafka服务器的地址,以及我们在代码中用于入站和出站流的Kafka主题。他们都必须使用相同的Kafka主题greetings!contentType属性告诉Spring Cloud Stream在流中发送/接收我们的消息对象String。

创建消息对象

Greetings使用下面的代码创建一个简单的类,代表我们读取的消息对象并写入greetingsKafka主题:

ort lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Getter
@Setter
@ToString
@Builder
public class Greetings {
    private long timestamp;
    private String message;

    public Greetings(long timestamp, String message) {
        this.timestamp = timestamp;
        this.message = message;
    }

    public Greetings() {

    }
}

请注意,由于使用了Lombok注释,该类没有任何getter和setter方法。该@ToString会生成一个toString()使用类的字段和方法,@Builder的注释将允许我们创建Greetings使用流利的建设者builder (见下文)的对象。

创建一个服务层来写入Kafka

让我们使用下面的代码创建GreetingsService类,该代码将Greetings对象写入greetingsKafka主题:

@Service
@Slf4j
public class GreetingsService {
    private final GreetingsStreams greetingsStreams;

    public GreetingsService(GreetingsStreams greetingsStreams) {
        this.greetingsStreams = greetingsStreams;
    }

    public void sendGreeting(final Greetings greetings) {
        log.info("Sending greetings {}", greetings);
        MessageChannel messageChannel = greetingsStreams.outboundGreetings();
        messageChannel.send(MessageBuilder
                .withPayload(greetings)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());
    }
}

@Service注释将配置该类作为一个Spring bean,并通过构造依赖将GreetingsStreams注入GreetingsService。

@Slf4j注释表示我们可以使用SLF4J日志。

在sendGreeting()方法中,我们使用注入的GreetingsStream对象来发送由Greetings对象表示的消息。

创建REST API

现在我们将创建一个REST API端点,它将触发使用GreetingsServiceSpring Bean 向Kafka发送消息:

@RestController
public class GreetingsController {
    private final GreetingsService greetingsService;
    public GreetingsController(GreetingsService greetingsService) {
        this.greetingsService = greetingsService;
    }
    @GetMapping("/greetings")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void greetings(@RequestParam("message") String message) {
        Greetings greetings = Greetings.builder()
                .message(message)
                .timestamp(System.currentTimeMillis())
                .build();
        greetingsService.sendGreeting(greetings);
    }
}

greetings()方法定义了一个HTTP GET /greetings端点,该端点接受请求参数message并将其传递给GreetingsService的sendGreeting()方法

监听卡夫卡主题

让我们创建一个com.kaviddiss.streamkafka.service.GreetingsListener类来监听Kafka主题greetings上的消息并将它们记录在控制台上:

@Component
@Slf4j
public class GreetingsListener {
    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(@Payload Greetings greetings) {
        log.info("Received greetings: {}", greetings);
    }
}

GreetingsListener有一个方法handleGreetings(),Spring Cloud Stream将监听Kafka主题Greetings上的每个新消息对象greetings。这要归功于为handleGreetings()方法配置的注释@StreamListener。

运行应用程序

拼图的最后一部分是由Spring Initializer自动生成的StreamKafkaApplication类:

@SpringBootApplication
public class CloudbusstreamkafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(CloudbusstreamkafkaApplication.class, args);
    }
}

无需在此处进行任何更改。您可以从IDE运行此类作为Java应用程序,也可以使用Spring Boot Maven插件从命令行运行该应用程序:

mvn spring - boot:run

应用程序运行后,在浏览器中转到 http://localhost:8080/greetingsmessage=hello并检查控制台:

(timestamp=1531643278270, message=hello)

如果需要输入用户名和密码,用户名是user,密码在控制台using generated security password可以找到,或者使用元注解排除SecurityAutoConfiguration.class

@SpringBootApplication(exclude = {SecurityAutoConfiguration.class })

专家推荐

“随着微服务架构的发展,Spring Cloud 使用得越来越广泛。驰狼课堂http://www.chilangedu.com (QQ群:348039381) Spring Boot 快速入门,Spring Boot 与Spring Cloud 整合,docker+k8s,大型电商商城等多套免费实战教程可以帮您真正做到快速上手,将技术点切实运用到微服务项目中。”

在这里找到源代码

关注公众号,每天精彩内容,第一时间送达!
在这里插入图片描述

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇ELK+kafka集成 下一篇业务系统日志采集:logstash+kafka

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目