Announcement Announcement Module
Collapse
No announcement yet.
Consumer doesn't recover from com.rabbitmq.client.ShutdownSignalException Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Consumer doesn't recover from com.rabbitmq.client.ShutdownSignalException

    For the past few months we have been running our system on spring integration using rabbitmq broker without any problems. Until last night... One process in the system stopped receiving messages from RabbitMQ.

    Tracing back through the logs we get back to the exception below (the messages are repeated multiple times, I am guessing this is per each listening thread in the process). It looks like the connection to RabbitMQ was disconnected briefly, at present don't know why. Anyway my understanding was that the listener container would repeatedly retry the connection until it connects again.

    The problem we have is the process didn't receive any more messages afterwards. I think this is due to the queue being exclusive, and non-durable, and the rabbitmq broker deleting the queue when the connection was lost. When the listener container reconnected it doesn't try to re-declare the queues.

    Is there any way to re-declare the queues on a reconnect, or is it recommended to use durable queues for issues such as these.

    The following is an extract from the rabbitmq broker log when the connection is lost.
    Rabbitmq version is RabbitMQ 3.0.2, Erlang R14B04 running on RedHat 5.

    Code:
    =WARNING REPORT==== 9-Mar-2013::00:01:13 ===
    closing AMQP connection <0.11895.0> (217.74.234.15:37592 -> 5.11.10.130:5672):
    connection_closed_abruptly
    
    =INFO REPORT==== 9-Mar-2013::00:01:13 ===
    accepting AMQP connection <0.15676.24> (217.74.234.15:36648 -> 5.11.10.130:5672)
    
    =ERROR REPORT==== 9-Mar-2013::00:01:18 ===
    connection <0.15676.24>, channel 1 - soft error:
    {amqp_error,not_found,
                "no queue 'amq.gen-IB-jfmjJ0LN0uRaNKk4eog' in vhost '/'",
                'queue.declare'}
    
    =ERROR REPORT==== 9-Mar-2013::00:01:18 ===
    connection <0.15676.24>, channel 7 - soft error:
    {amqp_error,not_found,
                "no queue 'amq.gen-IB-jfmjJ0LN0uRaNKk4eog' in vhost '/'",
                'queue.declare'}
    The exception
    Code:
    2013-03-09 00:05:41 thread="SimpleAsyncTaskExecutor-48" class="org.springframework.amqp.rabbit.listener.BlockingQueueConsumer" level="WARN" application="myappname" Reconnect failed; retries left=0
    java.io.IOException: null
            at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:755) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61) ~[amqp-client-2.8.4.jar:na]
            at sun.reflect.GeneratedMethodAccessor85.invoke(Unknown Source) ~[na:na]
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.6.0_24]
            at java.lang.reflect.Method.invoke(Method.java:616) ~[na:1.6.0_24]
            at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:348) ~[spring-rabbit-1.1.2.RELEASE.jar:na]
            at $Proxy53.queueDeclarePassive(Unknown Source) ~[na:na]
            at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:213) ~[spring-rabbit-1.1.2.RELEASE.jar:na]
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:524) [spring-rabbit-1.1.2.RELEASE.jar:na]
            at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24]
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'amq.gen-IB-jfmjJ0LN0uRaNKk4eog' in vhost '/', class-id=50, method-id=10), null, ""}
            at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ~[amqp-client-2.8.4.jar:na]
            ... 10 common frames omitted
    com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'amq.gen-IB-jfmjJ0LN0uRaNKk4eog' in vhost '/', class-id=50, method-id=10), null, ""}
            at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:454) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:294) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:526) ~[amqp-client-2.8.4.jar:na]
    2013-03-09 00:05:41 thread="SimpleAsyncTaskExecutor-50" class="org.springframework.amqp.rabbit.listener.BlockingQueueConsumer" level="WARN" application="myappname" Reconnect failed; retries left=0
    java.io.IOException: null
            at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:755) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61) ~[amqp-client-2.8.4.jar:na]
            at sun.reflect.GeneratedMethodAccessor85.invoke(Unknown Source) ~[na:na]
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.6.0_24]
            at java.lang.reflect.Method.invoke(Method.java:616) ~[na:1.6.0_24]
            at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:348) ~[spring-rabbit-1.1.2.RELEASE.jar:na]
            at $Proxy53.queueDeclarePassive(Unknown Source) ~[na:na]
            at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:213) ~[spring-rabbit-1.1.2.RELEASE.jar:na]
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:524) [spring-rabbit-1.1.2.RELEASE.jar:na]
            at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24]
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'amq.gen-IB-jfmjJ0LN0uRaNKk4eog' in vhost '/', class-id=50, method-id=10), null, ""}
            at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) ~[amqp-client-2.8.4.jar:na]
            at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ~[amqp-client-2.8.4.jar:na]
    ...
    ... lots more exceptions like above
    ...
    2013-03-09 00:05:42 thread="SimpleAsyncTaskExecutor-49" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer" level="INFO" application="myappname" Stopping container from aborted consumer
    2013-03-09 00:05:42 thread="SimpleAsyncTaskExecutor-49" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer" level="INFO" application="myappname" Waiting for workers to finish.
    2013-03-09 00:05:42 thread="SimpleAsyncTaskExecutor-47" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer" level="ERROR" application="myappname" Consumer received fatal exception on startup

  • #2
    You can automatically declare your queues by including them in your application context, and including a Rabbit Admin. The admin registers itself as a connection listener and will redeclare when the connection is reestablished.

    However, you have to give queues a name attribute - it looks like you are using auto-generated queue names (amq.gen-IB-jfmjJ0LN0uRaNKk4eog).

    I just ran a test and everything worked fine, as expected, for me...

    Code:
    <rabbit:admin connection-factory="connectionFactory" />
    
    <rabbit:queue name="si.test.queue" durable="false" exclusive="true" />
    09:24:44.110 WARN [SimpleAsyncTaskExecutor-1][org.springframework.amqp.rabbit.core.RabbitAdmin] Auto-declaring a non-durable Queue (si.test.queue). It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.

    09:24:44.111 WARN [SimpleAsyncTaskExecutor-1][org.springframework.amqp.rabbit.core.RabbitAdmin] Auto-declaring an exclusive Queue (si.test.queue). It cannot be accessed by consumers on another connection, and will be redeclared if the connection is reopened.
    ...
    09:24:52.179 WARN [SimpleAsyncTaskExecutor-1][org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer] Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: com.rabbitmq.client.ShutdownSignalException: connection error; reason: {#method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0), null, ""}
    ...
    09:25:02.188 WARN [SimpleAsyncTaskExecutor-3][org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer] Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused
    ...
    09:25:02.193 WARN [SimpleAsyncTaskExecutor-4][org.springframework.amqp.rabbit.core.RabbitAdmin] Auto-declaring a non-durable Queue (si.test.queue). It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.

    09:25:02.193 WARN [SimpleAsyncTaskExecutor-4][org.springframework.amqp.rabbit.core.RabbitAdmin] Auto-declaring an exclusive Queue (si.test.queue). It cannot be accessed by consumers on another connection, and will be redeclared if the connection is reopened.

    Comment


    • #3
      Thanks Gary for your example.

      It highlighted the issue was in our code, it was how the queue object was created in our java code. The queue wasn't declared as a bean in the application context so wasn't picked up by the RabbitAdmin class when the connection was re-established.

      We are now changing the code to declare the queue as a bean.

      Comment


      • #4
        Glad to see that you're on the right track now. I've added a JIRA issue so that we'll be sure to highlight this requirement for explicitly named queues (if redeclaration is expected) within the reference manual:
        https://jira.springsource.org/browse/AMQP-299

        Thanks,
        Mark

        Comment

        Working...
        X