Announcement Announcement Module
Collapse
No announcement yet.
Rollback message, place back on Q if something goes wrong Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • #16
    Hmmm i'm not totally sold on the approach. What happens if you have multiple consumers of the same JMS destination? Does the initial read of a message "lock" it somehow so no one else can read it? And what happens if the thread that locked it never does a second read to remove it? Also, during the placing on an error queue what happens if that fails?

    In my scenario I have to do some database stuff after picking the message up too so I would still require some element of rollback if all went wrong but wouldnt need distributed transactions using the suggested solution. I think i've basically decided that for the amount of times this will actally happen is so small (in my case) that it doesnt warrant too much development time just so long as I have something in place when it does happen. I think i'm going to rely on JTA and set the backout threshold to 1 on the queue and let WAS and MQ work their magic leaving me more time for coffee breaks :-)

    Am defo interested in what you decide though and how it works out for you as we have a reference architecture internally that this could be added to properly to save any pain next time round

    Comment


    • #17
      Hi,
      I am facing problems in rolling back a tracsaction so that the message is sent back on the queue. As in your previous post you mentioned that InbounChannelAdapter will send the message to the errorChannel and we need to re-throw the exception from there. I tried that but still the message doesn't go back on the queue.

      Here's the config I am using:
      Code:
          <bean id="jmsMQConnectionFactoryExternal" class="org.apache.activemq.ActiveMQConnectionFactory">
              <property name="brokerURL" value="failover:(tcp://localhost:61616)"/>
          </bean>
      
      
          <bean id="externalDestination1" class="org.apache.activemq.command.ActiveMQQueue">
              <constructor-arg value="MY_QUEUE"/>
          </bean>
      
         
          <bean id="is3TransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
              <property name="transactionManagerName">
                  <value>java:/TransactionManager</value>
              </property>
              <property name="userTransactionName">
                  <value>UserTransaction</value>
              </property>
          </bean>
      
          <jms:inbound-channel-adapter id="GenericInboundAdapter"
              connection-factory="jmsMQConnectionFactoryExternal"
              destination="externalDestination1"
              channel="inboundRouterChannel">
               <int:poller>
                   <int:interval-trigger interval="1000"/>
                   <int:transactional transaction-manager="transactionManager"/>
              </int:poller>
          </jms:inbound-channel-adapter>
      
          <int:channel id="inboundRouterChannel"/>
      
          <int:router input-channel="inboundRouterChannel" ref="EventRouter"/>
      
          <bean id="EventRouter" class="com.xyz.router.EventRouter">
              <property name="eventTypeChannelMap">
                  <map>
                      <entry key="A" value-ref="AChannel"/>
                      <entry key="B" value-ref="BChannel"/>
                  </map>
              </property>
          </bean>
      
      
          <int:channel id="AChannel"/>
          <int:channel id="BChannel"/>
      
          
          <int:service-activator ref="IS3EventHandler" method="handleAEvent" input-channel="AChannel"/>
          <int:service-activator ref="IS3EventHandler" method="handleError" input-channel="errorChannel"/>
      
          <bean id="IS3EventHandler" class="com.xyz.handler.MyMessageHandler"/>
      The handler is:
      Code:
      public class IS3MessageHandler {
      
          private final Log logger = LogFactory.getLog(getClass());
      
          public void handleAEvent(Message<?> message) {
              logger.info("handleAEventcalled");
             //do something
          }
          
          public void handleError(Message<?> message){
              throw new RuntimeException("errorchannel");
          }
      }
      Any help is appreciated.

      Thanks,
      Shantanu

      Comment


      • #18
        What do you deploy your application to? In my example I use WebSphere Application Server and in order to get the JTA stuff working all the resources (JMS and JDBC) needed to be configured in the app server and use JNDI to look the resources up and use them in my Spring config.

        Comment


        • #19
          I'm using JBoss application server for this. Can you please share the config file which you are using?
          It works for me (rollback) if I just use a "DefaultMessageListenerContainer" with a listener and throw an exception from the listener.

          Thanks.
          Last edited by shantanupm; Mar 23rd, 2009, 12:12 PM.

          Comment


          • #20
            hi rhart, hows the project going? U mentioned u were going to use the JTA MQ magic to rollback, hows that working for you? We are rapidly approaching deadline for our proof of concept but we still fumbling a little. Any words of advice for Q/DB rollback would be most appreciative.

            Comment


            • #21
              Hi guys,
              I'm really stumped. I know this is more a Data Access issue, but I'm not getting any responses from that forum, and I'm running out of time and options. And since I will also be using the WebSphereUowTransactionManager for all JTA operations, I am really hoping someone can point me in the right direction here. This should have worked, am I missing something silly? Please anyone help?

              I set up an Oracle XA Datasource in WAS and I test the connection successfully from within WAS. As soon as I deploy my app though and try access the DB it fails with a jdbc connection error.

              The connection details succeed from the 'TEST CONNECTION' inside WAS. Any ideas are most welcome and appreciated, please guys...thanks

              Here is my Spring config.

              <jee:jndi-lookup id="dataSource" jndi-name="eeBridgeXADS" />

              <bean id="transactionManager" class="org.springframework.transaction.jta.WebSphe reUowTransactionManager"/>

              <aop:config>
              <aop:advisor id="managerTx" advice-ref="txAdvice" pointcut="execution(* *..service.*Manager.*(..))"/>
              </aop:config>

              <tx:advice id="txAdvice" transaction-manager="transactionManager">
              <tx:attributes>
              <tx:method name="get*" read-only="true"/>
              <tx:method name="*"/>
              </tx:attributes>
              </tx:advice>

              <bean id="jdbcOperations"
              class="org.springframework.jdbc.core.JdbcTemplate" >
              <property name="dataSource" ref="dataSource"/>
              </bean>

              <bean id="messageDao" class="mine.dao.jdbc.JdbcMessageDao">
              <property name="jdbcOperations" ref="jdbcOperations"/>
              </bean>

              Here is the error I get when trying to make a connection from within the deployed application.
              ------------------------------
              Caused by: org.springframework.jdbc.CannotGetJdbcConnectionEx ception: Could not get JDBC Connection; nested exception is java.sql.SQLException: invalid arguments in callDSRA0010E: SQL State = null, Error Code = 17,433
              at org.springframework.jdbc.datasource.DataSourceUtil s.getConnection(DataSourceUtils.java:82)
              at org.springframework.jdbc.core.JdbcTemplate.execute (JdbcTemplate.java:577)
              at org.springframework.jdbc.core.JdbcTemplate.query(J dbcTemplate.java:641)
              at org.springframework.jdbc.core.JdbcTemplate.query(J dbcTemplate.java:670)
              at org.springframework.jdbc.core.JdbcTemplate.query(J dbcTemplate.java:678)
              at org.springframework.jdbc.core.JdbcTemplate.query(J dbcTemplate.java:710)
              at org.appfuse.dao.jdbc.JdbcMessageDao.getNextMessage (JdbcMessageDao.java:64)
              at org.appfuse.service.impl.PersistentMessageAdapterI mpl.removeNext(PersistentMessageAdapterImpl.java:5 5)
              at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method)
              at sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:39)
              at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:25)
              at java.lang.reflect.Method.invoke(Method.java:585)
              at org.springframework.util.MethodInvoker.invoke(Meth odInvoker.java:276)
              at org.springframework.integration.util.NameResolving MethodInvoker.invokeMethod(NameResolvingMethodInvo ker.java:58)
              at org.springframework.integration.message.MethodInvo kingMessageSource.receive(MethodInvokingMessageSou rce.java:84)
              ... 18 more
              Caused by: java.sql.SQLException: invalid arguments in callDSRA0010E: SQL State = null, Error Code = 17,433
              at oracle.jdbc.driver.DatabaseError.throwSqlException (DatabaseError.java:112)
              at oracle.jdbc.driver.DatabaseError.throwSqlException (DatabaseError.java:146)
              at oracle.jdbc.driver.DatabaseError.throwSqlException (DatabaseError.java:208)
              at oracle.jdbc.driver.T4CConnection.logon(T4CConnecti on.java:236)
              at oracle.jdbc.driver.PhysicalConnection.<init>(Physi calConnection.java:414)
              at oracle.jdbc.driver.T4CConnection.<init>(T4CConnect ion.java:165)
              at oracle.jdbc.driver.T4CDriverExtension.getConnectio n(T4CDriverExtension.java:35)
              at oracle.jdbc.driver.OracleDriver.connect(OracleDriv er.java:801)
              at oracle.jdbc.pool.OracleDataSource.getPhysicalConne ction(OracleDataSource.java:297)
              at oracle.jdbc.xa.client.OracleXADataSource.getPooled Connection(OracleXADataSource.java:456)
              at oracle.jdbc.xa.client.OracleXADataSource.getXAConn ection(OracleXADataSource.java:143)
              at oracle.jdbc.xa.client.OracleXADataSource.getXAConn ection(OracleXADataSource.java:129)
              at oracle.jdbc.xa.client.OracleXADataSource.getXAConn ection(OracleXADataSource.java:90)
              at com.ibm.ws.rsadapter.spi.InternalGenericDataStoreH elper$1.run(InternalGenericDataStoreHelper.java:92 0)
              at com.ibm.ws.security.util.AccessController.doPrivil eged(AccessController.java:118)
              at com.ibm.ws.rsadapter.spi.InternalGenericDataStoreH elper.getPooledConnection(InternalGenericDataStore Helper.java:957)
              at com.ibm.ws.rsadapter.spi.WSRdbDataSource.getPooled Connection(WSRdbDataSource.java:1602)
              at com.ibm.ws.rsadapter.spi.WSManagedConnectionFactor yImpl.createManagedConnection(WSManagedConnectionF actoryImpl.java:1217)
              at com.ibm.ejs.j2c.FreePool.createManagedConnectionWi thMCWrapper(FreePool.java:1895)
              at com.ibm.ejs.j2c.FreePool.createOrWaitForConnection (FreePool.java:1570)
              at com.ibm.ejs.j2c.PoolManager.reserve(PoolManager.ja va:2338)
              at com.ibm.ejs.j2c.ConnectionManager.allocateMCWrappe r(ConnectionManager.java:910)
              at com.ibm.ejs.j2c.ConnectionManager.allocateConnecti on(ConnectionManager.java:600)
              at com.ibm.ws.rsadapter.jdbc.WSJdbcDataSource.getConn ection(WSJdbcDataSource.java:449)
              at com.ibm.ws.rsadapter.jdbc.WSJdbcDataSource.getConn ection(WSJdbcDataSource.java:418)
              at org.springframework.jdbc.datasource.DataSourceUtil s.doGetConnection(DataSourceUtils.java:113)
              at org.springframework.jdbc.datasource.DataSourceUtil s.getConnection(DataSourceUtils.java:79)
              ... 32 more

              Comment


              • #22
                I hate to be redundant, but this is the Spring Integration forum. You might be more lucky in the forum about the relevant topic.

                Comment


                • #23
                  Hi Iwein,

                  I have posted there numerous times, but no response. I am looking into purchasing some support in my personal capacity. I was just hoping someone with a trained eye could quickly spot a mistake or provide a hint that could help me.
                  thanx

                  Comment


                  • #24
                    Solved this issue and got my SI app deployed to WAS 6.1 using the WebsphereUowTransactionManager for XA JDBC and MQ JMS operations. On an error condition it performs a succesful rollback and retries the message.

                    Earlier in this thread there is a link on a retry message strategy, to basically configure the MQ queue to backout after a certain no of retries and place the message on a errorQueue. Another option is to keep your own retry and using SI monitor the native SI error queue and put the message somewhere else.
                    If anyone needs assistance with the WAS configuration specifics that enables the topics covered in this thread, then you can PM me and I will share. I will also put it on my wiki...details to follow.

                    Configuration specific include configuring WAS to use XA and the JTA WebsphereUowTransactionManager. Also, Q connection factories, XA datasources and Queues need to be configured correctly in WAS, taking special attention to the scope of the resources with respect to the application thats deployed and the resources that are configured. I spent most of the time actually figuring WAS out, and NOT Spring. Spring always just works straight out the box!
                    Well done Spring!!!

                    Comment


                    • #25
                      Nice, thanks for sharing.

                      Comment


                      • #26
                        my pleasure....
                        now that, thats all working, I see theres something causing a further strange XA error and I need to determine if this is an XA/WAS/MQ config issue or my SI config is not 100%.
                        Here is my SI context files and the resulting error on WAS.
                        ================================================

                        Basically, I just want to know if the way I am doing this is correct or not. It seems to be treating the whole process as a single transaction but is there a better way? Something just seems a little disjointed, like how does the 'save' method know its part of the initial transaction, when the message is read from the Q?

                        I am reading off a Q
                        -------------------

                        <jms:inbound-channel-adapter id="jmsin"
                        destination="responseQueue"
                        channel="saveMailChannel"
                        connection-factory="connectionFactoryMQ">
                        <intoller id="poller" default="true" max-messages-per-poll="1">
                        <int:interval-trigger interval="10000" />
                        <int:transactional propagation="REQUIRES_NEW" transaction-manager="transactionManager" />
                        </intoller>
                        </jms:inbound-channel-adapter>
                        ----------------------------------------------------
                        and then write to DB, the messageAdapter has a save method which calls the jdbc save method.
                        ----------------------------------------------------

                        <int:channel id="saveMailChannel" />


                        <intutbound-channel-adapter
                        channel="saveMailChannel"
                        ref="messageAdapter"
                        method="save"/>


                        ==================================================

                        I am getting this error in WAS 6.1. Found out that the timeout is configured in my appserver. Its the heuristic config that performs a ROLLBACK if a transaction is open for longer than 120s. A transaction timeout occurs and a rollback happens on the next XA operation. Its clear that I need to somehow wrap these steps into some sort of single operation. Can someone please assist with a pointer on how my SI config can be improved? Another idea I had, is to call commit on the save? But then I need a handle to the txManager? Am I on the right track here? How can one stop a tx from timing out, or is this just normal expected behaviour?

                        5/12/09 18:29:38:367 SAST] 00000011 TimeoutManage I WTRN0006W: Transaction 0000012135A695F50000000500000112F5603E64FED47E7B8B 57820397863671278FC3330000012135A695F5000000050000 0112F5603E64FED47E7B8B57820397863671278FC333000000 01 has timed out after 120 seconds.


                        [5/12/09 19:07:44:364 SAST] 0000002f SystemOut O INFO - PersistentMessageAdapterImpl.save(43) | Saving message f834882e-0573-49a2-b72f-e628bf53046d
                        [5/12/09 19:07:44:370 SAST] 0000002f XATransaction E J2CA0026E: Method addSync caught javax.transaction.RollbackException: Transaction rolled back
                        at com.ibm.ws.Transaction.JTA.TransactionImpl.registe rSynchronization(TransactionImpl.java:3546)
                        at com.ibm.ws.Transaction.JTA.TransactionImpl.registe rSynchronization(TransactionImpl.java:3525)
                        at com.ibm.ws.Transaction.JTA.TranManagerSet.register Synchronization(TranManagerSet.java:507)
                        at com.ibm.ejs.j2c.XATransactionWrapper.addSync(XATra nsactionWrapper.java:289)
                        at com.ibm.ejs.j2c.ConnectionManager.initializeForUOW (ConnectionManager.java:1290)
                        at com.ibm.ejs.j2c.ConnectionManager.involveMCInTran( ConnectionManager.java:1038)
                        at com.ibm.ejs.j2c.ConnectionManager.allocateConnecti on(ConnectionManager.java:602)
                        at com.ibm.ws.rsadapter.jdbc.WSJdbcDataSource.getConn ection(WSJdbcDataSource.java:449)
                        at com.ibm.ws.rsadapter.jdbc.WSJdbcDataSource.getConn ection(WSJdbcDataSource.java:418)
                        at org.springframework.jdbc.datasource.DataSourceUtil s.doGetConnection(DataSourceUtils.java:113)
                        at org.springframework.jdbc.datasource.DataSourceUtil s.getConnection(DataSourceUtils.java:79)
                        at org.springframework.jdbc.core.JdbcTemplate.execute (JdbcTemplate.java:577)
                        at org.springframework.jdbc.core.JdbcTemplate.execute (JdbcTemplate.java:619)
                        at org.appfuse.dao.jdbc.JdbcMessageDao.save(JdbcMessa geDao.java:120)
                        at org.appfuse.service.impl.PersistentMessageAdapterI mpl.save(PersistentMessageAdapterImpl.java:44)
                        at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method)
                        at sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:39)
                        at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:25)
                        at java.lang.reflect.Method.invoke(Method.java:585)
                        at org.springframework.integration.util.DefaultMethod Invoker.invokeMethod(DefaultMethodInvoker.java:89)
                        at org.springframework.integration.handler.MessageMap pingMethodInvoker.doInvokeMethod(MessageMappingMet hodInvoker.java:133)
                        at org.springframework.integration.handler.MessageMap pingMethodInvoker.invokeMethod(MessageMappingMetho dInvoker.java:106)
                        at org.springframework.integration.handler.MethodInvo kingMessageHandler.handleMessage(MethodInvokingMes sageHandler.java:45)
                        at org.springframework.integration.dispatcher.Abstrac tDispatcher.sendMessageToHandler(AbstractDispatche r.java:85)
                        at org.springframework.integration.dispatcher.Abstrac tUnicastDispatcher.dispatch(AbstractUnicastDispatc her.java:54)
                        at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:56)
                        at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:116)
                        at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:94)
                        at org.springframework.integration.channel.MessageCha nnelTemplate.doSend(MessageChannelTemplate.java:22 3)
                        at org.springframework.integration.channel.MessageCha nnelTemplate.send(MessageChannelTemplate.java:180)
                        at org.springframework.integration.endpoint.SourcePol lingChannelAdapter.doPoll(SourcePollingChannelAdap ter.java:78)
                        at org.springframework.integration.endpoint.AbstractP ollingEndpoint$Poller$2.doInTransaction(AbstractPo llingEndpoint.java:228)
                        at org.springframework.transaction.jta.WebSphereUowTr ansactionManager$UOWActionAdapter.run(WebSphereUow TransactionManager.java:306)
                        at com.ibm.ws.uow.UOWManagerImpl.runUnderNewUOW(UOWMa nagerImpl.java:948)
                        at com.ibm.ws.uow.UOWManagerImpl.runUnderUOW(UOWManag erImpl.java:509)
                        at org.springframework.transaction.jta.WebSphereUowTr ansactionManager.execute(WebSphereUowTransactionMa nager.java:252)
                        at org.springframework.transaction.support.Transactio nTemplate.execute(TransactionTemplate.java:122)
                        at org.springframework.integration.endpoint.AbstractP ollingEndpoint$Poller.innerPoll(AbstractPollingEnd point.java:226)
                        at org.springframework.integration.endpoint.AbstractP ollingEndpoint$Poller.poll(AbstractPollingEndpoint .java:216)
                        at org.springframework.integration.endpoint.AbstractP ollingEndpoint$Poller.run(AbstractPollingEndpoint. java:209)
                        at org.springframework.integration.scheduling.SimpleT askScheduler$ErrorHandlingRunnableWrapper.run(Simp leTaskScheduler.java:307)
                        at java.util.concurrent.Executors$RunnableAdapter.cal l(Executors.java:417)
                        at java.util.concurrent.FutureTask$Sync.innerRunAndRe set(FutureTask.java:280)
                        at java.util.concurrent.FutureTask.runAndReset(Future Task.java:135)
                        at org.springframework.integration.scheduling.SimpleT askScheduler$TriggeredTask.run(SimpleTaskScheduler .java:256)
                        at java.util.concurrent.ThreadPoolExecutor$Worker.run Task(ThreadPoolExecutor.java:650)
                        at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:675)
                        at java.lang.Thread.run(Thread.java:595)
                        while trying to register the Resource Adapter with the Synchronization Manager for the current transaction, and threw a ResourceException.

                        Comment


                        • #27
                          Originally posted by rhart View Post
                          This is just WAS config http://www.ibm.com/developerworks/we...itheridge.html I know this article is not for WAS 6.1 but the principle is the same. I think this is actually part of the JMS spec. Cant find how to set the retry iterval though

                          hi rhart, I'm busy on the retry thing now. Basically, I believe MQ has a backout threshold which I've set to 5 and also a backout requeue name. I've also set this property on the queue. I force a rollback but the messages just seem to be stuck, hows your retry and backout strategy going?

                          Comment


                          • #28
                            Hi dudleygb, I've been up to my eyes in it so havnt checked the forums for AGES!

                            Originally posted by dudleygb View Post
                            Something just seems a little disjointed, like how does the 'save' method know its part of the initial transaction, when the message is read from the Q?
                            I think the save method knows it's part of the initial transaction because you're using direct queues so the entire process is in the same thread and also all your resources are managed by WAS.

                            My backout strategy is as it was all that time ago in my POC. By coincidence i'm starting to productionize it tomorrow so i'll let you know how it goes. I plan on doing as you seem to have which is to set the backout threshold and backout requeue name on the queue.

                            One thing to be careful of is to make sure the settings you set in WAS have actually saved. Sometimes i get an error saying "The WMQQueueDefiner MBean has encountered an error". Are you setting them via the admin console or using a script? Also make sure your WAS instance has been upgraded to 6.1.23, I had some problems around this. Another problem i encountered was i didnt set my JMS and JDBC resources as resource references in my deployment descriptor http://www.ibm.com/developerworks/we...09_alcott.html. Hope that helps

                            Comment


                            • #29
                              hi rhart,
                              thanx for the tips, yup, I am finding the exact same issues with the resource thing. I'm struggling with that at the moment.
                              The WMQ... error you mentioned, I was also getting, because you need to add some MQ settings before you click on the MQ config link when in the Queues window. But yes, that config is at least done. The main thing I'm still having a problem with is the backout q thing. Its doing the backout but its not actually moving the message to the backout queue, that I specified in the WAS console. I'm currently trying to solve this mystery, but we are still on patch 21. So perhaps an upgrade first. Thanx again for the tips, please let me know how you manage with the requeue thing, and if you managed to get it working ok.
                              thanx!

                              Comment


                              • #30
                                I just came across this article regarding the backout threshold and backut requeue...I think I was hoping for a bit much to expect MQ to do it automatically?

                                Moving the message to the backout queue is not done automatically by the
                                Qmgr. Those attributes are there for applications to use. As part of
                                your application's initialization routine you need to open the queue for
                                inquiry, issue an MQINQ call to obtain those 2 attributes, and open the
                                backout queue for output. Then open your queue for input and begin
                                processing the messages. After each MQGET check the backout count in the
                                MQMD against the threshold attribute you saved off. If it is at the
                                threshold then put the message on the backout queue followed by an
                                MQCMIT call . This is kind of abbreviated, but you will find it
                                discussed in the Application Programming Guide.

                                Did you find anything to the contrary?

                                Comment

                                Working...
                                X