Announcement Announcement Module
No announcement yet.
how to handle MessageConversionException in amqp inbound gateway(consumer) Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • how to handle MessageConversionException in amqp inbound gateway(consumer)

    Hi I have configured an amqp gate way with my own custom message converter.
    When ever a messageconversion exception is thrown, I am seeing that the gateway is not sending a NACK or ACK to the broker and
    the message is kept in the queue assuming that other consumers might pick this message up. Other consumers also does the same and these message which caused message conversion exception or neither processed nor removed from the Queue.

    What would be a good way of gracefully handling these message at the consumer side. I don want to handle this at broker by configuring x-message-ttl or dead-letter-exchange or an alternate-exchange.

    I would like to send a error message with a error code to the producer in such scenario.

    My initial thought was to configure a error handler as well through the message listener container and send an error message to the producer, but unfortunately the error handler will not have enough context to know where it should dispatch the message (this is because the error handler interface is void handleError(Throwable t)).
    I would not be able to pass context information like reply queue name, exchange name etc to my custom error handler which can produce a error message for the message producer.

    Since the messageListenerContainer in AmqpInboundGateway is private final, i cannot even extend AmqpInboundGateway and set my own implementation of MessageListener instead of the ananymous messageListener implementation.

    Would it be possible to make the messageListenerContainer protected in the future release of spring-integration-amqp?

    Or can you suggest me any other way of gracefully handled MessageconversionException in the consumer end?

    P.S configuring errorChannel will be of no help because the MessageconversionException happens at the listener container level and the message is not even handed over to SI completely.

  • #2
    The recent spring-amqp 1.1.0 release introduced a new attribute on the listener-container...

    		<xsd:attribute name="requeue-rejected" type="xsd:string">
    	Tells the container the default requeue behavior when rejecting messages. Default is 'true' meaning messages
    	will be requeued, unless the listener signals not to by throwing an AmqpRejectAndDontRequeueException. When
    	set to false, messages will never be requeued.
    		<xsd:attribute name="phase" type="xsd:string">
    As noted in the schema docs, if you want to do it selectively, you can leave it as the default (true) and throw an AmqpRejectAndDontRequeueException (or set it as the cause of the MessageConversionException) from your custom message converter.

    This was intended to facilitate delivery to a Dead Letter Exchange, but it will also work when no DLE is defined.

    You will have to generate and send the error message from within your converter, perhaps by delegating to some custom error handler there.


    • #3
      Also, if you can't move to 1.1.0; there is a discussion here about putting a stateful retry interceptor on the listener's advice-chain to catch and handle the exception.


      • #4
        Thanks Gary,
        I am now throwing AmqpRejectAndDontRequeueException as the cause of my message conversion exception and it worked well.
        If no DLE is configured, the rouge message are removed from queue, if a DLE is configured the messages are getting reddirected to the DLE.
        Works wonderfully out of box thanks a lot.


        • #5
          Throwing AmqpRejectAndDontRequeueException works great. Thanks for the post, saved my day. However, it would be great if it was documented somewhere. Originally, I thought that throwing any exception would result in message reject.


          • #6
            it would be great if it was documented somewhere...

            The Reference Manual in the section "Message Listeners and the Asynchronous Case" talks about this exception and the "rejectRequeued" property (and that they can be used with Dead Letter Exchanges).

            Feel free to open a Documentation JIRA Issue if you feel there is room for improvement.