Announcement Announcement Module
Collapse
No announcement yet.
Exception causes infinite loop Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Exception causes infinite loop

    Hello,

    I have a client talking to a server via Spring Integration/AMQP and RabbitMQ. The client uses an amqp:outbound-gateway and the server uses an amqp:inbound-gateway configuration.

    When the client creates a message which processes without errors, the response is sent back as expected.

    When the client creates a message which causes the server to throw an exception, it seems as if the message is sent right back into the server, causing yet another exception - etc.

    The server code responsible for returning a reply is a @ServiceActivator with the following general appearance:

    Code:
    	@ServiceActivator(inputChannel="xmlProcessingRequestChannel")
    	public byte[] handleMessage(final Message<String> message) throws Exception {
    		LOG.debug("Handling AMQP message: " + message);
    
    		...process message, return byte[] or throw exception...
    Some excerpts from my log:

    Code:
    15:16:51,321 [SimpleAsyncTaskExecutor-1] WARN  org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Execution of Rabbit message listener failed, and no ErrorHandler has been set.
    org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:588)
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:527)
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:470)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:57)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:106)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:587)
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:450)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:463)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:447)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:57)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:532)
    	at java.lang.Thread.run(Thread.java:722)[:1.7.0_03]
    2012-05-24 15:16:51,333 [main] DEBUG Service.class Marking service as idle
    2012-05-24 15:16:51,333 [main] DEBUG Service.class Processing 1 incoming messages
    2012-05-24 15:16:51,333 [main] DEBUG Service.class Service received taskRunner message: com.myproduct.xml.scheduler.Service$2$1@76dd8c85
    2012-05-24 15:16:51,333 [main] DEBUG Service.class Marking service as executing
    2012-05-24 15:16:51,333 [main] DEBUG Service.class Executing job processing tasks
    2012-05-24 15:16:51,333 [main] DEBUG Service.class Processing tasks
    2012-05-24 15:16:51,333 [main] DEBUG Service.class   Executing task: PrintJobExecutorTask
    2012-05-24 15:16:51,333 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Rejecting messages
    2012-05-24 15:16:51,333 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Retrieving delivery for Consumer: tag=[amq.ctag-AZGJvn5oMDpb7zB_wFoOrZ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,3), acknowledgeMode=AUTO local queue size=0
    2012-05-24 15:16:51,334 [pool-1-thread-3] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Storing delivery for Consumer: tag=[amq.ctag-AZGJvn5oMDpb7zB_wFoOrZ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,3), acknowledgeMode=AUTO local queue size=0
    2012-05-24 15:16:51,334 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Received message: (Body:'<xml test="4"/'; ID:null; Content:text/plain; Headers:{spring_reply_correlation=3199198d-f273-49f6-900c-8e414ae2a566, spring_reply_to=null, x-myproduct-metadata-id=olsen}; Exchange:xml-request-delivery; RoutingKey:xml-request.number; Reply:myproduct.xmlProcessingReplyQueue; DeliveryMode:PERSISTENT; DeliveryTag:3781)
    2012-05-24 15:16:51,334 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper headerName=[amqp_receivedRoutingKey] WILL NOT be mapped

  • #2
    You can do a few things here:

    1. add an error-channel on the inbound-gateway in order to deal with the error message (potentially still sending a reply to the client)

    2. add an ErrorHandler to the message listener via the gateway to deal with the Exception directly

    3. configure a retry interceptor (see the AMQP stocks sample on github for spring-amqp as an example)

    Comment


    • #3
      I should also mention that I am using an async gateway, meaning the client servicegateway is defined to return a Future<x>. As the client is waiting in the future.get() call, the server is running amok through some sort of feedback loop.

      Comment


      • #4
        Hello Mark,

        Option 1 sounds like it might be the best way to go. But how do I get the messages back to the producer/client? In my scenario the client has a Future<xxx> that it obtained from the service gateway proxy, and is currently waiting for a reply using the .get(timeout) method.

        If the service activator method throws an exception, then how does the message get back to the client, and what exactly will the client receive?

        (Preferably I want the client to know about the exception as quickly as possible)

        Comment

        Working...
        X