设为首页 加入收藏

TOP

Java SpringBoot集成RabbitMQ实战和总结(六)
2018-10-10 04:11:04 】 浏览:1394
Tags:Java SpringBoot 集成 RabbitMQ 战和 总结
-heart-{0}", id.getAndIncrement()));
        }
    });


client线程池见:com.rabbitmq.client.impl.ConsumerWorkService构造方法。Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)。


final ExecutorService executorService = Executors.newFixedThreadPool(5, new ThreadFactory() {
public final AtomicInteger id = new AtomicInteger();


        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, MessageFormat.format("amqp-client-{0}", id.getAndIncrement()));
        }
    });


listener的线程设置如下:


    simpleRabbitListenerContainerFactory.setTaskExecutor(new SimpleAsyncTaskExecutor"amqp-consumer-"));


注意:SimpleAsyncTaskExecutor每次执行一个任务都会新建一个线程,对于生命周期很短的任务不要使用这个线程池(如client线程池的任务), 这里的消费者线程生命周期直到SimpleMessageListenerContainer停止所以没有适合这个场景


修改过之后的线程如下:


消息投递过程如下:


在AMQConnection中开启连接线程,该线程用于处理和RabbitMq的通信:
public void startMainLoop() {
MainLoop loop = new MainLoop();
final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
mainLoopThread = Environment.newThread(threadFactory, loop, name);
mainLoopThread.start();
}
AMQConnection.heartbeatExecutor是心跳线程。
AMQConnection.consumerWorkServiceExecutor则是用来处理事件的线程池,AMQConnection线程收到消息投递到这里。
分发逻辑详见com.rabbitmq.client.impl.ChannelN#processAsync->com.rabbitmq.client.impl.ConsumerDispatcher#handleDelivery->投递到线程池.
线程池中继续将消息投递到org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#queue中
consumer线程进行最终消息
上面的是默认的消费者监听器。SpringAMQP 2.0引入了一个新的监听器实现DirectMessageListenerContainer。这个实现最大的变化在于消费者的处理逻辑不是在自己的线程池中执行而是直接在client线程池中处理,这样最明显的是省去了线程的上下文切换的开销,而且设计上也变得更为直观。所以如果采用这个监听器需要覆盖默认的线程池加大Connection的线程池。采用这个监听器只需要设置@RabbitListener的containerFactory属性。声明方法如下:


@Bean
DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    final DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory = new DirectRabbitListenerContainerFactory();
    directRabbitListenerContainerFactory.setConsumersPerQueue(Runtime.getRuntime().availableProcessors());
    directRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
    directRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
    directRabbitListenerContainerFactory.setConsumersPerQueue(10);
    return directRabbitListenerContainerFactory;
}


这时的消息流转图如下:


还有一些关于监听器的例子和Springboot配置我放在了源码里,这里不再讲述。


首页 上一页 3 4 5 6 下一页 尾页 6/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Java程序语言的后门-反射机制 下一篇Java Bean与Map之间相互转化的实现

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目