org.springframework.jms.connection包里面放置了一些与Connection相关的工具类(ConnectionFactoryUtils),基础类(JmsResourceHolder等)。这里重点关注一下JmsTransactionManager(extendsAbstractPlatformTransactionManager,其中的doXXX方法非常有看点),这个类也是JMS本地事务处理的一个核心工作类,如下:

org.springframework.jms.core包里面主要是spring封装的一些回调接口,如BrowserCallback,MessageCreator,MessagePostProcessor,ProducerCallback,SessionCallback,当然我们之前分析过的JmsTemplate也在这个包里面。
org.springframework.jms.core.support包里面就一个抽象类JmsGatewaySupport,暂时没怎么用,就是在afterPropertiesSet方法里面内置了一个initGateway方法,用来做一些定制化操作(custominitialization behavior)。
org.springframework.jms.listener和org.springframework.jms.listener.adapter包,我们要重点关注一下,刚才编程式API主要介绍了消息的发送,消息的接受是怎么处理的呢,主要看这两个包里面的类。类图如下:
我们先来了解下SimpleMessageListenerContainer的核心方法:
/**
* Create a MessageConsumer for the given JMS Session,
* registering a MessageListener for the specified listener.
* @param session the JMS Session to work on
* @return the MessageConsumer
* @throws JMSException if thrown by JMS methods
* @see #executeListener
*/
protected MessageConsumer createListenerConsumer(final Session session) throws JMSException {
Destination destination = getDestination();
if (destination == null) {
destination = resolveDestinationName(session, getDestinationName());
}
MessageConsumer consumer = createConsumer(session, destination);
if (this.taskExecutor != null) {
consumer.setMessageListener(new MessageListener() {
public void onMessage(final Message message) {
taskExecutor.execute(new Runnable() {
public void run() {
processMessage(message, session);
}
});
}
});
}
else {
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
processMessage(message, session);
}
});
}
return consumer;
} 怎么样,很简单吧?非常简单的调度算法,也没有失败重连等高级功能。如果需要这些功能,怎么办?ok,是时候DefaultMessageListenerContainer出场了,一个功能相对比较丰富的Listener容器,和SimpleMessageListenerContainer不同,它使用AsyncMessageListenerInvoker执行一个looped的MessageConsumer.receive()调用来接收消息,注意这里的Executor,默认是SimpleAsyncTaskExecutor,文档里写的很清楚:
NOTE: This implementation does not reuse threads! Consider a thread-pooling TaskExecutor implementation instead, in particular for executing a large number of short-lived tasks.来看看这个类里面几个重要的成员变量,首先是concurrentConsumers和maxConcurrentConsumers。通过设置setConcurrency方法,可以scale up number of consumers between the minimum number ofconsumers(concurrentConsumers)and the maximum number of consumers(maxConcurrentConsumers)。那么单个消费任务如何消费消息呢,这里又有一个变量需要注意一下,即idleTaskExecutionLimit,官方的解释很清楚了:
Within each task execution, a number of message reception attempts (according to the maxMessagesPerTask setting) will each wait for an incoming message (according to the receiveTimeout setting). If all of those receive attempts in a given task return without a message, the task is considered idle with respect to received messages. Such a task may still be rescheduled; however, once it reached the specified idleTaskExecutionLimit, it will shut down (in case of dynamic scaling).接下来,我们来看这个类里面最最重要的调度方法,在其内部类AsyncMessageListenerInvoker里面,如下:
public void run() {
synchronized (lifecycleMonit