设为首页 加入收藏

TOP

RabbitMQ个人实践(一)
2023-07-25 21:34:44 】 浏览:59
Tags:RabbitMQ

前言

MQ(Message Queue)就是消息队列,其有点有很多:解耦、异步、削峰等等,本文来聊一下RabbitMQ的一些概念以及使用。

RabbitMq

案例

Springboot整合RabbitMQ简单案例

基本概念

rabbitmq.png

  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • Producer:消息生产者,就是投递消息的程序。
  • Consumer:消息消费者,就是接受消息的程序。

发布消息到RabbitMQ需要经过两步:

  1. producer → exchange
  2. exchange 根据 exchange 的类型和 routing key 确定将消息投递到哪个队列

工作流程

了解了RabbitMQ的一些概念,我们来捋捋使用RabbitMQ的流程:

  1. 创建Exchange
  2. 创建Queue
  3. 将Queue绑定进Exchange中(此处会设置routing key)
  4. 生产者发布消息
  5. 消费者订阅消息

交换机(Exchange)

交换机可以绑定队列,绑定时可以给队列指定路由(Routing key)和参数(Arguments)

所有的消息发送都是经过交换机转发到队列的,而不是直接到队列中

交换机类型:

  • direct

    根据确定的路由(routing key)转发消息到队列中(一条消息可以发到多个队列,只要路由相同)

  • fanout

    路由无效,只要和该交换机绑定的队列,都能接收到消息

  • topic

    允许路由使用*和#来进行模糊匹配

    *表示一个单词

    #表示任意数量(零个或多个)单词

    例如:如果队列的路由为com.# 那么往交换机发消息是,路由填com.ccc 队列就可以收到消息

  • headers

    忽略路由,由参数(Arguments)来确定转发的队列

消息过期时间TTL

有两种方式设置TTL,创建队列时设置整个队列的TTL或者在发送消息时单独设置每条消息的TTL,消息存活时间取两者的最小值。

  1. 创建队列时设置

    是消息的存活时间,不是队列的存活时间,别搞混了。

    @Bean
    public Queue queue(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000); // 设置队列中的消息5秒过期
        return new Queue("queueName",true, false, false, args);
    }
    
  2. 发送消息时设置

    public void makeOrder(String userid,String productid,int num){
        String exchangeName = "ttl_exchange";
        String routingKey = "ttlmessage";
        //给消息设置过期时间
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
            public Message postProcessMessage(Message message){
                // 设置消息5秒过期
                message.getMessageProperties().setExpiration("5000");
                return message;
            }
        }
        rabbitTemplate.convertAndSend(exchangeName,routingKey,"message",messagePostProcessor);
    }
    

死信队列

死信队列也是一个正常队列,只是当绑定了死信队列的队列满足相应条件,就会将满足条件的消息转移到死信队列中。

进入死信队列的条件:

  1. 消息被拒绝
  2. 消息过期(超时)
  3. 队列达到最大长度

死信队列的配置:

  1. 按照正常步骤定义一个队列(交换机、队列、绑定)

  2. 给需要绑定死信队列的队列添加x-dead-letter-exchange(死信队列的交换机)和x-dead-letter-routing-key(死信队列的路由)参数

    @Bean
    public Queue queue(){
        Map<String, Object> args = new HashMap<>();
    		args.put("x-dead-letter-exchange", "死信队列交换机名称"); 
    		args.put("x-dead-letter-routing-key", "死信队列路由"); 
        return new Queue("queueName",true, false, false, args);
    }
    

如何保证MQ消息正确送达与消费

可靠性生产和推送

步骤:

  1. 发送消息前数据库保存MQ消息发送日志
  2. MQ消息发送后使用回调更新日志状态

实现:

上面我们讲了,发布消息到RabbitMQ需要经过两步:

producer → exchange
exchange 根据 exchange 的类型和 routing key 确定将消息投递到哪个队列

所以,发布消息的确认也分两个部分,以下是确认步骤:

  1. 修改MQ应答机制(yml)

    spring:
      rabbitmq:
        username: rmq
        password: 123456
        virtual-host: /
        host: localhost
        port: 5672
        # 发送消息确认,producer -> exchange
        publisher-confirm-type: correlated
        # 发送消息确认,exchange -> queue
        publisher-returns: true
    
  2. 新增mq的回调方法

    /**
     * PostConstruct注解好多人以为是Spring提供的。其实是Java自己的注解。
     * Java中该注解的说明:@PostConstruct该注解被用来修饰一个非静态的void()方法。
     * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
     * PostConstruct在构造函数之后执行,init()方法之前执行。
     * Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
     */
    @PostConstruct
    private void regCallBack() {
        // producer -> exchange 成功或失败都会触发此回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Java锁的逻辑(结合对象头和Objec.. 下一篇第2-4-3章 规则引擎Drools基础语..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目