设为首页 加入收藏

TOP

RabbitMQ - 稳定性探讨 - SpringBoot集成(二)
2023-07-25 21:40:53 】 浏览:93
Tags:RabbitMQ SpringBoot 集成
rm有三种方式:

普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。

批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。

异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方

注意:

批量模式极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。

消息消费失败处理方案

设置死信队列

当消息发送失败后,设置requeue=false消息进入死信队列,并获取死信队列的长度,设置重新发送到正常队列的重试时间和重试间隔,重新发送到正常队列。

监控死信队列长度,日志记录及时预警。

实现延迟队列

RabbitMQ本身没有延迟队列,需要靠TTL和DLX模拟出延迟的效果

TTL来设置一个消息的的过期时间,DLX设置一个死信队列,将过期的消息推送到死信队列中,消费端监听死信队列来消费数据,从而达到消息延迟的效果。

死信队列补偿机制

当消息消费失败后,进入死信队列,框架层实现逻辑,获取对应死信队列的消息长度,当大于0时并判断是否超过重试次数并达到重试间隔。当没有超过重试次数时,自动将消息从死信队列迁移到正常队列。

消息防堆积方案

  • 加强对不合理使用MQ的审批。
  • 监控消费能力(耗时<300ms),及时预警。
  • 框架层实现发送方限流。(默认值:100条/s)
  • 设置消息TTL。
  • 使用惰性队列。

关键节点日志记录

MQ成功接受消息时。(info)

生产者消息发送失败时。(error)

生产者confrim确认失败时。(error)

生产消息量过大,限流时。(error)

生产者连接MQ超时时。 (error)

消息大小大于10KB时。(error)

消费者成功消费消息是。(info)

消费者连接MQ超时时。(error)

消费者消费失败时。(error)

消费者进入死信队列时。(error)

消费耗时低于300ms时。(error)

项目集成

maven引入

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yam配置

spring:
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    #虚拟host 可以不设置,使用server默认host
    virtual-host: itclub
    # 开启publisher-confirm,
    # 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-confirm-type: correlated
    # publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
    publisher-returns: true
    # 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
    template:
      mandatory: true

经典案例代码实现

1. 延迟队列实现

/**
 * 延迟队列
 * @Author : onePiece
 */
@Configuration
public class TtlQueueConfig {

    /**
     * 普通交换机名称
     */
    public static final String X_EXCHANGE = "X";

    /**
     * 死信交换机名称
     */
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    /**
     * 普通队列名称
     */
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";

    /**
     * 死信队列名称
     */
    public static final String DEAD_LETTER_QUEUE = "QD";

    public static final String DEAD_LETTER_QUEUE_KEY = "YD";

    /**
     * 声明 XExchange
     */
    @Bean
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    /**
     * 声明 yExchange
     */
    @Bean
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明队列QA
     */
    @Bean
    public Queue queueA(){
        Map<String, Object> arguments = new HashMap<>(3);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 设置死信路由键
        arguments.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_KEY);
        // 设置过期时间
        arguments.put("x-message-ttl", 2000);
        return new Queue(QUEUE_A, true, true, false, arguments);
    }

    /**
     * 声明队列QB
     */
    @Bean
    public Queue queueB(){
        Map<String, Object> arguments = new HashMap<>(3);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 设置死信路由键
        arguments.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_KEY);
        // 设置过期时间
        arguments.put("x-message-ttl", 4000);
        retur
首页 上一页 1 2 3 4 5 下一页 尾页 2/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇redis实现用户查询次数限制 下一篇Spring Boot常见问题

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目