MOM系列文章之 - Spring Jms Integration 解读(四)

2014-11-24 07:36:59 · 作者: · 浏览: 1
or) { activeInvokerCount++; lifecycleMonitor.notifyAll(); } boolean messageReceived = false; try { if (maxMessagesPerTask < 0) { messageReceived = executeOngoingLoop(); } else { int messageCount = 0; while (isRunning() && messageCount < maxMessagesPerTask) { messageReceived = (invokeListener() || messageReceived); messageCount++; } } } catch (Throwable ex) { clearResources(); if (!this.lastMessageSucceeded) { // We failed more than once in a row - sleep for recovery interval // even before first recovery attempt. sleepInbetweenRecoveryAttempts(); } this.lastMessageSucceeded = false; boolean alreadyRecovered = false; synchronized (recoveryMonitor) { if (this.lastRecoveryMarker == currentRecoveryMarker) { handleListenerSetupFailure(ex, false); recoverAfterListenerSetupFailure(); currentRecoveryMarker = new Object(); } else { alreadyRecovered = true; } } if (alreadyRecovered) { handleListenerSetupFailure(ex, true); } } finally { synchronized (lifecycleMonitor) { decreaseActiveInvokerCount(); lifecycleMonitor.notifyAll(); } if (!messageReceived) { this.idleTaskExecutionCount++; } else { this.idleTaskExecutionCount = 0; } synchronized (lifecycleMonitor) { if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) { // We're shutting down completely. scheduledInvokers.remove(this); if (logger.isDebugEnabled()) { logger.debug(Lowered scheduled invoker count: + scheduledInvokers.size()); } lifecycleMonitor.notifyAll(); clearResources(); } else if (isRunning()) { int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount(); if (nonPausedConsumers < 1) { logger.error(All scheduled consumers have been paused, probably due to tasks having been rejected. + Check your thread pool configuration! Manual recovery necessary through a start() call.); } else if (nonPausedConsumers < getConcurrentConsumers()) { logger.warn(Number of scheduled consumers has dropped below concurrentConsumers limit, probably + due to tasks having been rejected. Check your thread pool configuration! Automatic recovery + to be triggered by remaining consumers.); } } } } } private boolean executeOngoingLoop() throws JMSException { boolean messageReceived = false; boolean active = true; while (active) { synchronized (lifecycleMonitor) { boolean interrupted = false; boolean wasWaiting = false; while ((active = isActive()) && !isRunning()) { if (interrupted) { throw new IllegalStateException(Thread was interrupted while waiting for + a restart of the listener container, but container is still stopped); } if (!wasWaiting) { decreaseActiveInvokerCount(); } wasWaiting = true; try { lifecycleMonitor.wait(); } catch (InterruptedException ex) { // Re-interrupt current thread, to allow other threads to react. Thread.currentThread().interrupt(); interrupted = true; } } if (wasWaiting) { activeInvokerCount++; } if (scheduledInvokers.size() > maxConcurrentConsumers) { active = false; } } if (active) { messageReceived = (invokeListener() || messageReceived); } } return messageReceived; }

差不多这个类就介绍到这里,继续往下看吧~

org.springframework.jms.listener.endpoint包里面提供了一些JavaEE特性 – 对JCA的支持,这里就不展开了。

org.springframework.jms.support,org.springframework.jms.support.converter,org.springframework.jms.support.destination则分别提供了Jms工具类JmsUtils(依我来看,JmsAccessor类可以考虑放到core包里面,而把一些工具类抽到这里来),针对消息转换器(主要包括三类转换,Object<->Message,XML<->Message,Json<->Message),Destination的支持,难度不大,这里也就不展开讨论了。

org.springframework.jms.remoting包则告诉我们底层可以通过JMS走远程服务,类似RMI的Remoting。

ok,差不多就这些内容。看了这么多,最后我们再总结一下Spring对JMS封装的不足之处吧:

(1) Spring对JMS的封装停留在JMS 1.1规范上(1.0.2中的支持Deprecated了),JMS 2的支持在最新的4.0 版本中未曾找见;

(2) 消息发送&接收的时候没有预留钩子方法。比方说我们有这样的需求 - 跟踪消息走向,在消息发送完后向本地的agent写一点数据,agent定时,定量推送数据去server端做统计运算,展示等。这个时候就没有out-of-box的方法可以去实现,当然变通的方法也有不少,但不适合和开源版本融合;

(3) 缺少一些容错策略,比方说消息发送失败,如何处理?

如果有不明白的地方,欢迎大家留言讨论!