Announcement Announcement Module
Collapse
No announcement yet.
JMS - DB - External System - DB Transcations Propagation Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • JMS - DB - External System - DB Transcations Propagation

    Consider the following scenario, assuming JMS and Datasource XA is enabled. A message listener (Spring bean) picks up a message from the queue. Then for each message one or more services are invoked which execute the following:

    1. read() from DB (DAO)
    2. invokeExtSystem() : external system
    3. insert() to DB (DAO)
    4. update() to DB (DAO)

    The goal is if any of the step 1-3 fail, then the transaction should be rollback (i.e the message should be put back to the queue) and step 4 should be always executed (assuming in a separate transcation). If step 4 fails then all steps should be rolledback (msg should be put back in the queue).

    Below you will se the message listener decleration. If the Implementatin of this is not marked with @Transcational Annotation, would it still create a new transcation on each message that it picks up, because of its configuration? (JTA transcation manager is available)

    Code:
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="destination"/>
        <property name="messageListener" ref="messageListener"/>
        <property name="transactionManager" ref="transactionManager"/>
    </bean>
    
        @Transcational
        public class ServiceA {
        public handleMsg(msg){
        String status = "failed";
        try{
        String status = ServiceB.registerMsg(msg);
        } catch (Exception e) {
        LOG.error(e);
        }
        DAO.update(status,msg);
        }
    
        @Transctional
        public class ServiceB{
        public int registerMsg(msg) {
        Object o = DAO.read(msg);
        int id = invokeExtSystem(msg, o); //throws unchecked Exception;
        DAO.insert(id, msg);
        return "success";
        }
    So, message listener invokes first the injected Service A and then Service A invokes service B.

    Will the sample code achieve the requirements? Please for your comments. The DAO's public methods are not marked as transcationals.
    Last edited by boom123; Mar 9th, 2013, 10:18 AM.

  • #2
    Calling one @Transactional method from another within the same class won't start a new transaction, even if the second one has propagation REQUIRES_NEW. @Transaction uses Spring AOP so internal method calls don't go through the proxy.

    To solve your problem you will need to wrap the second call in a new <bean/>, annotate it with @Transactional with propagation REQUIRES_NEW, and inject this bean into the listener.

    So, with this arrangement, you would need to wrap 1, 2, 3 in a try block and put #4 in a finally block. That way, #4 will always execute (in its own transaction) regardless of the success/failure of 1,2,3. If 4 throws an exception, both will be rolled back.
    Last edited by Gary Russell; Mar 9th, 2013, 11:08 AM.

    Comment


    • #3
      I edit my post. The JMS listener invokes Service A first. So you tell me, that I dont need two classes to achieve this Service A and B. Just one class with try catch and finally? But in this way i will have to rethrow the Exception (if any) from steps 1-3 for the transcation to rollback?! and then in the finally perfom the update?
      Can you pls provide a sample to make this more clear because im a bit confused. thanks

      Comment


      • #4
        No; put the try/finally in your listener...

        Code:
        public class MyMessageListener
            @Autowired
            private Service service;
        
            @Transactional
            public void onMessage(Message m) {
                try {
                   service.methodA()
                }
                finally {
                    service.methodB();
                }
            }
        }
        
        public class Service {
        
            public void methodA() {
                read()
                callExtSystem()
                insert()
            }
        
            @Transactional(propagation=Propagation.REQUIRES_NEW)
            public void methodB() {...}
        }
        If you catch the exception in the listener (e.g. if you want to log it, or you want to communicate the type of failure to methodB), then, yes, you'll have to re-throw it. But, regardless, the call to methodB will execute (in its own transaction), while an exeption thrown anywhere (including methodB) will cause the "outer" transaction to roll back.

        Hope that helps.

        Comment


        • #5
          Thanks for your example. But I have a few more questions... methodB() should take as argument the result from methodA() (see my example in the first post except the case of an exception...) so in your example I will have to implement some business logic inside the messageListener which i though is a good practice to keep it clean and move all the business logic to the services (In my case the messageListener acts as a Controller..right? ).
          The exception from methodA() should always be logged so re-throwing the exception is unavoidable....which is consider by many a bad practice.
          Please for your comments

          Code:
          public class MyMessageListener
              @Autowired
              private Service service;
          
              @Transactional
              public void onMessage(Message m) {  
                     service.methodA(m); 
          }
          
          public class ServiceA {
          
              @Transactional(propagation=Propagation.REQUIRES_NEW)
              public void methodA() {
                  try{
                  String res = ServiceB.methodB(m);
                } catch {
                    log.(e); //i guess I do not need to rethrow the exception, because the transcation has been rollbacked when methodB failed and the msg is back to the queue
                   }
                  updateDB(res); //if this fail everything will be rollbacked?
          
              }
          
          public class ServiceB {
              @Transactional
              public void methodB(String m)  {
                  read()
                  callExtSystem() //this can throw unchecked exception
                  insert()
            }
          }
          Is my understanding correct? Or do you think is an overkill to implement this in two different services?
          Last edited by boom123; Mar 9th, 2013, 12:17 PM.

          Comment


          • #6
            No, that won't work - all of the updates are done within the scope of the same transaction so you need to do what I suggested. If you don't want the logic in the listener, you will need another bean to wrap the logic that I put in the listener in my example. You must do updateDB(res) in a NEW transaction, and you must rethrow the exception.

            Comment


            • #7
              Gary thanks a lot for your time. Below You can see a complete example on how i managed to implement this, but I am not sure why its working...that why i am still asking for your help...to make it clear or find a simpler solution. maybe the one u have proposed.
              Bear in mind that the JMS/Datasource are full-XA 2PC and are managed by the Weblogic JTA Transaction manager, which might have a different handling on the transactions...?

              1. The message listener is not transactional annotated...(but it seems that is able to participate in transactions, maybe because of the definition of the JMS container?)
              2. If any unchecked exception is thrown from ServiceB, the transaction gets rollbacked and the message is put back to the JMS queue
              3. The exception is the caught in serviceA, logged and the dao.update(res) is happening in a new transaction
              4. If dao.update(res) fails, everything will get roll-backed

              I confirm that the above works, but I don't quite understand this behaviour, it seems that the JMS container listens to any transaction that is being roll-backed at any time?

              Please for comments and suggestions.

              Code:
              public class MyMessageListener
                  @Autowired
                  private ServiceA serviceA;
              
                  //I am not declaring this as a transcational.
                  public void onMessage(Message m) {  
                         serviceA.methodA(m); 
              }
              
              public class ServiceA {
              
                   @Autowired
                  private ServiceB serviceB;
              
                   @Autowired
                  private DAO dao;
              
                  @Transactional(propagation=Propagation.REQUIRES_NEW)
                  public void methodA() {
                      try{
                          String res = ServiceB.methodB(m);
                    } catch (Exception e) {
                        log.(e); 
                       }
                      dao.update(res);
                  }
              
              
              public class ServiceB {
              
                  @Autowired
                  private DAO dao;
              
                  @Transactional
                  public void methodB(String m)  {
                      dao.read()
                      callExtSystem() //this can throw unchecked exception
                      dao.insert()
                }
              
              public class DAO {
              
                     @Transactional
                     public read() {}
              
                     @Transactional
                     public insert(){}
              
                      @Transactional(propagation=Propagation.REQUIRES_NEW)
                     public update(){}
              }
              Code:
               <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                  <property name="connectionFactory" ref="connectionFactory"/>
                  <property name="destination" ref="destination"/>
                  <property name="messageListener" ref="messageListener"/>
                  <property name="transactionManager" ref="transactionManager"/> //Weblogic JTA Manager
                  <property name="concurrentConsumers" value="10"/> 
              </bean>
              
              <tx:jta-transaction-manager/>
              P.S
              I am pretty sure that i have overused the Transactional annotations...
              I am not sure how this will behave if I remove the @Transactional annotations from the DAO layer.


              edit
              ----
              As per my understanding, when ServiceB , throws an unchecked Exception then the transaction is marked for rollback...so the msg goes back to the quee and everything inside methodB is rollbacked. After this, the exception is being caught in the outer level where an invocation is made to the dao.update() method which is marked with @Propagation.REQUIRES_NEW therefore that update should always executed in a new transaction....
              Last edited by boom123; Mar 10th, 2013, 06:31 AM.

              Comment


              • #8
                But that isn't what you asked for.

                By putting the REQUIRES_NEW on methodA, the database stuff will be performed in a different transaction to the JMS. The fact that it looks like it's working is misleading.

                Like I said dao.update(res); must be the ONLY thing in a REQUIRES_NEW transactional method if you want it to commit regardless of the success/failure of the "outer" transaction.

                If you turn on TRACE level logging for org.springframework.transaction, you'll see log messages that describe the transaction flow.

                Comment


                • #9
                  Originally posted by Gary Russell View Post
                  But that isn't what you asked for.

                  By putting the REQUIRES_NEW on methodA, the database stuff will be performed in a different transaction to the JMS. The fact that it looks like it's working is misleading.

                  Like I said dao.update(res); must be the ONLY thing in a REQUIRES_NEW transactional method if you want it to commit regardless of the success/failure of the "outer" transaction.

                  If you turn on TRACE level logging for org.springframework.transaction, you'll see log messages that describe the transaction flow.
                  I have tried to debug. the following code, please see the logs below. It works as it suppose...the initial transaction of invocation serviceB.methodA(m); is rollbacked in case of exceptio and the serviceB.methodB(m); is executed in a new trascaction and commits. after the resume trascaction is expired and nothing gets commited from that part.
                  I'm not sure thought if i am violating the JTA TX manager with all these...
                  Nevertheless, as you can see in the logs the initial tx expires without the need to rethrow the Exception(s)...is this expected with the below implementation?

                  Please for your valuable comments.

                  Code:
                  public class MyMessageListener
                      @Autowired
                      private Service service;
                  
                      @Transactional
                      public void onMessage(Message m) {  
                             service.methodA(m); 
                  }
                  
                  public class ServiceA {
                  
                      @Autowired
                      private ServiceB serviceB;
                      
                     @Transactional
                      public void methodA() {
                          try{
                          String res = serviceB.methodA(m);
                        } catch (Exception e) {
                            log.(e); 
                           }
                           serviceB.methodB(m);
                  
                      }
                  
                  public class ServiceB {
                      @Transactional
                      public void methodA(String m)  {
                          read()
                          callExtSystem() //this can throw unchecked exception
                          insert()
                    }
                  
                  
                   @Transactional(propagation=Propagation.REQUIRES_NEW)
                      public void methodB(String m)  {
                        update()
                    }
                  }
                  Last edited by boom123; Mar 11th, 2013, 01:44 PM.

                  Comment


                  • #10
                    LOGS

                    Code:
                    11 Mar 2013 16:53:24,119 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Participating transaction failed - marking existing transaction as rollback-only
                    11 Mar 2013 16:53:24,119 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Participating transaction failed - marking existing transaction as rollback-only
                    11 Mar 2013 16:53:24,119 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Setting JTA transaction rollback-only
                    11 Mar 2013 16:53:24,119 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Setting JTA transaction rollback-only
                    11 Mar 2013 16:53:24,119 ERROR SubmissionServiceImpl [jmsContainer-1]: HRS - Unable to register document: eu.europa.ec.agri.jcore.hrs.client.exception.HrsSoapFaultException: HRS returned a SoapFault indicating an error condition. Details for this error: ....		
                    11 Mar 2013 16:53:24,119 DEBUG AnnotationTransactionAttributeSource [jmsContainer-1]: Adding transactional method [updateSubmissionStatus] with attribute [PROPAGATION_REQUIRES_NEW,ISOLATION_DEFAULT]
                    11 Mar 2013 16:53:24,119 DEBUG AnnotationTransactionAttributeSource [jmsContainer-1]: Adding transactional method [updateSubmissionStatus] with attribute [PROPAGATION_REQUIRES_NEW,ISOLATION_DEFAULT]
                    11 Mar 2013 16:53:24,119 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Suspending current transaction, creating new transaction with name [eu.europa.ec.eac.eforms.services.HermesService.updateSubmissionStatus]
                    11 Mar 2013 16:53:24,119 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Suspending current transaction, creating new transaction with name [eu.europa.ec.eac.eforms.services.HermesService.updateSubmissionStatus]
                    11 Mar 2013 16:53:24,119 DEBUG DataSourceUtils [jmsContainer-1]: Returning JDBC Connection to DataSource
                    11 Mar 2013 16:53:24,119 DEBUG JdbcTemplate    [jmsContainer-1]: Executing prepared SQL update
                    11 Mar 2013 16:53:24,119 DEBUG JdbcTemplate    [jmsContainer-1]: Executing prepared SQL statement [UPDATE SUBMISSION_DATA_TEST SET STATUS = ? WHERE SUBMISSION_ID = ?]
                    11 Mar 2013 16:53:24,119 DEBUG DataSourceUtils [jmsContainer-1]: Fetching JDBC Connection from DataSource
                    11 Mar 2013 16:53:24,276 DEBUG DataSourceUtils [jmsContainer-1]: Registering transaction synchronization for JDBC Connection
                    11 Mar 2013 16:53:24,291 DEBUG JdbcTemplate    [jmsContainer-1]: SQL update affected 1 rows
                    11 Mar 2013 16:53:24,291 DEBUG DataSourceUtils [jmsContainer-1]: Returning JDBC Connection to DataSource
                    11 Mar 2013 16:53:24,291 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Initiating transaction commit
                    11 Mar 2013 16:53:24,291 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Initiating transaction commit
                    11 Mar 2013 16:53:24,307 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Resuming suspended transaction
                    11 Mar 2013 16:53:24,307 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Resuming suspended transaction
                    11 Mar 2013 16:53:24,307 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Standard JTA resume threw InvalidTransactionException: Attempt to resume an inactive transaction: BEA1-79D4C717C3AE3057A4BD - trying WebLogic JTA forceResume
                    11 Mar 2013 16:53:24,307 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Standard JTA resume threw InvalidTransactionException: Attempt to resume an inactive transaction: BEA1-79D4C717C3AE3057A4BD - trying WebLogic JTA forceResume
                    11 Mar 2013 16:53:24,307 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Initiating transaction commit
                    11 Mar 2013 16:53:24,307 DEBUG WebLogicJtaTransactionManager [jmsContainer-1]: Initiating transaction commit
                    11 Mar 2013 16:53:24,322 INFO  DefaultMessageListenerContainer [jmsContainer-1]: Setup of JMS message listener invoker failed for destination 'JMSSystemModule!HermesQueue' - trying to recover. Cause: JTA transaction unexpectedly rolled back (maybe due to a timeout); nested exception is weblogic.transaction.RollbackException: setRollbackOnly called on transaction
                    org.springframework.transaction.UnexpectedRollbackException: JTA transaction unexpectedly rolled back (maybe due to a timeout); nested exception is weblogic.transaction.RollbackException: setRollbackOnly called on transaction
                    	at org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1031)
                    	at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:732)
                    	at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:701)
                    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
                    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
                    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:974)
                    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:876)
                    	at java.lang.Thread.run(Thread.java:662)
                    Caused by: weblogic.transaction.RollbackException: setRollbackOnly called on transaction
                    	at weblogic.transaction.internal.TransactionImpl.throwRollbackException(TransactionImpl.java:1884)
                    	at weblogic.transaction.internal.ServerTransactionImpl.internalCommit(ServerTransactionImpl.java:376)
                    	at weblogic.transaction.internal.ServerTransactionImpl.commit(ServerTransactionImpl.java:267)
                    	at weblogic.transaction.internal.TransactionManagerImpl.commit(TransactionManagerImpl.java:307)
                    	at org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1028)
                    	... 7 more
                    Caused by: weblogic.transaction.internal.AppSetRollbackOnlyException: setRollbackOnly called on transaction
                    	at weblogic.transaction.internal.TransactionImpl.setRollbackOnly(TransactionImpl.java:555)
                    	at weblogic.transaction.internal.TransactionManagerImpl.setRollbackOnly(TransactionManagerImpl.java:340)
                    	at org.springframework.transaction.jta.JtaTransactionManager.doSetRollbackOnly(JtaTransactionManager.java:1082)
                    	at org.springframework.transaction.support.AbstractPlatformTransactionManager.processRollback(AbstractPlatformTransactionManager.java:831)
                    	at org.springframework.transaction.support.AbstractPlatformTransactionManager.rollback(AbstractPlatformTransactionManager.java:800)
                    	at org.springframework.transaction.interceptor.TransactionAspectSupport.completeTransactionAfterThrowing(TransactionAspectSupport.java:339)
                    	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110)
                    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
                    	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
                    	at $Proxy112.registerDocument(Unknown Source)
                    	at eu.europa.ec.eac.eforms.services.impl.SubmissionServiceImpl.handleSubmission(SubmissionServiceImpl.java:27)
                    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
                    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
                    	at java.lang.reflect.Method.invoke(Method.java:597)
                    	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:307)
                    	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
                    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
                    	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:106)
                    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
                    	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
                    	at $Proxy113.handleSubmission(Unknown Source)
                    	at eu.europa.ec.eac.eforms.hrs.queue.SubmissionListener.onMessage(SubmissionListener.java:31)
                    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
                    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
                    	at java.lang.reflect.Method.invoke(Method.java:597)
                    	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:307)
                    	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
                    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
                    	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:106)
                    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
                    	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
                    	at $Proxy114.onMessage(Unknown Source)
                    	at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:543)
                    	at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:482)
                    	at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:451)
                    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
                    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:241)
                    	... 4 more
                    11 Mar 2013 16:53:24,322 INFO  DefaultMessageListenerContainer [jmsContainer-1]: Successfully refreshed JMS Connection

                    Comment

                    Working...
                    X