Announcement Announcement Module
Collapse
No announcement yet.
Listening to auto-generated queues with <int-amqp:inbound-channel-adapter /> Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Listening to auto-generated queues with <int-amqp:inbound-channel-adapter />

    Hi all, and congratulations to every one working on Spring Integration Team
    and Spring AMQP, your work is great!

    I'm trying this kind of client-server (master-worker) communication using
    Spring-based applications and Rabbit MQ server:
    • Multiple clients send requests to one or more servers.
    • Each request is processed by only one server.

    I implemented this with AMQP outbound gateway in client, and AMQP inbound
    gateway in server. Client publishes to a topic exchange, 'client.requests',
    with a routing key like 'request.type.server.one'. Server listens to a
    exclusive, durable, non auto-delete queue which is binded to
    'client.request' topic with pattern '#.server.one'. Client listens to reply
    messages on an auto-generated queue.

    This is working OK, though a new queue is generated on each request and
    client is not sending ACK when receiving the server's reply (I'm not sure
    of this, but this is what I assume after looking at logging and Rabbit MQ
    http management console), but for the moment this is OK (though this is not
    the main question I ask in this post, any clue that helps me to resolve
    this is appreciated too).

    Now my main question is that I want the server to notify all clients the
    exit status of the operation done on each request. The queues on which
    clients listen to the notifications must be auto-generated too. I figured
    that this could be accomplished with an inbound channel adapter. I post my
    configuration.

    Code:
    <rabbit:queue id="anonymous.notification.listening.queue" />
    
    <rabbit:topic-exchange name="server.notification">
      <rabbit:bindings>
        <rabbit:binding queue="anonymous.notification.listening.queue"
          pattern="#.notification.server.one" />
        </rabbit:bindings>
    </rabbit:topic-exchange>
    
    <int-amqp:inbound-channel-adapter
      channel="from.rabbit" connection-factory="rabbit.connection.factory"
      queue-names="anonymous.notification.listening.queue" />
    I use id in 'queue' element cause I want the framework to auto generate the
    queue. And this is done, I guess, but I get an exception.

    Code:
    Exception in thread "main" org.springframework.context.ApplicationContextException: Failed to start bean 'fromRabbitAdapter'; nested exception is org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup
    	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:169)
    	at org.springframework.context.support.DefaultLifecycleProcessor.access$1(DefaultLifecycleProcessor.java:154)
    	at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:335)
    	at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:143)
    	at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:108)
    	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:908)
    	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:428)
    	at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
    	at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83)
    	at es.umh.intelvia.Main.main(Main.java:32)
    Caused by: org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:309)
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:358)
    	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.doStart(AmqpInboundChannelAdapter.java:82)
    	at org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:84)
    	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:166)
    	... 9 more
    Caused by: org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:192)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:489)
    	at java.lang.Thread.run(Thread.java:679)
    Caused by: java.io.IOException
    	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:107)
    	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:131)
    	at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:643)
    	at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:59)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:616)
    	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:298)
    	at $Proxy0.queueDeclarePassive(Unknown Source)
    	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:188)
    	... 2 more
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'anonymous.notification.listening.queue' in vhost '/intelvia', class-id=50, method-id=10),null,""}
    	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:328)
    	at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:201)
    	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:125)
    	... 11 more
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'anonymous.notification.listening.queue' in vhost '/intelvia', class-id=50, method-id=10),null,""}
    	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:384)
    	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:235)
    	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:151)
    	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:96)
    	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:441)

    The exception is caused, as far as I can see, by the inbound adapter (by
    the message listener or the listener container framework instantiates, I
    think) who tries to get a queue declared with that name and not the
    auto-generated Queue object that is in Spring container.

    I don't know how to solve this, or if this is the correct way for achieving
    what I want.


    Any help will be much appreciated.

  • #2
    SOLVED: Listening to auto-generated queues with &lt;int-amqp:inbound-channel-adapter /&gt;

    Thanks to gnuphie, in his/her thread Messages not being auto-acked he solved my main question, maybe accidentally.

    Problem is solved using SpEL, changing this code:

    Code:
    <rabbit:queue id="anonymous.notification.listening.queue" />
    
    <rabbit:topic-exchange name="server.notification">
      <rabbit:bindings>
        <rabbit:binding queue="anonymous.notification.listening.queue"
          pattern="#.notification.server.one" />
        </rabbit:bindings>
    </rabbit:topic-exchange>
    
    <int-amqp:inbound-channel-adapter
      channel="from.rabbit" connection-factory="rabbit.connection.factory"
      queue-names="anonymous.notification.listening.queue" />
    to this one:

    Code:
    <rabbit:queue id="anonymousQueue" />
    
    <rabbit:topic-exchange name="server.notification">
      <rabbit:bindings>
        <rabbit:binding queue="anonymousQueue"
          pattern="#.notification.server.one" />
        </rabbit:bindings>
    </rabbit:topic-exchange>
    
    <int-amqp:inbound-channel-adapter
      channel="from.rabbit" connection-factory="rabbit.connection.factory"
      queue-names="#{anonymousQueue.getName()}" />

    And my secondary question, messages not being auto-acked is solved in the post I reference.

    Comment

    Working...
    X