设为首页 加入收藏

TOP

Spring Cloud Stream Kafka 特定分区的消息始终由特定的消费者实例消费
2019-05-06 14:29:26 】 浏览:95
Tags:Spring Cloud Stream Kafka 特定 分区 消息 始终 消费者 实例 消费

实验目的:Kafka特定分区的消息始终由消费者应用的特定实例消费,例如,分区1由实例索引0的实例消费,分区2由实例索引1的实例消费,分区3由实例索引2的实例消费。

项目介绍:项目分为1个生产者实例,3个消费者实例,生产者应用和消费者应用均为Spring Cloud Eureka客户端项目。生产者实例将消息发送到Kafka Topic的3个分区中,消费者的3个实例分别按实例索引消费Kafka Topic的3个分区数据。即,实例索引0的实例消费分区0,实例索引1的实例消费分区1,实例索引2的实例消费分区2。

生产者项目结构:

生产者项目结构

引入Spring Cloud Stream依赖

<dependency>

 <groupId>org.springframework.cloud</groupId>

 <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>

</dependency>



<dependency>

 <groupId>org.springframework.cloud</groupId>

 <artifactId>spring-cloud-stream-binder-kafka</artifactId>

</dependency>

应用类:

SpringCloudStreamKafkaProducerApplication.java



import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.cloud.stream.messaging.Source;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.scheduling.annotation.EnableScheduling;

import org.springframework.scheduling.annotation.Scheduled;



@EnableBinding(Source.class)

@EnableScheduling

@SpringBootApplication

public class SpringCloudStreamKafkaProducerApplication {



 @Autowired

 private Source source;



 public static void main(String[] args) {

 SpringApplication.run(SpringCloudStreamKafkaProducerApplication.class, args);

 }



 @Scheduled(fixedRate = 5000)

 public void handle1() {

 Person person = new Person();



 Long currentTimeMillis = System.currentTimeMillis();



 person.setId(Long.parseLong(currentTimeMillis.toString().substring(currentTimeMillis.toString().length() - 1)));

 person.setName("rock ");



 System.out.println("send a person..." + person);



 source.output().send(MessageBuilder.withPayload(person).build());

 }



 public static class Person {

 private Long id;

 private String name;



 public Long getId() {

 return id;

 }



 public void setId(Long id) {

 this.id = id;

 }



 public String getName() {

 return name;

 }



 public void setName(String name) {

 this.name = name;

 }



 @Override

 public String toString() {

 return "Person{" +

 "id=" + id +

 ", name='" + name + '\'' +

 '}';

 }

 }

}

通道类:

CustomSource.java



public interface CustomSource {

 String OUTPUT1 = "output1";



 @Output(CustomSource.OUTPUT1)

 MessageChannel output1();



 String OUTPUT2 = "output2";



 @Output(CustomSource.OUTPUT2)

 MessageChannel output2();

}

配置类:

KafkaBindingConfig.java



@Configuration

public class KafkaBindingConfig {

 @Bean

 public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {

 return new CustomPartitionKeyExtractorClass();

 }



 @Bean

 public CustomPartitionSelectorClass customPartitionSelector() {

 return new CustomPartitionSelectorClass();

 }

}



CustomPartitionKeyExtractorClass.java



/**

* <p>Description: 从Message中提取partition key的策略</p>

*/

public class CustomPartitionKeyExtractorClass implements PartitionKeyExtractorStrategy {

 @Override

 public Object extractKey(Message<> message) {



 Object obj = message.getPayload();

 System.out.println("消息载荷:" + obj);



 if (obj instanceof SpringCloudStreamKafkaProducerApplication.Person) {

 SpringCloudStreamKafkaProducerApplication.Person person = (SpringCloudStreamKafkaProducerApplication.Person) obj;

 return person.getId();

 }



 return null;

 }

}



CustomPartitionSelectorClass.java



/**

* <p>Description: 决定message发送到哪个partition的策略</p>

*/

public class CustomPartitionSelectorClass implements PartitionSelectorStrategy {

 @Override

 public int selectPartition(Object key, int partitionCount) {

 System.out.println("消息载荷的key:" + key + " partitionCount:" + partitionCount);



 if (!ObjectUtils.isEmpty(key)) {

 Long id = (Long) key;



 return id.intValue() % partitionCount;

 }



 return 0;

 }

}

配置文件:

application.properties



server.port=8881

spring.application.name=spring-cloud-stream-kafka-producer

eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/



# Kafka Binder Properties



# A list of brokers to which the Kafka binder connects.

# Default: localhost.

spring.cloud.stream.kafka.binder.brokers=localhost:9092



# If set to true, the binder creates new topics automatically.

# If set to false, the binder relies on the topics being already configured.

# In the latter case, if the topics do not exist, the binder fails to start.

# This setting is independent of the auto.topic.create.enable setting of the broker and does not influence it.

# If the server is set to auto-create topics, they may be created as part of the metadata retrieva l request, with default broker settings.

# Default: true.

spring.cloud.stream.kafka.binder.autoCreateTopics=true



# If set to true, the binder creates new partitions if required.

# If set to false, the binder relies on the partition size of the topic being already configured.

# If the partition count of the target topic is smaller than the expected value, the binder fails to start.

# Default: false.

spring.cloud.stream.kafka.binder.autoAddPartitions=true



management.endpoints.web.exposure.include=bindings



# 通过两个channel向同一个topic发送消息

spring.cloud.stream.bindings.output.destination=topic2

spring.cloud.stream.bindings.output.content-type=application/json



# 配置分区的输出绑定

spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id

# 此属性开始若报无订阅者错误,需开启autoAddPartitions=true

# 输出消息分布到3个分区

spring.cloud.stream.bindings.output.producer.partitionCount=3



# partition Key 提取器名称,负责从消息中提取分区key

spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName=customPartitionKeyExtractor

# 自定义partition选择器,负责根据分区key和partitionCount计算出将消息发布到哪个分区

spring.cloud.stream.bindings.output.producer.partitionSelectorName=customPartitionSelector



# LOGGING

#logging.level.root=WARN

#logging.level.org.springframework.web=DEBUG

#logging.level.org.springframework=DEBUG

#logging.level.com.spring.cloud.stream.kafka.consumer.producer=DEBUG

logging.pattern.console=${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %4line %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}

消费者项目结构:

消费者项目结构

引入Spring Cloud Stream依赖

<dependency>

 <groupId>org.springframework.cloud</groupId>

 <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>

</dependency>



<dependency>

 <groupId>org.springframework.cloud</groupId>

 <artifactId>spring-cloud-stream-binder-kafka</artifactId>

</dependency>

应用类:

KafkaConsumer1Application.java



import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.cloud.stream.messaging.Sink;



@EnableBinding(Sink.class)

@SpringBootApplication

public class KafkaConsumer1Application {



 public static void main(String[] args) {

 SpringApplication.run(KafkaConsumer1Application.class, args);

 }



 @StreamListener(Sink.INPUT)

 public void handle(Person person) {

 System.out.println("handle Received: " + person);

 }



 public static class Person {

 private Long id;

 private String name;



 public Long getId() {

 return id;

 }



 public void setId(Long id) {

 this.id = id;

 }



 public String getName() {

 return name;

 }



 public void setName(String name) {

 this.name = name;

 }



 @Override

 public String toString() {

 return "Person{" +

 "id=" + id +

 ", name='" + name + '\'' +

 '}';

 }

 }



}

配置文件:

application-c1.properties



server.port=8871

spring.application.name=spring-cloud-stream-kafka-consumer

eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/



# input通道对应的设置

spring.cloud.stream.bindings.input.destination=topic2

spring.cloud.stream.bindings.input.content-type=application/json

spring.cloud.stream.bindings.input.group=spring-cloud-stream-kafka-consumer



# 同一个应用的实例数量和实例索引

spring.cloud.stream.instanceCount=3

spring.cloud.stream.instanceIndex=0



# 配置分区的输入绑定

spring.cloud.stream.bindings.input.consumer.partitioned=true

spring.cloud.stream.bindings.input.consumer.concurrency=1



# autoRebalanceEnabled为true(default)时,Kafka负责在实例之间分布partitions,不需要这些属性:instanceCount,instanceIndex,partitioned

# autoRebalanceEnabled为false时,binder使用instanceCount and instanceIndex决定实例订阅哪个partition

# partition数量至少要与实例数量一致

# binder代替Kafka计算partitions

# 这可以让特定分区的消息始终进入同一个实例

spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled=false



application-c2.properties



server.port=8872

spring.application.name=spring-cloud-stream-kafka-consumer

eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/



# input通道对应的设置

spring.cloud.stream.bindings.input.destination=topic2

spring.cloud.stream.bindings.input.content-type=application/json

spring.cloud.stream.bindings.input.group=spring-cloud-stream-kafka-consumer



# 同一个应用的实例数量和实例索引

spring.cloud.stream.instanceCount=3

spring.cloud.stream.instanceIndex=1



# 配置分区的输入绑定

spring.cloud.stream.bindings.input.consumer.partitioned=true

spring.cloud.stream.bindings.input.consumer.concurrency=1



# autoRebalanceEnabled为true(default)时,Kafka负责在实例之间分布partitions,不需要这些属性:instanceCount,instanceIndex,partitioned

# autoRebalanceEnabled为false时,binder使用instanceCount and instanceIndex决定实例订阅哪个partition

# partition数量至少要与实例数量一致

# binder代替Kafka计算partitions

# 这可以让特定分区的消息始终进入同一个实例

spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled=false



application-c3.properties



server.port=8873

spring.application.name=spring-cloud-stream-kafka-consumer

eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/



# input通道对应的设置

spring.cloud.stream.bindings.input.destination=topic2

spring.cloud.stream.bindings.input.content-type=application/json

spring.cloud.stream.bindings.input.group=spring-cloud-stream-kafka-consumer



# 同一个应用的实例数量和实例索引

spring.cloud.stream.instanceCount=3

spring.cloud.stream.instanceIndex=2



# 配置分区的输入绑定

spring.cloud.stream.bindings.input.consumer.partitioned=true

spring.cloud.stream.bindings.input.consumer.concurrency=1



# autoRebalanceEnabled为true(default)时,Kafka负责在实例之间分布partitions,不需要这些属性:instanceCount,instanceIndex,partitioned

# autoRebalanceEnabled为false时,binder使用instanceCount and instanceIndex决定实例订阅哪个partition

# partition数量至少要与实例数量一致

# binder代替Kafka计算partitions

# 这可以让特定分区的消息始终进入同一个实例

spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled=false

结果:分别启动生产者,3个消费者实例,可以在输出中看到,实例索引0的消费者消费的分区为0,实例索引1的消费者消费的分区为1,实例索引2的消费者消费的分区为2。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇高并发架构实战(八) Kafka的安装.. 下一篇Kafka的多任务线程消费测试

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目