Announcement Announcement Module
Collapse
No announcement yet.
Wrapping message processing logic in a separate tx to work with JDBC Msg store Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Wrapping message processing logic in a separate tx to work with JDBC Msg store

    Wrapping message processing logic in a separate tx to work with JDBC Msg store

    Hello,

    I'm trying to use a JDBC backed message store for a Queue so that in the event of an application server outage I do not loose any messages that are currently being processed. Also I need to keep the message processing logic separate from the message store so that message processing failure does not mean the message needs to be retried - just that the failed message should go to an error queue.

    The way I tried this is as follows:
    1. A spring container listener is listening on a JMS queue for a big xml containing several orders coming in as <orders><order></order>....</orders>.
    2. I split that XML into individual order and put them on a JDBC message store backed queue. individual <order></order> fragments are stored here.
    3. I have a downstream poller that polls this queue, so that parallel threads can pick up the orders inthe queue above and process them concurrently. This poller is configured to start a transaction for each message it processes.
    4. Under normal circumstances this works fine.
    5. Assume if one of the order processing errors out - service throws an Exception. Then the corresponding message in the queue does not get delete and it keeps coming back over and over again.
    In my implementation I saw that if the service under the poller threw an Exception- since it was wrapped in a single transaction it would also rollback the Message store transaction (Message store is to remove the message from the DB once they are processed). This causes the errored out message to keep coming back over and over again in an endless loop.
    I tried to look for a way so that the service transaction within the poller does not cause the Message store transaction to rollback. I wanted to have the processing errors land up in some dead letter queue via an error channel.

    I tried looking for a way to send exceptions from my service activators (or for that matter any other kind of a processors - be it spring's out of the box ones like XPatah splitter). But I'm not sure how to do it.

    Code:
        <!--Orders xml received by this listener container -->
        <int-jms:message-driven-channel-adapter id="xmlInputJms" channel="ordersMessage"
                                                container="messageListenerContainer"/>
    
        <int:channel id="ordersMessage"/>
    
        <!--Orders splitted to individual fragments of order each -->
        <int-xml:xpath-splitter input-channel="ordersMessage" output-channel="orders" create-documents="false">
            <int-xml:xpath-expression expression="/orders/order"/>
        </int-xml:xpath-splitter>
    
        <!--JDBC message store backed queue-->
        <int:channel id="orders">
            <int:queue message-store="jdbcMessageStore"/>
        </int:channel>
    
        <int-jdbc:message-store id="jdbcMessageStore" data-source="dataSource" region="ORDERS"/>
    
        <int:bridge input-channel="orders" output-channel="ordersForInvoicing">
            <int:poller fixed-delay="5000" max-messages-per-poll="1" task-executor="taskExecutor">
                <int:transactional propagation="REQUIRES_NEW" transaction-manager="transactionManager"/>
            </int:poller>
        </int:bridge>
    
        <int:channel id="ordersForInvoicing"/>
    
        <!--This throws an exception for one of the messages and those need to go to an error queue-->
        <int:service-activator input-channel="ordersForInvoicing"
                               output-channel="outgoingChannel" ref="orderInvoiceService" method="invoice"/>
        <!--There would be more service activator calls here any of which could also fail-->
    
        <task:executor id="taskExecutor" pool-size="2" rejection-policy="CALLER_RUNS"/>
    
        <bean id="outgoingChannel" class="si.LoggingMessageNullChannel"/>
        <bean id="ordersError" class="si.xml.LoggingErrorMessageNullChannel"/>
    1. I couldn't find a way to not rollback transaction on say abc Exception.
    2. Ideally I want to have some way of directing errors to an Error Channel (When there are any exceptions in the processing).
    3. I also tried to setup an error-channel on the poller but it did not help. The message that throws an exception keeps coming back from the queue for processing.

    Code:
            <int:poller fixed-delay="5000" max-messages-per-poll="1" task-executor="taskExecutor"
                        error-channel="ordersError">
    Anybody has an Ideas on how to achieve the above?

    Thanks,
    -Amit.
    Last edited by amit.kapps; Aug 22nd, 2011, 04:56 PM. Reason: Added info about error channel in poller

  • #2
    Message is not deleted because the exception propagates back to the caller (poller in this case) and is rolled back. What you need to do is mask the error from the poller. You can do it by simply defining an error-channel on the poller. The error-channel gives you an equivalent of a 'catch' block. So all you need to do is add a subscriber to the error-channel which simply puts the error-message to some error queue or simply do nothing, but as long as it will not return exception the whole process would seem as a complete success to the poller and the message will be removed.

    Comment


    • #3
      Thanks for the prompt reply Oleg. I did try the error channel. However the error message keeps coming back and is not wiped from the queue on exception.

      Here's the updated config file:

      Code:
          <!--Orders xml received by this listener container -->
          <int-jms:message-driven-channel-adapter id="xmlInputJms" channel="ordersMessage"
                                                  container="messageListenerContainer"/>
      
          <int:channel id="ordersMessage"/>
      
          <!--Orders splitted to individual fragments of order each -->
          <int-xml:xpath-splitter input-channel="ordersMessage" output-channel="orders" create-documents="false">
              <int-xml:xpath-expression expression="/orders/order"/>
          </int-xml:xpath-splitter>
      
          <!--JDBC message store backed queue-->
          <int:channel id="orders">
              <int:queue message-store="jdbcMessageStore"/>
          </int:channel>
      
          <int-jdbc:message-store id="jdbcMessageStore" data-source="dataSource" region="ORDERS"/>
      
          <int:bridge input-channel="orders" output-channel="ordersForInvoicing">
              <int:poller fixed-delay="5000" max-messages-per-poll="1" task-executor="taskExecutor"
                          error-channel="ordersError">
                  <int:transactional propagation="REQUIRES_NEW" transaction-manager="transactionManager"/>
              </int:poller>
          </int:bridge>
      
      
          <int:channel id="ordersForInvoicing"/>
      
          <!--This throws an exception for one of the messages and those need to go to an error queue-->
          <int:service-activator input-channel="ordersForInvoicing"
                                 output-channel="outgoingChannel" ref="orderInvoiceService" method="invoice"/>
          <!--There would be more service activator calls here any of which could also fail-->
      
          <task:executor id="taskExecutor" pool-size="2" rejection-policy="CALLER_RUNS"/>
      
          <bean id="outgoingChannel" class="si.LoggingMessageNullChannel"/>
      
          <int:channel id="ordersError"/>
          <int-stream:stderr-channel-adapter channel="ordersError" auto-startup="true"/>
      The exception keeps printing over and over again. And I can see that the problem message also sits in the DB Table INT_MESSAGE_GROUP.


      Code:
      org.springframework.integration.MessageHandlingException: java.lang.NullPointerException: We dont like order id 1
      	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:76)
      	at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:64)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:98)
      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
      	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
      	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
      	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
      	at org.springframework.integration.endpoint.PollingConsumer.doPoll(PollingConsumer.java:70)
      	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
      	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      	at java.lang.reflect.Method.invoke(Method.java:597)
      	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)
      	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
      	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
      	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110)
      	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
      	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
      	at $Proxy13.call(Unknown Source)
      	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
      	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      	at java.lang.Thread.run(Thread.java:619)
      Caused by: java.lang.NullPointerException: We dont like order id 1
      	at si.xml.OrderInvoiceService.invoice(OrderInvoiceService.java:24)
      	at si.xml.OrderInvoiceService$$FastClassByCGLIB$$352bd5a7.invoke(<generated>)
      	at net.sf.cglib.proxy.MethodProxy.invoke(MethodProxy.java:191)
      	at org.springframework.aop.framework.Cglib2AopProxy$CglibMethodInvocation.invokeJoinpoint(Cglib2AopProxy.java:688)
      	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
      	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110)
      	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
      	at org.springframework.aop.framework.Cglib2AopProxy$DynamicAdvisedInterceptor.intercept(Cglib2AopProxy.java:621)
      	at si.xml.OrderInvoiceService$$EnhancerByCGLIB$$4b9bec95.invoice(<generated>)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      	at java.lang.reflect.Method.invoke(Method.java:597)
      	at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:69)
      	at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:83)
      	at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:57)
      	at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
      	at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:102)
      	at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:126)
      	at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:225)
      	at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:125)
      	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)
      	... 35 more
      Thanks,
      -Amit.

      Comment


      • #4
        Not sure if you resolved it or not, but the main point i was trying to make is that routing your error to the error-channel allows you to handle the error and return successfully (as it never happened) thus making poller believe that everything went fine, thus completing transaction.

        Comment


        • #5
          Oleg, I could not resolve it.
          I did set up the error channel, and the errored out (order XML) message does go to the error channel (the err stream I configured) but it seems the poller is still rolling back the transaction. The (order xml) message that throws the error still sits in the int_message_group table and when the poller runs again the message comes back for processing (acting as a poison message). Any other ideas ?

          Thanks,

          Comment


          • #6
            Sorry, i didn't pay attention that you had Transaction configuration there.
            <transaction> element on the poller is a convenience element which allows you to skip manually defining an advice chain, but at the end it is becoming part of the advice chain which is injected with standard Spring's TransactionInterceptor (basic Spring's transaction handling) - http://static.springsource.org/sprin...decl-explained

            Sio essentially what's happening is that ErrorHandlingTaskExecutor (the one that delegates to error-channel) is invoking a task which itself is a transactional proxy. This means that any exception downstream wil be propagated through this proxy before it gets to ErrorHandlingTaskExecutor. In other words TransactionInterceptor sees the error before it gets to ErrorHandlingTaskExecutor and rolls back the transaction. Only then it propagates the error up the stack. So although you see the correct behavior of the error-channel, by that time its too late since your transaction is already rolled back.

            The main point is that when using 'advice-chain' you are taking complete control of what happens downstream.

            So, I hope this explains the issue.
            Now, how can you solve it?

            There are many ways you can do that. The simplest (requires no additional java code, only a smal addition to configuration) is to use the combination of service-activator/gateway. You can look at it as a hybrid pattern which allows you to create a new segment within your message flow

            poller(TX) -> sa/gateway -> . . .

            The service-activator is bootstrapped with Messaging gateway which essentially defines an entry point into the particular segment of the flow. Since gateway also defines an error-channel any errors that would happen downstream would be trapped by such error-channel *before* they are propagated to the initial caller (which is a poller), so if those errors are handled at the level of the gateway, the poller would never know about them and in your case TransactionInterceptor would assume that everything happened successfully.

            I am attaching a small working sample that is based on your configuration so you can get better understanding what that means:

            Hope that helps

            Comment


            • #7
              Thanks Oleg. I believe the sample you attached is for splitter/aggregator not the one related to Gateways. But I get the Idea, Will try it out.

              -Amit.

              Comment


              • #8
                Sorry, i've uploaded the wrong archive.
                Here is the right one which is based on configuration identical to yours.

                Comment

                Working...
                X