Announcement Announcement Module
Collapse
No announcement yet.
Transactional channel message stores and failed messages Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Transactional channel message stores and failed messages

    Hi,
    When a message fails (transaction rolls back) to process on a queue backed by a transactional channel message store, like the JdbcChannelMessageStore, the message would remain in the store and be constantly retried (correct?). I would like to remove the message so that it is not retried. The only approach I could think of is to use the stateful retry advice, so that on the very first retry it is sent to the error channel. Is that the recommended approach or is there anything simpler (expiring the message?) provided by the message group store?
    Thanks,
    Hari.

  • #2
    Hi,
    As I tried to configure the retry, I just realized that there isn't any out-of-the-box retry advice for a polling consumer. It wasn't that obvious from the documentation, but this post (http://forum.springsource.org/showth...-error-handler) made it clear.
    I can see that the AbstractRequestHandlerAdvice intercepts either of 'handleRequestMessage' or 'handleMessage' method. Looking through the PollingConsumer code, it's not clear to me what I should intercept. My guess is I should intercept the 'handleMessage' method of the MessageHandler used by the PollingConsumer but then that's what the AbstractRequestHandlerAdvice does anyway. Any pointers to help me get started would be appreciated.
    Thanks,
    Hari.
    Last edited by iyerh; Apr 11th, 2013, 06:19 PM.

    Comment


    • #3
      The request handler advice chain is a special case; we had to take care to only advise the internal endpoint methods and not any downstream processing (on output channels).

      Advising pollers is simpler because we're advising the whole flow. As described in section "7.1.4 Namespace Support" subsection "AOP Advice chains", you simply create an advice by implementing the MethodInterceptor interface.

      See SourcePollingChannelAdapterFactoryBeanTests.testAd viceChain() for a very simple advice...

      Code:
      		adviceChain.add(new MethodInterceptor() {
      			public Object invoke(MethodInvocation invocation) throws Throwable {
      				adviceApplied.set(true);
      				return invocation.proceed();
      			}
      		});
      This simply is used to assert that the advice was called properly; a real advice would add code before and/or after the invocation.proceed().

      In effect, this advice advises all methods, but there is only one, (Callable.call()).

      Comment


      • #4
        I want to do a stateful retry of the flow (actually, zero retry), so I need access to the message. The Callable.call() does not expose a message. I would get the failed message from the messaging exception but I need to intercept the polled message the next time around, before it is sent to the first message handler (a service activator) so I can successfully consume the failing message.
        We currently have the ability to advise the handleRequestMessage method in the AbstractReplyProducingMessageHandler which restricts it to just that handler. It appears that I need to advise the handleMessage method in the AbstractMessageHandler to advise the flow.

        I finally created a PollingConsumer bean, and passed it a proxied ServiceActivatingHandler bean. I proxied the MessageHandler interface with a stateful RequestHandlerRetryAdvice. It worked just fine.
        Last edited by iyerh; Apr 12th, 2013, 01:34 PM.

        Comment


        • #5
          If I understand your use case, there's another solution - you can add a <gateway/> with an error channel between the message source and the first <service-activator/>. Handle the error on the error-channel's flow and the transaction will commit.

          Code:
          <int:channel id="jdbc.backed" ... >
              <int:queue/>
          </int:channel>
          
          <int:service-activator ref="tryCatchGW" channel="jdbc.backed">
              <int:poller .../>
          </int:service-activator>
          
          <int:gateway default-request-channel="toMyFirstSA" error-channel="error-handling-flow" />
          The message on error-channel will be an ErrorMessage with payload MessagingException, with failedMessage and cause properties.

          A typical flow on that error channel might be a transformer (expression=payload.failedMessage) followed by some service to handle the error.


          If you really want to do it with an advice, it's a bit involved - you need a BeanPostProcessor; I posted one solution in this gist...

          https://gist.github.com/garyrussell/5374267

          Comment


          • #6
            Hi Gary,
            Since I am performing other database operations within the flow, I do need the failed flow to rollback. This is why I need a retry of the flow in which only the failed message is consumed by the poller and the flow is not executed.

            I now have another issue with my approach (proxy the poller's message handler to mixin retry advice) when I integrate monitoring (using Spring Batch Admin). I get this exception:
            Caused by: org.springframework.integration.MessagingException : Failed to invoke handler
            at org.springframework.integration.handler.advice.Req uestHandlerRetryAdvice$2.doWithRetry(RequestHandle rRetryAdvice.java:96)
            at org.springframework.retry.support.RetryTemplate.do Execute(RetryTemplate.java:255)
            at org.springframework.retry.support.RetryTemplate.ex ecute(RetryTemplate.java:188)
            at org.springframework.integration.handler.advice.Req uestHandlerRetryAdvice.doInvoke(RequestHandlerRetr yAdvice.java:84)
            at org.springframework.integration.handler.advice.Abs tractRequestHandlerAdvice.invoke(AbstractRequestHa ndlerAdvice.java:58)
            at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :172)
            at org.springframework.aop.framework.JdkDynamicAopPro xy.invoke(JdkDynamicAopProxy.java:204)
            at $Proxy82.handleMessage(Unknown Source)
            at org.springframework.integration.monitor.SimpleMess ageHandlerMetrics.handleMessage(SimpleMessageHandl erMetrics.java:108)
            at org.springframework.integration.monitor.SimpleMess ageHandlerMetrics.invoke(SimpleMessageHandlerMetri cs.java:88)
            at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :172)
            at org.springframework.integration.handler.advice.Abs tractRequestHandlerAdvice$1.cloneAndExecute(Abstra ctRequestHandlerAdvice.java:76)
            at org.springframework.integration.handler.advice.Req uestHandlerRetryAdvice$2.doWithRetry(RequestHandle rRetryAdvice.java:87)
            ... 26 more

            It appears that the advice created by the IntegrationMBeanExporter over the message handler causes the retry advice handler method invocation to fail (not sure why). Perhaps I need to control the ordering of the advice i.e. apply the monitoring advice around the retry advice? If so, using the BeanPostProcessor approach you have provided allows for controlling the ordering?

            Thanks,
            Hari.

            Comment


            • #7
              There's another variant of the Advised.advice that takes a position. See

              Code:
              void addAdvice(int pos, Advice advice) throws AopConfigException;

              Comment


              • #8
                Sorry for the false alarm. It turned out to be due to this bug: [URL="https://jira.springsource.org/browse/INT-1820"]. So I didn't have to muck with the ordering to fix it.

                Comment

                Working...
                X