or) {
activeInvokerCount++;
lifecycleMonitor.notifyAll();
}
boolean messageReceived = false;
try {
if (maxMessagesPerTask < 0) {
messageReceived = executeOngoingLoop();
}
else {
int messageCount = 0;
while (isRunning() && messageCount < maxMessagesPerTask) {
messageReceived = (invokeListener() || messageReceived);
messageCount++;
}
}
}
catch (Throwable ex) {
clearResources();
if (!this.lastMessageSucceeded) {
// We failed more than once in a row - sleep for recovery interval
// even before first recovery attempt.
sleepInbetweenRecoveryAttempts();
}
this.lastMessageSucceeded = false;
boolean alreadyRecovered = false;
synchronized (recoveryMonitor) {
if (this.lastRecoveryMarker == currentRecoveryMarker) {
handleListenerSetupFailure(ex, false);
recoverAfterListenerSetupFailure();
currentRecoveryMarker = new Object();
}
else {
alreadyRecovered = true;
}
}
if (alreadyRecovered) {
handleListenerSetupFailure(ex, true);
}
}
finally {
synchronized (lifecycleMonitor) {
decreaseActiveInvokerCount();
lifecycleMonitor.notifyAll();
}
if (!messageReceived) {
this.idleTaskExecutionCount++;
}
else {
this.idleTaskExecutionCount = 0;
}
synchronized (lifecycleMonitor) {
if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
// We're shutting down completely.
scheduledInvokers.remove(this);
if (logger.isDebugEnabled()) {
logger.debug(Lowered scheduled invoker count: + scheduledInvokers.size());
}
lifecycleMonitor.notifyAll();
clearResources();
}
else if (isRunning()) {
int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
if (nonPausedConsumers < 1) {
logger.error(All scheduled consumers have been paused, probably due to tasks having been rejected. +
Check your thread pool configuration! Manual recovery necessary through a start() call.);
}
else if (nonPausedConsumers < getConcurrentConsumers()) {
logger.warn(Number of scheduled consumers has dropped below concurrentConsumers limit, probably +
due to tasks having been rejected. Check your thread pool configuration! Automatic recovery +
to be triggered by remaining consumers.);
}
}
}
}
}
private boolean executeOngoingLoop() throws JMSException {
boolean messageReceived = false;
boolean active = true;
while (active) {
synchronized (lifecycleMonitor) {
boolean interrupted = false;
boolean wasWaiting = false;
while ((active = isActive()) && !isRunning()) {
if (interrupted) {
throw new IllegalStateException(Thread was interrupted while waiting for +
a restart of the listener container, but container is still stopped);
}
if (!wasWaiting) {
decreaseActiveInvokerCount();
}
wasWaiting = true;
try {
lifecycleMonitor.wait();
}
catch (InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
interrupted = true;
}
}
if (wasWaiting) {
activeInvokerCount++;
}
if (scheduledInvokers.size() > maxConcurrentConsumers) {
active = false;
}
}
if (active) {
messageReceived = (invokeListener() || messageReceived);
}
}
return messageReceived;
}
差不多这个类就介绍到这里,继续往下看吧~
org.springframework.jms.listener.endpoint包里面提供了一些JavaEE特性 – 对JCA的支持,这里就不展开了。
org.springframework.jms.support,org.springframework.jms.support.converter,org.springframework.jms.support.destination则分别提供了Jms工具类JmsUtils(依我来看,JmsAccessor类可以考虑放到core包里面,而把一些工具类抽到这里来),针对消息转换器(主要包括三类转换,Object<->Message,XML<->Message,Json<->Message),Destination的支持,难度不大,这里也就不展开讨论了。
org.springframework.jms.remoting包则告诉我们底层可以通过JMS走远程服务,类似RMI的Remoting。
ok,差不多就这些内容。看了这么多,最后我们再总结一下Spring对JMS封装的不足之处吧:
(1) Spring对JMS的封装停留在JMS 1.1规范上(1.0.2中的支持Deprecated了),JMS 2的支持在最新的4.0 版本中未曾找见;
(2) 消息发送&接收的时候没有预留钩子方法。比方说我们有这样的需求 - 跟踪消息走向,在消息发送完后向本地的agent写一点数据,agent定时,定量推送数据去server端做统计运算,展示等。这个时候就没有out-of-box的方法可以去实现,当然变通的方法也有不少,但不适合和开源版本融合;
(3) 缺少一些容错策略,比方说消息发送失败,如何处理?
如果有不明白的地方,欢迎大家留言讨论!