1   /*
2    * Copyright 2002-2015 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.jms.listener;
18  
19  import java.util.HashSet;
20  import java.util.Set;
21  import java.util.concurrent.Executor;
22  import javax.jms.Connection;
23  import javax.jms.JMSException;
24  import javax.jms.MessageConsumer;
25  import javax.jms.Session;
26  
27  import org.springframework.core.Constants;
28  import org.springframework.core.task.SimpleAsyncTaskExecutor;
29  import org.springframework.core.task.TaskExecutor;
30  import org.springframework.jms.JmsException;
31  import org.springframework.jms.support.JmsUtils;
32  import org.springframework.jms.support.destination.CachingDestinationResolver;
33  import org.springframework.jms.support.destination.DestinationResolver;
34  import org.springframework.scheduling.SchedulingAwareRunnable;
35  import org.springframework.scheduling.SchedulingTaskExecutor;
36  import org.springframework.util.Assert;
37  import org.springframework.util.ClassUtils;
38  import org.springframework.util.backoff.BackOff;
39  import org.springframework.util.backoff.BackOffExecution;
40  import org.springframework.util.backoff.FixedBackOff;
41  
42  /**
43   * Message listener container variant that uses plain JMS client APIs, specifically
44   * a loop of {@code MessageConsumer.receive()} calls that also allow for
45   * transactional reception of messages (registering them with XA transactions).
46   * Designed to work in a native JMS environment as well as in a Java EE environment,
47   * with only minimal differences in configuration.
48   *
49   * <p>This is a simple but nevertheless powerful form of message listener container.
50   * On startup, it obtains a fixed number of JMS Sessions to invoke the listener,
51   * and optionally allows for dynamic adaptation at runtime (up to a maximum number).
52   * Like {@link SimpleMessageListenerContainer}, its main advantage is its low level
53   * of runtime complexity, in particular the minimal requirements on the JMS provider:
54   * not even the JMS {@code ServerSessionPool} facility is required. Beyond that, it is
55   * fully self-recovering in case the broker is temporarily unavailable, and allows
56   * for stops/restarts as well as runtime changes to its configuration.
57   *
58   * <p>Actual {@code MessageListener} execution happens in asynchronous work units which are
59   * created through Spring's {@link org.springframework.core.task.TaskExecutor TaskExecutor}
60   * abstraction. By default, the specified number of invoker tasks will be created
61   * on startup, according to the {@link #setConcurrentConsumers "concurrentConsumers"}
62   * setting. Specify an alternative {@code TaskExecutor} to integrate with an existing
63   * thread pool facility (such as a Java EE server's), for example using a
64   * {@link org.springframework.scheduling.commonj.WorkManagerTaskExecutor CommonJ WorkManager}.
65   * With a native JMS setup, each of those listener threads is going to use a
66   * cached JMS {@code Session} and {@code MessageConsumer} (only refreshed in case
67   * of failure), using the JMS provider's resources as efficiently as possible.
68   *
69   * <p>Message reception and listener execution can automatically be wrapped
70   * in transactions by passing a Spring
71   * {@link org.springframework.transaction.PlatformTransactionManager} into the
72   * {@link #setTransactionManager "transactionManager"} property. This will usually
73   * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
74   * Java EE environment, in combination with a JTA-aware JMS {@code ConnectionFactory}
75   * obtained from JNDI (check your Java EE server's documentation). Note that this
76   * listener container will automatically reobtain all JMS handles for each transaction
77   * in case an external transaction manager is specified, for compatibility with
78   * all Java EE servers (in particular JBoss). This non-caching behavior can be
79   * overridden through the {@link #setCacheLevel "cacheLevel"} /
80   * {@link #setCacheLevelName "cacheLevelName"} property, enforcing caching of
81   * the {@code Connection} (or also {@code Session} and {@code MessageConsumer})
82   * even if an external transaction manager is involved.
83   *
84   * <p>Dynamic scaling of the number of concurrent invokers can be activated
85   * by specifying a {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"}
86   * value that is higher than the {@link #setConcurrentConsumers "concurrentConsumers"}
87   * value. Since the latter's default is 1, you can also simply specify a
88   * "maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling up to
89   * 5 concurrent consumers in case of increasing message load, as well as dynamic
90   * shrinking back to the standard number of consumers once the load decreases.
91   * Consider adapting the {@link #setIdleTaskExecutionLimit "idleTaskExecutionLimit"}
92   * setting to control the lifespan of each new task, to avoid frequent scaling up
93   * and down, in particular if the {@code ConnectionFactory} does not pool JMS
94   * {@code Sessions} and/or the {@code TaskExecutor} does not pool threads (check
95   * your configuration!). Note that dynamic scaling only really makes sense for a
96   * queue in the first place; for a topic, you will typically stick with the default
97   * number of 1 consumer, otherwise you'd receive the same message multiple times on
98   * the same node.
99   *
100  * <p><b>Note: Don't use Spring's {@link org.springframework.jms.connection.CachingConnectionFactory}
101  * in combination with dynamic scaling.</b> Ideally, don't use it with a message
102  * listener container at all, since it is generally preferable to let the
103  * listener container itself handle appropriate caching within its lifecycle.
104  * Also, stopping and restarting a listener container will only work with an
105  * independent, locally cached Connection - not with an externally cached one.
106  *
107  * <p><b>It is strongly recommended to either set {@link #setSessionTransacted
108  * "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
109  * "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
110  * javadoc for details on acknowledge modes and native transaction options, as
111  * well as the {@link AbstractPollingMessageListenerContainer} javadoc for details
112  * on configuring an external transaction manager. Note that for the default
113  * "AUTO_ACKNOWLEDGE" mode, this container applies automatic message acknowledgment
114  * before listener execution, with no redelivery in case of an exception.
115  *
116  * @author Juergen Hoeller
117  * @since 2.0
118  * @see #setTransactionManager
119  * @see #setCacheLevel
120  * @see javax.jms.MessageConsumer#receive(long)
121  * @see SimpleMessageListenerContainer
122  * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager
123  */
124 public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
125 
126 	/**
127 	 * Default thread name prefix: "DefaultMessageListenerContainer-".
128 	 */
129 	public static final String DEFAULT_THREAD_NAME_PREFIX =
130 			ClassUtils.getShortName(DefaultMessageListenerContainer.class) + "-";
131 
132 	/**
133 	 * The default recovery interval: 5000 ms = 5 seconds.
134 	 */
135 	public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
136 
137 
138 	/**
139 	 * Constant that indicates to cache no JMS resources at all.
140 	 * @see #setCacheLevel
141 	 */
142 	public static final int CACHE_NONE = 0;
143 
144 	/**
145 	 * Constant that indicates to cache a shared JMS {@code Connection} for each
146 	 * listener thread.
147 	 * @see #setCacheLevel
148 	 */
149 	public static final int CACHE_CONNECTION = 1;
150 
151 	/**
152 	 * Constant that indicates to cache a shared JMS {@code Connection} and a JMS
153 	 * {@code Session} for each listener thread.
154 	 * @see #setCacheLevel
155 	 */
156 	public static final int CACHE_SESSION = 2;
157 
158 	/**
159 	 * Constant that indicates to cache a shared JMS {@code Connection}, a JMS
160 	 * {@code Session}, and a JMS MessageConsumer for each listener thread.
161 	 * @see #setCacheLevel
162 	 */
163 	public static final int CACHE_CONSUMER = 3;
164 
165 	/**
166 	 * Constant that indicates automatic choice of an appropriate caching level
167 	 * (depending on the transaction management strategy).
168 	 * @see #setCacheLevel
169 	 */
170 	public static final int CACHE_AUTO = 4;
171 
172 
173 	private static final Constants constants = new Constants(DefaultMessageListenerContainer.class);
174 
175 
176 	private Executor taskExecutor;
177 
178 	private BackOff backOff = createDefaultBackOff(DEFAULT_RECOVERY_INTERVAL);
179 
180 	private int cacheLevel = CACHE_AUTO;
181 
182 	private int concurrentConsumers = 1;
183 
184 	private int maxConcurrentConsumers = 1;
185 
186 	private int maxMessagesPerTask = Integer.MIN_VALUE;
187 
188 	private int idleConsumerLimit = 1;
189 
190 	private int idleTaskExecutionLimit = 1;
191 
192 	private final Set<AsyncMessageListenerInvoker> scheduledInvokers = new HashSet<AsyncMessageListenerInvoker>();
193 
194 	private int activeInvokerCount = 0;
195 
196 	private int registeredWithDestination = 0;
197 
198 	private volatile boolean recovering = false;
199 
200 	private Runnable stopCallback;
201 
202 	private Object currentRecoveryMarker = new Object();
203 
204 	private final Object recoveryMonitor = new Object();
205 
206 
207 	/**
208 	 * Set the Spring {@code TaskExecutor} to use for running the listener threads.
209 	 * <p>Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor},
210 	 * starting up a number of new threads, according to the specified number
211 	 * of concurrent consumers.
212 	 * <p>Specify an alternative {@code TaskExecutor} for integration with an existing
213 	 * thread pool. Note that this really only adds value if the threads are
214 	 * managed in a specific fashion, for example within a Java EE environment.
215 	 * A plain thread pool does not add much value, as this listener container
216 	 * will occupy a number of threads for its entire lifetime.
217 	 * @see #setConcurrentConsumers
218 	 * @see org.springframework.core.task.SimpleAsyncTaskExecutor
219 	 * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
220 	 */
221 	public void setTaskExecutor(Executor taskExecutor) {
222 		this.taskExecutor = taskExecutor;
223 	}
224 
225 	/**
226 	 * Specify the {@link BackOff} instance to use to compute the interval
227 	 * between recovery attempts. If the {@link BackOffExecution} implementation
228 	 * returns {@link BackOffExecution#STOP}, this listener container will not further
229 	 * attempt to recover.
230 	 * <p>The {@link #setRecoveryInterval(long) recovery interval} is ignored
231 	 * when this property is set.
232 	 */
233 	public void setBackOff(BackOff backOff) {
234 		this.backOff = backOff;
235 	}
236 
237 	/**
238 	 * Specify the interval between recovery attempts, in <b>milliseconds</b>.
239 	 * The default is 5000 ms, that is, 5 seconds. This is a convenience method
240 	 * to create a {@link FixedBackOff} with the specified interval.
241 	 * <p>For more recovery options, consider specifying a {@link BackOff}
242 	 * instance instead.
243 	 * @see #setBackOff(BackOff)
244 	 * @see #handleListenerSetupFailure
245 	 */
246 	public void setRecoveryInterval(long recoveryInterval) {
247 		this.backOff = createDefaultBackOff(recoveryInterval);
248 	}
249 
250 	/**
251 	 * Specify the level of caching that this listener container is allowed to apply,
252 	 * in the form of the name of the corresponding constant: e.g. "CACHE_CONNECTION".
253 	 * @see #setCacheLevel
254 	 */
255 	public void setCacheLevelName(String constantName) throws IllegalArgumentException {
256 		if (constantName == null || !constantName.startsWith("CACHE_")) {
257 			throw new IllegalArgumentException("Only cache constants allowed");
258 		}
259 		setCacheLevel(constants.asNumber(constantName).intValue());
260 	}
261 
262 	/**
263 	 * Specify the level of caching that this listener container is allowed to apply.
264 	 * <p>Default is {@link #CACHE_NONE} if an external transaction manager has been specified
265 	 * (to reobtain all resources freshly within the scope of the external transaction),
266 	 * and {@link #CACHE_CONSUMER} otherwise (operating with local JMS resources).
267 	 * <p>Some Java EE servers only register their JMS resources with an ongoing XA
268 	 * transaction in case of a freshly obtained JMS {@code Connection} and {@code Session},
269 	 * which is why this listener container by default does not cache any of those.
270 	 * However, depending on the rules of your server with respect to the caching
271 	 * of transactional resources, consider switching this setting to at least
272 	 * {@link #CACHE_CONNECTION} or {@link #CACHE_SESSION} even in conjunction with an
273 	 * external transaction manager.
274 	 * @see #CACHE_NONE
275 	 * @see #CACHE_CONNECTION
276 	 * @see #CACHE_SESSION
277 	 * @see #CACHE_CONSUMER
278 	 * @see #setCacheLevelName
279 	 * @see #setTransactionManager
280 	 */
281 	public void setCacheLevel(int cacheLevel) {
282 		this.cacheLevel = cacheLevel;
283 	}
284 
285 	/**
286 	 * Return the level of caching that this listener container is allowed to apply.
287 	 */
288 	public int getCacheLevel() {
289 		return this.cacheLevel;
290 	}
291 
292 
293 	/**
294 	 * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
295 	 * upper limit String, e.g. "10" (the lower limit will be 1 in this case).
296 	 * <p>This listener container will always hold on to the minimum number of consumers
297 	 * ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number
298 	 * of consumers {@link #setMaxConcurrentConsumers} in case of increasing load.
299 	 */
300 	@Override
301 	public void setConcurrency(String concurrency) {
302 		try {
303 			int separatorIndex = concurrency.indexOf('-');
304 			if (separatorIndex != -1) {
305 				setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
306 				setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length())));
307 			}
308 			else {
309 				setConcurrentConsumers(1);
310 				setMaxConcurrentConsumers(Integer.parseInt(concurrency));
311 			}
312 		}
313 		catch (NumberFormatException ex) {
314 			throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
315 					"single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.");
316 		}
317 	}
318 
319 	/**
320 	 * Specify the number of concurrent consumers to create. Default is 1.
321 	 * <p>Specifying a higher value for this setting will increase the standard
322 	 * level of scheduled concurrent consumers at runtime: This is effectively
323 	 * the minimum number of concurrent consumers which will be scheduled
324 	 * at any given time. This is a static setting; for dynamic scaling,
325 	 * consider specifying the "maxConcurrentConsumers" setting instead.
326 	 * <p>Raising the number of concurrent consumers is recommendable in order
327 	 * to scale the consumption of messages coming in from a queue. However,
328 	 * note that any ordering guarantees are lost once multiple consumers are
329 	 * registered. In general, stick with 1 consumer for low-volume queues.
330 	 * <p><b>Do not raise the number of concurrent consumers for a topic,
331 	 * unless vendor-specific setup measures clearly allow for it.</b>
332 	 * With regular setup, this would lead to concurrent consumption
333 	 * of the same message, which is hardly ever desirable.
334 	 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
335 	 * @see #setMaxConcurrentConsumers
336 	 */
337 	public void setConcurrentConsumers(int concurrentConsumers) {
338 		Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
339 		synchronized (this.lifecycleMonitor) {
340 			this.concurrentConsumers = concurrentConsumers;
341 			if (this.maxConcurrentConsumers < concurrentConsumers) {
342 				this.maxConcurrentConsumers = concurrentConsumers;
343 			}
344 		}
345 	}
346 
347 	/**
348 	 * Return the "concurrentConsumer" setting.
349 	 * <p>This returns the currently configured "concurrentConsumers" value;
350 	 * the number of currently scheduled/active consumers might differ.
351 	 * @see #getScheduledConsumerCount()
352 	 * @see #getActiveConsumerCount()
353 	 */
354 	public final int getConcurrentConsumers() {
355 		synchronized (this.lifecycleMonitor) {
356 			return this.concurrentConsumers;
357 		}
358 	}
359 
360 	/**
361 	 * Specify the maximum number of concurrent consumers to create. Default is 1.
362 	 * <p>If this setting is higher than "concurrentConsumers", the listener container
363 	 * will dynamically schedule new consumers at runtime, provided that enough
364 	 * incoming messages are encountered. Once the load goes down again, the number of
365 	 * consumers will be reduced to the standard level ("concurrentConsumers") again.
366 	 * <p>Raising the number of concurrent consumers is recommendable in order
367 	 * to scale the consumption of messages coming in from a queue. However,
368 	 * note that any ordering guarantees are lost once multiple consumers are
369 	 * registered. In general, stick with 1 consumer for low-volume queues.
370 	 * <p><b>Do not raise the number of concurrent consumers for a topic,
371 	 * unless vendor-specific setup measures clearly allow for it.</b>
372 	 * With regular setup, this would lead to concurrent consumption
373 	 * of the same message, which is hardly ever desirable.
374 	 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
375 	 * @see #setConcurrentConsumers
376 	 */
377 	public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
378 		Assert.isTrue(maxConcurrentConsumers > 0, "'maxConcurrentConsumers' value must be at least 1 (one)");
379 		synchronized (this.lifecycleMonitor) {
380 			this.maxConcurrentConsumers =
381 					(maxConcurrentConsumers > this.concurrentConsumers ? maxConcurrentConsumers : this.concurrentConsumers);
382 		}
383 	}
384 
385 	/**
386 	 * Return the "maxConcurrentConsumer" setting.
387 	 * <p>This returns the currently configured "maxConcurrentConsumers" value;
388 	 * the number of currently scheduled/active consumers might differ.
389 	 * @see #getScheduledConsumerCount()
390 	 * @see #getActiveConsumerCount()
391 	 */
392 	public final int getMaxConcurrentConsumers() {
393 		synchronized (this.lifecycleMonitor) {
394 			return this.maxConcurrentConsumers;
395 		}
396 	}
397 
398 	/**
399 	 * Specify the maximum number of messages to process in one task.
400 	 * More concretely, this limits the number of message reception attempts
401 	 * per task, which includes receive iterations that did not actually
402 	 * pick up a message until they hit their timeout (see the
403 	 * {@link #setReceiveTimeout "receiveTimeout"} property).
404 	 * <p>Default is unlimited (-1) in case of a standard TaskExecutor,
405 	 * reusing the original invoker threads until shutdown (at the
406 	 * expense of limited dynamic scheduling).
407 	 * <p>In case of a SchedulingTaskExecutor indicating a preference for
408 	 * short-lived tasks, the default is 10 instead. Specify a number
409 	 * of 10 to 100 messages to balance between rather long-lived and
410 	 * rather short-lived tasks here.
411 	 * <p>Long-lived tasks avoid frequent thread context switches through
412 	 * sticking with the same thread all the way through, while short-lived
413 	 * tasks allow thread pools to control the scheduling. Hence, thread
414 	 * pools will usually prefer short-lived tasks.
415 	 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
416 	 * @see #setTaskExecutor
417 	 * @see #setReceiveTimeout
418 	 * @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
419 	 */
420 	public void setMaxMessagesPerTask(int maxMessagesPerTask) {
421 		Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
422 		synchronized (this.lifecycleMonitor) {
423 			this.maxMessagesPerTask = maxMessagesPerTask;
424 		}
425 	}
426 
427 	/**
428 	 * Return the maximum number of messages to process in one task.
429 	 */
430 	public final int getMaxMessagesPerTask() {
431 		synchronized (this.lifecycleMonitor) {
432 			return this.maxMessagesPerTask;
433 		}
434 	}
435 
436 	/**
437 	 * Specify the limit for the number of consumers that are allowed to be idle
438 	 * at any given time.
439 	 * <p>This limit is used by the {@link #scheduleNewInvokerIfAppropriate} method
440 	 * to determine if a new invoker should be created. Increasing the limit causes
441 	 * invokers to be created more aggressively. This can be useful to ramp up the
442 	 * number of invokers faster.
443 	 * <p>The default is 1, only scheduling a new invoker (which is likely to
444 	 * be idle initially) if none of the existing invokers is currently idle.
445 	 */
446 	public void setIdleConsumerLimit(int idleConsumerLimit) {
447 		Assert.isTrue(idleConsumerLimit > 0, "'idleConsumerLimit' must be 1 or higher");
448 		synchronized (this.lifecycleMonitor) {
449 			this.idleConsumerLimit = idleConsumerLimit;
450 		}
451 	}
452 
453 	/**
454 	 * Return the limit for the number of idle consumers.
455 	 */
456 	public final int getIdleConsumerLimit() {
457 		synchronized (this.lifecycleMonitor) {
458 			return this.idleConsumerLimit;
459 		}
460 	}
461 
462 	/**
463 	 * Specify the limit for idle executions of a consumer task, not having
464 	 * received any message within its execution. If this limit is reached,
465 	 * the task will shut down and leave receiving to other executing tasks.
466 	 * <p>The default is 1, closing idle resources early once a task didn't
467 	 * receive a message. This applies to dynamic scheduling only; see the
468 	 * {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"} setting.
469 	 * The minimum number of consumers
470 	 * (see {@link #setConcurrentConsumers "concurrentConsumers"})
471 	 * will be kept around until shutdown in any case.
472 	 * <p>Within each task execution, a number of message reception attempts
473 	 * (according to the "maxMessagesPerTask" setting) will each wait for an incoming
474 	 * message (according to the "receiveTimeout" setting). If all of those receive
475 	 * attempts in a given task return without a message, the task is considered
476 	 * idle with respect to received messages. Such a task may still be rescheduled;
477 	 * however, once it reached the specified "idleTaskExecutionLimit", it will
478 	 * shut down (in case of dynamic scaling).
479 	 * <p>Raise this limit if you encounter too frequent scaling up and down.
480 	 * With this limit being higher, an idle consumer will be kept around longer,
481 	 * avoiding the restart of a consumer once a new load of messages comes in.
482 	 * Alternatively, specify a higher "maxMessagesPerTask" and/or "receiveTimeout" value,
483 	 * which will also lead to idle consumers being kept around for a longer time
484 	 * (while also increasing the average execution time of each scheduled task).
485 	 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
486 	 * @see #setMaxMessagesPerTask
487 	 * @see #setReceiveTimeout
488 	 */
489 	public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
490 		Assert.isTrue(idleTaskExecutionLimit > 0, "'idleTaskExecutionLimit' must be 1 or higher");
491 		synchronized (this.lifecycleMonitor) {
492 			this.idleTaskExecutionLimit = idleTaskExecutionLimit;
493 		}
494 	}
495 
496 	/**
497 	 * Return the limit for idle executions of a consumer task.
498 	 */
499 	public final int getIdleTaskExecutionLimit() {
500 		synchronized (this.lifecycleMonitor) {
501 			return this.idleTaskExecutionLimit;
502 		}
503 	}
504 
505 
506 	//-------------------------------------------------------------------------
507 	// Implementation of AbstractMessageListenerContainer's template methods
508 	//-------------------------------------------------------------------------
509 
510 	@Override
511 	public void initialize() {
512 		// Adapt default cache level.
513 		if (this.cacheLevel == CACHE_AUTO) {
514 			this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER);
515 		}
516 
517 		// Prepare taskExecutor and maxMessagesPerTask.
518 		synchronized (this.lifecycleMonitor) {
519 			if (this.taskExecutor == null) {
520 				this.taskExecutor = createDefaultTaskExecutor();
521 			}
522 			else if (this.taskExecutor instanceof SchedulingTaskExecutor &&
523 					((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() &&
524 					this.maxMessagesPerTask == Integer.MIN_VALUE) {
525 				// TaskExecutor indicated a preference for short-lived tasks. According to
526 				// setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case
527 				// unless the user specified a custom value.
528 				this.maxMessagesPerTask = 10;
529 			}
530 		}
531 
532 		// Proceed with actual listener initialization.
533 		super.initialize();
534 	}
535 
536 	/**
537 	 * Creates the specified number of concurrent consumers,
538 	 * in the form of a JMS Session plus associated MessageConsumer
539 	 * running in a separate thread.
540 	 * @see #scheduleNewInvoker
541 	 * @see #setTaskExecutor
542 	 */
543 	@Override
544 	protected void doInitialize() throws JMSException {
545 		synchronized (this.lifecycleMonitor) {
546 			for (int i = 0; i < this.concurrentConsumers; i++) {
547 				scheduleNewInvoker();
548 			}
549 		}
550 	}
551 
552 	/**
553 	 * Destroy the registered JMS Sessions and associated MessageConsumers.
554 	 */
555 	@Override
556 	protected void doShutdown() throws JMSException {
557 		logger.debug("Waiting for shutdown of message listener invokers");
558 		try {
559 			synchronized (this.lifecycleMonitor) {
560 				// Waiting for AsyncMessageListenerInvokers to deactivate themselves...
561 				while (this.activeInvokerCount > 0) {
562 					if (logger.isDebugEnabled()) {
563 						logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
564 								" message listener invokers");
565 					}
566 					this.lifecycleMonitor.wait();
567 				}
568 				// Clear remaining scheduled invokers, possibly left over as paused tasks...
569 				for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
570 					scheduledInvoker.clearResources();
571 				}
572 				this.scheduledInvokers.clear();
573 			}
574 		}
575 		catch (InterruptedException ex) {
576 			// Re-interrupt current thread, to allow other threads to react.
577 			Thread.currentThread().interrupt();
578 		}
579 	}
580 
581 	/**
582 	 * Overridden to reset the stop callback, if any.
583 	 */
584 	@Override
585 	public void start() throws JmsException {
586 		synchronized (this.lifecycleMonitor) {
587 			this.stopCallback = null;
588 		}
589 		super.start();
590 	}
591 
592 	/**
593 	 * Stop this listener container, invoking the specific callback
594 	 * once all listener processing has actually stopped.
595 	 * <p>Note: Further {@code stop(runnable)} calls (before processing
596 	 * has actually stopped) will override the specified callback. Only the
597 	 * latest specified callback will be invoked.
598 	 * <p>If a subsequent {@link #start()} call restarts the listener container
599 	 * before it has fully stopped, the callback will not get invoked at all.
600 	 * @param callback the callback to invoke once listener processing
601 	 * has fully stopped
602 	 * @throws JmsException if stopping failed
603 	 * @see #stop()
604 	 */
605 	@Override
606 	public void stop(Runnable callback) throws JmsException {
607 		synchronized (this.lifecycleMonitor) {
608 			this.stopCallback = callback;
609 		}
610 		stop();
611 	}
612 
613 	/**
614 	 * Return the number of currently scheduled consumers.
615 	 * <p>This number will always be between "concurrentConsumers" and
616 	 * "maxConcurrentConsumers", but might be higher than "activeConsumerCount"
617 	 * (in case some consumers are scheduled but not executing at the moment).
618 	 * @see #getConcurrentConsumers()
619 	 * @see #getMaxConcurrentConsumers()
620 	 * @see #getActiveConsumerCount()
621 	 */
622 	public final int getScheduledConsumerCount() {
623 		synchronized (this.lifecycleMonitor) {
624 			return this.scheduledInvokers.size();
625 		}
626 	}
627 
628 	/**
629 	 * Return the number of currently active consumers.
630 	 * <p>This number will always be between "concurrentConsumers" and
631 	 * "maxConcurrentConsumers", but might be lower than "scheduledConsumerCount"
632 	 * (in case some consumers are scheduled but not executing at the moment).
633 	 * @see #getConcurrentConsumers()
634 	 * @see #getMaxConcurrentConsumers()
635 	 * @see #getActiveConsumerCount()
636 	 */
637 	public final int getActiveConsumerCount() {
638 		synchronized (this.lifecycleMonitor) {
639 			return this.activeInvokerCount;
640 		}
641 	}
642 
643 	/**
644 	 * Return whether at least one consumer has entered a fixed registration with the
645 	 * target destination. This is particularly interesting for the pub-sub case where
646 	 * it might be important to have an actual consumer registered that is guaranteed
647 	 * not to miss any messages that are just about to be published.
648 	 * <p>This method may be polled after a {@link #start()} call, until asynchronous
649 	 * registration of consumers has happened which is when the method will start returning
650 	 * {@code true} &ndash; provided that the listener container ever actually establishes
651 	 * a fixed registration. It will then keep returning {@code true} until shutdown,
652 	 * since the container will hold on to at least one consumer registration thereafter.
653 	 * <p>Note that a listener container is not bound to having a fixed registration in
654 	 * the first place. It may also keep recreating consumers for every invoker execution.
655 	 * This particularly depends on the {@link #setCacheLevel cache level} setting:
656 	 * only {@link #CACHE_CONSUMER} will lead to a fixed registration.
657 	 */
658 	public boolean isRegisteredWithDestination() {
659 		synchronized (this.lifecycleMonitor) {
660 			return (this.registeredWithDestination > 0);
661 		}
662 	}
663 
664 
665 	/**
666 	 * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
667 	 * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
668 	 * with the specified bean name (or the class name, if no bean name specified) as thread name prefix.
669 	 * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
670 	 */
671 	protected TaskExecutor createDefaultTaskExecutor() {
672 		String beanName = getBeanName();
673 		String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
674 		return new SimpleAsyncTaskExecutor(threadNamePrefix);
675 	}
676 
677 	/**
678 	 * Schedule a new invoker, increasing the total number of scheduled
679 	 * invokers for this listener container.
680 	 */
681 	private void scheduleNewInvoker() {
682 		AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
683 		if (rescheduleTaskIfNecessary(invoker)) {
684 			// This should always be true, since we're only calling this when active.
685 			this.scheduledInvokers.add(invoker);
686 		}
687 	}
688 
689 	/**
690 	 * Use a shared JMS Connection depending on the "cacheLevel" setting.
691 	 * @see #setCacheLevel
692 	 * @see #CACHE_CONNECTION
693 	 */
694 	@Override
695 	protected final boolean sharedConnectionEnabled() {
696 		return (getCacheLevel() >= CACHE_CONNECTION);
697 	}
698 
699 	/**
700 	 * Re-executes the given task via this listener container's TaskExecutor.
701 	 * @see #setTaskExecutor
702 	 */
703 	@Override
704 	protected void doRescheduleTask(Object task) {
705 		this.taskExecutor.execute((Runnable) task);
706 	}
707 
708 	/**
709 	 * Tries scheduling a new invoker, since we know messages are coming in...
710 	 * @see #scheduleNewInvokerIfAppropriate()
711 	 */
712 	@Override
713 	protected void messageReceived(Object invoker, Session session) {
714 		((AsyncMessageListenerInvoker) invoker).setIdle(false);
715 		scheduleNewInvokerIfAppropriate();
716 	}
717 
718 	/**
719 	 * Marks the affected invoker as idle.
720 	 */
721 	@Override
722 	protected void noMessageReceived(Object invoker, Session session) {
723 		((AsyncMessageListenerInvoker) invoker).setIdle(true);
724 	}
725 
726 	/**
727 	 * Schedule a new invoker, increasing the total number of scheduled
728 	 * invokers for this listener container, but only if the specified
729 	 * "maxConcurrentConsumers" limit has not been reached yet, and only
730 	 * if the specified "idleConsumerLimit" has not been reached either.
731 	 * <p>Called once a message has been received, in order to scale up while
732 	 * processing the message in the invoker that originally received it.
733 	 * @see #setTaskExecutor
734 	 * @see #getMaxConcurrentConsumers()
735 	 * @see #getIdleConsumerLimit()
736 	 */
737 	protected void scheduleNewInvokerIfAppropriate() {
738 		if (isRunning()) {
739 			resumePausedTasks();
740 			synchronized (this.lifecycleMonitor) {
741 				if (this.scheduledInvokers.size() < this.maxConcurrentConsumers &&
742 						getIdleInvokerCount() < this.idleConsumerLimit) {
743 					scheduleNewInvoker();
744 					if (logger.isDebugEnabled()) {
745 						logger.debug("Raised scheduled invoker count: " + this.scheduledInvokers.size());
746 					}
747 				}
748 			}
749 		}
750 	}
751 
752 	/**
753 	 * Determine whether the current invoker should be rescheduled,
754 	 * given that it might not have received a message in a while.
755 	 * @param idleTaskExecutionCount the number of idle executions
756 	 * that this invoker task has already accumulated (in a row)
757 	 */
758 	private boolean shouldRescheduleInvoker(int idleTaskExecutionCount) {
759 		boolean superfluous =
760 				(idleTaskExecutionCount >= this.idleTaskExecutionLimit && getIdleInvokerCount() > 1);
761 		return (this.scheduledInvokers.size() <=
762 				(superfluous ? this.concurrentConsumers : this.maxConcurrentConsumers));
763 	}
764 
765 	/**
766 	 * Determine whether this listener container currently has more
767 	 * than one idle instance among its scheduled invokers.
768 	 */
769 	private int getIdleInvokerCount() {
770 		int count = 0;
771 		for (AsyncMessageListenerInvoker invoker : this.scheduledInvokers) {
772 			if (invoker.isIdle()) {
773 				count++;
774 			}
775 		}
776 		return count;
777 	}
778 
779 
780 	/**
781 	 * Overridden to accept a failure in the initial setup - leaving it up to the
782 	 * asynchronous invokers to establish the shared Connection on first access.
783 	 * @see #refreshConnectionUntilSuccessful()
784 	 */
785 	@Override
786 	protected void establishSharedConnection() {
787 		try {
788 			super.establishSharedConnection();
789 		}
790 		catch (Exception ex) {
791 			if (ex instanceof JMSException) {
792 				invokeExceptionListener((JMSException) ex);
793 			}
794 			logger.debug("Could not establish shared JMS Connection - " +
795 					"leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex);
796 		}
797 	}
798 
799 	/**
800 	 * This implementations proceeds even after an exception thrown from
801 	 * {@code Connection.start()}, relying on listeners to perform
802 	 * appropriate recovery.
803 	 */
804 	@Override
805 	protected void startSharedConnection() {
806 		try {
807 			super.startSharedConnection();
808 		}
809 		catch (Exception ex) {
810 			logger.debug("Connection start failed - relying on listeners to perform recovery", ex);
811 		}
812 	}
813 
814 	/**
815 	 * This implementations proceeds even after an exception thrown from
816 	 * {@code Connection.stop()}, relying on listeners to perform
817 	 * appropriate recovery after a restart.
818 	 */
819 	@Override
820 	protected void stopSharedConnection() {
821 		try {
822 			super.stopSharedConnection();
823 		}
824 		catch (Exception ex) {
825 			logger.debug("Connection stop failed - relying on listeners to perform recovery after restart", ex);
826 		}
827 	}
828 
829 	/**
830 	 * Handle the given exception that arose during setup of a listener.
831 	 * Called for every such exception in every concurrent listener.
832 	 * <p>The default implementation logs the exception at warn level
833 	 * if not recovered yet, and at debug level if already recovered.
834 	 * Can be overridden in subclasses.
835 	 * @param ex the exception to handle
836 	 * @param alreadyRecovered whether a previously executing listener
837 	 * already recovered from the present listener setup failure
838 	 * (this usually indicates a follow-up failure than can be ignored
839 	 * other than for debug log purposes)
840 	 * @see #recoverAfterListenerSetupFailure()
841 	 */
842 	protected void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered) {
843 		if (ex instanceof JMSException) {
844 			invokeExceptionListener((JMSException) ex);
845 		}
846 		if (ex instanceof SharedConnectionNotInitializedException) {
847 			if (!alreadyRecovered) {
848 				logger.info("JMS message listener invoker needs to establish shared Connection");
849 			}
850 		}
851 		else {
852 			// Recovery during active operation..
853 			if (alreadyRecovered) {
854 				logger.debug("Setup of JMS message listener invoker failed - already recovered by other invoker", ex);
855 			}
856 			else {
857 				StringBuilder msg = new StringBuilder();
858 				msg.append("Setup of JMS message listener invoker failed for destination '");
859 				msg.append(getDestinationDescription()).append("' - trying to recover. Cause: ");
860 				msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
861 				if (logger.isDebugEnabled()) {
862 					logger.warn(msg, ex);
863 				}
864 				else {
865 					logger.warn(msg);
866 				}
867 			}
868 		}
869 	}
870 
871 	/**
872 	 * Recover this listener container after a listener failed to set itself up,
873 	 * for example re-establishing the underlying Connection.
874 	 * <p>The default implementation delegates to DefaultMessageListenerContainer's
875 	 * recovery-capable {@link #refreshConnectionUntilSuccessful()} method, which will
876 	 * try to re-establish a Connection to the JMS provider both for the shared
877 	 * and the non-shared Connection case.
878 	 * @see #refreshConnectionUntilSuccessful()
879 	 * @see #refreshDestination()
880 	 */
881 	protected void recoverAfterListenerSetupFailure() {
882 		this.recovering = true;
883 		try {
884 			refreshConnectionUntilSuccessful();
885 			refreshDestination();
886 		}
887 		finally {
888 			this.recovering = false;
889 		}
890 	}
891 
892 	/**
893 	 * Refresh the underlying Connection, not returning before an attempt has been
894 	 * successful. Called in case of a shared Connection as well as without shared
895 	 * Connection, so either needs to operate on the shared Connection or on a
896 	 * temporary Connection that just gets established for validation purposes.
897 	 * <p>The default implementation retries until it successfully established a
898 	 * Connection, for as long as this message listener container is running.
899 	 * Applies the specified recovery interval between retries.
900 	 * @see #setRecoveryInterval
901 	 * @see #start()
902 	 * @see #stop()
903 	 */
904 	protected void refreshConnectionUntilSuccessful() {
905 		BackOffExecution execution = this.backOff.start();
906 		while (isRunning()) {
907 			try {
908 				if (sharedConnectionEnabled()) {
909 					refreshSharedConnection();
910 				}
911 				else {
912 					Connection con = createConnection();
913 					JmsUtils.closeConnection(con);
914 				}
915 				logger.info("Successfully refreshed JMS Connection");
916 				break;
917 			}
918 			catch (Exception ex) {
919 				if (ex instanceof JMSException) {
920 					invokeExceptionListener((JMSException) ex);
921 				}
922 				StringBuilder msg = new StringBuilder();
923 				msg.append("Could not refresh JMS Connection for destination '");
924 				msg.append(getDestinationDescription()).append("' - retrying using ");
925 				msg.append(execution).append(". Cause: ");
926 				msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
927 				if (logger.isDebugEnabled()) {
928 					logger.error(msg, ex);
929 				}
930 				else {
931 					logger.error(msg);
932 				}
933 			}
934 			if (!applyBackOffTime(execution)) {
935 				StringBuilder msg = new StringBuilder();
936 				msg.append("Stopping container for destination '")
937 						.append(getDestinationDescription())
938 						.append("' - back off policy does not allow ").append("for further attempts.");
939 				logger.error(msg.toString());
940 				stop();
941 			}
942 		}
943 	}
944 
945 	/**
946 	 * Refresh the JMS destination that this listener container operates on.
947 	 * <p>Called after listener setup failure, assuming that a cached Destination
948 	 * object might have become invalid (a typical case on WebLogic JMS).
949 	 * <p>The default implementation removes the destination from a
950 	 * DestinationResolver's cache, in case of a CachingDestinationResolver.
951 	 * @see #setDestinationName
952 	 * @see org.springframework.jms.support.destination.CachingDestinationResolver
953 	 */
954 	protected void refreshDestination() {
955 		String destName = getDestinationName();
956 		if (destName != null) {
957 			DestinationResolver destResolver = getDestinationResolver();
958 			if (destResolver instanceof CachingDestinationResolver) {
959 				((CachingDestinationResolver) destResolver).removeFromCache(destName);
960 			}
961 		}
962 	}
963 
964 	/**
965 	 * Apply the next back off time using the specified {@link BackOffExecution}.
966 	 * <p>Return {@code true} if the back off period has been applied and a new
967 	 * attempt to recover should be made, {@code false} if no further attempt
968 	 * should be made.
969 	 */
970 	protected boolean applyBackOffTime(BackOffExecution execution) {
971 		long interval = execution.nextBackOff();
972 		if (interval == BackOffExecution.STOP) {
973 			return false;
974 		}
975 		else {
976 			try {
977 				Thread.sleep(interval);
978 			}
979 			catch (InterruptedException interEx) {
980 				// Re-interrupt current thread, to allow other threads to react.
981 				Thread.currentThread().interrupt();
982 			}
983 		}
984 		return true;
985 	}
986 
987 	private FixedBackOff createDefaultBackOff(long interval) {
988 		return new FixedBackOff(interval, Long.MAX_VALUE);
989 	}
990 
991 	/**
992 	 * Return whether this listener container is currently in a recovery attempt.
993 	 * <p>May be used to detect recovery phases but also the end of a recovery phase,
994 	 * with {@code isRecovering()} switching to {@code false} after having been found
995 	 * to return {@code true} before.
996 	 * @see #recoverAfterListenerSetupFailure()
997 	 */
998 	public final boolean isRecovering() {
999 		return this.recovering;
1000 	}
1001 
1002 
1003 	//-------------------------------------------------------------------------
1004 	// Inner classes used as internal adapters
1005 	//-------------------------------------------------------------------------
1006 
1007 	/**
1008 	 * Runnable that performs looped {@code MessageConsumer.receive()} calls.
1009 	 */
1010 	private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable {
1011 
1012 		private Session session;
1013 
1014 		private MessageConsumer consumer;
1015 
1016 		private Object lastRecoveryMarker;
1017 
1018 		private boolean lastMessageSucceeded;
1019 
1020 		private int idleTaskExecutionCount = 0;
1021 
1022 		private volatile boolean idle = true;
1023 
1024 		@Override
1025 		public void run() {
1026 			synchronized (lifecycleMonitor) {
1027 				activeInvokerCount++;
1028 				lifecycleMonitor.notifyAll();
1029 			}
1030 			boolean messageReceived = false;
1031 			try {
1032 				if (maxMessagesPerTask < 0) {
1033 					messageReceived = executeOngoingLoop();
1034 				}
1035 				else {
1036 					int messageCount = 0;
1037 					while (isRunning() && messageCount < maxMessagesPerTask) {
1038 						messageReceived = (invokeListener() || messageReceived);
1039 						messageCount++;
1040 					}
1041 				}
1042 			}
1043 			catch (Throwable ex) {
1044 				clearResources();
1045 				if (!this.lastMessageSucceeded) {
1046 					// We failed more than once in a row or on startup - sleep before
1047 					// first recovery attempt.
1048 					sleepBeforeRecoveryAttempt();
1049 				}
1050 				this.lastMessageSucceeded = false;
1051 				boolean alreadyRecovered = false;
1052 				synchronized (recoveryMonitor) {
1053 					if (this.lastRecoveryMarker == currentRecoveryMarker) {
1054 						handleListenerSetupFailure(ex, false);
1055 						recoverAfterListenerSetupFailure();
1056 						currentRecoveryMarker = new Object();
1057 					}
1058 					else {
1059 						alreadyRecovered = true;
1060 					}
1061 				}
1062 				if (alreadyRecovered) {
1063 					handleListenerSetupFailure(ex, true);
1064 				}
1065 			}
1066 			finally {
1067 				synchronized (lifecycleMonitor) {
1068 					decreaseActiveInvokerCount();
1069 					lifecycleMonitor.notifyAll();
1070 				}
1071 				if (!messageReceived) {
1072 					this.idleTaskExecutionCount++;
1073 				}
1074 				else {
1075 					this.idleTaskExecutionCount = 0;
1076 				}
1077 				synchronized (lifecycleMonitor) {
1078 					if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
1079 						// We're shutting down completely.
1080 						scheduledInvokers.remove(this);
1081 						if (logger.isDebugEnabled()) {
1082 							logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
1083 						}
1084 						lifecycleMonitor.notifyAll();
1085 						clearResources();
1086 					}
1087 					else if (isRunning()) {
1088 						int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
1089 						if (nonPausedConsumers < 1) {
1090 							logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +
1091 									"Check your thread pool configuration! Manual recovery necessary through a start() call.");
1092 						}
1093 						else if (nonPausedConsumers < getConcurrentConsumers()) {
1094 							logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
1095 									"due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
1096 									"to be triggered by remaining consumers.");
1097 						}
1098 					}
1099 				}
1100 			}
1101 		}
1102 
1103 		private boolean executeOngoingLoop() throws JMSException {
1104 			boolean messageReceived = false;
1105 			boolean active = true;
1106 			while (active) {
1107 				synchronized (lifecycleMonitor) {
1108 					boolean interrupted = false;
1109 					boolean wasWaiting = false;
1110 					while ((active = isActive()) && !isRunning()) {
1111 						if (interrupted) {
1112 							throw new IllegalStateException("Thread was interrupted while waiting for " +
1113 									"a restart of the listener container, but container is still stopped");
1114 						}
1115 						if (!wasWaiting) {
1116 							decreaseActiveInvokerCount();
1117 						}
1118 						wasWaiting = true;
1119 						try {
1120 							lifecycleMonitor.wait();
1121 						}
1122 						catch (InterruptedException ex) {
1123 							// Re-interrupt current thread, to allow other threads to react.
1124 							Thread.currentThread().interrupt();
1125 							interrupted = true;
1126 						}
1127 					}
1128 					if (wasWaiting) {
1129 						activeInvokerCount++;
1130 					}
1131 					if (scheduledInvokers.size() > maxConcurrentConsumers) {
1132 						active = false;
1133 					}
1134 				}
1135 				if (active) {
1136 					messageReceived = (invokeListener() || messageReceived);
1137 				}
1138 			}
1139 			return messageReceived;
1140 		}
1141 
1142 		private boolean invokeListener() throws JMSException {
1143 			initResourcesIfNecessary();
1144 			boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
1145 			this.lastMessageSucceeded = true;
1146 			return messageReceived;
1147 		}
1148 
1149 		private void decreaseActiveInvokerCount() {
1150 			activeInvokerCount--;
1151 			if (stopCallback != null && activeInvokerCount == 0) {
1152 				stopCallback.run();
1153 				stopCallback = null;
1154 			}
1155 		}
1156 
1157 		private void initResourcesIfNecessary() throws JMSException {
1158 			if (getCacheLevel() <= CACHE_CONNECTION) {
1159 				updateRecoveryMarker();
1160 			}
1161 			else {
1162 				if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
1163 					updateRecoveryMarker();
1164 					this.session = createSession(getSharedConnection());
1165 				}
1166 				if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
1167 					this.consumer = createListenerConsumer(this.session);
1168 					synchronized (lifecycleMonitor) {
1169 						registeredWithDestination++;
1170 					}
1171 				}
1172 			}
1173 		}
1174 
1175 		private void updateRecoveryMarker() {
1176 			synchronized (recoveryMonitor) {
1177 				this.lastRecoveryMarker = currentRecoveryMarker;
1178 			}
1179 		}
1180 
1181 		private void clearResources() {
1182 			if (sharedConnectionEnabled()) {
1183 				synchronized (sharedConnectionMonitor) {
1184 					JmsUtils.closeMessageConsumer(this.consumer);
1185 					JmsUtils.closeSession(this.session);
1186 				}
1187 			}
1188 			else {
1189 				JmsUtils.closeMessageConsumer(this.consumer);
1190 				JmsUtils.closeSession(this.session);
1191 			}
1192 			if (this.consumer != null) {
1193 				synchronized (lifecycleMonitor) {
1194 					registeredWithDestination--;
1195 				}
1196 			}
1197 			this.consumer = null;
1198 			this.session = null;
1199 		}
1200 
1201 		/**
1202 		 * Apply the back off time once. In a regular scenario, the back off is only applied if we
1203 		 * failed to recover with the broker. This additional sleep period avoids a burst retry
1204 		 * scenario when the broker is actually up but something else if failing (i.e. listener
1205 		 * specific).
1206 		 */
1207 		private void sleepBeforeRecoveryAttempt() {
1208 			BackOffExecution execution = DefaultMessageListenerContainer.this.backOff.start();
1209 			applyBackOffTime(execution);
1210 		}
1211 
1212 		@Override
1213 		public boolean isLongLived() {
1214 			return (maxMessagesPerTask < 0);
1215 		}
1216 
1217 		public void setIdle(boolean idle) {
1218 			this.idle = idle;
1219 		}
1220 
1221 		public boolean isIdle() {
1222 			return this.idle;
1223 		}
1224 	}
1225 
1226 }