Announcement Announcement Module
Collapse
No announcement yet.
reply-listener may not recover from network outage Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • reply-listener may not recover from network outage

    I am using an outbound-gateway and reply-listener with Tibco EMS. If EMS is bounced the outbound-gateway and reply-listener reconnect perfectly fine. However, if a request occurs while the there is a network disconnect (e.g. I unplug the ethernet cable from my EMS server) the outbound-gateway seems to recover and resume sending messages but the reply-listeners does not consistently do so. When the reply-listener fails to realize it has been disconnected, the result is the outbound-gateways sending requests with reply to destination that no longer exists. Thus all requests timeout. Interestingly, not only will bouncing the client correct this situation, but so will bouncing EMS.

    If I remove the reply-listener and allow the outbound-gateway to create a new reply to destination with every request then recovery is very consistent.

    Does anyone have any ideas about how this may be overcome, outside of removing the reply-listener?

    The offending outbound-gateway:

    Code:
            <int-jms:outbound-gateway request-channel="requestChannel" correlation-key="JMSCorrelationID"
    		request-pub-sub-domain="true" request-destination-name="polo"
    		connection-factory="poloConnectionFactory" explicit-qos-enabled="true"
    		time-to-live="5000" receive-timeout="5000">
    		<int-jms:reply-listener/>
    	</int-jms:outbound-gateway>

  • #2
    This is likely a function of your Tibco client.

    Take a look at the connection factory configuration options - look for heartbeats, keepalives, socket timeouts, or something similar.

    Pulling an ethernet cable is the worst thing you can do because, in some situations, the tcp stack doesn't notify the application (Tibco client) that the socket no longer exists. The reply listener container is happily calling receive every second (by default) but since the connection is broken the client doesn't return any data. This is definitely a problem if there's a switch in the network and the cable is pulled on the "other" side of the switch; such disconnections are usually not propagated.

    For these reasons it's always a good idea to have some level of heartbeating (or low-level socket keep-alives) going on, or have the connection time out after some period of inactivity.

    Equally, the broker is unaware of the broken connection. So, when you bounce the broker (presumably after reconnecting the cable), he will send a RST packet which triggers the client to throw an exception to the listener container, which will then reconnect.

    I suspect that if you bounce the broker while the cable is still disconnected it won't recover; even if you subsequently re-bounce the broker after reconnecting.

    If you believe you are seeing something else, please attach a TRACE level log.

    Comment


    • #3
      Yay fixed

      Thanks for the awesome and quick response. I knew you had nailed it the moment I read it and I should have found it myself. But since we have come this far, I thought I would mention for other EMS user having this problem that in retesting, I set the global client_timeout_server_connection=15 and server_heartbeat_client=5. (server_heartbeat_client=5 is the default according to the docs but EMS got angry at me for setting client_timeout_server_connection without setting server_heartbeat_client)

      Once I set these, I could not make my client apps stay down via any combination of network disconnects or restarts. I have not yet experimented with client_heartbeat_server or server_timeout_client_connection, but the docs lead me to believe that they may help the server reclaim resources associated with lost network connections.

      One final note. The first message during the recovery sometimes throws a NullPointerException:

      Code:
      Exception in thread "org.springframework.integration.jms.JmsOutboundGateway#0.replyListener-55" java.lang.NullPointerException
           at org.springframework.jms.listener.AbstractMessageListenerContainer.getDestinationDescription(AbstractMessageListenerContainer.java:207)
           at org.springframework.integration.jms.JmsOutboundGateway$GatewayReplyListenerContainer.getDestinationDescription(JmsOutboundGateway.java:1034)
           at org.springframework.jms.listener.DefaultMessageListenerContainer.refreshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:881)
           at org.springframework.jms.listener.DefaultMessageListenerContainer.recoverAfterListenerSetupFailure(DefaultMessageListenerContainer.java:851)
           at org.springframework.integration.jms.JmsOutboundGateway$GatewayReplyListenerContainer.recoverAfterListenerSetupFailure(JmsOutboundGateway.java:1041)
           at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:982)
           at java.lang.Thread.run(Thread.java:680)
      I have no issue with an exception being throw at this point and the fact that is it a NullPointerException is not causing me any issues either. Still, I thought consideration should be made of handling it bit more deliberately so as to result in some descendent of org.springframework.jms.JmsException.

      Comment


      • #4
        Thanks for letting us know that you solved your problem.

        We (the Spring Integration Team) don't like NPEs at all; while it is benign from a functional perspective, it does prevent the logging of the "Could not refresh JMS Connection..." message.

        I have opened a JIRA Issue for this.

        Comment


        • #5
          I'm using the JMS outbound-gateway (with spring-integration 2.2.1) and I have the same NPE exception when the gateway tries to reconnect after the central JMS service is restarted.
          This time the uncaught exception is more critical because it seems could prevent the outbound-gateway to safely reconnect.

          Below the exception stack log.

          Code:
          00:00:54,468 ERROR [stderr] (DefaultMessageListenerContainer-31) Exception in thread "DefaultMessageListenerContainer-31" java.lang.NullPointerException
          00:00:54,468 ERROR [stderr] (DefaultMessageListenerContainer-31)     at org.springframework.jms.listener.AbstractMessageListenerContainer.getDestinationDescription(AbstractMessageListenerContainer.java:207)
          00:00:54,496 ERROR [stderr] (DefaultMessageListenerContainer-31)     at org.springframework.integration.jms.JmsOutboundGateway$GatewayReplyListenerContainer.getDestinationDescription(JmsOutboundGateway.java:1021)
          00:00:54,497 ERROR [stderr] (DefaultMessageListenerContainer-31)     at org.springframework.jms.listener.DefaultMessageListenerContainer.handleListenerSetupFailure(DefaultMessageListenerContainer.java:828)
          00:00:54,497 ERROR [stderr] (DefaultMessageListenerContainer-31)     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:981)
          00:00:54,498 ERROR [stderr] (DefaultMessageListenerContainer-31)     at java.lang.Thread.run(Thread.java:722)
          00:00:59,400 ERROR [stderr] (DefaultMessageListenerContainer-32) Exception in thread "DefaultMessageListenerContainer-32" java.lang.NullPointerException
          00:00:59,401 ERROR [stderr] (DefaultMessageListenerContainer-32)     at org.springframework.jms.listener.AbstractMessageListenerContainer.getDestinationDescription(AbstractMessageListenerContainer.java:207)
          00:00:59,402 ERROR [stderr] (DefaultMessageListenerContainer-32)     at org.springframework.integration.jms.JmsOutboundGateway$GatewayReplyListenerContainer.getDestinationDescription(JmsOutboundGateway.java:1021)
          00:00:59,402 ERROR [stderr] (DefaultMessageListenerContainer-32)     at org.springframework.jms.listener.DefaultMessageListenerContainer.handleListenerSetupFailure(DefaultMessageListenerContainer.java:828)
          00:00:59,403 ERROR [stderr] (DefaultMessageListenerContainer-32)     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:981)
          00:00:59,404 ERROR [stderr] (DefaultMessageListenerContainer-32)     at java.lang.Thread.run(Thread.java:722)
          The exception is continuously thrown and the outbound-gateway stops working.

          Looking at the source, DefaultMessageListenerContainer.java, line 981, it seems that the uncaught NPE exception prevent to execute the next line of code which call the recoverAfterListenerSetupFailure() method (that It is supposed it should try to recover the gateway listener).

          Comment


          • #6
            This is indeed the same problem, but at a different place. The previous one was benign (it was already in refreshConnectionUntilSuccessful) while your case prevents us from getting into that method.

            I have increased the priority of the JIRA issue accordingly.

            However, I'd like to understand exactly what happened in your case; that code (that threw the NPE) should not be executed if the exception is SharedConnectionNotInitializedException; which is what I would expect with a lost connection.

            Do you have any logs preceding the exception you posted that might provide more insight?

            Also, can you post your full configuration of the gateway?

            As a temporary work-around, you could stop() and start() the gateway after such a failure and it should reconnect ok. You can use JMX or a <control-bus/> to do that.

            Comment


            • #7
              Many thanks for quick response and suggestions.
              Our central service has a QPid message broker accessed by web application with spring jms/integration over QPid java client jms library.
              Every day around the midnight, the central service is shut down for maintainance and then restarted after some minutes.
              The problem happens after the reconnection to the service but not every day (it occurs occasionaly... 2 times in the last two months).

              Below some log excerpts.

              Code:
              00:00:41,252 WARN  [org.springframework.jms.listener.DefaultMessageListenerContainer] (org.springframework.jms.listener.DefaultMessageListenerContainer#0-1) Could not refresh JMS Connection for destination 'fastcache.internal' - retrying in 5000 ms. Cause: Error creating connection: connection-forced: Authentication failed
              00:00:41,273 INFO  [org.springframework.integration.jms.JmsOutboundGateway$GatewayReplyListenerContainer] (DefaultMessageListenerContainer-28) JMS message listener invoker needs to establish shared Connection
              00:00:41,297 INFO  [org.apache.qpid.client.AMQConnection] (DefaultMessageListenerContainer-28) Unable to connect to broker at tcp://192.168.7.18:17902: org.apache.qpid.AMQException: Cannot connect to broker: connection-forced: Authentication failed [error code 320: connection forced]
              	at org.apache.qpid.client.AMQConnectionDelegate_0_10.makeBrokerConnection(AMQConnectionDelegate_0_10.java:241) [qpid-client-0.20.jar:]
              	at org.apache.qpid.client.AMQConnection.makeBrokerConnection(AMQConnection.java:604) [qpid-client-0.20.jar:]
              	at org.apache.qpid.client.AMQConnection.<init>(AMQConnection.java:383) [qpid-client-0.20.jar:]
              	at org.apache.qpid.client.AMQConnectionFactory.createConnection(AMQConnectionFactory.java:121) [qpid-client-0.20.jar:]
              	at org.springframework.jms.connection.SingleConnectionFactory.doCreateConnection(SingleConnectionFactory.java:342) [org.springframework.jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
              	at org.springframework.jms.connection.SingleConnectionFactory.initConnection(SingleConnectionFactory.java:288) [org.springframework.jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
              	at org.springframework.jms.connection.SingleConnectionFactory.createConnection(SingleConnectionFactory.java:225) [org.springframework.jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
              	at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:184) [org.springframework.jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
              	at org.springframework.jms.listener.AbstractJmsListeningContainer.createSharedConnection(AbstractJmsListeningContainer.java:404) [org.springframework.jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
              	at org.springframework.jms.listener.AbstractJmsListeningContainer.refreshSharedConnection(AbstractJmsListeningContainer.java:389) [org.springframework.jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
              	at org.springframework.jms.listener.DefaultMessageListenerContainer.refreshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:869) [org.springframework.jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
              	at org.springframework.jms.listener.DefaultMessageListenerContainer.recoverAfterListenerSetupFailure(DefaultMessageListenerContainer.java:851) [org.springframework.jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
              	at org.springframework.integration.jms.JmsOutboundGateway$GatewayReplyListenerContainer.recoverAfterListenerSetupFailure(JmsOutboundGateway.java:1028) [spring-integration-jms-2.2.1.RELEASE.jar:]
              	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:982) [org.springframework.jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
              	at java.lang.Thread.run(Thread.java:722) [rt.jar:1.7.0_17]
              Caused by: org.apache.qpid.transport.ConnectionException: connection-forced: Authentication failed
              	at org.apache.qpid.transport.ConnectionException.rethrow(ConnectionException.java:67) [qpid-common-0.20.jar:]
              	at org.apache.qpid.transport.Connection.connect(Connection.java:267) [qpid-common-0.20.jar:]
              	at org.apache.qpid.client.AMQConnectionDelegate_0_10.makeBrokerConnection(AMQConnectionDelegate_0_10.java:221) [qpid-client-0.20.jar:]
              	... 14 more
              Caused by: org.apache.qpid.transport.ConnectionException: connection-forced: Authentication failed
              	at org.apache.qpid.transport.Connection.closeCode(Connection.java:539) [qpid-common-0.20.jar:]
              	at org.apache.qpid.transport.ConnectionDelegate.connectionClose(ConnectionDelegate.java:75) [qpid-common-0.20.jar:]
              	at org.apache.qpid.transport.ConnectionDelegate.connectionClose(ConnectionDelegate.java:40) [qpid-common-0.20.jar:]
              	at org.apache.qpid.transport.ConnectionClose.dispatch(ConnectionClose.java:91) [qpid-common-0.20.jar:]
              	... 16 more
              
              00:00:41,387 ERROR [stderr] (DefaultMessageListenerContainer-28) Exception in thread "DefaultMessageListenerContainer-28" java.lang.NullPointerException
              00:00:41,388 ERROR [stderr] (DefaultMessageListenerContainer-28) 	at org.springframework.jms.listener.AbstractMessageListenerContainer.getDestinationDescription(AbstractMessageListenerContainer.java:207)
              00:00:41,389 ERROR [stderr] (DefaultMessageListenerContainer-28) 	at org.springframework.integration.jms.JmsOutboundGateway$GatewayReplyListenerContainer.getDestinationDescription(JmsOutboundGateway.java:1021)
              00:00:41,399 ERROR [stderr] (DefaultMessageListenerContainer-28) 	at org.springframework.jms.listener.DefaultMessageListenerContainer.refreshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:881)
              00:00:41,400 ERROR [stderr] (DefaultMessageListenerContainer-28) 	at org.springframework.jms.listener.DefaultMessageListenerContainer.recoverAfterListenerSetupFailure(DefaultMessageListenerContainer.java:851)
              00:00:41,408 ERROR [stderr] (DefaultMessageListenerContainer-28) 	at org.springframework.integration.jms.JmsOutboundGateway$GatewayReplyListenerContainer.recoverAfterListenerSetupFailure(JmsOutboundGateway.java:1028)
              00:00:41,408 ERROR [stderr] (DefaultMessageListenerContainer-28) 	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:982)
              00:00:41,409 ERROR [stderr] (DefaultMessageListenerContainer-28) 	at java.lang.Thread.run(Thread.java:722)
              00:00:46,264 INFO  [org.apache.qpid.transport.ClientDelegate] (IoReceiver - /192.168.7.18:17902) The broker does not support the configured connection idle timeout of 120 sec, using the brokers max supported value of 0 sec instead.
              00:00:46,265 INFO  [org.apache.qpid.client.AMQConnection] (org.springframework.jms.listener.DefaultMessageListenerContainer#0-1) Connection 31 now connected from /192.168.7.14:60161 to /192.168.7.18:17902
              00:00:46,266 INFO  [org.springframework.jms.listener.DefaultMessageListenerContainer] (org.springframework.jms.listener.DefaultMessageListenerContainer#0-1) Successfully refreshed JMS Connection
              00:00:46,392 INFO  [org.springframework.integration.jms.JmsOutboundGateway$GatewayReplyListenerContainer] (DefaultMessageListenerContainer-29) JMS message listener invoker needs to establish shared Connection
              00:00:46,408 INFO  [org.apache.qpid.transport.ClientDelegate] (IoReceiver - /192.168.7.18:17902) The broker does not support the configured connection idle timeout of 120 sec, using the brokers max supported value of 0 sec instead.
              00:00:46,411 INFO  [org.apache.qpid.client.AMQConnection] (DefaultMessageListenerContainer-29) Connection 30 now connected from /192.168.7.14:60162 to /192.168.7.18:17902
              00:00:46,411 INFO  [org.springframework.jms.connection.CachingConnectionFactory] (DefaultMessageListenerContainer-29) Established shared JMS Connection: AMQConnection:
              Host: 192.168.7.18
              Port: 17902
              Virtual Host: amq.direct
              Client ID: 192.168.7.18
              Active session count: 0
              00:00:46,412 INFO  [org.springframework.integration.jms.JmsOutboundGateway$GatewayReplyListenerContainer] (DefaultMessageListenerContainer-29) Successfully refreshed JMS Connection
              00:00:49,393 ERROR [stderr] (DefaultMessageListenerContainer-30) Exception in thread "DefaultMessageListenerContainer-30" java.lang.NullPointerException
              00:00:49,394 ERROR [stderr] (DefaultMessageListenerContainer-30) 	at org.springframework.jms.listener.AbstractMessageListenerContainer.getDestinationDescription(AbstractMessageListenerContainer.java:207)
              00:00:49,395 ERROR [stderr] (DefaultMessageListenerContainer-30) 	at org.springframework.integration.jms.JmsOutboundGateway$GatewayReplyListenerContainer.getDestinationDescription(JmsOutboundGateway.java:1021)
              00:00:49,395 ERROR [stderr] (DefaultMessageListenerContainer-30) 	at org.springframework.jms.listener.DefaultMessageListenerContainer.handleListenerSetupFailure(DefaultMessageListenerContainer.java:828)
              00:00:49,396 ERROR [stderr] (DefaultMessageListenerContainer-30) 	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:981)
              00:00:49,398 ERROR [stderr] (DefaultMessageListenerContainer-30) 	at java.lang.Thread.run(Thread.java:722)
              In the next post, the gateway configuration...

              Comment


              • #8
                ...the gateway configuration:

                Code:
                	
                 	<bean id="jmsAmqpCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
                	    <property name="targetConnectionFactory" ref="jmsAmqpConnectionFactory" />
                	    <property name="sessionCacheSize" value="${jms.sessionCacheSize}" />
                	    <property name="reconnectOnException" value="true" />
                	</bean>
                
                	<!-- spring integration for sendAdnReceive over JMS, with reply on a TemporaryQueue -->
                	<int:channel id="jmsRequestChannel"/>
                  
                	<bean id="jmsMessageConverter" class="TestMessageConverter"/>
                  	
                	<int-jms:outbound-gateway id="jmsOutGateway"
                			connection-factory="jmsAmqpCachingConnectionFactory"
                            request-destination-name="${jms.webInQueueName}"
                            request-channel="jmsRequestChannel"
                            receive-timeout="${jms.responseTimeout}"
                            extract-reply-payload="false"
                            message-converter="jmsMessageConverter"
                            correlation-key="JMSCorrelationID">
                	    <int-jms:reply-listener />
                	</int-jms:outbound-gateway>
                	
                	<bean id="messagingTemplate" class="org.springframework.integration.core.MessagingTemplate">
                		<property name="defaultChannel" ref="jmsRequestChannel" />
                	</bean>

                Comment

                Working...
                X