Announcement Announcement Module
Collapse
No announcement yet.
BlockingQueueConsumer fails because of missing queue in RabbitMQ Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • BlockingQueueConsumer fails because of missing queue in RabbitMQ

    Hi,

    I have an application that consumes messages from RabbitMQ. Here is the basic XML configuration:

    <rabbit:admin connection-factory="connectionFactory" auto-startup="false" />

    <rabbit:queue name="myproduct.xmlProcessingReplyQueue"/>
    <rabbit:queue name="myproduct.xmlProcessingRequestQueue" />

    When my application starts (with a freshly installed RabbitMQ) I get these exceptions:

    Code:
    2012-05-31 14:48:21,867 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Starting consumer Consumer: tag=[null], channel=null, acknowledgeMode=AUTO local queue size=0
    2012-05-31 14:48:21,867 [SimpleAsyncTaskExecutor-3] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Starting consumer Consumer: tag=[null], channel=null, acknowledgeMode=AUTO local queue size=0
    2012-05-31 14:48:21,867 [SimpleAsyncTaskExecutor-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Starting consumer Consumer: tag=[null], channel=null, acknowledgeMode=AUTO local queue size=0
    2012-05-31 14:48:22,348 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,3)
    2012-05-31 14:48:22,358 [SimpleAsyncTaskExecutor-3] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,2)
    2012-05-31 14:48:22,358 [SimpleAsyncTaskExecutor-2] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,1)
    2012-05-31 14:48:22,498 [SimpleAsyncTaskExecutor-2] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory Detected closed channel on exception.  Re-initializing: null
    2012-05-31 14:48:22,498 [SimpleAsyncTaskExecutor-3] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory Detected closed channel on exception.  Re-initializing: null
    2012-05-31 14:48:22,508 [SimpleAsyncTaskExecutor-2] WARN  org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Reconnect failed; retries left=2
    java.io.IOException
    	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    	at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:755)
    	at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.6.0_22]
    	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)[:1.6.0_22]
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)[:1.6.0_22]
    	at java.lang.reflect.Method.invoke(Unknown Source)[:1.6.0_22]
    	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:331)
    	at $Proxy52.queueDeclarePassive(Unknown Source)
    	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:213)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:516)
    	at java.lang.Thread.run(Unknown Source)[:1.6.0_22]
    And further down:


    Code:
    2012-05-31 14:48:37,760 [SimpleAsyncTaskExecutor-2] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory Detected closed channel on exception.  Re-initializing: null
    2012-05-31 14:48:37,760 [SimpleAsyncTaskExecutor-2] ERROR org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup
    org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:228)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:516)
    	at java.lang.Thread.run(Unknown Source)[:1.6.0_22]
    I have a feeling that the queues are not detected because the <rabbit:admin> tag has auto-startup=false and so it doesn't create the queues. I'm not sure what to do about it though...

    Anyone?

  • #2
    I believe you are right regarding the auto-startup flag. Why do you need that to be false?

    Comment


    • #3
      Because when the application starts up, RabbitMQ may not yet be available. The application is designed to start up and allow a user, through interaction, to start the messaging services.

      I just checked with auto-startup and it does in fact seem to be the problem (queueDeclarePassive() is invoked, which checks if the queue exists).

      Comment


      • #4
        Can you not simply declare the queues in the rabbitmq.config, rather than via RabbitAdmin?

        Comment


        • #5
          Well, this would require the customer/end user to do some manual configuration which we want to avoid if at all possible. But even if we decided to add the configuration to rabbitmq.config, it would not help with the server not being available when the application starts. Or that the location of the server may not be immediately known.

          Besides, the auto-startup attribute exists - isn't it supposed to work? :-)

          Comment


          • #6
            Yes, it does work; that's why you don't have your queues declared automatically

            I think what you need to do is also set auto-startup=false on your message-listener container and/or any channel-adapters/gateways.

            Comment


            • #7
              Just to clarify; if you set auto-startup to false on the rabbit admin it will not automatically declare queues etc found in the context. Any elements using those queues must also not be started.

              When you are ready to start, get a reference to the RabbitAdmin bean, and call initialize(); you can then start the other components, such as listener containers, Spring Integration adapters etc.

              Comment


              • #8
                If I have to get a reference to the RabbitAdmin instance in my code, in order to call initialize() on it, I'll be creating a fairly strong coupling to RabbitMQ inside my code. I would like to avoid this, if possible, as the idea is to be able to use a different message infrastructure at some later point.

                Do you have any suggestions for how I might call initialize() on the RabbitAdmin instance without actually referencing any rabbit specific classes in my code?

                Comment


                • #9
                  Okay, I fixed the above by introducing an Admin interface in my code, which is then initialized by a messaging-specific implementation. This way the coupling is minimal.

                  Comment


                  • #10
                    Another option is to send a message to a void return gateway proxy method where the <gateway/>'s request channel is connected to a <control-bus/>.

                    The message should contain a String payload "@rabbitAdmin.initialize()".

                    Comment

                    Working...
                    X