Announcement Announcement Module
Collapse
No announcement yet.
TransactionAwareConnectionFactoryProxy under JTA (Atomikos) & ActiveMQ Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • TransactionAwareConnectionFactoryProxy under JTA (Atomikos) & ActiveMQ

    Hi,
    I set up a junit testcase (standalone application) using:
    • Spring v. 3.1.0.RELEASE
    • Atomikos JTA implementation v. 3.8.0
    • ActiveMQ JMS provider v 5.6.0
    This testcase runs a bean method under container-managed transaction which just sends a JMS message to some destination.
    I am using org.springframework.jms.connection.TransactionAwar eConnectionFactoryProxy for accessing JMS connection.
    I am facing two problems:
    1. JMS connection obtained thru connectionFactory.createConnection is not closed (and returned into pool) on transaction completion
    2. Invoking close on JMS connection in bean method delegates method invocation to underlying connection even if the transaction is still active and subsequent JTA transaction commit fails (I was looking into TransactionAwareDataSourceProxy and this does not happen there, connection is released on transaction completion).

    Please anyone has an idea what i am doing wrong (in code, configuration) or generally where the problem could be. Below is my code and spring configuration file...

    Thank you

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
      xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
    http://activemq.apache.org/schema/core
    http://activemq.apache.org/schema/core/activemq-core.xsd">
    
      <context:component-scan base-package="cz.gpe.spring.preview.tx.jms.cfp">
        <context:include-filter type="annotation" expression="javax.ejb.Stateless" />
      </context:component-scan>
    
      <!-- enable the configuration of transactional behavior based on annotations -->
      <tx:annotation-driven transaction-manager="txManager" />
    
      <!-- ActiveMQ JMS Broker -->
      <amq:broker id="my-broker" useJmx="false" persistent="false">
        <amq:transportConnectors>
          <amq:transportConnector uri="tcp://localhost:61617" />
        </amq:transportConnectors>
      </amq:broker>
      <!-- ActiveMQ JMS XA CF -->
      <bean id="amq.connectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory" depends-on="my-broker">
        <property name="brokerURL" value="tcp://localhost:61617?jms.redeliveryPolicy.maximumRedeliveries=2" />
      </bean>
    
      <!-- ActiveMQ JMS CF -->
      <bean id="amq.test.connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="my-broker">
        <property name="brokerURL" value="tcp://localhost:61617?jms.redeliveryPolicy.maximumRedeliveries=2" />
      </bean>
    
      <!-- The Atomikos JTA-enabled ConnectionFactory, configured with the vendor's XA factory. -->
      <bean id="atomikosConnectionFactoryBean" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close" depends-on="my-broker">
        <!-- The unique resource name needed for recovery by the Atomikos core. -->
        <property name="uniqueResourceName">
          <value>JMS_BROKER_CFP</value>
        </property>
        <property name="xaConnectionFactory" ref="amq.connectionFactory" />
        <property name="localTransactionMode" value="false" />
      </bean>
    
      <!-- Spring transaction-aware CF proxy, this is what programmer physically uses in a code -->
      <bean name="jms/connectionFactory" class="org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy">
        <property name="targetConnectionFactory" ref="atomikosConnectionFactoryBean" />
        <property name="synchedLocalTransactionAllowed" value="true" />
      </bean>
    
      <bean name="jms/queue" class="org.apache.activemq.command.ActiveMQQueue">
        <property name="physicalName">
          <value>SPRING_PREVIEW_QUEUE</value>
        </property>
      </bean>
    
      <bean id="transactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransaction">
        <!-- IMPORTANT: disable startup because the userTransactionService above does this -->
        <property name="startupTransactionService" value="false" />
    
        <!-- when close is called, should we force transactions to terminate or not? -->
        <property name="forceShutdown" value="false" />
      </bean>
    
      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory">
          <ref bean="atomikosConnectionFactoryBean" />
        </property>
        <property name="defaultDestination">
          <ref bean="jms/queue" />
        </property>
        <property name="sessionTransacted" value="true" />
      </bean>
    
      <bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
        <property name="transactionTimeout" value="30000" />
      </bean>
    
      <bean id="txManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="userTransaction" ref="userTransaction" />
        <property name="transactionManager" ref="transactionManager" />
      </bean>
    </beans>
    Code:
    @Stateless
    @Local
    @TransactionAttribute
    public class ConnectionFactoryProxyBean {
    
    	@Resource(name = "jms/queue")
    	private Destination destination;
    
    	@Resource(name = "jms/connectionFactory")
    	private ConnectionFactory cf;
    	
    	@Resource
    	private JmsTemplate jmsTemplate;
    
    	public void sendMessage(final String message) {
    		Connection connection = null;
    		try {
    			connection = cf.createConnection();
    			Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    			MessageProducer producer = session.createProducer(destination);
    			producer.send(session.createTextMessage(message));
    		}
    		catch (JMSException e) {
    			throw new RuntimeException(e);
    		}
    		finally {
    			if (connection != null) {
    				try {
    				    //if i close the connection here i get "org.springframework.jms.connection.SynchedLocalTransactionFailedException: Local JMS transaction failed to commit; nested exception is javax.jms.IllegalStateException: Session was closed already - calling commit is no longer allowed." when the JTA transaction is commited. See the stacktrace below...
                                       connection.close();
    				}
    				catch (Exception e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    }
    Stacktrace for problem #2
    Code:
    org.springframework.jms.connection.SynchedLocalTransactionFailedException: Local JMS transaction failed to commit; nested exception is javax.jms.IllegalStateException: Session was closed already - calling commit is no longer allowed.
    	at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:407)
    	at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:1)
    	at org.springframework.transaction.support.ResourceHolderSynchronization.afterCommit(ResourceHolderSynchronization.java:79)
    	at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:133)
    	at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:121)
    	at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:950)
    	at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:796)
    	at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:723)
    	at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:393)
    	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:120)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    	at org.springframework.aop.framework.Cglib2AopProxy$DynamicAdvisedInterceptor.intercept(Cglib2AopProxy.java:622)
    	at cz.gpe.spring.preview.tx.jms.cfp.ConnectionFactoryProxyBean$$EnhancerByCGLIB$$50a8d85e.sendMessage(<generated>)
    ....
    Caused by: javax.jms.IllegalStateException: Session was closed already - calling commit is no longer allowed.
    	at com.atomikos.jms.AtomikosJmsXaSessionProxy.invoke(AtomikosJmsXaSessionProxy.java:117)
    	at $Proxy19.commit(Unknown Source)
    	at org.springframework.jms.connection.JmsResourceHolder.commitAll(JmsResourceHolder.java:181)
    	at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:404)
    	... 43 more

  • #2
    Same problem here, is there any solution to this?

    Comment


    • #3
      Originally posted by borkert View Post
      Same problem here, is there any solution to this?
      I dont have any...

      Comment

      Working...
      X