Announcement Announcement Module
Collapse
No announcement yet.
DefaultMessageListenerContainer stops receiving messages after MQ restart Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • DefaultMessageListenerContainer stops receiving messages after MQ restart

    Hi.

    I'm using a DefaultMessageListenerContainer for a MDP. Receiving messages works without any problems. Now when I stop the MQ-Server (ActiveMQ 5.1) and restart it, I would expect that the DMLC re-establishes the connection to the MQ-Server. If the MQ-Server doesn't send anything to the DMLC as failure-information, is it possible to configure some connection check interval or something to note the connection loss?

    My configuration looks like this (spring 2.5.5):

    Code:
    	<!--  ActiveMQ destinations to use  -->
    	<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue" autowire="constructor">
    		<constructor-arg value="NEW" />
    	</bean>
    
    	<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    		<property name="brokerURL">
    			<value>tcp://localhost:61616</value>
    		</property>
    		<property name="redeliveryPolicy">
    			<ref local="redeliveryPolicy" />
    		</property>
    	</bean>
    	<bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    		<property name="initialRedeliveryDelay" value="10000" />
    		<property name="maximumRedeliveries" value="4" />
    		<property name="useExponentialBackOff" value="true" />
    		<property name="backOffMultiplier" value="2" />
    	</bean>
    		
    	<!-- Spring JMS Template -->
    	<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    		<property name="connectionFactory">
    			<!-- lets wrap in a pool to avoid creating a connection per send -->
    			<bean class="org.springframework.jms.connection.SingleConnectionFactory">
    				<property name="targetConnectionFactory">
    					<ref local="jmsFactory" />
    				</property>
    			</bean>
    		</property>
    	</bean>
    
    
    	<!-- and this is the message listener container -->
    	<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<property name="connectionFactory" ref="jmsFactory"/>
    		<property name="destination" ref="destination"/>
    		<property name="messageListener" ref="messageListener" />
    		<property name="concurrentConsumers" value="100" />
    		<property name="maxMessagesPerTask" value="1" />
    		<property name="sessionTransacted" value="true" />		
    	</bean>
    
    	<!-- this is the Message Driven POJO (MDP) -->
    	<bean id="messageListener" class="com.foo.jms.Consumer" />
    Any hints?

  • #2
    I had this issue too, turns out you need to add a transaction manager to the DefaultMessageListenerContainer, that solved the issue for me

    So in your case:

    add:
    Code:
    	<bean id="jmsTransActionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    		<property name="connectionFactory">
    			<ref local="jmsConnectionFactory" />
    		</property>
    	</bean>
    
    	<!-- and this is the message listener container -->
    	<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<property name="connectionFactory" ref="jmsFactory"/>
    		<property name="destination" ref="destination"/>
    		<property name="messageListener" ref="messageListener" />
    		<property name="concurrentConsumers" value="100" />
    		<property name="maxMessagesPerTask" value="1" />
    		<property name="sessionTransacted" value="true" />		
    		<property name="transactionManager" ref="jmsTransActionManager" />	
    	</bean>
    Wouter

    Comment


    • #3
      Thanks for the detailed answer!

      I setup everything like in your example. Now the MessageListener is very noisy and prints continuously things like:

      Code:
      [17.07. 08:58:17,DEBUG,WireFormatNegotiator,]: Received WireFormat: WireFormatInfo { version=3, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
      [17.07. 08:58:17,DEBUG,WireFormatNegotiator,]: tcp://localhost/127.0.0.1:34650 before negotiation: OpenWireFormat{version=3, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false}
      [17.07. 08:58:17,DEBUG,WireFormatNegotiator,]: tcp://localhost/127.0.0.1:34650 after negotiation: OpenWireFormat{version=3, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false}
      [17.07. 08:58:17,DEBUG,FailoverTransport,]: Connection established
      [17.07. 08:58:17, INFO,FailoverTransport,]: Successfully connected to tcp://foo.bar.local:34650
      [17.07. 08:58:18,DEBUG,ActiveMQSession,]: ID:foo.bar.local-56164-1216277883792-0:15:1 Transaction Commit
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Stopped.
      [17.07. 08:58:18,DEBUG,TcpTransport,]: Stopping transport tcp://localhost/127.0.0.1:34650
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Started.
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Waking up reconnect task
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Waiting 10 ms before attempting connection.
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Waiting 20 ms before attempting connection.
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Waiting 40 ms before attempting connection.
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Waiting 80 ms before attempting connection.
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Waiting 160 ms before attempting connection.
      [17.07. 08:58:18,DEBUG,ActiveMQSession,]: ID:foo.bar.local-56164-1216277883792-0:14:1 Transaction Commit
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Stopped.
      [17.07. 08:58:18,DEBUG,TcpTransport,]: Stopping transport tcp://localhost/127.0.0.1:34650
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Started.
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Waking up reconnect task
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Waiting 10 ms before attempting connection.
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Waiting 20 ms before attempting connection.
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Waiting 40 ms before attempting connection.
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Waiting 80 ms before attempting connection.
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Waiting 160 ms before attempting connection.
      [17.07. 08:58:18,DEBUG,FailoverTransport,]: Waiting 320 ms before attempting connection.
      [17.07. 08:58:19,DEBUG,FailoverTransport,]: Waiting 320 ms before attempting connection.
      [17.07. 08:58:19,DEBUG,FailoverTransport,]: Waiting 640 ms before attempting connection.
      [17.07. 08:58:19,DEBUG,FailoverTransport,]: Waiting 640 ms before attempting connection.
      [17.07. 08:58:19, INFO,DiscoveryTransport,]: Adding new broker connection URL: tcp://foo.bar.local:34650
      [17.07. 08:58:19, INFO,DiscoveryTransport,]: Adding new broker connection URL: tcp://foo.bar.local:34650
      [17.07. 08:58:19,DEBUG,FailoverTransport,]: Waking up reconnect task
      [17.07. 08:58:19,DEBUG,FailoverTransport,]: Waking up reconnect task
      [17.07. 08:58:19,DEBUG,FailoverTransport,]: Attempting connect to: tcp://foo.bar.local:34650
      [17.07. 08:58:19,DEBUG,WireFormatNegotiator,]: Sending: WireFormatInfo { version=3, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
      [17.07. 08:58:19,DEBUG,WireFormatNegotiator,]: Received WireFormat: WireFormatInfo { version=3, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
      [17.07. 08:58:19,DEBUG,WireFormatNegotiator,]: tcp://localhost/127.0.0.1:34650 before negotiation: OpenWireFormat{version=3, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false}
      [17.07. 08:58:19,DEBUG,WireFormatNegotiator,]: tcp://localhost/127.0.0.1:34650 after negotiation: OpenWireFormat{version=3, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false}
      [17.07. 08:58:19,DEBUG,FailoverTransport,]: Connection established
      [17.07. 08:58:19, INFO,FailoverTransport,]: Successfully connected to tcp://foo.bar.local:34650
      is that expected behavior? So that means it has to actively check the connections?

      Another thing is, that I've configured RedeliveryPolicy within ActiveMQ. That worked before (redelivery with backoff/delays), but now It tries to redelivery without any delay/backoff - just immediately. I'm a bit confused right now why that happens oO...

      Comment


      • #4
        Hibernate transaction

        I am having same issue with my ActiveMQ .
        I am using Hiberante transaction for some of the DAO calls inside the listener.
        How can I use another transaction manager for these JMS messages?
        Please reply.. This is really important...

        How do I use
        <bean id="jmsPostIntakeContainer"
        class="org.springframework.jms.listener.DefaultMes sageListenerContainer"
        parent="template.jms.postintake">
        <property name="messageListener" ref="pingPostProcessor" />
        <property name="concurrentConsumers" value="2" />
        <property name="transactionManager"
        ref="hibernateTransactionManager" />
        <!-- <property name="recoveryInterval" value="300000" /> -->
        </bean>

        Comment

        Working...
        X