Announcement Announcement Module
Collapse
No announcement yet.
Unable to restart DefaultMessageListener after stopping via stop callback Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Unable to restart DefaultMessageListener after stopping via stop callback

    Hi All,

    I am developing an application which uses Spring's DefaultMessageListenerContainer to implement a MDP. As part of this application I have to implement a task which will run on a schedule (every x minutes) which I am implementing via Springs scheduledTasks support:

    Code:
    <task:scheduled-tasks scheduler="heartbeatScheduler">
        <task:scheduled ref="heartbeatService" method="processHeartbeat" cron="${heartbeat.cron}"/>
    </task:scheduled-tasks>
    
     <task:scheduler id="heartbeatScheduler" pool-size="1"/>
    Before this code executes it needs to stop/pause the JMS Listener container to avoid a potential race condition. As such I have attempted to implement this via the SmartLifecycle stop callback e.g:

    Code:
    messageListenerContainer.stop(new Runnable() {
    
                @Override
                public void run() {
                    LOGGER.info("JMS Container stopped executing task");
                    performTask();
                    LOGGER.info("Task complete starting JMS Container");
                    messageListenerContainer.start();
                }
    });
    The issue however is that although the messageListenerContainer.start(); is called and the container reports that it is both active and running, from checking the activemq console the JMS connection is never reinstated.

    If however I stop the container via the stop() callback and poll the container until the activeConsumerCount == 0 and then restart, the JMS Connection is reinstated correctly.

    My message listener container config is as follows:

    Code:
    <bean id="adapterListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="jmsConnectionFactory"/>
            <property name="destinationName" value="${inbound.destination}" />
            <property name="messageListener" ref="messageListener" />
            <property name="transactionManager" ref="jmsTransactionManager" />
            <property name="maxConcurrentConsumers" value="${max.concurrent.consumers}" />
    </bean>
    Any help you could provide would be much appreciated.
    Last edited by pbray; Jun 21st, 2012, 11:51 PM.

  • #2
    I suggest you run with TRACE level logging and take a look at the log. Attach it it here if you can't figure out what's going on.

    Comment


    • #3
      Hi Gary,

      Thanks for your quick response. As suggested I have enabled Trace level logging for Spring as follows:

      Code:
      log4j.category.org.springframework =TRACE
      Unfortunately this has not provided me with any new insights as to why the container is not re-establishing the connection.

      Log output is included below:

      Code:
      2012-07-02 14:00:38,772 [main] INFO  org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService  'heartbeatScheduler'
      2012-07-02 14:00:38,772 [main] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Finished creating instance of bean 'heartbeatScheduler'
      2012-07-02 14:00:38,772 [main] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.scheduling.support.ScheduledMethodRunnable#0'
      2012-07-02 14:00:38,788 [main] INFO  org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647
      2012-07-02 14:00:38,788 [main] DEBUG org.springframework.context.support.DefaultLifecycleProcessor - Starting bean 'adapterListener' of type [class org.springframework.jms.listener.DefaultMessageListenerContainer]
      2012-07-02 14:00:38,960 [main] DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer - Established shared JMS Connection
      2012-07-02 14:00:38,960 [main] DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@16fcc4
      2012-07-02 14:00:39,006 [main] DEBUG org.springframework.context.support.DefaultLifecycleProcessor - Successfully started bean 'adapterListener'
      2012-07-02 14:00:58,477 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Creating new transaction with name [adapterListener]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
      2012-07-02 14:00:58,477 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Created JMS transaction on Session [ActiveMQSession {id=ID:w5c260a886515-2436-1341201638850-1:21:1,started=false}] from Connection [ActiveMQConnection {id=ID:w5c260a886515-2436-1341201638850-1:21,clientId=ID:w5c260a886515-2436-1341201638850-0:21,started=false}]
      2012-07-02 14:00:58,477 [adapterListener-1] TRACE org.springframework.transaction.support.TransactionSynchronizationManager - Bound value [org.springframework.jms.connection.JmsResourceHolder@17ccb2f] for key [org.apache.activemq.ActiveMQConnectionFactory@f49e8f] to thread [adapterListener-1]
      2012-07-02 14:00:58,477 [adapterListener-1] TRACE org.springframework.transaction.support.TransactionSynchronizationManager - Retrieved value [org.springframework.jms.connection.JmsResourceHolder@17ccb2f] for key [org.apache.activemq.ActiveMQConnectionFactory@f49e8f] bound to thread [adapterListener-1]
      2012-07-02 14:00:59,492 [adapterListener-1] TRACE org.springframework.jms.listener.DefaultMessageListenerContainer - Consumer [ActiveMQMessageConsumer { value=ID:w5c260a886515-2436-1341201638850-1:21:1:1, started=true }] of transactional session [ActiveMQSession {id=ID:w5c260a886515-2436-1341201638850-1:21:1,started=true}] did not receive a message
      2012-07-02 14:00:59,492 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Initiating transaction commit
      2012-07-02 14:00:59,492 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Committing JMS transaction on Session [ActiveMQSession {id=ID:w5c260a886515-2436-1341201638850-1:21:1,started=true}]
      2012-07-02 14:00:59,492 [adapterListener-1] TRACE org.springframework.transaction.support.TransactionSynchronizationManager - Removed value [org.springframework.jms.connection.JmsResourceHolder@17ccb2f] for key [org.apache.activemq.ActiveMQConnectionFactory@f49e8f] from thread [adapterListener-1]
      2012-07-02 14:00:59,523 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Creating new transaction with name [adapterListener]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
      2012-07-02 14:00:59,523 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Created JMS transaction on Session [ActiveMQSession {id=ID:w5c260a886515-2436-1341201638850-1:22:1,started=false}] from Connection [ActiveMQConnection {id=ID:w5c260a886515-2436-1341201638850-1:22,clientId=ID:w5c260a886515-2436-1341201638850-0:22,started=false}]
      2012-07-02 14:00:59,523 [adapterListener-1] TRACE org.springframework.transaction.support.TransactionSynchronizationManager - Bound value [org.springframework.jms.connection.JmsResourceHolder@1768b0a] for key [org.apache.activemq.ActiveMQConnectionFactory@f49e8f] to thread [adapterListener-1]
      2012-07-02 14:00:59,523 [adapterListener-1] TRACE org.springframework.transaction.support.TransactionSynchronizationManager - Retrieved value [org.springframework.jms.connection.JmsResourceHolder@1768b0a] for key [org.apache.activemq.ActiveMQConnectionFactory@f49e8f] bound to thread [adapterListener-1]
      2012-07-02 14:01:00,538 [adapterListener-1] TRACE org.springframework.jms.listener.DefaultMessageListenerContainer - Consumer [ActiveMQMessageConsumer { value=ID:w5c260a886515-2436-1341201638850-1:22:1:1, started=true }] of transactional session [ActiveMQSession {id=ID:w5c260a886515-2436-1341201638850-1:22:1,started=true}] did not receive a message
      2012-07-02 14:01:00,538 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Initiating transaction commit
      2012-07-02 14:01:00,538 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Committing JMS transaction on Session [ActiveMQSession {id=ID:w5c260a886515-2436-1341201638850-1:22:1,started=true}]
      2012-07-02 14:01:00,538 [adapterListener-1] TRACE org.springframework.transaction.support.TransactionSynchronizationManager - Removed value [org.springframework.jms.connection.JmsResourceHolder@1768b0a] for key [org.apache.activemq.ActiveMQConnectionFactory@f49e8f] from thread [adapterListener-1]
      2012-07-02 14:01:00,569 [adapterListener-1] INFO  HeartbeatService - JMS Container stopped executing task
      2012-07-02 14:01:01,115 [adapterListener-1] INFO  HeartbeatService - Task complete starting JMS Container
      As you can see after the HeartbeatService Runnable completes execution there is no further output.

      Any further assistance you could provide would be much appreciated.

      Thanks,
      Patrick Bray

      Comment


      • #4
        I am confused; you seem to be trying to stop/start the container on one of its own threads...

        2012-07-02 14:01:00,569 [adapterListener-1] INFO HeartbeatService - JMS Container stopped executing task
        2012-07-02 14:01:01,115 [adapterListener-1] INFO HeartbeatService - Task complete starting JMS Container

        ...yet I didn't see an inbound message; shouldn't we see the stop() running on a heartbeatScheduler thread?

        Comment


        • #5
          Hi Gary,

          Once again thanks for your quick response. You are absolutely right the issue I was having was due to the stop callback running under the JMS listener thread and hence being unable to restart the container. Modifying the stop callback as follows has resolved my issue and the connection is re-established successfully:

          Code:
          messageListenerContainer.stop(new Runnable() {
          
                      @Override
                      public void run() {
                          taskExecutor.execute(new HeartbeatWorker());
                      }
          });
          Code:
              private final class HeartbeatWorker implements Runnable {
          
                  @Override
                  public void run() {
                      try {
                          LOGGER.info("JMS Container stopped executing task");
                          performTask();
                      }
                      finally {
                          LOGGER.info("Task complete starting JMS Container");
                          messageListenerContainer.start();
                      }
                  }
              }
          Code:
          2012-07-03 10:32:00,129 [heartbeatScheduler-1] INFO  WSHeartbeatServiceNonPolling - Processing heartbeat
          2012-07-03 10:32:01,113 [adapterListener-1] TRACE org.springframework.jms.listener.DefaultMessageListenerContainer - Consumer [ActiveMQMessageConsumer { value=ID:w5c260a886515-1913-1341274679580-1:814:1:1, started=true }] of transactional session [ActiveMQSession {id=ID:w5c260a886515-1913-1341274679580-1:814:1,started=true}] did not receive a message
          2012-07-03 10:32:01,113 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Initiating transaction commit
          2012-07-03 10:32:01,113 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Committing JMS transaction on Session [ActiveMQSession {id=ID:w5c260a886515-1913-1341274679580-1:814:1,started=true}]
          2012-07-03 10:32:01,113 [heartbeatScheduler-1] INFO  WSHeartbeatServiceNonPolling - JMS Container stopped executing task
          2012-07-03 10:32:01,113 [heartbeatScheduler-1] INFO  WSHeartbeatServiceNonPolling - Sending heartbeat request
          2012-07-03 10:32:01,144 [heartbeatScheduler-1] INFO  WSHeartbeatServiceNonPolling - Heartbeat processing complete 0 results returned
          2012-07-03 10:32:01,144 [heartbeatScheduler-1] INFO  WSHeartbeatServiceNonPolling - Task complete starting JMS Container
          2012-07-03 10:32:01,144 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Creating new transaction with name [adapterListener]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
          2012-07-03 10:32:01,144 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Created JMS transaction on Session [ActiveMQSession {id=ID:w5c260a886515-1913-1341274679580-1:815:1,started=false}] from Connection [ActiveMQConnection {id=ID:w5c260a886515-1913-1341274679580-1:815,clientId=ID:w5c260a886515-1913-1341274679580-0:815,started=false}]
          2012-07-03 10:32:02,144 [adapterListener-1] TRACE org.springframework.jms.listener.DefaultMessageListenerContainer - Consumer [ActiveMQMessageConsumer { value=ID:w5c260a886515-1913-1341274679580-1:815:1:1, started=true }] of transactional session [ActiveMQSession {id=ID:w5c260a886515-1913-1341274679580-1:815:1,started=true}] did not receive a message
          2012-07-03 10:32:02,144 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Initiating transaction commit
          Thanks for all your help it is much appreciated.

          Thanks,
          Patrick Bray

          Comment

          Working...
          X