Announcement Announcement Module
Collapse
No announcement yet.
Retry Mechanism with delay using spring retry framework Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Retry Mechanism with delay using spring retry framework

    Good morning Dave.
    Dave I am trying to you spring's retry framework (org.springframework.retry).
    I have tested out retry mechanism with the attached configuration.
    I wanted to find if I could set delay (for example 1 minute) before each retry.
    I appreciate and thank you for your feedback.

    Thanks
    Venkat

    Parent Queue Configuration:
    ----------------------------
    @Configuration
    public abstract class AbstractMDBRabbitConfiguration {

    protected abstract void configureMDBTemplate(DMBTemplate template);

    @Bean
    public ConnectionFactory connectionFactory() {
    String rabbitMQUser = "guest";
    String rabbitMQPassword = "guest";
    String rabbitMQHost = "localhost";
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMQHost);
    connectionFactory.setChannelCacheSize(10);
    connectionFactory.setUsername(rabbitMQUser);
    connectionFactory.setPassword(rabbitMQPassword);
    return connectionFactory;
    }

    @Bean
    public DMBTemplate rabbitTemplate() {
    DMBTemplate template = new DMBTemplate(connectionFactory());
    template.setMessageConverter(messageConverter());
    configureMDBTemplate(template);
    return template;
    }

    @Bean
    public MessageConverter messageConverter() {
    SimpleMessageConverter messageConverter = new SimpleMessageConverter();
    messageConverter.setCreateMessageIds(true);
    return messageConverter;
    }

    @Bean
    public Advice retryInterceptor(){
    StatefulRetryOperationsInterceptorFactoryBean retry = new StatefulRetryOperationsInterceptorFactoryBean();
    retry.setRetryOperations(retryTemplate());
    retry.setMessageRecoverer(messageRecoverer());
    return retry.getObject();
    }


    @Bean
    public RetryTemplate retryTemplate(){
    RetryTemplate retryTemplate = new RetryTemplate();
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
    }


    @Bean
    public MessageRecoverer messageRecoverer() {
    return new DMBMessageRecoverer();
    }



    @Bean
    public AmqpAdmin amqpAdmin() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
    return rabbitAdmin ;
    }

    }
    ------------------------------------------------------------------------------------------
    Concrete Queue Configuration:
    ------------------------------
    @Configuration
    public class WriteToFileConfig extends AbstractMDBRabbitConfiguration {
    private static final String MDB_AUDIT_EXCHANGE_NAME = "write.tofile.exchange";

    /** The Constant MDB_QUEUE_NAME. */
    private static final String MDB_QUEUE_NAME = "write.tofile.queue";

    /** The Constant MDB_ROUTING_KEY. */
    private static final String MDB_ROUTING_KEY = MDB_QUEUE_NAME;


    @Override
    protected void configureMDBTemplate(DMBTemplate template) {
    template.setExchange(MDB_AUDIT_EXCHANGE_NAME);
    template.setRoutingKey(MDB_ROUTING_KEY);

    }

    @Bean
    public Queue writeQueue() {
    Map<String, Object> args = new HashMap<String, Object>();
    String haPolicyValue = CommonsConfigurationServiceImpl.getPropertyValue(C ommonsConfigurationConstants.RABBIT_MQ_CONFIG_NAME , "rabbitmq.ha.policy");
    args.put("x-ha-policy", haPolicyValue);
    Queue queue = new Queue(MDB_QUEUE_NAME,true,false,false,args);
    return queue;
    }

    @Bean
    public TopicExchange writeExchange() {
    TopicExchange exchange = new TopicExchange(MDB_AUDIT_EXCHANGE_NAME,true,false);
    return exchange;
    }

    @Bean
    public Binding writeBinding() {
    return BindingBuilder.bind(writeQueue()).to(writeExchange ()).with(MDB_ROUTING_KEY);
    }


    }
    -----------------------------------------------------------------------------------------------------------------------
    consumer configuration:
    =======================
    <context:component-scan base-package="com.pb.test.write.file">
    </context:component-scan>

    <!-- central log messaging config -->

    <bean id="messageListenerContainer" class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="write.tofile.queue"/>
    <property name="concurrentConsumers" value="5" />
    <property name="txSize" value="100" />
    <property name="prefetchCount" value="100" />
    <property name="adviceChain" >
    <list>
    <ref bean="retryInterceptor"/>
    </list>
    </property>

    <property name="messageListener" ref="messageListenerAdapter" />
    </bean>

    <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.ad apter.MessageListenerAdapter">
    <property name="delegate" ref="writeToFileHandler" />
    <property name="messageConverter" ref="messageConverter" />
    </bean>

    <!-- central log handlers config -->


    <bean id="writeToFileHandler" class="com.pb.test.write.file.WriteToFileProcessor ">
    </bean>

  • #2
    That all looks fine. For a backoff you need to add a BackOffPolicy to the RetryTemplate when you create it. 1 minute is a bit of a long wait to start with, but it should work.

    Comment


    • #3
      Good afternoon Dave. Thanks for your feedback.
      Dave I tried to use ExponentialBackOffPolicy configured as follows. I have set initial interval as 1000 milliseconds. When I executed test program that throws an exception every time it gets into Queue Consumer, I found that the all retries are using the same value as initialInterval (1000 ms). I placed a break-point in the following method:
      public synchronized long getSleepAndIncrement() {
      long sleep = this.interval;
      if (sleep > maxInterval) {
      sleep = (long) maxInterval;
      }
      else {
      this.interval *= this.multiplier;
      }
      return sleep;
      }
      }

      I noticed that interval get incremented by the multiplier. But the interval value is always 1000 and thus it is only retrying each retry with 1000 ms for all four retries. Could you please let me know if I am missing any setting.

      Dave one more thing I noticed is that if I set maxAttempts as five in SimpleTryPolicy it is retrying for ten times. If I keep this number less tha five it works as expected.

      Thanks
      Venkat


      @Bean
      public RetryTemplate retryTemplate(){
      RetryTemplate retryTemplate = new RetryTemplate();
      SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
      retryPolicy.setMaxAttempts(4);
      ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
      exponentialBackOffPolicy.setInitialInterval(1000L) ;
      retryTemplate.setRetryPolicy(retryPolicy);
      retryTemplate.setBackOffPolicy(exponentialBackOffP olicy);

      return retryTemplate;
      }

      Comment


      • #4
        You can cut down some of the confusion by setting the txSize=1 while you debug what is going on. The state for the backoff (and retry) is stored in a retry context keyed on the failed message id. If you have txSize>1 it is possible that the failed message is different from attempt to attempt, so the context will be different. If you have a test listener that fails for every message this is all the more likely. You would hope that in a real system you have relatievly few failures.

        Comment


        • #5
          Good evening Dave.
          I have tried by setting txSize=1 and prefetchCount=1. No luck. I believe each retry is treated as a new message and thus the Backoff is not in tact.

          Thanks
          Venkat

          Comment


          • #6
            Actually, now you mention it, I think that is a known issue with spring-retry. It was reported here https://jira.springsource.org/browse/BATCH-1795 and there is a suggested patch there but I haven't seen a contributor's agreement or a pull request.

            Comment


            • #7
              Good evening Dave I have applied the patch as suggested on the link https://jira.springsource.org/browse/BATCH-1795. Exponential Backoff Policy worked as expected. Today I found that for every retry there is a new connection and channels equal to number of retries are created. I found that there are 2000 connections and 2071 channels created during the period of 2-3 hours as some queues were subjected to retries. Is there a known channel leak with spring retry.

              Thanks
              Venkat

              Comment

              Working...
              X