rm有三种方式:
普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方
注意:
批量模式极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。
消息消费失败处理方案
设置死信队列
当消息发送失败后,设置requeue=false消息进入死信队列,并获取死信队列的长度,设置重新发送到正常队列的重试时间和重试间隔,重新发送到正常队列。
监控死信队列长度,日志记录及时预警。
实现延迟队列
RabbitMQ本身没有延迟队列,需要靠TTL和DLX模拟出延迟的效果
TTL来设置一个消息的的过期时间,DLX设置一个死信队列,将过期的消息推送到死信队列中,消费端监听死信队列来消费数据,从而达到消息延迟的效果。
死信队列补偿机制
当消息消费失败后,进入死信队列,框架层实现逻辑,获取对应死信队列的消息长度,当大于0时并判断是否超过重试次数并达到重试间隔。当没有超过重试次数时,自动将消息从死信队列迁移到正常队列。
消息防堆积方案
- 加强对不合理使用MQ的审批。
- 监控消费能力(耗时<300ms),及时预警。
- 框架层实现发送方限流。(默认值:100条/s)
- 设置消息TTL。
- 使用惰性队列。
关键节点日志记录
MQ成功接受消息时。(info)
生产者消息发送失败时。(error)
生产者confrim确认失败时。(error)
生产消息量过大,限流时。(error)
生产者连接MQ超时时。 (error)
消息大小大于10KB时。(error)
消费者成功消费消息是。(info)
消费者连接MQ超时时。(error)
消费者消费失败时。(error)
消费者进入死信队列时。(error)
消费耗时低于300ms时。(error)
项目集成
maven引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yam配置
spring:
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#虚拟host 可以不设置,使用server默认host
virtual-host: itclub
# 开启publisher-confirm,
# 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-confirm-type: correlated
# publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
publisher-returns: true
# 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
template:
mandatory: true
经典案例代码实现
1. 延迟队列实现
/**
* 延迟队列
* @Author : onePiece
*/
@Configuration
public class TtlQueueConfig {
/**
* 普通交换机名称
*/
public static final String X_EXCHANGE = "X";
/**
* 死信交换机名称
*/
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
/**
* 普通队列名称
*/
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
/**
* 死信队列名称
*/
public static final String DEAD_LETTER_QUEUE = "QD";
public static final String DEAD_LETTER_QUEUE_KEY = "YD";
/**
* 声明 XExchange
*/
@Bean
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
/**
* 声明 yExchange
*/
@Bean
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
/**
* 声明队列QA
*/
@Bean
public Queue queueA(){
Map<String, Object> arguments = new HashMap<>(3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 设置死信路由键
arguments.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_KEY);
// 设置过期时间
arguments.put("x-message-ttl", 2000);
return new Queue(QUEUE_A, true, true, false, arguments);
}
/**
* 声明队列QB
*/
@Bean
public Queue queueB(){
Map<String, Object> arguments = new HashMap<>(3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 设置死信路由键
arguments.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_KEY);
// 设置过期时间
arguments.put("x-message-ttl", 4000);
retur