MOM系列文章之 - Spring Jms Integration 解读(三)

2014-11-24 07:36:59 · 作者: · 浏览: 2
de_Book.html#d0e574,深入了解下Webx是怎么利用Schema实现OCP原则的。

org.springframework.jms.connection包里面放置了一些与Connection相关的工具类(ConnectionFactoryUtils),基础类(JmsResourceHolder等)。这里重点关注一下JmsTransactionManager(extendsAbstractPlatformTransactionManager,其中的doXXX方法非常有看点),这个类也是JMS本地事务处理的一个核心工作类,如下:

\


org.springframework.jms.core包里面主要是spring封装的一些回调接口,如BrowserCallback,MessageCreator,MessagePostProcessor,ProducerCallback,SessionCallback,当然我们之前分析过的JmsTemplate也在这个包里面。

org.springframework.jms.core.support包里面就一个抽象类JmsGatewaySupport,暂时没怎么用,就是在afterPropertiesSet方法里面内置了一个initGateway方法,用来做一些定制化操作(custominitialization behavior)。

org.springframework.jms.listener和org.springframework.jms.listener.adapter包,我们要重点关注一下,刚才编程式API主要介绍了消息的发送,消息的接受是怎么处理的呢,主要看这两个包里面的类。类图如下:

\

我们先来了解下SimpleMessageListenerContainer的核心方法:

/**
	 * Create a MessageConsumer for the given JMS Session,
	 * registering a MessageListener for the specified listener.
	 * @param session the JMS Session to work on
	 * @return the MessageConsumer
	 * @throws JMSException if thrown by JMS methods
	 * @see #executeListener
	 */
	protected MessageConsumer createListenerConsumer(final Session session) throws JMSException {
		Destination destination = getDestination();
		if (destination == null) {
			destination = resolveDestinationName(session, getDestinationName());
		}
		MessageConsumer consumer = createConsumer(session, destination);

		if (this.taskExecutor != null) {
			consumer.setMessageListener(new MessageListener() {
				public void onMessage(final Message message) {
					taskExecutor.execute(new Runnable() {
						public void run() {
							processMessage(message, session);
						}
					});
				}
			});
		}
		else {
			consumer.setMessageListener(new MessageListener() {
				public void onMessage(Message message) {
					processMessage(message, session);
				}
			});
		}

		return consumer;
	}
怎么样,很简单吧?非常简单的调度算法,也没有失败重连等高级功能。如果需要这些功能,怎么办?ok,是时候DefaultMessageListenerContainer出场了,一个功能相对比较丰富的Listener容器,和SimpleMessageListenerContainer不同,它使用AsyncMessageListenerInvoker执行一个looped的MessageConsumer.receive()调用来接收消息,注意这里的Executor,默认是SimpleAsyncTaskExecutor,文档里写的很清楚:
NOTE: This implementation does not reuse threads! Consider a thread-pooling TaskExecutor implementation instead, in particular for executing a large number of short-lived tasks.
来看看这个类里面几个重要的成员变量,首先是concurrentConsumers和maxConcurrentConsumers。通过设置setConcurrency方法,可以scale up number of consumers between the minimum number ofconsumers(concurrentConsumers)and the maximum number of consumers(maxConcurrentConsumers)。那么单个消费任务如何消费消息呢,这里又有一个变量需要注意一下,即idleTaskExecutionLimit,官方的解释很清楚了:

Within each task execution, a number of message reception attempts (according to the maxMessagesPerTask setting) will each wait for an incoming message (according to the receiveTimeout setting). If all of those receive attempts in a given task return without a message, the task is considered idle with respect to received messages. Such a task may still be rescheduled; however, once it reached the specified idleTaskExecutionLimit, it will shut down (in case of dynamic scaling).
接下来,我们来看这个类里面最最重要的调度方法,在其内部类AsyncMessageListenerInvoker里面,如下:

public void run() {
			synchronized (lifecycleMonit