Announcement Announcement Module
Collapse
No announcement yet.
AMQP 1.0.1-SNAPSHOT, DLQ and Retry/Backoff policy Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • AMQP 1.0.1-SNAPSHOT, DLQ and Retry/Backoff policy

    Hello,

    I've been communicating with Gary Russell at vmware (who has been very helpful) concerning AMQP, RabbitMQ and the dead letter queue (DLQ).

    Support for the DLQ has been added in 1.0.1-SNAPSHOT, and the following Spring fragment creates the listener given the ability to set the defaultRequeueRejected value on the amqp:inbound-channel-adapter is not yet supported:

    Code:
        
    	<bean id="retryInterceptor" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    		<property name="retryOperations" ref="retryTemplate" />
    	</bean>		
    	<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    		<property name="backOffPolicy">
    			<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
    				<property name="initialInterval" value="5000" />
    				<property name="maxInterval" value="120000" />
    				<property name="multiplier" value="2" /> <!-- default value here for clarity -->
    			</bean>		
    		</property>
    		<property name="retryPolicy">
    			<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
    				<property name="maxAttempts" value="5" />
    			</bean>		
    		</property>
    	</bean>
    
            <bean id="lc" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    		<property name="queueNames" value="test.in"/>
    		<property name="connectionFactory" ref="connectionFactory"/>
    		<property name="defaultRequeueRejected" value="false"/>
    		<property name="concurrentConsumers" value="1"/>
    		<property name="adviceChain" ref="retryInterceptor"/>
    		<property name="txSize" value="1"/>
    		<property name="acknowledgeMode" value="AUTO" /> 
      		<property name="channelTransacted" value="true" />
            </bean>
    	
            <int-amqp:inbound-channel-adapter channel="channelStart"
    		listener-container="lc" />
    When defaultRequeueRejected is set to false, a the message is moved to the DLQ before the retryInterceptor completes its attempt to process a message 5 times.

    When defaultRequestRejectedis set to true, the retryInterceptor works as expected but the DLQ is not used once it has tried to process the message 5 times.

    It therefore seems as if these two pieces of functionality can not be used together, which seems a fairly common use case, ie make X attempts to process the message with Y seconds between each attempt, and send to the DLQ if the message remains unprocessed.


    John

  • #2
    Hi John,

    This is because the retry interceptor is using AMQP's requeue and redeliver feature for the retries. As you have seen, this is incompatible with setting requeueRejected to false.

    The solution is to inject a custom MessageRecoverer

    Code:
    public interface MessageRecoverer {
    
    	/**
    	 * Callback for message that was consumed but failed all retry attempts.
    	 * 
    	 * @param message the message to recover
    	 * @param cause the cause of the error
    	 */
    	void recover(Message message, Throwable cause);
    
    }
    into the retry interceptor. The default recoverer simply logs and drops the message...

    Code:
    if (messageRecoverer == null) {
    	logger.warn("Message dropped on recovery: " + message, cause);
    } else {
    	messageRecoverer.recover(message, cause);
    }
    If you want the message to go the DLQ after exhausting the retries, a custom MessageRecoverer should throw an AmqpRejectAndDontRequeueException.

    This is why we provided two mechanisms for DLQ support (always DLQ, or optional by throwing that exception).

    Hope that helps.

    Comment


    • #3
      Hello,

      So are you suggesting setting defaultRequeueRejected to true, implementing MessageRecoverer and throwing the exception?


      John

      Comment


      • #4
        Correct - although the defaultRequeueRejected defaults to true, so no need to explicitly set it.

        Comment


        • #5
          Great, it works fine. Here's the solution:

          Code:
          	<bean id="retryInterceptor" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
          		<property name="messageRecoverer">
          			<bean class="DLQMessageRecoverer" />
          		</property>
          		<property name="retryOperations" ref="retryTemplate" />
          	</bean>		
          	<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
          		<property name="backOffPolicy">
          			<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
          				<property name="initialInterval" value="5000" />
          				<property name="maxInterval" value="120000" />
          				 <!-- 
          					default value here for clarity, however currently broken.				 
          				 -->
          				<property name="multiplier" value="2" />
          			</bean>		
          		</property>
          		<property name="retryPolicy">
          			<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
          				<property name="maxAttempts" value="5" />
          			</bean>		
          		</property>
          	</bean>
          		
          	<int-amqp:inbound-channel-adapter queue-names="test.in" channel="channelStart"
          		connection-factory="connectionFactory" channel-transacted="true" 
          		concurrent-consumers="1" tx-size="1"
          		acknowledge-mode="AUTO" advice-chain="retryInterceptor" />
          And this:

          Code:
          public class DLQMessageRecoverer implements MessageRecoverer {
              @Override
              public void recover(Message message, Throwable cause) {
                  throw new AmqpRejectAndDontRequeueException(cause);
              }
          }
          The only remaining problem is the multiplier in the back off policy, which is ignored.

          Comment


          • #6
            HI John,

            Given that spring-retry is not a top level project, please go ahead and open a JIRA against spring-amqp and I'll take a look at it. I realize there's an open issue against BATCH (https://jira.springsource.org/browse/BATCH-1795) but batch currently doesn't use spring-retry.
            Last edited by Gary Russell; Apr 24th, 2012, 07:41 AM.

            Comment


            • #7
              I created this JIRA https://jira.springsource.org/browse/AMQP-226

              will have a pull request soon.

              Comment


              • #8
                I have issued a pull request https://github.com/SpringSource/spring-retry/pull/1

                It might take a while before it is merged into master and show up in a BUILD-SNAPSHOT.

                In the meantime, if you want to test from source, you can

                It will put 1.0.1.BUILD-SNAPSHOT into your local repo.

                Comment


                • #9
                  Hi Garry,

                  When I have set manual ACK, how can do a basicReject on a message when SimpleListerner container receives exception?
                  If we catch RuntimeException and try to do basicReject, Container somehow receives cancel and closes the channel, and queue gets deleted.

                  How to work out dead-lettering in case of manual ACK?

                  regards
                  raghav

                  Comment


                  • #10
                    I just ran this test and it all worked fine (with 1.1.1.RELEASE).

                    Code:
                    @Test
                    public void test() throws Exception {
                    	CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
                    	connectionFactory.setChannelCacheSize(3);
                    	connectionFactory.setPort(BrokerTestUtils.getPort());
                    	RabbitTemplate template = new RabbitTemplate(connectionFactory);
                    
                    	ChannelAwareMessageListener listener = new ChannelAwareMessageListener() {
                    
                    		@Override
                    		public void onMessage(Message message, Channel channel) throws Exception {
                    			try {
                    				if (new String(message.getBody()).equals("bar")) {
                    					throw new RuntimeException("bar - failed");
                    				}
                    				channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                    			}
                    			catch (Exception e) {
                    				System.out.println("Rejecting without requeue");
                    				channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                    			}
                    		}
                    	};
                    	String queueName = "test.queueX";
                    	Queue queue = new Queue(queueName);
                    	RabbitAdmin admin = new RabbitAdmin(connectionFactory);
                    	admin.declareQueue(queue);
                    	template.setRoutingKey(queueName);
                    	SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
                    	container.setMessageListener(listener);
                    	container.setQueueNames(queue.getName());
                    	container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
                    	container.afterPropertiesSet();
                    	container.start();
                    
                    	template.convertAndSend("foo");
                    	template.convertAndSend("bar");
                    
                    	Thread.sleep(10000);
                    	admin.deleteQueue(queueName);
                    }
                    What are you doing differently?

                    Comment


                    • #11
                      Gary,

                      Thanks for reply. I am using "<entry key="x-dead-letter-exchange" value="deadLetterQueue" />" as additional queue argument. When this is added, I get following exception and queue gets deleted (and listener won't work until server restart). If I remove this argument, it starts processing for next message, discarding the one rejected...

                      Exception:
                      ===========

                      [WARN] 2012-07-24 08:59:21,752 org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer - Cancel received
                      [WARN] 2012-07-24 08:59:22,671 org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it
                      org.springframework.amqp.rabbit.listener.ConsumerC ancelledException
                      at org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer.nextMessage(BlockingQueueConsumer.jav a:190)
                      at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.doReceiveAndExecute(SimpleMe ssageListenerContainer.java:458)
                      at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.receiveAndExecute(SimpleMess ageListenerContainer.java:447)
                      at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.access$200(SimpleMessageList enerContainer.java:57)
                      at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er.run(SimpleMessageListenerContainer.java:532)
                      at java.lang.Thread.run(Thread.java:636)
                      [INFO] 2012-07-24 08:59:22,676 org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer - Restarting Consumer: tag=[amq.ctag-QYyv9kNSiKIHV3ZDzdv9qC], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=MANUAL local queue size=0
                      [DEBUG] 2012-07-24 08:59:22,676 org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1)
                      [DEBUG] 2012-07-24 08:59:22,677 org.springframework.amqp.rabbit.connection.Caching ConnectionFactory - Closing cached Channel: AMQChannel(amqp://[email protected]:5672/,1)
                      [DEBUG] 2012-07-24 08:59:22,692 org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer - Starting consumer Consumer: tag=[null], channel=null, acknowledgeMode=MANUAL local queue size=0
                      [DEBUG] 2012-07-24 08:59:22,705 org.springframework.amqp.rabbit.connection.Caching ConnectionFactory - Detected closed channel on exception. Re-initializing: null
                      [WARN] 2012-07-24 08:59:22,708 org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer - Reconnect failed; retries left=2
                      java.io.IOException
                      at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChanne l.java:106)
                      at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChanne l.java:102)
                      at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (AMQChannel.java:124)
                      at com.rabbitmq.client.impl.ChannelN.queueDeclarePass ive(ChannelN.java:755)
                      at com.rabbitmq.client.impl.ChannelN.queueDeclarePass ive(ChannelN.java:61)
                      at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method)
                      at sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:57)
                      at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:43)
                      at java.lang.reflect.Method.invoke(Method.java:616)
                      at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory$CachedChannelInvocationHandler.i nvoke(CachingConnectionFactory.java:331)
                      at $Proxy31.queueDeclarePassive(Unknown Source)
                      at org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer.start(BlockingQueueConsumer.java:213)
                      at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er.run(SimpleMessageListenerContainer.java:516)
                      at java.lang.Thread.run(Thread.java:636)
                      Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND...


                      Extract from code:
                      ===================
                      ----------
                      <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                      xmlns:rabbit="http://www.springframework.org/schema/rabbit"
                      xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schem...rabbit-1.1.xsd
                      http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

                      ......

                      <rabbit:queue name="myMsgQueue">
                      <rabbit:queue-arguments>
                      <!-- Mirror it across all nodes in a clustered mode -->
                      <entry key="x-ha-policy" value="all" />
                      <entry key="x-dead-letter-exchange" value="deadLetterQueue" />
                      </rabbit:queue-arguments>
                      </rabbit:queue>


                      <rabbit:queue name="deadMessageQueue">
                      <rabbit:queue-arguments>
                      <!-- Mirror it across all nodes in a clustered mode -->
                      <entry key="x-ha-policy" value="all" />
                      </rabbit:queue-arguments>
                      </rabbit:queue>


                      <rabbit:direct-exchange name="deadLetterQueue">
                      <rabbit:bindings>
                      <rabbit:binding queue="deadMessageQueue" key="myMsgQueue" />
                      </rabbit:bindings>
                      </rabbit:direct-exchange>
                      .....
                      ------------


                      Listener

                      @Override
                      public void onMessage(Message msg, final Channel channel) {
                      ....
                      try {
                      channel.basicAck(msg.getMessageProperties().getDel iveryTag(), false);
                      }catch(Exception le){
                      logger.error("RUNTIME EXCEPTION: " + le.getMessage());
                      channel.basicReject(msg.getMessageProperties().get DeliveryTag(), false);
                      }

                      ...
                      }

                      Environment:
                      =============
                      Spring 3.1
                      Spring amqp: 1.1.0
                      rabbitmq-java-client: 2.8.1

                      Comment


                      • #12
                        What version of RabbitMQ are you using?

                        There was a problem in 2.8.1 where rejecting a message in a queue configured with a DLE caused durable queues to be deleted. I think they fixed it in 2.8.2 but I haven't tested it.

                        https://groups.google.com/forum/?fro...DLQ$20problems

                        The "workaround" for 2.8.1 was to make the queue non-durable.

                        Comment


                        • #13
                          I tried with 2.8.4 just now, this time message was getting to DLQ - when NORMAL message was published using RabbitMQ Mgmt plugin and after failure it gets in DLQ.

                          However, there is other problem, using Spring AMQP ampqTemplate.convertAndSend("messageQueue", msg) message doesn't get pushed to queue, nither there is any exception. Blocking consumer keeps listening, but doesn't publish normal message.

                          When I re-installed back Rabbitmq 2.8.1, it was pushing message to queue.

                          To summarize, 2.8.4 -- DLQ works, however, message doesn't get published to normal Queue. I mean blocking consumer doesn't receive normal message.
                          2.8.1 -- DLQ fails.

                          This very strange, difficult to find reason.

                          Thanks,
                          Raghav

                          Comment


                          • #14
                            Just to clarify above post, I was not able to publish message to queue (listener didn't receive it) using Spring AMQP 1.1/1.1.1 , RabbitMQ 2.8.4, Java Client 2.8.4 (2.8.1) and there was not exception. Also, rabbitmq logs didn't show anything received.

                            However, when message was published using RabbitMQ mgmt plugin, amqp listener received message

                            Is there any compatibility issue here?

                            Thanks,
                            Raghav

                            Comment


                            • #15
                              FYI, I just upgraded to 2.8.4 and I am seeing no issues when sending messages to the queue, regardless of whether the queue is set up with a dead letter exchange.

                              Comment

                              Working...
                              X