Announcement Announcement Module
Collapse
No announcement yet.
Durable RPC with Spring Integration and RabbitMQ Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Durable RPC with Spring Integration and RabbitMQ

    I have created a simple message sender and consumer and linked them together using spring-integration and RabbitMq.

    I have used the outbound-gateway and inbound-gateway in order to create RPC (request / response) style messaging. I have not specified a reply-channel as I am leaving this up to spring to create (I am assuming it will create an anonymous queue for the reply).

    This works just fine when the consumer is started before the sender, but I am not receiving a response if the consumer is started after the sender. I can see that the consumer receives the message but no response is returned.

    On further inspection, I can see that a temporary anonymous queue is created in rabbit when the sender sends the message, and the message contains this queue name in the reply-to header. This queue, however, disappears shortly after it is created, before I start up the consumer. I'm guessing that as the queue no longer exists, the consumer cannot publish a response to it.

    I can see from the rabbit admin tool that the anonymous queue is set as exclusive, and auto-delete is set to true. I have no control over these properties though as the queue is being created by spring-integration.

    Does anyone have any ideas of how to solve this? My config is as follows:

    Sender:

    Code:
    <import resource="classpath:rabbit.xml" />
    
    <int:channel id="output" />
    
    <int:gateway id="senderGateway" service-interface="gordon.outbound.SenderGateway" default-request-channel="output"/>
    
    <int-amqp:outbound-gateway request-channel="output"
                                       amqp-template="amqpTemplate" exchange-name="silly-wabbit-exchange"
                                       routing-key="silly-wabbit-key"/>
    Consumer:

    Code:
    <import resource="classpath:rabbit.xml" />
    
    <int:channel id="input"/>
    
    <int-amqp:inbound-gateway request-channel="input" queue-names="silly-wabbit-queue" connection-factory="connectionFactory"/>
    
    <bean id="listenerService" class="gordon.inbound.ListenerService"/>
    
    <int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>

  • #2
    The default replyTimeout is 5 seconds. If the consumer doesn't reply in 5 seconds, the outbound gatgeway's consumer is canceled, removing the temporary queue.

    You can increase the timeout by configuring a reply-timeout on the <rabbit:template/> (milliseconds).

    Comment


    • #3
      Thanks Gary... Would I expect to see an exception or something similar upon timeout so that I can react to it in my code?

      Currently it just seems to continue to block.

      Also, is there a suggested approach to what the consumer should then do with a message that the producer is no longer waiting for?

      Comment


      • #4
        It shouldn't be blocked - under DEBUG logging, you should see

        logger.debug("handler '" + this + "' produced no reply for request Message: " + message);

        In a future release, we could expose an (existing) property "requiresReply", which would cause the gateway to throw an exception rather than just emitting a DEBUG log.

        However, if you believe it's blocking, can you submit a thread dump? (use jstack, or VisualVM)

        Comment


        • #5
          I see you are using an <int:gateway/> to send to the amqp gateway; it WILL block (because it's default timeout is infinity). You can set a default-reply-timeout on the <int:gateway/> and you'll get an exception.

          Comment

          Working...
          X