设为首页 加入收藏

TOP

springboot 中使用 RabbitMQ 配置使用优先级队列(一)
2023-07-25 21:30:59 】 浏览:51
Tags:springboot RabbitMQ

RabbitMQ 支持优先级队列,当工作中有一些任务需要紧急优先处理,此时可以使用优先级队列
通过设置 MQ 的 x-max-priority 属性可以将对列设置为优先级队列

配置文件类


@Slf4j
@Getter
@Configuration
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.addresses}")
    private String addresses;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Value("${spring.rabbitmq.cache.channel.size:100}")
    private int channelCacheSize;       // 单条链接channel 缓存大小

    @Value("${spring.rabbitmq.cache.channel.checkout-timeout:5000}")
    private long channelCheckTimeout;    //获取channel等待时间,毫秒

    @Value("${app.accept-text-queue-name}")
    private String acceptTextQueueName;

    @Bean
    public MessageConverter jsonConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setChannelCheckoutTimeout(channelCheckTimeout);
        connectionFactory.setChannelCacheSize(channelCacheSize);
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean(name = "jsonRabbitTemplate")
    public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMandatory(true);
        template.setMessageConverter(jsonConverter());
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (!ack) {
                log.error("confirm message error! correlationData: {}, cause: {}", correlationData, cause);
            }
        });
        template.setReturnsCallback((msg) -> {
            String content = new String(msg.getMessage().getBody(), StandardCharsets.UTF_8);
            String message = MessageFormatter.format("send msg error, routing key:{}, msg:{}", msg.getRoutingKey(), content).getMessage();
            LogssetaPrinter.printError(message);
            log.error(JsonObjectUtil.toJsonQuietly(msg));
        });

        return template;
    }

    @Bean(name = "acceptSseConvertQueue")
    public Queue acceptSseConvertQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-max-priority", 100);// 设置优先级最高为 100,数字越大优先级越高
        return new Queue(acceptTextQueueName, true, false, false, args);
    }

}

设置预取数

在消费者中默认 containerFactory 为 SimpleRabbitListenerContainerFactory ,其中默认预取数为 250,该配置在 AbstractMessageListenerContainer 中的 DEFAULT_PREFETCH_COUNT
如果需要每次消费一个消息,就需要在配置文件中指定预取数为 1, 配置项为

// 设置预取数为 1
spring.rabbitmq.listener.simple.prefetch = 1

设置并发消费数

在 @RabbitListener 中,配置项 concurrency 来配置并发消费数

在生产环境中一般需要设置高可用MQ集群,常见的是镜像队列模式

客户端配置镜像队列方式

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password)
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇jar包转化成可执行exe 下一篇死磕面试系列,Java到底是值传递..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目