1 /*
2 * Copyright 2002-2015 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.springframework.jms.listener;
18
19 import java.util.HashSet;
20 import java.util.Set;
21 import java.util.concurrent.Executor;
22 import javax.jms.Connection;
23 import javax.jms.JMSException;
24 import javax.jms.MessageConsumer;
25 import javax.jms.Session;
26
27 import org.springframework.core.Constants;
28 import org.springframework.core.task.SimpleAsyncTaskExecutor;
29 import org.springframework.core.task.TaskExecutor;
30 import org.springframework.jms.JmsException;
31 import org.springframework.jms.support.JmsUtils;
32 import org.springframework.jms.support.destination.CachingDestinationResolver;
33 import org.springframework.jms.support.destination.DestinationResolver;
34 import org.springframework.scheduling.SchedulingAwareRunnable;
35 import org.springframework.scheduling.SchedulingTaskExecutor;
36 import org.springframework.util.Assert;
37 import org.springframework.util.ClassUtils;
38 import org.springframework.util.backoff.BackOff;
39 import org.springframework.util.backoff.BackOffExecution;
40 import org.springframework.util.backoff.FixedBackOff;
41
42 /**
43 * Message listener container variant that uses plain JMS client APIs, specifically
44 * a loop of {@code MessageConsumer.receive()} calls that also allow for
45 * transactional reception of messages (registering them with XA transactions).
46 * Designed to work in a native JMS environment as well as in a Java EE environment,
47 * with only minimal differences in configuration.
48 *
49 * <p>This is a simple but nevertheless powerful form of message listener container.
50 * On startup, it obtains a fixed number of JMS Sessions to invoke the listener,
51 * and optionally allows for dynamic adaptation at runtime (up to a maximum number).
52 * Like {@link SimpleMessageListenerContainer}, its main advantage is its low level
53 * of runtime complexity, in particular the minimal requirements on the JMS provider:
54 * not even the JMS {@code ServerSessionPool} facility is required. Beyond that, it is
55 * fully self-recovering in case the broker is temporarily unavailable, and allows
56 * for stops/restarts as well as runtime changes to its configuration.
57 *
58 * <p>Actual {@code MessageListener} execution happens in asynchronous work units which are
59 * created through Spring's {@link org.springframework.core.task.TaskExecutor TaskExecutor}
60 * abstraction. By default, the specified number of invoker tasks will be created
61 * on startup, according to the {@link #setConcurrentConsumers "concurrentConsumers"}
62 * setting. Specify an alternative {@code TaskExecutor} to integrate with an existing
63 * thread pool facility (such as a Java EE server's), for example using a
64 * {@link org.springframework.scheduling.commonj.WorkManagerTaskExecutor CommonJ WorkManager}.
65 * With a native JMS setup, each of those listener threads is going to use a
66 * cached JMS {@code Session} and {@code MessageConsumer} (only refreshed in case
67 * of failure), using the JMS provider's resources as efficiently as possible.
68 *
69 * <p>Message reception and listener execution can automatically be wrapped
70 * in transactions by passing a Spring
71 * {@link org.springframework.transaction.PlatformTransactionManager} into the
72 * {@link #setTransactionManager "transactionManager"} property. This will usually
73 * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
74 * Java EE environment, in combination with a JTA-aware JMS {@code ConnectionFactory}
75 * obtained from JNDI (check your Java EE server's documentation). Note that this
76 * listener container will automatically reobtain all JMS handles for each transaction
77 * in case an external transaction manager is specified, for compatibility with
78 * all Java EE servers (in particular JBoss). This non-caching behavior can be
79 * overridden through the {@link #setCacheLevel "cacheLevel"} /
80 * {@link #setCacheLevelName "cacheLevelName"} property, enforcing caching of
81 * the {@code Connection} (or also {@code Session} and {@code MessageConsumer})
82 * even if an external transaction manager is involved.
83 *
84 * <p>Dynamic scaling of the number of concurrent invokers can be activated
85 * by specifying a {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"}
86 * value that is higher than the {@link #setConcurrentConsumers "concurrentConsumers"}
87 * value. Since the latter's default is 1, you can also simply specify a
88 * "maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling up to
89 * 5 concurrent consumers in case of increasing message load, as well as dynamic
90 * shrinking back to the standard number of consumers once the load decreases.
91 * Consider adapting the {@link #setIdleTaskExecutionLimit "idleTaskExecutionLimit"}
92 * setting to control the lifespan of each new task, to avoid frequent scaling up
93 * and down, in particular if the {@code ConnectionFactory} does not pool JMS
94 * {@code Sessions} and/or the {@code TaskExecutor} does not pool threads (check
95 * your configuration!). Note that dynamic scaling only really makes sense for a
96 * queue in the first place; for a topic, you will typically stick with the default
97 * number of 1 consumer, otherwise you'd receive the same message multiple times on
98 * the same node.
99 *
100 * <p><b>Note: Don't use Spring's {@link org.springframework.jms.connection.CachingConnectionFactory}
101 * in combination with dynamic scaling.</b> Ideally, don't use it with a message
102 * listener container at all, since it is generally preferable to let the
103 * listener container itself handle appropriate caching within its lifecycle.
104 * Also, stopping and restarting a listener container will only work with an
105 * independent, locally cached Connection - not with an externally cached one.
106 *
107 * <p><b>It is strongly recommended to either set {@link #setSessionTransacted
108 * "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
109 * "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
110 * javadoc for details on acknowledge modes and native transaction options, as
111 * well as the {@link AbstractPollingMessageListenerContainer} javadoc for details
112 * on configuring an external transaction manager. Note that for the default
113 * "AUTO_ACKNOWLEDGE" mode, this container applies automatic message acknowledgment
114 * before listener execution, with no redelivery in case of an exception.
115 *
116 * @author Juergen Hoeller
117 * @since 2.0
118 * @see #setTransactionManager
119 * @see #setCacheLevel
120 * @see javax.jms.MessageConsumer#receive(long)
121 * @see SimpleMessageListenerContainer
122 * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager
123 */
124 public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
125
126 /**
127 * Default thread name prefix: "DefaultMessageListenerContainer-".
128 */
129 public static final String DEFAULT_THREAD_NAME_PREFIX =
130 ClassUtils.getShortName(DefaultMessageListenerContainer.class) + "-";
131
132 /**
133 * The default recovery interval: 5000 ms = 5 seconds.
134 */
135 public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
136
137
138 /**
139 * Constant that indicates to cache no JMS resources at all.
140 * @see #setCacheLevel
141 */
142 public static final int CACHE_NONE = 0;
143
144 /**
145 * Constant that indicates to cache a shared JMS {@code Connection} for each
146 * listener thread.
147 * @see #setCacheLevel
148 */
149 public static final int CACHE_CONNECTION = 1;
150
151 /**
152 * Constant that indicates to cache a shared JMS {@code Connection} and a JMS
153 * {@code Session} for each listener thread.
154 * @see #setCacheLevel
155 */
156 public static final int CACHE_SESSION = 2;
157
158 /**
159 * Constant that indicates to cache a shared JMS {@code Connection}, a JMS
160 * {@code Session}, and a JMS MessageConsumer for each listener thread.
161 * @see #setCacheLevel
162 */
163 public static final int CACHE_CONSUMER = 3;
164
165 /**
166 * Constant that indicates automatic choice of an appropriate caching level
167 * (depending on the transaction management strategy).
168 * @see #setCacheLevel
169 */
170 public static final int CACHE_AUTO = 4;
171
172
173 private static final Constants constants = new Constants(DefaultMessageListenerContainer.class);
174
175
176 private Executor taskExecutor;
177
178 private BackOff backOff = createDefaultBackOff(DEFAULT_RECOVERY_INTERVAL);
179
180 private int cacheLevel = CACHE_AUTO;
181
182 private int concurrentConsumers = 1;
183
184 private int maxConcurrentConsumers = 1;
185
186 private int maxMessagesPerTask = Integer.MIN_VALUE;
187
188 private int idleConsumerLimit = 1;
189
190 private int idleTaskExecutionLimit = 1;
191
192 private final Set<AsyncMessageListenerInvoker> scheduledInvokers = new HashSet<AsyncMessageListenerInvoker>();
193
194 private int activeInvokerCount = 0;
195
196 private int registeredWithDestination = 0;
197
198 private volatile boolean recovering = false;
199
200 private Runnable stopCallback;
201
202 private Object currentRecoveryMarker = new Object();
203
204 private final Object recoveryMonitor = new Object();
205
206
207 /**
208 * Set the Spring {@code TaskExecutor} to use for running the listener threads.
209 * <p>Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor},
210 * starting up a number of new threads, according to the specified number
211 * of concurrent consumers.
212 * <p>Specify an alternative {@code TaskExecutor} for integration with an existing
213 * thread pool. Note that this really only adds value if the threads are
214 * managed in a specific fashion, for example within a Java EE environment.
215 * A plain thread pool does not add much value, as this listener container
216 * will occupy a number of threads for its entire lifetime.
217 * @see #setConcurrentConsumers
218 * @see org.springframework.core.task.SimpleAsyncTaskExecutor
219 * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
220 */
221 public void setTaskExecutor(Executor taskExecutor) {
222 this.taskExecutor = taskExecutor;
223 }
224
225 /**
226 * Specify the {@link BackOff} instance to use to compute the interval
227 * between recovery attempts. If the {@link BackOffExecution} implementation
228 * returns {@link BackOffExecution#STOP}, this listener container will not further
229 * attempt to recover.
230 * <p>The {@link #setRecoveryInterval(long) recovery interval} is ignored
231 * when this property is set.
232 */
233 public void setBackOff(BackOff backOff) {
234 this.backOff = backOff;
235 }
236
237 /**
238 * Specify the interval between recovery attempts, in <b>milliseconds</b>.
239 * The default is 5000 ms, that is, 5 seconds. This is a convenience method
240 * to create a {@link FixedBackOff} with the specified interval.
241 * <p>For more recovery options, consider specifying a {@link BackOff}
242 * instance instead.
243 * @see #setBackOff(BackOff)
244 * @see #handleListenerSetupFailure
245 */
246 public void setRecoveryInterval(long recoveryInterval) {
247 this.backOff = createDefaultBackOff(recoveryInterval);
248 }
249
250 /**
251 * Specify the level of caching that this listener container is allowed to apply,
252 * in the form of the name of the corresponding constant: e.g. "CACHE_CONNECTION".
253 * @see #setCacheLevel
254 */
255 public void setCacheLevelName(String constantName) throws IllegalArgumentException {
256 if (constantName == null || !constantName.startsWith("CACHE_")) {
257 throw new IllegalArgumentException("Only cache constants allowed");
258 }
259 setCacheLevel(constants.asNumber(constantName).intValue());
260 }
261
262 /**
263 * Specify the level of caching that this listener container is allowed to apply.
264 * <p>Default is {@link #CACHE_NONE} if an external transaction manager has been specified
265 * (to reobtain all resources freshly within the scope of the external transaction),
266 * and {@link #CACHE_CONSUMER} otherwise (operating with local JMS resources).
267 * <p>Some Java EE servers only register their JMS resources with an ongoing XA
268 * transaction in case of a freshly obtained JMS {@code Connection} and {@code Session},
269 * which is why this listener container by default does not cache any of those.
270 * However, depending on the rules of your server with respect to the caching
271 * of transactional resources, consider switching this setting to at least
272 * {@link #CACHE_CONNECTION} or {@link #CACHE_SESSION} even in conjunction with an
273 * external transaction manager.
274 * @see #CACHE_NONE
275 * @see #CACHE_CONNECTION
276 * @see #CACHE_SESSION
277 * @see #CACHE_CONSUMER
278 * @see #setCacheLevelName
279 * @see #setTransactionManager
280 */
281 public void setCacheLevel(int cacheLevel) {
282 this.cacheLevel = cacheLevel;
283 }
284
285 /**
286 * Return the level of caching that this listener container is allowed to apply.
287 */
288 public int getCacheLevel() {
289 return this.cacheLevel;
290 }
291
292
293 /**
294 * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
295 * upper limit String, e.g. "10" (the lower limit will be 1 in this case).
296 * <p>This listener container will always hold on to the minimum number of consumers
297 * ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number
298 * of consumers {@link #setMaxConcurrentConsumers} in case of increasing load.
299 */
300 @Override
301 public void setConcurrency(String concurrency) {
302 try {
303 int separatorIndex = concurrency.indexOf('-');
304 if (separatorIndex != -1) {
305 setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
306 setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length())));
307 }
308 else {
309 setConcurrentConsumers(1);
310 setMaxConcurrentConsumers(Integer.parseInt(concurrency));
311 }
312 }
313 catch (NumberFormatException ex) {
314 throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
315 "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.");
316 }
317 }
318
319 /**
320 * Specify the number of concurrent consumers to create. Default is 1.
321 * <p>Specifying a higher value for this setting will increase the standard
322 * level of scheduled concurrent consumers at runtime: This is effectively
323 * the minimum number of concurrent consumers which will be scheduled
324 * at any given time. This is a static setting; for dynamic scaling,
325 * consider specifying the "maxConcurrentConsumers" setting instead.
326 * <p>Raising the number of concurrent consumers is recommendable in order
327 * to scale the consumption of messages coming in from a queue. However,
328 * note that any ordering guarantees are lost once multiple consumers are
329 * registered. In general, stick with 1 consumer for low-volume queues.
330 * <p><b>Do not raise the number of concurrent consumers for a topic,
331 * unless vendor-specific setup measures clearly allow for it.</b>
332 * With regular setup, this would lead to concurrent consumption
333 * of the same message, which is hardly ever desirable.
334 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
335 * @see #setMaxConcurrentConsumers
336 */
337 public void setConcurrentConsumers(int concurrentConsumers) {
338 Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
339 synchronized (this.lifecycleMonitor) {
340 this.concurrentConsumers = concurrentConsumers;
341 if (this.maxConcurrentConsumers < concurrentConsumers) {
342 this.maxConcurrentConsumers = concurrentConsumers;
343 }
344 }
345 }
346
347 /**
348 * Return the "concurrentConsumer" setting.
349 * <p>This returns the currently configured "concurrentConsumers" value;
350 * the number of currently scheduled/active consumers might differ.
351 * @see #getScheduledConsumerCount()
352 * @see #getActiveConsumerCount()
353 */
354 public final int getConcurrentConsumers() {
355 synchronized (this.lifecycleMonitor) {
356 return this.concurrentConsumers;
357 }
358 }
359
360 /**
361 * Specify the maximum number of concurrent consumers to create. Default is 1.
362 * <p>If this setting is higher than "concurrentConsumers", the listener container
363 * will dynamically schedule new consumers at runtime, provided that enough
364 * incoming messages are encountered. Once the load goes down again, the number of
365 * consumers will be reduced to the standard level ("concurrentConsumers") again.
366 * <p>Raising the number of concurrent consumers is recommendable in order
367 * to scale the consumption of messages coming in from a queue. However,
368 * note that any ordering guarantees are lost once multiple consumers are
369 * registered. In general, stick with 1 consumer for low-volume queues.
370 * <p><b>Do not raise the number of concurrent consumers for a topic,
371 * unless vendor-specific setup measures clearly allow for it.</b>
372 * With regular setup, this would lead to concurrent consumption
373 * of the same message, which is hardly ever desirable.
374 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
375 * @see #setConcurrentConsumers
376 */
377 public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
378 Assert.isTrue(maxConcurrentConsumers > 0, "'maxConcurrentConsumers' value must be at least 1 (one)");
379 synchronized (this.lifecycleMonitor) {
380 this.maxConcurrentConsumers =
381 (maxConcurrentConsumers > this.concurrentConsumers ? maxConcurrentConsumers : this.concurrentConsumers);
382 }
383 }
384
385 /**
386 * Return the "maxConcurrentConsumer" setting.
387 * <p>This returns the currently configured "maxConcurrentConsumers" value;
388 * the number of currently scheduled/active consumers might differ.
389 * @see #getScheduledConsumerCount()
390 * @see #getActiveConsumerCount()
391 */
392 public final int getMaxConcurrentConsumers() {
393 synchronized (this.lifecycleMonitor) {
394 return this.maxConcurrentConsumers;
395 }
396 }
397
398 /**
399 * Specify the maximum number of messages to process in one task.
400 * More concretely, this limits the number of message reception attempts
401 * per task, which includes receive iterations that did not actually
402 * pick up a message until they hit their timeout (see the
403 * {@link #setReceiveTimeout "receiveTimeout"} property).
404 * <p>Default is unlimited (-1) in case of a standard TaskExecutor,
405 * reusing the original invoker threads until shutdown (at the
406 * expense of limited dynamic scheduling).
407 * <p>In case of a SchedulingTaskExecutor indicating a preference for
408 * short-lived tasks, the default is 10 instead. Specify a number
409 * of 10 to 100 messages to balance between rather long-lived and
410 * rather short-lived tasks here.
411 * <p>Long-lived tasks avoid frequent thread context switches through
412 * sticking with the same thread all the way through, while short-lived
413 * tasks allow thread pools to control the scheduling. Hence, thread
414 * pools will usually prefer short-lived tasks.
415 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
416 * @see #setTaskExecutor
417 * @see #setReceiveTimeout
418 * @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
419 */
420 public void setMaxMessagesPerTask(int maxMessagesPerTask) {
421 Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
422 synchronized (this.lifecycleMonitor) {
423 this.maxMessagesPerTask = maxMessagesPerTask;
424 }
425 }
426
427 /**
428 * Return the maximum number of messages to process in one task.
429 */
430 public final int getMaxMessagesPerTask() {
431 synchronized (this.lifecycleMonitor) {
432 return this.maxMessagesPerTask;
433 }
434 }
435
436 /**
437 * Specify the limit for the number of consumers that are allowed to be idle
438 * at any given time.
439 * <p>This limit is used by the {@link #scheduleNewInvokerIfAppropriate} method
440 * to determine if a new invoker should be created. Increasing the limit causes
441 * invokers to be created more aggressively. This can be useful to ramp up the
442 * number of invokers faster.
443 * <p>The default is 1, only scheduling a new invoker (which is likely to
444 * be idle initially) if none of the existing invokers is currently idle.
445 */
446 public void setIdleConsumerLimit(int idleConsumerLimit) {
447 Assert.isTrue(idleConsumerLimit > 0, "'idleConsumerLimit' must be 1 or higher");
448 synchronized (this.lifecycleMonitor) {
449 this.idleConsumerLimit = idleConsumerLimit;
450 }
451 }
452
453 /**
454 * Return the limit for the number of idle consumers.
455 */
456 public final int getIdleConsumerLimit() {
457 synchronized (this.lifecycleMonitor) {
458 return this.idleConsumerLimit;
459 }
460 }
461
462 /**
463 * Specify the limit for idle executions of a consumer task, not having
464 * received any message within its execution. If this limit is reached,
465 * the task will shut down and leave receiving to other executing tasks.
466 * <p>The default is 1, closing idle resources early once a task didn't
467 * receive a message. This applies to dynamic scheduling only; see the
468 * {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"} setting.
469 * The minimum number of consumers
470 * (see {@link #setConcurrentConsumers "concurrentConsumers"})
471 * will be kept around until shutdown in any case.
472 * <p>Within each task execution, a number of message reception attempts
473 * (according to the "maxMessagesPerTask" setting) will each wait for an incoming
474 * message (according to the "receiveTimeout" setting). If all of those receive
475 * attempts in a given task return without a message, the task is considered
476 * idle with respect to received messages. Such a task may still be rescheduled;
477 * however, once it reached the specified "idleTaskExecutionLimit", it will
478 * shut down (in case of dynamic scaling).
479 * <p>Raise this limit if you encounter too frequent scaling up and down.
480 * With this limit being higher, an idle consumer will be kept around longer,
481 * avoiding the restart of a consumer once a new load of messages comes in.
482 * Alternatively, specify a higher "maxMessagesPerTask" and/or "receiveTimeout" value,
483 * which will also lead to idle consumers being kept around for a longer time
484 * (while also increasing the average execution time of each scheduled task).
485 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
486 * @see #setMaxMessagesPerTask
487 * @see #setReceiveTimeout
488 */
489 public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
490 Assert.isTrue(idleTaskExecutionLimit > 0, "'idleTaskExecutionLimit' must be 1 or higher");
491 synchronized (this.lifecycleMonitor) {
492 this.idleTaskExecutionLimit = idleTaskExecutionLimit;
493 }
494 }
495
496 /**
497 * Return the limit for idle executions of a consumer task.
498 */
499 public final int getIdleTaskExecutionLimit() {
500 synchronized (this.lifecycleMonitor) {
501 return this.idleTaskExecutionLimit;
502 }
503 }
504
505
506 //-------------------------------------------------------------------------
507 // Implementation of AbstractMessageListenerContainer's template methods
508 //-------------------------------------------------------------------------
509
510 @Override
511 public void initialize() {
512 // Adapt default cache level.
513 if (this.cacheLevel == CACHE_AUTO) {
514 this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER);
515 }
516
517 // Prepare taskExecutor and maxMessagesPerTask.
518 synchronized (this.lifecycleMonitor) {
519 if (this.taskExecutor == null) {
520 this.taskExecutor = createDefaultTaskExecutor();
521 }
522 else if (this.taskExecutor instanceof SchedulingTaskExecutor &&
523 ((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() &&
524 this.maxMessagesPerTask == Integer.MIN_VALUE) {
525 // TaskExecutor indicated a preference for short-lived tasks. According to
526 // setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case
527 // unless the user specified a custom value.
528 this.maxMessagesPerTask = 10;
529 }
530 }
531
532 // Proceed with actual listener initialization.
533 super.initialize();
534 }
535
536 /**
537 * Creates the specified number of concurrent consumers,
538 * in the form of a JMS Session plus associated MessageConsumer
539 * running in a separate thread.
540 * @see #scheduleNewInvoker
541 * @see #setTaskExecutor
542 */
543 @Override
544 protected void doInitialize() throws JMSException {
545 synchronized (this.lifecycleMonitor) {
546 for (int i = 0; i < this.concurrentConsumers; i++) {
547 scheduleNewInvoker();
548 }
549 }
550 }
551
552 /**
553 * Destroy the registered JMS Sessions and associated MessageConsumers.
554 */
555 @Override
556 protected void doShutdown() throws JMSException {
557 logger.debug("Waiting for shutdown of message listener invokers");
558 try {
559 synchronized (this.lifecycleMonitor) {
560 // Waiting for AsyncMessageListenerInvokers to deactivate themselves...
561 while (this.activeInvokerCount > 0) {
562 if (logger.isDebugEnabled()) {
563 logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
564 " message listener invokers");
565 }
566 this.lifecycleMonitor.wait();
567 }
568 // Clear remaining scheduled invokers, possibly left over as paused tasks...
569 for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
570 scheduledInvoker.clearResources();
571 }
572 this.scheduledInvokers.clear();
573 }
574 }
575 catch (InterruptedException ex) {
576 // Re-interrupt current thread, to allow other threads to react.
577 Thread.currentThread().interrupt();
578 }
579 }
580
581 /**
582 * Overridden to reset the stop callback, if any.
583 */
584 @Override
585 public void start() throws JmsException {
586 synchronized (this.lifecycleMonitor) {
587 this.stopCallback = null;
588 }
589 super.start();
590 }
591
592 /**
593 * Stop this listener container, invoking the specific callback
594 * once all listener processing has actually stopped.
595 * <p>Note: Further {@code stop(runnable)} calls (before processing
596 * has actually stopped) will override the specified callback. Only the
597 * latest specified callback will be invoked.
598 * <p>If a subsequent {@link #start()} call restarts the listener container
599 * before it has fully stopped, the callback will not get invoked at all.
600 * @param callback the callback to invoke once listener processing
601 * has fully stopped
602 * @throws JmsException if stopping failed
603 * @see #stop()
604 */
605 @Override
606 public void stop(Runnable callback) throws JmsException {
607 synchronized (this.lifecycleMonitor) {
608 this.stopCallback = callback;
609 }
610 stop();
611 }
612
613 /**
614 * Return the number of currently scheduled consumers.
615 * <p>This number will always be between "concurrentConsumers" and
616 * "maxConcurrentConsumers", but might be higher than "activeConsumerCount"
617 * (in case some consumers are scheduled but not executing at the moment).
618 * @see #getConcurrentConsumers()
619 * @see #getMaxConcurrentConsumers()
620 * @see #getActiveConsumerCount()
621 */
622 public final int getScheduledConsumerCount() {
623 synchronized (this.lifecycleMonitor) {
624 return this.scheduledInvokers.size();
625 }
626 }
627
628 /**
629 * Return the number of currently active consumers.
630 * <p>This number will always be between "concurrentConsumers" and
631 * "maxConcurrentConsumers", but might be lower than "scheduledConsumerCount"
632 * (in case some consumers are scheduled but not executing at the moment).
633 * @see #getConcurrentConsumers()
634 * @see #getMaxConcurrentConsumers()
635 * @see #getActiveConsumerCount()
636 */
637 public final int getActiveConsumerCount() {
638 synchronized (this.lifecycleMonitor) {
639 return this.activeInvokerCount;
640 }
641 }
642
643 /**
644 * Return whether at least one consumer has entered a fixed registration with the
645 * target destination. This is particularly interesting for the pub-sub case where
646 * it might be important to have an actual consumer registered that is guaranteed
647 * not to miss any messages that are just about to be published.
648 * <p>This method may be polled after a {@link #start()} call, until asynchronous
649 * registration of consumers has happened which is when the method will start returning
650 * {@code true} – provided that the listener container ever actually establishes
651 * a fixed registration. It will then keep returning {@code true} until shutdown,
652 * since the container will hold on to at least one consumer registration thereafter.
653 * <p>Note that a listener container is not bound to having a fixed registration in
654 * the first place. It may also keep recreating consumers for every invoker execution.
655 * This particularly depends on the {@link #setCacheLevel cache level} setting:
656 * only {@link #CACHE_CONSUMER} will lead to a fixed registration.
657 */
658 public boolean isRegisteredWithDestination() {
659 synchronized (this.lifecycleMonitor) {
660 return (this.registeredWithDestination > 0);
661 }
662 }
663
664
665 /**
666 * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
667 * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
668 * with the specified bean name (or the class name, if no bean name specified) as thread name prefix.
669 * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
670 */
671 protected TaskExecutor createDefaultTaskExecutor() {
672 String beanName = getBeanName();
673 String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
674 return new SimpleAsyncTaskExecutor(threadNamePrefix);
675 }
676
677 /**
678 * Schedule a new invoker, increasing the total number of scheduled
679 * invokers for this listener container.
680 */
681 private void scheduleNewInvoker() {
682 AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
683 if (rescheduleTaskIfNecessary(invoker)) {
684 // This should always be true, since we're only calling this when active.
685 this.scheduledInvokers.add(invoker);
686 }
687 }
688
689 /**
690 * Use a shared JMS Connection depending on the "cacheLevel" setting.
691 * @see #setCacheLevel
692 * @see #CACHE_CONNECTION
693 */
694 @Override
695 protected final boolean sharedConnectionEnabled() {
696 return (getCacheLevel() >= CACHE_CONNECTION);
697 }
698
699 /**
700 * Re-executes the given task via this listener container's TaskExecutor.
701 * @see #setTaskExecutor
702 */
703 @Override
704 protected void doRescheduleTask(Object task) {
705 this.taskExecutor.execute((Runnable) task);
706 }
707
708 /**
709 * Tries scheduling a new invoker, since we know messages are coming in...
710 * @see #scheduleNewInvokerIfAppropriate()
711 */
712 @Override
713 protected void messageReceived(Object invoker, Session session) {
714 ((AsyncMessageListenerInvoker) invoker).setIdle(false);
715 scheduleNewInvokerIfAppropriate();
716 }
717
718 /**
719 * Marks the affected invoker as idle.
720 */
721 @Override
722 protected void noMessageReceived(Object invoker, Session session) {
723 ((AsyncMessageListenerInvoker) invoker).setIdle(true);
724 }
725
726 /**
727 * Schedule a new invoker, increasing the total number of scheduled
728 * invokers for this listener container, but only if the specified
729 * "maxConcurrentConsumers" limit has not been reached yet, and only
730 * if the specified "idleConsumerLimit" has not been reached either.
731 * <p>Called once a message has been received, in order to scale up while
732 * processing the message in the invoker that originally received it.
733 * @see #setTaskExecutor
734 * @see #getMaxConcurrentConsumers()
735 * @see #getIdleConsumerLimit()
736 */
737 protected void scheduleNewInvokerIfAppropriate() {
738 if (isRunning()) {
739 resumePausedTasks();
740 synchronized (this.lifecycleMonitor) {
741 if (this.scheduledInvokers.size() < this.maxConcurrentConsumers &&
742 getIdleInvokerCount() < this.idleConsumerLimit) {
743 scheduleNewInvoker();
744 if (logger.isDebugEnabled()) {
745 logger.debug("Raised scheduled invoker count: " + this.scheduledInvokers.size());
746 }
747 }
748 }
749 }
750 }
751
752 /**
753 * Determine whether the current invoker should be rescheduled,
754 * given that it might not have received a message in a while.
755 * @param idleTaskExecutionCount the number of idle executions
756 * that this invoker task has already accumulated (in a row)
757 */
758 private boolean shouldRescheduleInvoker(int idleTaskExecutionCount) {
759 boolean superfluous =
760 (idleTaskExecutionCount >= this.idleTaskExecutionLimit && getIdleInvokerCount() > 1);
761 return (this.scheduledInvokers.size() <=
762 (superfluous ? this.concurrentConsumers : this.maxConcurrentConsumers));
763 }
764
765 /**
766 * Determine whether this listener container currently has more
767 * than one idle instance among its scheduled invokers.
768 */
769 private int getIdleInvokerCount() {
770 int count = 0;
771 for (AsyncMessageListenerInvoker invoker : this.scheduledInvokers) {
772 if (invoker.isIdle()) {
773 count++;
774 }
775 }
776 return count;
777 }
778
779
780 /**
781 * Overridden to accept a failure in the initial setup - leaving it up to the
782 * asynchronous invokers to establish the shared Connection on first access.
783 * @see #refreshConnectionUntilSuccessful()
784 */
785 @Override
786 protected void establishSharedConnection() {
787 try {
788 super.establishSharedConnection();
789 }
790 catch (Exception ex) {
791 if (ex instanceof JMSException) {
792 invokeExceptionListener((JMSException) ex);
793 }
794 logger.debug("Could not establish shared JMS Connection - " +
795 "leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex);
796 }
797 }
798
799 /**
800 * This implementations proceeds even after an exception thrown from
801 * {@code Connection.start()}, relying on listeners to perform
802 * appropriate recovery.
803 */
804 @Override
805 protected void startSharedConnection() {
806 try {
807 super.startSharedConnection();
808 }
809 catch (Exception ex) {
810 logger.debug("Connection start failed - relying on listeners to perform recovery", ex);
811 }
812 }
813
814 /**
815 * This implementations proceeds even after an exception thrown from
816 * {@code Connection.stop()}, relying on listeners to perform
817 * appropriate recovery after a restart.
818 */
819 @Override
820 protected void stopSharedConnection() {
821 try {
822 super.stopSharedConnection();
823 }
824 catch (Exception ex) {
825 logger.debug("Connection stop failed - relying on listeners to perform recovery after restart", ex);
826 }
827 }
828
829 /**
830 * Handle the given exception that arose during setup of a listener.
831 * Called for every such exception in every concurrent listener.
832 * <p>The default implementation logs the exception at warn level
833 * if not recovered yet, and at debug level if already recovered.
834 * Can be overridden in subclasses.
835 * @param ex the exception to handle
836 * @param alreadyRecovered whether a previously executing listener
837 * already recovered from the present listener setup failure
838 * (this usually indicates a follow-up failure than can be ignored
839 * other than for debug log purposes)
840 * @see #recoverAfterListenerSetupFailure()
841 */
842 protected void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered) {
843 if (ex instanceof JMSException) {
844 invokeExceptionListener((JMSException) ex);
845 }
846 if (ex instanceof SharedConnectionNotInitializedException) {
847 if (!alreadyRecovered) {
848 logger.info("JMS message listener invoker needs to establish shared Connection");
849 }
850 }
851 else {
852 // Recovery during active operation..
853 if (alreadyRecovered) {
854 logger.debug("Setup of JMS message listener invoker failed - already recovered by other invoker", ex);
855 }
856 else {
857 StringBuilder msg = new StringBuilder();
858 msg.append("Setup of JMS message listener invoker failed for destination '");
859 msg.append(getDestinationDescription()).append("' - trying to recover. Cause: ");
860 msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
861 if (logger.isDebugEnabled()) {
862 logger.warn(msg, ex);
863 }
864 else {
865 logger.warn(msg);
866 }
867 }
868 }
869 }
870
871 /**
872 * Recover this listener container after a listener failed to set itself up,
873 * for example re-establishing the underlying Connection.
874 * <p>The default implementation delegates to DefaultMessageListenerContainer's
875 * recovery-capable {@link #refreshConnectionUntilSuccessful()} method, which will
876 * try to re-establish a Connection to the JMS provider both for the shared
877 * and the non-shared Connection case.
878 * @see #refreshConnectionUntilSuccessful()
879 * @see #refreshDestination()
880 */
881 protected void recoverAfterListenerSetupFailure() {
882 this.recovering = true;
883 try {
884 refreshConnectionUntilSuccessful();
885 refreshDestination();
886 }
887 finally {
888 this.recovering = false;
889 }
890 }
891
892 /**
893 * Refresh the underlying Connection, not returning before an attempt has been
894 * successful. Called in case of a shared Connection as well as without shared
895 * Connection, so either needs to operate on the shared Connection or on a
896 * temporary Connection that just gets established for validation purposes.
897 * <p>The default implementation retries until it successfully established a
898 * Connection, for as long as this message listener container is running.
899 * Applies the specified recovery interval between retries.
900 * @see #setRecoveryInterval
901 * @see #start()
902 * @see #stop()
903 */
904 protected void refreshConnectionUntilSuccessful() {
905 BackOffExecution execution = this.backOff.start();
906 while (isRunning()) {
907 try {
908 if (sharedConnectionEnabled()) {
909 refreshSharedConnection();
910 }
911 else {
912 Connection con = createConnection();
913 JmsUtils.closeConnection(con);
914 }
915 logger.info("Successfully refreshed JMS Connection");
916 break;
917 }
918 catch (Exception ex) {
919 if (ex instanceof JMSException) {
920 invokeExceptionListener((JMSException) ex);
921 }
922 StringBuilder msg = new StringBuilder();
923 msg.append("Could not refresh JMS Connection for destination '");
924 msg.append(getDestinationDescription()).append("' - retrying using ");
925 msg.append(execution).append(". Cause: ");
926 msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
927 if (logger.isDebugEnabled()) {
928 logger.error(msg, ex);
929 }
930 else {
931 logger.error(msg);
932 }
933 }
934 if (!applyBackOffTime(execution)) {
935 StringBuilder msg = new StringBuilder();
936 msg.append("Stopping container for destination '")
937 .append(getDestinationDescription())
938 .append("' - back off policy does not allow ").append("for further attempts.");
939 logger.error(msg.toString());
940 stop();
941 }
942 }
943 }
944
945 /**
946 * Refresh the JMS destination that this listener container operates on.
947 * <p>Called after listener setup failure, assuming that a cached Destination
948 * object might have become invalid (a typical case on WebLogic JMS).
949 * <p>The default implementation removes the destination from a
950 * DestinationResolver's cache, in case of a CachingDestinationResolver.
951 * @see #setDestinationName
952 * @see org.springframework.jms.support.destination.CachingDestinationResolver
953 */
954 protected void refreshDestination() {
955 String destName = getDestinationName();
956 if (destName != null) {
957 DestinationResolver destResolver = getDestinationResolver();
958 if (destResolver instanceof CachingDestinationResolver) {
959 ((CachingDestinationResolver) destResolver).removeFromCache(destName);
960 }
961 }
962 }
963
964 /**
965 * Apply the next back off time using the specified {@link BackOffExecution}.
966 * <p>Return {@code true} if the back off period has been applied and a new
967 * attempt to recover should be made, {@code false} if no further attempt
968 * should be made.
969 */
970 protected boolean applyBackOffTime(BackOffExecution execution) {
971 long interval = execution.nextBackOff();
972 if (interval == BackOffExecution.STOP) {
973 return false;
974 }
975 else {
976 try {
977 Thread.sleep(interval);
978 }
979 catch (InterruptedException interEx) {
980 // Re-interrupt current thread, to allow other threads to react.
981 Thread.currentThread().interrupt();
982 }
983 }
984 return true;
985 }
986
987 private FixedBackOff createDefaultBackOff(long interval) {
988 return new FixedBackOff(interval, Long.MAX_VALUE);
989 }
990
991 /**
992 * Return whether this listener container is currently in a recovery attempt.
993 * <p>May be used to detect recovery phases but also the end of a recovery phase,
994 * with {@code isRecovering()} switching to {@code false} after having been found
995 * to return {@code true} before.
996 * @see #recoverAfterListenerSetupFailure()
997 */
998 public final boolean isRecovering() {
999 return this.recovering;
1000 }
1001
1002
1003 //-------------------------------------------------------------------------
1004 // Inner classes used as internal adapters
1005 //-------------------------------------------------------------------------
1006
1007 /**
1008 * Runnable that performs looped {@code MessageConsumer.receive()} calls.
1009 */
1010 private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable {
1011
1012 private Session session;
1013
1014 private MessageConsumer consumer;
1015
1016 private Object lastRecoveryMarker;
1017
1018 private boolean lastMessageSucceeded;
1019
1020 private int idleTaskExecutionCount = 0;
1021
1022 private volatile boolean idle = true;
1023
1024 @Override
1025 public void run() {
1026 synchronized (lifecycleMonitor) {
1027 activeInvokerCount++;
1028 lifecycleMonitor.notifyAll();
1029 }
1030 boolean messageReceived = false;
1031 try {
1032 if (maxMessagesPerTask < 0) {
1033 messageReceived = executeOngoingLoop();
1034 }
1035 else {
1036 int messageCount = 0;
1037 while (isRunning() && messageCount < maxMessagesPerTask) {
1038 messageReceived = (invokeListener() || messageReceived);
1039 messageCount++;
1040 }
1041 }
1042 }
1043 catch (Throwable ex) {
1044 clearResources();
1045 if (!this.lastMessageSucceeded) {
1046 // We failed more than once in a row or on startup - sleep before
1047 // first recovery attempt.
1048 sleepBeforeRecoveryAttempt();
1049 }
1050 this.lastMessageSucceeded = false;
1051 boolean alreadyRecovered = false;
1052 synchronized (recoveryMonitor) {
1053 if (this.lastRecoveryMarker == currentRecoveryMarker) {
1054 handleListenerSetupFailure(ex, false);
1055 recoverAfterListenerSetupFailure();
1056 currentRecoveryMarker = new Object();
1057 }
1058 else {
1059 alreadyRecovered = true;
1060 }
1061 }
1062 if (alreadyRecovered) {
1063 handleListenerSetupFailure(ex, true);
1064 }
1065 }
1066 finally {
1067 synchronized (lifecycleMonitor) {
1068 decreaseActiveInvokerCount();
1069 lifecycleMonitor.notifyAll();
1070 }
1071 if (!messageReceived) {
1072 this.idleTaskExecutionCount++;
1073 }
1074 else {
1075 this.idleTaskExecutionCount = 0;
1076 }
1077 synchronized (lifecycleMonitor) {
1078 if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
1079 // We're shutting down completely.
1080 scheduledInvokers.remove(this);
1081 if (logger.isDebugEnabled()) {
1082 logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
1083 }
1084 lifecycleMonitor.notifyAll();
1085 clearResources();
1086 }
1087 else if (isRunning()) {
1088 int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
1089 if (nonPausedConsumers < 1) {
1090 logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +
1091 "Check your thread pool configuration! Manual recovery necessary through a start() call.");
1092 }
1093 else if (nonPausedConsumers < getConcurrentConsumers()) {
1094 logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
1095 "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
1096 "to be triggered by remaining consumers.");
1097 }
1098 }
1099 }
1100 }
1101 }
1102
1103 private boolean executeOngoingLoop() throws JMSException {
1104 boolean messageReceived = false;
1105 boolean active = true;
1106 while (active) {
1107 synchronized (lifecycleMonitor) {
1108 boolean interrupted = false;
1109 boolean wasWaiting = false;
1110 while ((active = isActive()) && !isRunning()) {
1111 if (interrupted) {
1112 throw new IllegalStateException("Thread was interrupted while waiting for " +
1113 "a restart of the listener container, but container is still stopped");
1114 }
1115 if (!wasWaiting) {
1116 decreaseActiveInvokerCount();
1117 }
1118 wasWaiting = true;
1119 try {
1120 lifecycleMonitor.wait();
1121 }
1122 catch (InterruptedException ex) {
1123 // Re-interrupt current thread, to allow other threads to react.
1124 Thread.currentThread().interrupt();
1125 interrupted = true;
1126 }
1127 }
1128 if (wasWaiting) {
1129 activeInvokerCount++;
1130 }
1131 if (scheduledInvokers.size() > maxConcurrentConsumers) {
1132 active = false;
1133 }
1134 }
1135 if (active) {
1136 messageReceived = (invokeListener() || messageReceived);
1137 }
1138 }
1139 return messageReceived;
1140 }
1141
1142 private boolean invokeListener() throws JMSException {
1143 initResourcesIfNecessary();
1144 boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
1145 this.lastMessageSucceeded = true;
1146 return messageReceived;
1147 }
1148
1149 private void decreaseActiveInvokerCount() {
1150 activeInvokerCount--;
1151 if (stopCallback != null && activeInvokerCount == 0) {
1152 stopCallback.run();
1153 stopCallback = null;
1154 }
1155 }
1156
1157 private void initResourcesIfNecessary() throws JMSException {
1158 if (getCacheLevel() <= CACHE_CONNECTION) {
1159 updateRecoveryMarker();
1160 }
1161 else {
1162 if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
1163 updateRecoveryMarker();
1164 this.session = createSession(getSharedConnection());
1165 }
1166 if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
1167 this.consumer = createListenerConsumer(this.session);
1168 synchronized (lifecycleMonitor) {
1169 registeredWithDestination++;
1170 }
1171 }
1172 }
1173 }
1174
1175 private void updateRecoveryMarker() {
1176 synchronized (recoveryMonitor) {
1177 this.lastRecoveryMarker = currentRecoveryMarker;
1178 }
1179 }
1180
1181 private void clearResources() {
1182 if (sharedConnectionEnabled()) {
1183 synchronized (sharedConnectionMonitor) {
1184 JmsUtils.closeMessageConsumer(this.consumer);
1185 JmsUtils.closeSession(this.session);
1186 }
1187 }
1188 else {
1189 JmsUtils.closeMessageConsumer(this.consumer);
1190 JmsUtils.closeSession(this.session);
1191 }
1192 if (this.consumer != null) {
1193 synchronized (lifecycleMonitor) {
1194 registeredWithDestination--;
1195 }
1196 }
1197 this.consumer = null;
1198 this.session = null;
1199 }
1200
1201 /**
1202 * Apply the back off time once. In a regular scenario, the back off is only applied if we
1203 * failed to recover with the broker. This additional sleep period avoids a burst retry
1204 * scenario when the broker is actually up but something else if failing (i.e. listener
1205 * specific).
1206 */
1207 private void sleepBeforeRecoveryAttempt() {
1208 BackOffExecution execution = DefaultMessageListenerContainer.this.backOff.start();
1209 applyBackOffTime(execution);
1210 }
1211
1212 @Override
1213 public boolean isLongLived() {
1214 return (maxMessagesPerTask < 0);
1215 }
1216
1217 public void setIdle(boolean idle) {
1218 this.idle = idle;
1219 }
1220
1221 public boolean isIdle() {
1222 return this.idle;
1223 }
1224 }
1225
1226 }