.setPublisherConfirms(isConfirm);
}
在配置类使用@EnableRabbit的情况下,也可以基于注解进行声明,在Bean的方法上加上@RabbitListener,如下:
/**
* 可以直接通过注解声明交换器、绑定、队列。但是如果声明的和rabbitMq中已经存在的不一致的话
* 会报错便于测试,我这里都是不使用持久化,没有消费者之后自动删除
* {@link RabbitListener}是可以重复的。并且声明队列绑定的key也可以有多个.
*
* @param headers
* @param msg
*/
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
autoDelete = RabbitMQConstant.true_CONSTANT),
key = DKEY
),
//手动指明消费者的监听容器,默认Spring为自动生成一个SimpleMessageListenerContainer
containerFactory = "container",
//指定消费者的线程数量,一个线程会打开一个Channel,一个队列上的消息只会被消费一次(不考虑消息重新入队列的情况),下面的表示至少开启5个线程,最多10个。线程的数目需要根据你的任务来决定,如果是计算密集型,线程的数目就应该少一些
concurrency = "5-10"
)
public void process(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
log.info("basic consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}");
}
/**
* {@link Queue#ignoreDeclarationExceptions}声明队列会忽略错误不声明队列,这个消费者仍然是可用的
*
* @param headers
* @param msg
*/
@RabbitListener(queuesToDeclare = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, ignoreDeclarationExceptions = RabbitMQConstant.true_CONSTANT))
public void process2(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
log.info("basic2 consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}");
}
关于消息序列化
这个比较简单,默认采用了Java序列化,我们一般使用的Json格式,所以配置了Jackson,根据自己的情况来,直接贴代码:
@Bean
MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
同一个队列多消费类型
如果是同一个队列多个消费类型那么就需要针对每种类型提供一个消费方法,否则找不到匹配的方法会报错,如下:
@Componentbr/>@Slf4j
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = RabbitMQConstant.MULTIPART_HANDLE_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
value = @Queue(value = RabbitMQConstant.MULTIPART_HANDLE_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
autoDelete = RabbitMQConstant.true_CONSTANT),
key = RabbitMQConstant.MULTIPART_HANDLE_KEYbr/>)
)
@Profile(SpringConstant.MULTIPART_PROFILE)
public class MultipartConsumer {
/**
* RabbitHandler用于有多个方法时但是参数类型不能一样,否则会报错
*
* @param msg
*/
@RabbitHandler
public void process(ExampleEvent msg) {
log.info("param:{msg = [" + msg + "]} info:");
}
@RabbitHandler
public void processMessage2(ExampleEvent2 msg) {
log.info("param:{msg2 = [" + msg + "]} info:");
}
/**
* 下面的多个消费者,消费的类型不一样没事,不会被调用,但是如果缺了相应消息的处理Handler则会报错
*
* @param msg
*/
@RabbitHandler
public void processMessage3(ExampleEvent3 msg) {
l