Announcement Announcement Module
Collapse
No announcement yet.
Handling Failures With StatefulRetryOperationsInterceptorFactoryBean Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Handling Failures With StatefulRetryOperationsInterceptorFactoryBean

    Hi all,

    I have been using spring-amqp for quite awhile in many production projects and for the most part I handle errors by injecting an amqpTemplate it my messageListeners and simply republish to an exchange on failure (exception, etc).

    I've been reading the docs again and discovered StatefulRetryOperationsInterceptor but I cannot find any information on how to use it. Ideally, I'd like to retry messages 3 times and if they still fail publish them to my error exchange for inspection and re-attempts.

    Thanks,
    James

  • #2
    So I looked through the code and discovered why it just wouldn't work for me... it wants a messageId header of which my messages don't have one!

    So I keep getting "Illegal null id in message. Failed to manage retry for message".

    Aside from putting messageIds in my code, any ideas?

    Comment


    • #3
      Can you provide the full stack trace here? (or at least the relevant part).

      Thanks,
      Mark

      Comment


      • #4
        Sure... here's the relavant portion:


        Code:
        INFO  ContextLoader - Root WebApplicationContext: initialization started
        INFO  AnnotationConfigWebApplicationContext - Refreshing Root WebApplicationContext: startup date [Fri Nov 11 16:13:42 CST 2011]; root of context hierarchy
        INFO  AnnotationConfigWebApplicationContext - Successfully resolved class for [com.carfax.blueprint.amqp.ApplicationConfig]
        INFO  XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [com/carfax/blueprint/amqp/jndi-context.xml]
        INFO  DefaultListableBeanFactory - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@10ace8d: defining beans [org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,applicationConfig,stolenQueue,serviceQueue,stolenBinding,serviceBinding,rabbitAdmin,serviceListener,stolenListener,retryInterceptor,amqpConnectionFactory]; root of factory hierarchy
        WARN  SimpleMessageListenerContainer - CachingConnectionFactory's channelCacheSize can not be less than the number of concurrentConsumers so it was reset to match: 2
        INFO  DefaultLifecycleProcessor - Starting beans in phase 2147483647
        INFO  ContextLoader - Root WebApplicationContext: initialization completed in 625 ms
        INFO  SimpleMessageListenerContainer - Execution of Rabbit message listener failed, and no ErrorHandler has been set: class org.springframework.amqp.rabbit.listener.FatalListenerExecutionException: Illegal null id in message. Failed to manage retry for message: (Body:'{"make":"Honda","model":"Prelude","year":"1985"}'; ID:null; Content:application/json; Headers:{__TypeId__=com.carfax.blueprint.amqp.Vehicle}; Exchange:vehicle_history_changes; RoutingKey:vehicle.history.stolen; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:1)
        ERROR SimpleMessageListenerContainer - Consumer received fatal exception during processing
        org.springframework.amqp.rabbit.listener.FatalListenerExecutionException: Illegal null id in message. Failed to manage retry for message: (Body:'{"make":"Honda","model":"Prelude","year":"1985"}'; ID:null; Content:application/json; Headers:{__TypeId__=com.carfax.blueprint.amqp.Vehicle}; Exchange:vehicle_history_changes; RoutingKey:vehicle.history.stolen; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:1)
                at org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean$3.getKey(StatefulRetryOperationsInterceptorFactoryBean.java:105)
                at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:142)
                at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
                at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
                at $Proxy38.invokeListener(Unknown Source)
                at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:560)
                at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:452)
                at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)
                at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:420)
                at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:56)
                at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)
                at java.lang.Thread.run(Thread.java:662)

        Comment


        • #5
          Also, here's the relevant code:

          Code:
                  @Bean
          	public SimpleMessageListenerContainer stolenListener(){
          		SimpleMessageListenerContainer container= new SimpleMessageListenerContainer(amqpConnectionFactory);
          		container.setQueueNames(stolenQueue().getName());
          		container.setAutoStartup(true);
          		container.setConcurrentConsumers(2);
          		container.setErrorHandler(errorHandler());
          		container.setAdviceChain(new Advice[]{retryInterceptor()});
          		container.setMessageListener(messageListener(new StolenRecordListener()));
          		return container;
          	}
          	@Bean
          	public Advice retryInterceptor(){
          		StatefulRetryOperationsInterceptorFactoryBean retry = new StatefulRetryOperationsInterceptorFactoryBean();
          		RetryTemplate retryTemplate = new RetryTemplate();
          		SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
          		retryPolicy.setMaxAttempts(10);
          		retryTemplate.setRetryPolicy(retryPolicy);
          		retry.setRetryOperations(retryTemplate);
          		return retry.getObject();
          	}

          Comment


          • #6
            Just a quick note... for now, I'm just trying to define behavior so that it retries the message 10 times (rather than the rapid fire retry forever everytime it can get it that is the default). I'm hoping to move on to more advanced stuff like submitting the message to an error queue in the long run.

            Comment


            • #7
              Okay, I spent some time on this during the morning and figured it out... the default behavior is that it looks for a message ID but there is nothing that mandates a message id. Hence, the default setup never works.

              Anyhow, I created a setup that works and pushed it out to our example repository. Could you let me know if there is something I am doing wrong or I could do better? It works, just curious if there are better ways.

              https://github.com/CARFAX/blueprint-...ionConfig.java

              Comment


              • #8
                If you don't control he producer there's nothing else you can do for a stateful retry than add a keyGenerator (as you have done). If you do control the producer you can add the message id, right? In particular if the producer is another Spring AMQP client then the AbstractMessageConverter (base class for all our converters) has a createMessageIds flag you can set.

                We can certainly improve the documentation here, and I'd like to see more people using Spring Retry because, as you have hopefully found, it is quite a powerful abstraction. I was going to write a blog and so far only got a few paragraphs down. I'll have to think about bumping it up the priority list.

                Comment


                • #9
                  +1 for a blog entry covering this.

                  I have done a quick retry implementation that works find for an older spring 2.5 project using the raw java amqp library, but my newer project based on spring-integration 2.1 / spring 3.1 is not yet working with a broker death and/or restart. It appears that this should work out-of-box, but I am assuming with how recent the retry stuff was added, plus hitting a moving target with the rabbitmq might make this sensitive to the configuration (my setup is pre-release on almost everything). A blog might be a real help for me

                  I haven't spent much time trying to get the retry working yet because I hope it will just fall into place on its own as integration / rabbitmq / spring 3.1 gets closer to release on the versions I am using. Most everything else has with these projects has so far for me - keep up the good work!

                  Comment


                  • #10
                    A standard Spring AMQP consumer should survive broker death/restart, so if your Spring Integration adapter is not working that way it might be a good idea to raise a JIRA ticket (in INT) with some more detail of what you are doing, so it can be checked before 2.1 is final (soon).

                    Comment


                    • #11
                      FYI I created a new branch of our "blueprint rabbitmq project" at CARFAX that illustrates using the StatefulRetryOperationsInterceptorFactoryBean to implement generic error handling.

                      https://github.com/CARFAX/blueprint-...error-handling

                      I'll try to follow this up with a blog post soon.

                      Comment


                      • #12
                        xml config instead of annotation based

                        I'm trying to implement a retry/recover component using xml context configuration. As a guide, I started with the nice dead-letter Carfax example by James Carr on github. My goal is to retry a message twice if any exception is thrown during processing, then publish/send it to a differnt queue reserved for erroring messages. Maybe I should be using an inbound gateway instead of a channel? I don't know. Any advice is greatly appreciated.

                        Code:
                        Exception in thread "main" org.springframework.beans.factory.BeanCreationException: 
                        Error creating bean with name 'sroi' defined in class path resource [spring_context.xml]: No matching factory method found: factory bean 'sroifb'; factory method 'getObject()'. Check that a method with the specified name exists and that it is non-static.
                        	at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:528)
                        Code:
                        <bean id="errQRecoverer" class="com.se.subscrip.consum.ErrorQueueMessageRecoverer"></bean>
                        <bean id="sroi" class="org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor"
                        	factory-bean="sroifb" factory-method="getObject" />
                        <bean id="rp" class="org.springframework.retry.policy.SimpleRetryPolicy">
                        	<property name="maxAttempts" value="2" />
                        </bean>
                        <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
                        	<property name="retryPolicy" ref="rp" />
                        </bean>
                        <bean id="sroifb" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
                        	<property name="retryOperations" ref="retryTemplate" />
                        	<property name="messageRecoverer" ref="errQRecoverer" />
                        </bean>
                        <amqp:inbound-channel-adapter queue-names="some.q.name.here" channel="chan1"
                        		connection-factory="rabbitConnFactory" channel-transacted="true" concurrent-consumers="1" tx-size="1"
                        		acknowledge-mode="AUTO" advice-chain="sroi" error-handler="ErrorHandlerBean" />

                        Comment

                        Working...
                        X