KafkaBindingConfig.java
@Configuration
public class KafkaBindingConfig {
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
return new CustomPartitionKeyExtractorClass();
}
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
return new CustomPartitionSelectorClass();
}
}
CustomPartitionKeyExtractorClass.java
/**
* <p>Description: 从Message中提取partition key的策略</p>
*/
public class CustomPartitionKeyExtractorClass implements PartitionKeyExtractorStrategy {
@Override
public Object extractKey(Message<> message) {
Object obj = message.getPayload();
System.out.println("消息载荷:" + obj);
if (obj instanceof SpringCloudStreamKafkaProducerApplication.Person) {
SpringCloudStreamKafkaProducerApplication.Person person = (SpringCloudStreamKafkaProducerApplication.Person) obj;
return person.getId();
}
return null;
}
}
CustomPartitionSelectorClass.java
/**
* <p>Description: 决定message发送到哪个partition的策略</p>
*/
public class CustomPartitionSelectorClass implements PartitionSelectorStrategy {
@Override
public int selectPartition(Object key, int partitionCount) {
System.out.println("消息载荷的key:" + key + " partitionCount:" + partitionCount);
if (!ObjectUtils.isEmpty(key)) {
Long id = (Long) key;
return id.intValue() % partitionCount;
}
return 0;
}
}
配置文件:
application.properties
server.port=8881
spring.application.name=spring-cloud-stream-kafka-producer
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
# Kafka Binder Properties
# A list of brokers to which the Kafka binder connects.
# Default: localhost.
spring.cloud.stream.kafka.binder.brokers=localhost:9092
# If set to true, the binder creates new topics automatically.
# If set to false, the binder relies on the topics being already configured.
# In the latter case, if the topics do not exist, the binder fails to start.
# This setting is independent of the auto.topic.create.enable setting of the broker and does not influence it.
# If the server is set to auto-create topics, they may be created as part of the metadata retrieva l request, with default broker settings.
# Default: true.
spring.cloud.stream.kafka.binder.autoCreateTopics=true
# If set to true, the binder creates new partitions if required.
# If set to false, the binder relies on the partition size of the topic being already configured.
# If the partition count of the target topic is smaller than the expected value, the binder fails to start.
# Default: false.
spring.cloud.stream.kafka.binder.autoAddPartitions=true
management.endpoints.web.exposure.include=bindings
# 通过两个channel向同一个topic发送消息
spring.cloud.stream.bindings.output.destination=topic2
spring.cloud.stream.bindings.output.content-type=application/json
# 配置分区的输出绑定
spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id
# 此属性开始若报无订阅者错误,需开启autoAddPartitions=true
# 输出消息分布到3个分区
spring.cloud.stream.bindings.output.producer.partitionCount=3
# partition Key 提取器名称,负责从消息中提取分区key
spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName=customPartitionKeyExtractor
# 自定义partition选择器,负责根据分区key和partitionCount计算出将消息发布到哪个分区
spring.cloud.stream.bindings.output.producer.partitionSelectorName=customPartitionSelector
# LOGGING
#logging.level.root=WARN
#logging.level.org.springframework.web=DEBUG
#logging.level.org.springframework=DEBUG
#logging.level.com.spring.cloud.stream.kafka.consumer.producer=DEBUG
logging.pattern.console=${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %4line %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}