Announcement Announcement Module
No announcement yet.
Manual acknowlegment in synchronous processing Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Manual acknowlegment in synchronous processing

    Hi, Im new to AMQP and RabbitMQ. Im going through documentation, articles, posts and still cant find a way to implement functionality I need.

    I have one rabbit server and one consuming java application. I need to control number of messages processed in particular period of time so I think synchronous processing (using RabbitTemplate.receive()) is better choice.

    What I also need is to stop processing messages (without stopping application) when exception shows up. Of course I dont want the message to be lost - I want it to be delivered when app is started again. When I set rabbitTemplate.setChannelTransacted(true) message is returned to queue on exception. Then, as expected, message is consumed again and same exception.. over and over again.

    I can set channelTransacted to false and when exception occurs, rabbit server is waiting for ack (which it obviously cannot get) and application is not processing other messages. But then, the problem is how application can send ack when message is processed properly (as far as I can see I cant do that from RabbitTemplate's interface).

    Maybe there is a standard, common way of dealing with these kind of problems? I cant let the message for neverending circulation between server and consumer but I also cant stop the application on exception.

  • #2
    I assume you mean you are using RabbitTemplate.receive() within an @Transactional method (with a RabbitTransactionManager). Otherwise, template.receive() will do the commit before it returns and throwing an exception won't force a rollback.

    If you use a listener container (you can control the delivery rate by having a single consumer and pause the thread if needed), there are a couple of ways to control the action on a failure, including the addition of a retry advice, which can try the message a few times then give up; or you can configure a Dead Letter Exchange/Queue to send the bad (poison) message to another queue. Or, a combination of the two.


    • #3
      Thanks for your suggestions.

      I decided to switch to asynchronous processing with single consumer just like you wrote.

      I also work on adapting approach with retry interceptor and dead letters queue. I want the problematic message to be send to dead letters queue after 3 failures. Unfortunately, when I throw AmqpRejectAndDontRequeueException in my message recoverer I got the following:

      12-10-12 17:01:55:500 - {WARN} listener.SimpleMessageListenerContainer Thread [SimpleAsyncTaskExecutor-1];  Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpRejectAndDontRequeueException: 
      12-10-12 17:01:55:505 - {INFO} listener.SimpleMessageListenerContainer Thread [SimpleAsyncTaskExecutor-1];  Restarting Consumer: tag=[amq.ctag-Qlp3--gG_COX49OrFzf25k], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), acknowledgeMode=AUTO local queue size=0
      Then the application doesnt consume messages anymore. If I dont use retry interceptor and throw AmqpRejectAndDontRequeueException in my delegate I dont get any warns and consumer still works. In both cases bad message doesnt appear in dead letter queue - frankly, Im not sure how to make consumer use my dead queue.


      • #4
        Sorry, I forgot to mention that when used with the adapter, the exception has to be wrapped.

        I also forgot to mention that we now (as of 1.1.2) provide a recoverer for you - see RejectAndDontRequeueRecoverer - that does just that.

        In order to route to the DLE/DLQ, the original queue must be configured with the target DLE to be used when the consumer rejects a message.


        • #5
          Thanks, everything works fine now