设为首页 加入收藏

TOP

RabbitMQ延迟队列,死信队列配置
2023-08-06 07:49:50 】 浏览:38
Tags:RabbitMQ
延迟和死信队列的配置
  • 延迟队列有效期一分钟,后进入死信队列,如果异常就进入异常队列
@Configuration
@Data
public class RabbitMQConfig {


    /**
     * 交换机
     */
    private String orderEventExchange="order.event.exchange";


    /**
     * 延迟队列, 不能被监听消费
     */
    private String orderCloseDelayQueue="order.close.delay.queue";

    /**
     * 关单队列, 延迟队列的消息过期后转发的队列,被消费者监听
     */
    private String orderCloseQueue="order.close.queue";


    /**
     * 进入延迟队列的路由key
     */
    private String orderCloseDelayRoutingKey="order.close.delay.routing.key";


    /**
     * 进入死信队列的路由key,消息过期进入死信队列的key
     */
    private String orderCloseRoutingKey="order.close.routing.key";

    /**
     * 过期时间 毫秒,临时改为1分钟定时关单
     */
    private Integer ttl=1000*60;

    /**
     * 消息转换器
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }


    /**
     * 创建交换机 Topic类型,也可以用dirct路由
     * 一般一个微服务一个交换机
     * @return
     */
    @Bean
    public Exchange orderEventExchange(){
        return new TopicExchange(orderEventExchange,true,false);
    }


    /**
     * 延迟队列
     */
    @Bean
    public Queue orderCloseDelayQueue(){

        Map<String,Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange",orderEventExchange);
        args.put("x-dead-letter-routing-key",orderCloseRoutingKey);
        args.put("x-message-ttl",ttl);

        return new Queue(orderCloseDelayQueue,true,false,false,args);
    }


    /**
     * 死信队列,普通队列,用于被监听
     */
    @Bean
    public Queue orderCloseQueue(){

        return new Queue(orderCloseQueue,true,false,false);

    }

    /**
     * 第一个队列,即延迟队列的绑定关系建立
     * @return
     */
    @Bean
    public Binding orderCloseDelayBinding(){

        return new Binding(orderCloseDelayQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseDelayRoutingKey,null);
    }

    /**
     * 死信队列绑定关系建立
     * @return
     */
    @Bean
    public Binding orderCloseBinding(){

        return new Binding(orderCloseQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseRoutingKey,null);
    }



}
异常队列配置类
public class RabbitMQErrorConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;
    /**
     * 异常交换机
     */
    private String orderErrorExchange = "order.error.exchange";
    /**
     * 异常队列
     */
    private String orderErrorQueue = "order.error.queue";
    /**
     * 异常routing.key
     */
    private String orderErrorRoutingKey = "order.error.routing.key";

    /**
     * 异常交换机
     * @return
     */
    @Bean
    public TopicExchange errorTopicExchange(){
        return new TopicExchange(orderErrorExchange,true,false);
    }

    /**
     * 异常队列
     * @return
     */
    @Bean
    public Queue errorQueue(){
        return new Queue(orderErrorQueue,true);
    }

    /**
     * 队列交换机进行绑定
     * @param errorQueue
     * @return
     */
    @Bean
    public Binding BindingErrorQueueAndExchange(Queue errorQueue,TopicExchange errorTopicExchange){
        return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(orderErrorRoutingKey);
    }

    /**
     * 配置 RepublishMessageRecoverer
     * 用途:消息重试一定次数后,用特定的routingKey转发到指定的交换机中,方便后续排查和告警
     *
     * 顶层是 MessageRecoverer接口,多个实现类
     *
     * @return
     */
    @Bean
    public MessageRecoverer messageRecoverer(){
        return new RepublishMessageRecoverer(rabbitTemplate,orderErrorExchange,orderErrorRoutingKey);
    }
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spring-mvc系列:详解@RequestMap.. 下一篇SpringCloud-Hystrix服务熔断与降..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目