Announcement Announcement Module
Collapse
No announcement yet.
RPC style exception handling with Spring Integration & RabbitMQ Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • RPC style exception handling with Spring Integration & RabbitMQ

    I have a simple producer / consumer setup doing an RPC style call over RabbitMQ using Spring Integration...

    I would like to know what the best paten for handling exceptions thrown by the consumer is? In an ideal world I would have wanted the exception to be thrown directly out of the int:gateway on the producer so that it can be handled by the client.

    This is my producer config:

    Code:
        <rabbit:connection-factory id="connectionFactory"/>
    
        <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
    
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <rabbit:queue name="Q.aggregator.Proceed"/>
    
        <rabbit:direct-exchange name="E.aggregator.direct.Proceed">
            <rabbit:bindings>
                <rabbit:binding queue="Q.aggregator.Proceed" key="aggregator.Proceed"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <rabbit:direct-exchange name="E.thomas.direct.Proceed"/>
    
        <int:channel id="aggregator.Proceed"/>
    
        <int:gateway id="uwEngine" service-interface="some.Interface" default-request-channel="aggregator.Proceed" />
    
        <int-amqp:outbound-gateway request-channel="aggregator.Proceed" amqp-template="amqpTemplate" exchange-name="E.aggregator.direct.Proceed" routing-key="aggregator.Proceed"/>
    This is the consumer:

    Code:
        <int:channel id="aggregator.Proceed"/>
    
        <int-amqp:inbound-gateway request-channel="aggregator.Proceed" queue-names="Q.aggregator.Proceed" connection-factory="connectionFactory" concurrent-consumers="1"/>
    
        <bean id="beany" class="some.Interface"/>
    
        <int:service-activator input-channel="aggregator.Proceed" ref="beany" method="proceed"/>
    I currently get the following error:

    Code:
    INFO: Execution of Rabbit message listener failed, and no ErrorHandler has been set: class org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
    Nov 20, 2012 2:16:00 PM org.springframework.integration.gateway.MessagingGatewaySupport doSendAndReceive
    WARNING: failure occurred in gateway sendAndReceive
    From reading other threads and documentation I can see a couple of options, but none would seem to get the exception back to the client...
    • Define the error-channel property on the int-amqp:inbound-gateway on the consumer and then define an int-amqp:outbound-gateway to send the error back to the client. I am not sure how I would send this back to the temporary queue that was created and the client is blocking on.
    • Define an ErrorHandler, not sure what I should be doing inside it, or where this is documented?
    • Catch all exceptions at the consumer level and wrap these in my response payload.
    • Timeout the response on the client, useless because I won't know why...

    Thanks in advance!

  • #2
    1. Add an error-channel to the inbound-gateway; add a service-activator to consume from that channel to invoke some custom service that examines the payload (MessagingException with failedMessage and cause properties) and creates some form of error message to return to the client (simply return that value from the service and the gateway will return it to the client.

    2. On the client, add a reply-channel to the outbound-gateway; add a consumer that examines the reply and, if it's the error message, throw an exception and it will be propagated to the gateway client. If the message is not an error, simply return it from this response analyzer service and it will be returned to the gateway normally.

    In effect, you are adding a post-processing service after the outbound gateway.

    HTH

    Comment


    • #3
      Thanks Gary, that makes sense.

      Where I get confused is how the reply-channel that is created gets wired back to the int:gateway to receive the response. In my naive mind this channel seems to have no endpoints...

      If I just wanted to handle all exceptions and return these as error codes within my payload, would the service-activator be the place to do this?

      Comment


      • #4
        When you call the <int:gateway/>, the gateway creates a temporary channel and puts it in the message replyChannel header. This channel is ultimately where any reply will be sent; if there is any asynch processing, the thread calling the gateway will block in receive() on this channel. If there is no asynch handoff, the thread will call receive() to immediately retrieve the result when the flow ends. It's not "wired into the gateway", it's wired into the message.

        If you don't want to propagate the error by throwing an exception, you don't need any post-processing on the producer, just assemble the response you want in the service wired into the error-channel on the consumer inbound gateway.

        Comment

        Working...
        X