Announcement Announcement Module
Collapse
No announcement yet.
JMS & JTA & Spring Integration Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • JMS & JTA & Spring Integration

    I have been attempting to configure spring integration to use JMS as its "backing" messaging system and so far I have the sending working correctly. It sends the message in a JTA transaction, and all works well. On the receiving end is where things break down. I have not been able to receive a message within a JTA transaction, I can get it to work if I disable all of the JTA-ness (changing the ConnectionFactory and changing configuration).

    I guess my line of thinking is that the MessageListener would subscribe as transactional to the session, put that on the DirectChannel, still in the transaction, I could do my processing and then everything would work. But I can't figure out how to get the underlying message listener to subscribe in a transactional way.

    my configuration
    Code:
        <message-bus/>
        <annotation-driven/>
    
        <jee:jndi-lookup jndi-name="jms/XAConnectionFactory" id="connectionFactory" resource-ref="true"/>
        <jee:jndi-lookup jndi-name="jms/queues/queue id="queue" resource-ref="true"/>
    
        <direct-channel id="toQueue" />
        <direct-channel id="fromQueue" source="fromQueueSource"/>
    
        <jms-target id="toQueueTarget" jms-template="toQueueJMSTemplate"  />
        <jms-source id="fromQueueSource" jms-template="fromQueueJMSTemplate"/>
    
        <target-endpoint  input-channel="toQueue" target="toQueueTarget" />
    
        <source-endpoint source="fromQueueSource" channel="fromQueue">
            <schedule period="50"/>
        </source-endpoint>
    
    
        <beans:bean id="toQueueJMSTemplate" class="org.springframework.jms.core.JmsTemplate">
            <beans:property name="connectionFactory" ref="connectionFactory"/>
            <beans:property name="deliveryPersistent" value="true"/>
            <beans:property name="sessionTransacted" value="true"/>
            <beans:property name="defaultDestination" ref="queue"/>
            <beans:property name="messageConverter" ref="simpleMessageConverter"/>
        </beans:bean>
    
         <beans:bean id="fromQueueJMSTemplate" class="org.springframework.jms.core.JmsTemplate">
            <beans:property name="connectionFactory" ref="connectionFactory"/>
            <beans:property name="deliveryPersistent" value="true"/>
            <beans:property name="sessionTransacted" value="true"/>
            <beans:property name="defaultDestination" ref="queue"/>
            <beans:property name="messageConverter" ref="simpleMessageConverter"/>
        </beans:bean>
    
        <beans:bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
    Thanks,

    Chris

  • #2
    I'm not sure how to do JTA transactions with just JmsTemplate, but you can look at DefaultMessageListener using Spring's Message listener containers

    Comment


    • #3
      In Spring Integration, you can use the "jms-gateway" instead of "jms-source" in order to have a MessageListenerContainer-based approach (instead of the jmsTemplate.receive() calls). For example, you could try the following:
      Code:
      <jms-gateway id="jmsGateway"
                    destination="someQueue"
                    request-channel="someChannel"
                    expect-reply="false"/>

      Comment


      • #4
        Problems and some fixes

        On the suggestion of using the jmsGateway xml setup it does not appear to be calling start or initialize, (tried setting breakpoints) therefore its never subscribing to the JMS queue.

        So I attempted just to create it as a bean as follows.
        Code:
         <beans:bean id="jmsGateWay" class="org.springframework.integration.adapter.jms.JmsGateway" init-method="start" destroy-method="destroy">
                <beans:property name="connectionFactory" ref="connectionFactory"/>
                <beans:property name="requestChannel" ref="channel"/>
                <beans:property name="destination" ref="queue"/>
                <beans:property name="expectReply" value="false"/>
                <beans:property name="sessionTransacted" value="true"/>
            </beans:bean>
        This causes me to get an exception
        Code:
        DefaultMessageListenerContainer-2 DefaultMessageListenerContainer INFO  - Setup of JMS message listener invoker failed - trying to recover
        javax.jms.JMSException: JTA transaction required for JtaMessageConsumer
        	at com.atomikos.jms.DefaultJtaMessageConsumer.enlist(Unknown Source)
        	at com.atomikos.jms.DefaultJtaMessageConsumer.receive(Unknown Source)
        	at com.atomikos.jms.DefaultJtaMessageConsumer.receive(Unknown Source)
        	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:404)
        	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:307)
        	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:260)
        	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:944)
        	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:868)
        	at java.lang.Thread.run(Thread.java:613)
        This is caused by DefaultMessageListenerContainer / AbstractPollingMessageListenerContainer not having a reference to the transactionManager and there being no way to pass the transactionManager through the jmsGateway (since jmsGateway by default creates an instance of DefaultMessageListenerContainer) and jmsGateway does not have a transactionManager property.

        Therefore I finally change the code to this.
        Code:
        <beans:bean id="jmsGateWay" class="org.springframework.integration.adapter.jms.JmsGateway" init-method="start" destroy-method="destroy">
                <beans:property name="connectionFactory" ref="connectionFactory"/>
                <beans:property name="requestChannel" ref="channel"/>
                <beans:property name="destination" ref="queue"/>
                <beans:property name="expectReply" value="false"/>
                <beans:property name="sessionTransacted" value="true"/>
                <beans:property name="container" ref="messageListenerContainer"/>
            </beans:bean>
        
            <beans:bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                <beans:property name="connectionFactory" ref="connectionFactory"/>
                <beans:property name="destination" ref="queue"/>
                <beans:property name="transactionManager" ref="transactionManager"/>
                <beans:property name="sessionTransacted" value="true"/>
            </beans:bean>
        and my test finally passes.

        So two suggestions.
        1. Make sure jmsGateway calls setup and destroy methods (as needed), or prove me wrong - I could have easily screwed up my configuration.
        2. Add the transactionManager property to jmsGateway (xml schema and object) and pass it through to the DefaultMessageListenerContainer.

        This has been posted jira as INT-237

        Thanks,

        Chris

        Comment


        • #5
          Chris,

          (first, what version of Spring Integration are you using?)

          The JmsGateway should be calling start(). This is the expected behavior:
          1) A "jms-gateway" element is parsed
          2) the parser registers an instance of JmsGateway (which implements Lifecycle)
          3) when the ApplicationContext instantiation process completes it fires an event
          4) MessageBus receives the event, and if 'auto-startup' is true (the default), it will start()
          5) any endpoints that implement Lifecycle will be started from the MessageBus' start() method
          6) a JmsGateway will start() its MessageListener container

          I would be interested if you could double-check the breakpoint knowing that order of events.

          The configuration of a transaction-manager reference for the 'jms-gateway' element is definitely needed - as well as some other configuration parameters.

          Thanks,
          Mark

          Comment


          • #6
            My current version from the manifest is
            Integration - Bundle-Version: 1.0.0.M4
            Spring - Spring-Version: 2.5.4

            1) Happens
            2) Something funky appears to be happening here. LifeCycle start methods don't appear to be getting called. The MessageBus start is called from the onApplicationEvent method which is why it starts up successfully. But I still never see JmsGateway.start() get called and I only see AbstractApplicationContext.getLifeCycleBeans() get called during shutdown and I never see AbstractApplicationContext.start() get called but I am not fore sure if its really supposed to (is it only used for sub contexts?)
            3) Happens
            4) Happens though before I didn't have auto-startup set to true
            5) See that
            6) JmsGateway.start() never gets called

            So I don't know if this is a problem with spring or spring-integration

            Comment


            • #7
              One of the things I did was do a search for calls to start and I didn't find many and none in the test case framework.

              Could part of the problem the usage of context.refresh() vs context.start() and how spring integration is using context.start()?


              Chris

              Comment


              • #8
                Chris,

                Thanks for reporting this. The issues have now been resolved. See the brief description in the JIRA comments here: http://jira.springframework.org/browse/INT-237

                Since the source had been refactored to a "gateway" it was no longer being automatically recognized by the MessageBus. Now, the MessageBus also recognizes any gateway within the application context that also implements Lifecycle. So, when the MessageBus starts (after the application context refresh event), it will also start the JmsGateway (which in turn starts the MessageListener container).

                Also, the <jms-gateway/> element now supports a "transaction-manager" attribute to reference the bean name of your PlatformTransactionManager instance.

                Whenever you get a chance, please try out the SVN head version (r661 or later) and let us know how it goes.

                Thanks again,
                Mark

                Comment

                Working...
                X