Announcement Announcement Module
Collapse
No announcement yet.
Struggling with XA-Transaction and JMS. (Rollback not working properly) Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Struggling with XA-Transaction and JMS. (Rollback not working properly)

    Hi,
    I'm struggling for a few days with a Transaction problem. Here's my rather simple use case:

    I have a int-jms:message-driven-channel-adapter which reads from a Request-Queue and sends the message to a channel. There I do a DB update that the message was accepted. The message is then send to yet another channel, where I start a BatchJob. Depending on the return code of the BatchJob, I'll send the message back via Reply- (for correct BatchJob-Runs) or Error-Queue (for faulty BatchJob-Runs).

    Reading from the Request-Queue and the DB-Update happen in a Transaction. Before the BatchJob is started, the Transaction is commited.

    If I now manually (just for testing purposes) do a rollback() call, after the message was accepted, but before the BatchJob is started, I would think that the message is redelivered (set back into the Request-Queue). But instead, the rollback() call is ignored and the message propageds as usual.

    Here's what the ActiveMQ Config looks like:

    Code:
    <!-- ActiveMQ XA CF -->
    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
        <property name="brokerURL" value="${jms.broker.url}" />
        <property name="prefetchPolicy">
          <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
            <property name="queuePrefetch" value="0" />
          </bean>
        </property>
        <property name="redeliveryPolicy">
          <bean class="org.apache.activemq.RedeliveryPolicy">
            <property name="initialRedeliveryDelay" value="100" />
            <property name="maximumRedeliveries" value="1" />
            <property name="useExponentialBackOff" value="false" />
          </bean>
        </property>
    </bean>
    
    <!-- CF -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory" />
        <property name="sessionCacheSize" value="10" />
        <property name="cacheProducers" value="false" />
    </bean>
    
    <!-- Broker & Queues -->
    <amq:broker brokerName="broker">
        <amq:transportConnectors>
          <amq:transportConnector name="openwire" uri="${jms.broker.url}" />
        </amq:transportConnectors>
    </amq:broker>
    
    <bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="${master.partition.jms.request.destinationname}" />
    </bean>
    <bean id="replyQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="${master.partition.jms.reply.destinationname}" />
    </bean>
    <bean id="errorQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="${master.partition.jms.error.destinationname}" />
    </bean>
    The Transaction Handling is done with Atomikos. Here's the Config:

    Code:
    <bean id="slaveUserTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init"
        destroy-method="shutdownForce">
        <constructor-arg>
          <props>
            <prop key="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory</prop>
            <prop key="com.atomikos.icatch.tm_unique_name">slave_tmlog</prop>
            <prop key="com.atomikos.icatch.log_base_name">slave_tmlog</prop>
            <prop key="com.atomikos.icatch.log_base_dir">C:/ieu/MOBI_FIS/workspace/perfopt/perfopt_job</prop>
          </props>
        </constructor-arg>
    </bean>
    
    <bean id="jtaSlaveTxManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="slaveUserTransactionService">
        <property name="transactionManager">
          <bean class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
            <property name="startupTransactionService" value="false" />
            <property name="forceShutdown" value="true" />
            <property name="transactionTimeout" value="300" />
          </bean>
        </property>
        <property name="userTransaction">
          <bean class="com.atomikos.icatch.jta.UserTransactionImp" />
        </property>
    </bean>
    And finally the Spring Integration part with the message-driven-channel-adapter:

    Code:
    <int-jms:message-driven-channel-adapter id="slave.partition.request.channel.adapter"  
        channel="slave.partition.process.request.receive.channel" 
        container="slaveMessageListenerContainer" 
        acknowledge="transacted" />
    
    <bean id="slaveMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destinationName" value="${master.partition.jms.request.destinationname}" />
        <!-- change the transaction manager reference -->
        <property name="transactionManager" ref="jtaSlaveTxManager" />
        <property name="sessionTransacted" value="true" />
    </bean>
    Is everything set up correctly or am I missing something? If you need any other information, let me know.

    baeschti

  • #2
    Where are you doing the rollback?

    Comment


    • #3
      Here's the class in which I do the rollback:

      Code:
      public class BasePartitionAcceptor<T extends MasterSlaveMessagePayload>  implements PartitionAcceptor<T>, ApplicationContextAware {
      
       ApplicationContext applicationContext;
      
       public MasterSlaveMessage<T> accept(MasterSlaveMessage<T> msg) throws Exception {
      
          String serverNamePid = ManagementFactory.getRuntimeMXBean().getName();
          String[] serverNamePidArr = serverNamePid.split("@");
          msg.setSlaveServerPid(serverNamePidArr[0]);
          msg.setSlaveServerName(serverNamePidArr[1]);
      
          // For testing purposes
          JtaTransactionManager tx = (JtaTransactionManager) applicationContext.getBean("jtaSlaveTxManager");
          tx.getUserTransaction().rollback();
      
          msg.setState(PartitionStateEnum.ACCEPTED);
      
          getBatchApplicationServiceAdapter().updatePartitionState(msg);
      
          return msg;
       }
      
       @Override
       public void setApplicationContext(ApplicationContext applicationContext) {
          this.applicationContext = applicationContext;
       }
      }
      As you can see, I retrieve the JtaTransactionManager via the ApplicationContext and then do a rollback.

      Comment


      • #4
        Hi!
        JtaTransactionManager tx = (JtaTransactionManager) applicationContext.getBean("jtaSlaveTxManager");
        tx.getUserTransaction().rollback();
        Try this one:
        Code:
        TransactionAspectSupport.currentTransactionStatus().setRollbackOnly()
        -Artem

        Comment


        • #5
          Hi Artem.

          Thanks for your input. Unfortunately, the setRollbackOnly() call didn't help.

          Edit:

          I did some debugging and looked a little bit into TransactionAspectSupport.currentTransactionStatus( ). In the TransactionStatus Object I saw the following:

          As transactionManager I had org.springframework.jdbc.datasource.DataSourceTran sactionManager, not the expected JtaTransactionManager.
          The suspendedResources was org.springframework.transaction.support.AbstractPl atformTransactionManager with name=slaveMessageListenerContainer, which is where I reference my JtaTransactionManager as seen above in the config.

          Any ideas?
          Last edited by baeschti; Apr 25th, 2013, 03:57 AM.

          Comment


          • #6
            the setRollbackOnly() call didn't help
            Do you mean that in any case you message is acknowledge on MQ?
            Show, please, your message-flow: where you call BasePartitionAcceptor, how you call BatchJob.

            Comment


            • #7
              The message-flow looks like this:

              Code:
              <!-- Reading Messages from the Queue -->
              <int-jms:message-driven-channel-adapter id="slave.partition.request.channel.adapter"
                  channel="slave.partition.process.request.receive.channel" 
                  container="slaveMessageListenerContainer"
                  acknowledge="transacted" />
              
              <!-- Setting msg-status to ACCEPTED and updates DB -->
              <int:transformer id="slave.partition.incoming.acceptor" 
                  input-channel="slave.partition.process.request.receive.channel"
                  output-channel="slave.partition.process.request.accepted.channel" 
                  ref="slave.partition.acceptor" 
                  method="accept" />
              
              <bean id="slave.partition.acceptor" class="ch.mobi.batch.components.masterslave.slave.BasePartitionAcceptor" />
              
              <!-- ServiceActivaor which commits the current Transaction and starts the BatchJob -->
              <int:service-activator 
                  input-channel="slave.partition.process.request.accepted.channel"
                  output-channel="slave.partition.process.request.processed.channel">
                  <bean class="ch.mobi.batch.components.masterslave.slave.SlaveJobLaunchActivator" />
              </int:service-activator>
              From here, the message would passed on to other channels, depending on the return value of the BatchJob. Here is what the ServiceActivator in SlaveJobLaunchActivator looks like:

              Code:
              @ServiceActivator
              @Override
              public MasterSlaveMessage<T> launch(MasterSlaveMessage<T> msg) {
                  LOGGER.info("Job " + msg.getJobName() + " is starting ... PartitionNr: " + msg.getPartitionNumber());
              
                  Map<String, String> progArguments = new HashMap<String, String>(msg.getPayload().getProgramArguments());
                  Map<String, JobParameter> jobParameters = 
                        new HashMap<String, JobParameter>(msg.getPayload().getJobParameters());
              
                  progArguments.put(BatchProgramArgumentEnum.RUN_ID.getArgumentName(), msg.getRunId());
                  progArguments.put(BatchProgramArgumentEnum.CHUNK_ID.getArgumentName(), "" + msg.getPartitionNumber());
              
                  jobParameters.put(JobEnum.BATCH_PROCESS_ID_PARENT.toString(), 
                        new JobParameter(msg.getMasterProcessId()));
                  jobParameters.put(BatchProgramArgumentEnum.BATCH_SLAVE_PARTITION_ID.getArgumentName(), 
                        new JobParameter(msg.getId()));
              
                  putRangesIntoParameters(jobParameters, msg);
              
                  TransactionManager transactionManager = null;
                  JtaTransactionManager jtTxManager = null;
              
                  if (applicationContext != null) {
                    jtTxManager = (JtaTransactionManager) applicationContext.getBean("jtaSlaveTxManager");
                  }
              
                  if (jtTxManager != null) {
                    transactionManager = jtTxManager.getTransactionManager();
                  }
              
                  if (transactionManager != null) {
                    try {
                      transactionManager.commit();
                    }
                    catch (Exception e) {
                      LOGGER.error(e.getMessage(), e);
                    }
                  }
              
                  // BatchJob being started
                  int returnValue = launchJob(msg.getJobName(), progArguments, jobParameters);
                  LOGGER.info("Job " + msg.getJobName() + " has finished");
              
                  msg.setReturnCode(returnValue);
              
                  if (returnValue != 0) {
                    msg.setMessageType(MasterSlaveMessageTypeEnum.BATCHJOB_ERROR);
                    msg.setState(PartitionStateEnum.ERROR);
                  }
              
                  if (transactionManager != null) {
                    try {
                      transactionManager.begin();
                    }
                    catch (Exception e) {
                      LOGGER.error(e.getMessage(), e);
                    }
                  }
              
                  return msg;
              }

              Comment


              • #8
                Originally posted by baeschti View Post
                Hi Artem.

                Thanks for your input. Unfortunately, the setRollbackOnly() call didn't help.

                Edit:

                I did some debugging and looked a little bit into TransactionAspectSupport.currentTransactionStatus( ). In the TransactionStatus Object I saw the following:

                As transactionManager I had org.springframework.jdbc.datasource.DataSourceTran sactionManager, not the expected JtaTransactionManager.
                The suspendedResources was org.springframework.transaction.support.AbstractPl atformTransactionManager with name=slaveMessageListenerContainer, which is where I reference my JtaTransactionManager as seen above in the config.

                Any ideas?
                So, I got the DB update that is taking place to behave the way I expected. After the rollback() call, the changes from the DB Update are rolledback.

                The problem was that this whole thing is running in a small framework from our company. There was already a simple org.springframework.jdbc.datasource.DataSourceTran sactionManager defined in the context. So when I called the rollback() method, my JtaTransactionManager was suspended.

                The JMS Message and the "channel-flow" still ignore the rollback.

                Comment


                • #9
                  The JMS Message and the "channel-flow" still ignore the rollback.
                  Do you mean, that Db transaction is rollbacked, but not JMS?
                  How about to exclude usage of DataSourceTransactionManager ?
                  From other side you can simply:
                  Code:
                  throw new RuntimeException("intentional");
                  from you code and all transactions should be rollbacked.

                  If you want to have "unit of work" JMS+DB you have to use only one XA-transaction manager.

                  Comment


                  • #10
                    How about to exclude usage of DataSourceTransactionManager ?
                    That's what I did. Now, only my JtaTransactionManager is in use.

                    Do you mean, that Db transaction is rollbacked, but not JMS?
                    Yes. The DB Update is rolledback / commited correctly. The JMS Message though, is not redelivered to the Queue when I do a rollback.

                    If you want to have "unit of work" JMS+DB you have to use only one XA-transaction manager.
                    That's exactly what the desired design should look like. I thought that I acomplish this with the current config, but apparently it's not working. With the current configuration, shouldn't the XaDataSource and the JmsHandling be in the same Transaction?

                    Comment


                    • #11
                      is not redelivered to the Queue when I do a rollback
                      I recommend you go through the DEBUG on AbstractPollingMessageListenerContainer#receiveAnd Execute...

                      Comment


                      • #12
                        I did some debuging as you suggested. It gave me a much appreciated insight. So thanks for that.

                        receiveAndExecute() gets called as when the adapter is polling a message from the Queue. The JMS Session and everything looked what I would have expected. ActiveMQXAConnectionFactory and the DefaultMessageListenerContainer(=slaveMessageListe nerContainer) were both correct. Also the transacted flag was set.

                        Later down the road came also the following part and everything looked again like it should (at least as far I could tell).

                        Code:
                        TransactionSynchronizationManager.registerSynchronization(new JmsResourceSynchronization(resourceHolderToUse,
                            connectionFactory, resourceFactory.isSynchedLocalTransactionAllowed()));
                        resourceHolderToUse.setSynchronizedWithTransaction(true);
                        TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolderToUse);
                        But after that, the AbstractPollingMessageListenerContainer was never called again, even after rollback() or throwing a runTimeException, until the next message was polled out of the queue.

                        Edit: Do I have to configure the TransactionSynchronizationManager somehow, so that the transaction rolls back the JMS part?
                        Last edited by baeschti; Apr 25th, 2013, 08:49 AM.

                        Comment


                        • #13
                          I still couldn't solve my problem of rolling back the XA Transaction containing JDBC and JMS. So far only the JDBC part rolls back as desired. Could someone look through the configuration I provided and see if I have missed anything? Do I have to configure my channels another way or am I using wrong ActiveMQ connections?

                          It seems to me that the whole ActiveMQ/JMS part is not in the current Transaction. AbstractPollingMessageListenerContainer#receiveAnd Execute only gets called once when the jms-message is read from the queue. When a rollback occures, shouldn't the class AbstractPollingMessageListenerContainer be called again?

                          Any help or a simple sample configuration (XA with JDBC and JMS) would be highly appreciated.

                          Comment

                          Working...
                          X