Announcement Announcement Module
Collapse
No announcement yet.
JMS Message Driven Channel Adapter + Transactional Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • JMS Message Driven Channel Adapter + Transactional

    The jms container sends the message in a transaction /sessionTransacted = true

    Code:
      <jms:message-driven-channel-adapter id="Reader"
                                            container="QueueContainer"
                                            message-converter="MessageConverter"
                                            channel="Process"/>
    In between there is a router that sends to different other multiple jms outbound channel adapters which uses different message-converters.

    If one of the adapters fails, the message starts from the beginning again. But what happens to the message that is already sent to the jms outbound channel adapters? Does it rollback? Looks like it is sending messages again and again. Is this expected behaviour?

  • #2
    Originally posted by srikanthradix View Post
    The jms container sends the message in a transaction /sessionTransacted = true

    Code:
      <jms:message-driven-channel-adapter id="Reader"
                                            container="QueueContainer"
                                            message-converter="MessageConverter"
                                            channel="Process"/>
    In between there is a router that sends to different other multiple jms outbound channel adapters which uses different message-converters.

    If one of the adapters fails, the message starts from the beginning again. But what happens to the message that is already sent to the jms outbound channel adapters? Does it rollback? Looks like it is sending messages again and again. Is this expected behaviour?
    This is the expected behavior...you eventually want the message to be delivered, right?

    Comment


    • #3
      Yes, but not duplicates.

      Yes, I eventually want the message to be delivered but only once to each destination.

      Transacted Message comes in
      1) goes through Router to first destination (sends the message to destination)
      2) goes through Router to second destination (fails before sending message again).

      Rollback transaction
      1) goes through Router to first destination "again". So, the first destination winds up having multiple messages sent(which results in a duplicate)
      I thought when the message is rolledback, the message is removed from the destination Queue (because I thought session.commit should occur).

      So, are you saying first destination will have duplicates if the message is rolledback? We are seeing duplicate messages in the destination Queue though.

      What if we don't want to send the message to the destination Queue(s) more than once but should send them once?

      Comment


      • #4
        I'd have to see exactly what you are doing but if you are sending all of the messages in the same transaction, if one fails, all previous sends should roll back.

        Based on what you are saying, this doesn't appear to be the case. This likely means you have something misconfigured.

        Note that if you are sending these messages to different providers you will need to use XA transactions for this to work correctly.

        Comment


        • #5
          Our current configuration is as follows

          Code:
              
              /** Configuration for Message-Driven-Channel-Adapter Inbound **/
              
              <bean id="common.topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
                      abstract="true">
                  <property name="connectionFactory" ref="common.topicConnectionFactory"/>
                  <property name="subscriptionDurable" value="true"/>
                  <property name="pubSubDomain" value="true"/>
                  <property name="sessionTransacted" value="true"/>
              </bean>    
              
                  <bean id="lb.broadcastTopicContainer" parent="common.topicListenerContainer">
              		<property name="destination" ref="common.broadcastTopic"/>
              		<property name="durableSubscriptionName" value="${mt2.jms.lbdurablename}"/>		
              		<property name="clientId" value="${mt2.jms.lbdurableclientid}"/>
          	</bean>
              
              
                <jms:message-driven-channel-adapter id="lb.receiver" 
                  	container="lb.broadcastTopicContainer"
                  	channel="lb.fromBroadcast"
              	message-converter="lb.mt2MessageConverter"/>    
              	
              	
              	/** End of configuration of Inbound adapter **/
              	
              	/** Router Configuration **/
              	<si:router input-channel="lb.fromBroadcast" ref="lb.messageTypeRouter" method="route" />
          	
          	    <!-- The bean that will route messages based on type -->
          	    <bean id="lb.messageTypeRouter" class="com.xx.TypeListRouter">
          	        <property name="name" value="lb.messageTypeRouter" />
          	        <property name="destinations">
          	            <map>
          	                <entry key="NEW_ORDER">
          	                    <list>
          	                        <value>lb.eProcessing</value>
          	                        <value>lb.aProcessing</value>
          	                        <value>lb.bProcessing</value>
          	                    </list>
          	                </entry>
          		    </map>
          	    </bean>
                  </si:router>
                  
                  /** End of Router configuration **/
                  
                  
          
          	/** JMS Outbound for processing in-between **/
          	
          	....After some processing the channel lb.eProcessing will put the message into lb.toEProcess
          
          
              <jms:outbound-channel-adapter channel="lb.toEProcess"
                  jms-template="lb.toEJMSTemplate" />
                  
              <bean id="lb.toEJMSTemplate" parent="lb.eQueueJmsTemplate">
                  <property name="defaultDestination" ref="common.EProcessSendQueue" />
              </bean>
              
              <bean id="lb.eQueueJmsTemplate" abstract="true" parent="common.queueJmsTemplate">
                      <property name="messageConverter" ref="lb.mtPreserveUndeliveredMessageConverter" />
                      <property name="explicitQosEnabled" value="true" />
                      <property name="timeToLive" value="1000" />
              </bean>
              
            
              
              ....After some processing the channel lb.aProcessing and lb.bProcessing will put the message into lb.toSB
              
              
              <jms:outbound-channel-adapter channel="lb.toSB"
                  jms-template="legacy-backend.toSBJMSTemplate" />
              
              <bean id="lb.toSBJMSTemplate" parent="lb.mtQueueJmsTemplate">
                      <property name="defaultDestination" ref="common.stm" />
              </bean>
              
              <bean id="lb.mtQueueJmsTemplate" abstract="true" parent="common.queueJmsTemplate">
                      <property name="messageConverter" ref="lb.mtMessageConverter" />
              </bean>
              
               <bean id="lb.mtQueueJmsTemplate" abstract="true" parent="common.queueJmsTemplate">
                      <property name="messageConverter" ref="lb.mtMessageConverter" />
               </bean>
          
          
          	
          All of them use the same queueJmsTemplate
          
              	<bean id="common.queueJmsTemplate" abstract="true" parent="common.baseJmsTemplate">
              		<property name="connectionFactory" ref="common.queueConnectionFactory" />
          	</bean>
          	
          	<bean id="common.queueConnectionFactory" parent="common.userCredentialsConnectionFactoryAdapter" lazy-init="true">
          		<property name="targetConnectionFactory">
          			<bean class="com.tibco.tibjms.TibjmsQueueConnectionFactory">
          				<constructor-arg index="0"
          					value="${ems.queue.conn.factory.url}" />
          			</bean>
          		</property>
          	</bean>
          	
          	<bean id="common.userCredentialsConnectionFactoryAdapter"
          			class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter"
          			abstract="true">
          			<property name="username" value="${ems.username}" />
          			<property name="password" value="${ems.password}" />
          	</bean>
          Please let us know where and how should we use XA Transactional boundaries

          Comment


          • #6
            You have no transaction manager attached to your message listener container. That's your primary problem.

            I'm not sure why you have a custom user credentials adapter for your tibco connection factory. You can set your credentials directly on the tibco connection factory bean as you do the server url.

            This is how we had our wired up (without credentials):

            Code:
            <bean id="queueConnectionFactoryBean" class="org.springframework.jms.connection.SingleConnectionFactory" destroy-method="destroy">
                  <constructor-arg>
                    <bean class="com.atomikos.jms.QueueConnectionFactoryBean" init-method="init">
                      <property name="xaQueueConnectionFactory" ref="queueConnectionFactoryXa"/>
                    </bean>
                  </constructor-arg>
                  <property name="reconnectOnException"  value="true"></property>
              </bean>
              
              <bean id="queueConnectionFactoryXa"
                    class="com.tibco.tibjms.TibjmsXAQueueConnectionFactory">
                    <property name="serverUrl">
                        <value>${tibco.naming.provider.url}</value>
                    </property>
              </bean>
            
              <bean id="tibcoDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
                <property name="jndiTemplate"><ref local="tibcoJndiTemplate" /></property>
              </bean>
              
              
              <bean id="tibcoJndiTemplate" class="org.springframework.jndi.JndiTemplate">
                <property name="environment">
                  <props>
                    <prop key="java.naming.factory.initial">com.tibco.tibjms.naming.TibjmsInitialContextFactory</prop>
                    <prop key="java.naming.provider.url">${tibco.naming.provider.url}</prop>
                    <prop key="java.naming.security.principal">admin</prop>
                  </props>
                </property>
              </bean>
              
              <bean id="jmsTemplateXa" class="org.springframework.jms.core.JmsTemplate">
                <property name="connectionFactory"><ref local="queueConnectionFactoryBean" /></property>
                <property name="destinationResolver"><ref local="tibcoDestinationResolver" /></property>
                <property name="receiveTimeout" value="-1"/>
                <!-- Atomikos needs this to be set to work correctly -->
                <property name="sessionTransacted" value="true"/>
              </bean>

            Comment


            • #7
              Please provide link or sample code for transaction manager for JMS.

              Can you Please provide link or sample code on how to use transaction manager for EMS Queues?
              I thought using XA ConnectionFactories and using sessionTransacted=true would be sufficient, but seems like it is not.
              Furthermore, a tx manager requires a datasource right? Since we are dealing only with JMS, what is a datasource in this case?
              Last edited by srikanthradix; Apr 15th, 2010, 06:04 AM.

              Comment


              • #8
                The 'session-transacted' attribute will only enable "local" transactions (at the Session level). You need to add the 'transaction-manager' reference for the MessageListener container. You also need to provide a bean definition for the transactionManager using Spring's JtaTransactionManager. No data source is needed for the JTA tx manager.

                Comment


                • #9
                  I'm using Atomikos as my TM so mine looks like this:

                  Code:
                  <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" 
                            init-method="init" destroy-method="close">
                           <property name="forceShutdown" value="false"/>
                      </bean>
                      
                      <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.J2eeUserTransaction">
                          <property name="transactionTimeout" value="300"/>
                      </bean>
                      
                      <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"
                          depends-on="atomikosTransactionManager,atomikosUserTransaction">
                          <property name="transactionManager" ref="atomikosTransactionManager"/>
                          <property name="userTransaction" ref="atomikosUserTransaction"/>
                          <!-- This is required for Spring Batch set set a custom isolation level when it runs -->
                          <property name="allowCustomIsolationLevels" value="true"/>
                      </bean>
                  
                      <bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
                          depends-on="transactionManager">
                        <property name="connectionFactory" ref="queueConnectionFactoryBean"/>
                        <property name="transactionManager" ref="transactionManager"/>
                        <property name="destination" ref="messageDestination"/>
                        <property name="sessionTransacted" value="true"/>
                        <property name="maxConcurrentConsumers" value="5"/>
                        <property name="concurrentConsumers" value="1"/>
                        <property name="receiveTimeout" value="5000"/>
                        <property name="recoveryInterval" value="60000"/>
                        <property name="autoStartup" value="true"/>     
                    </bean>

                  Comment


                  • #10
                    Do we have to use any transaction boundaries?

                    I used the same configuration that you have given me. It seems like the previous sends are not getting rolledback.
                    When the router puts the message into the channel1, the message is then written to the outbound-queue1 and already been acknowledged.

                    When there is an error/exception in the second channel, the message from the first send is not being rolled back. Any suggestions?

                    Let me know if I am missing any other configuration

                    Queue and Topic Configurations:
                    Code:
                        <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" 
                              init-method="init" destroy-method="close">
                             <property name="forceShutdown" value="false"/>
                        </bean>
                        
                        <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.J2eeUserTransaction"/>
                        
                        <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"
                            depends-on="atomikosTransactionManager,atomikosUserTransaction">
                            <property name="transactionManager" ref="atomikosTransactionManager"/>
                            <property name="userTransaction" ref="atomikosUserTransaction"/>
                            <!-- This is required for Spring Batch set set a custom isolation level when it runs -->
                            <property name="allowCustomIsolationLevels" value="false"/>
                        </bean>
                        <bean id="topicMainContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
                                abstract="true">
                            <property name="connectionFactory" ref="topicConnectionFactory"/>
                            <property name="subscriptionDurable" value="true"/>
                            <property name="pubSubDomain" value="true"/>
                            <property name="sessionTransacted" value="true"/>
                            <property name="transactionManager" ref="transactionManager"></property>
                            <property name="destination" ref="broadcastTopic"/>
                    	<property name="durableSubscriptionName" value="${jms.durablename}"/>		
                    	<property name="clientId" value="${jms.durableclientid}"/>
                        </bean>
                    
                    
                    	<bean id="topicConnectionFactory" parent="userCredentialsConnectionFactoryAdapter" lazy-init="true">
                    		<property name="targetConnectionFactory">
                    			<bean class="com.tibco.tibjms.TibjmsTopicConnectionFactory">
                    				<constructor-arg index="0"
                    					value="${ems.topic.conn.factory.url}" />
                    			</bean>
                    		</property>
                    	</bean>
                    
                    	<bean id="queueConnectionFactory" parent="userCredentialsConnectionFactoryAdapter" lazy-init="true">
                    		<property name="targetConnectionFactory">
                    			<bean class="com.tibco.tibjms.TibjmsQueueConnectionFactory">
                    				<constructor-arg index="0"
                    					value="${ems.queue.conn.factory.url}" />
                    			</bean>
                    		</property>
                    	</bean>
                    Message-driven-channel-adapter gets the messages, sends it to router, When the Router sends to destinations(there are two destinations)
                    Code:
                             <jms:message-driven-channel-adapter id="receiver" 
                             	container="topicMainContainer"
                             	channel="fromBroadcast"
                        		message-converter="MessageConverter"/>    
                        	
                    	     <bean id="QueueJmsTemplate" abstract="true" class="org.springframework.jms.core.JmsTemplate"/>
                    	            <property name="messageConverter" ref="preserveUndeliveredMessageConverter" />
                    	            <property name="connectionFactory" ref="queueConnectionFactory" />
                    	            <property name="explicitQosEnabled" value="true" />
                    	            <property name="timeToLive" value="1000" />
                    	    </bean>
                    
                           <si:router input-channel="fromBroadcast" ref="testRouter" method="route" />
                    
                    	<bean id="testRouter" class="com.xx.TestRouter">
                    		<property name="destinations">
                    		    <map>
                    			<entry key="1" value="channel1" />
                    			<entry key="2" value="channel2" />
                    		    </map>
                    		</property>
                    	</bean>
                    1) Sends it to first destination channel.
                    This channel puts the message into the legacy-outbound-channel-adapter.
                    Code:
                        <jms:outbound-channel-adapter channel="channel1"
                            jms-template="toFirstDestinationQueueTemplate" />
                        <bean id="toFirstDestinationQueueTemplate" parent="QueueJmsTemplate">
                    	<property name="defaultDestination" ref="SendQueue1" />
                        </bean>
                    2) Sends it to second destination channel.
                    This channel puts the message into the legacy-channel-outbound-adapter.
                    Code:
                        <jms:outbound-channel-adapter channel="channel2"
                            			  jms-template="toSecondDestinationQueueTemplate" />
                        <bean id="toSecondDestinationQueueTemplate" parent="QueueJmsTemplate">
                    	  <property name="defaultDestination" ref="SendQueue2" />
                        </bean>
                    Last edited by srikanthradix; Apr 16th, 2010, 02:09 PM. Reason: added code.

                    Comment


                    • #11
                      I'm confused as to what you are doing...

                      You receive a message...it gets routed and sent to EITHER channel1 or channel2. It can't be sent to both so how can the same message get sent on channel1 and then throw an exception on channel2? Doesn't make any sense.

                      Comment


                      • #12
                        Requirement of the Transaction

                        I receive a message.
                        It gets ROUTED to both channel1 AND then channel2 (Router has multiple destinations).

                        If there is no exception it gets ROUTED to both channel1(which sends to Queue1) AND channel2(which sends to Queue2) and then acknowledgement is sent back. Message is removed from the Queue.

                        If there is an exception before ROUTING to channel2(to Queue2) and after it gets ROUTED to channel1(to Queue1), since the Queue1 has already consumed it, it is not getting removed from Queue1, Queue1 has already sent the message to its destination.

                        My question is how can I avoid the Queue1 consumption before transaction ends.

                        Transaction is like "If any exception occurs, everything should get rolledback right? Why not in this scenario?" How can I achieve this?
                        Last edited by srikanthradix; Apr 16th, 2010, 03:47 PM.

                        Comment


                        • #13
                          The problem is that typically transactions are bound to a thread. By putting the message on two separate channels, which are usually handled by separate threads, you are breaking the transaction semantics.

                          You'll have to review the documentation (or check the old posts) and see if someone has covered this concept of maintaining a single transaction across multiple concurrent channel executions.

                          It's possible that the recipient channel router handles this correctly..and that's actually closer to what you are actually doing.

                          Comment


                          • #14
                            I have the exact same problem. JMS to JMS in a transaction (MQ). I have my own router with redirects the message to a predefined channel based on the content of the message. This channel then redirects it to the correct queue. If something fails in the inbound listener it rolls back fine but if something fails after that (i.e. target queue doesn't exist, channel doesn't exist etc) then the message is lost. I imagined that a session would exist between the inbound jms message driven channel and the outbound jms channel and if something would go wrong in-between that all would be rolled back nicely. Doesn't seem to be the case (even though atomikos, or so the log says/attempts so). I suppose the question is when does the session start and end in Spring Integration and how can we determine/influence it.... Of course it could just be that I haven't configured something correctly....

                            Here is some of my (almost complete) configuration....maybe it can help...


                            Code:
                            <!-- Poller -->
                            	<integration:poller id="poller" default="true" >
                            		<integration:interval-trigger interval="1000" />
                            		<integration:transactional
                            			transaction-manager="JtaTransactionManager" 
                            			propagation="REQUIRED"
                            			isolation="REPEATABLE_READ" 
                            			timeout="300" 
                            			read-only="false" 			
                            			/>
                            	</integration:poller>
                            
                            	<!-- 1. Polls the input queue for incoming JMS messages -->
                            	<jms:message-driven-channel-adapter	id="jmsin" 
                            	destination="inputQueue" 
                            	channel="wltpContentRouterChannel" 
                            	transaction-manager="JtaTransactionManager"
                            	acknowledge="transacted" 
                            	 /> 
                            
                            	<!-- 2. Channel for holding the input of the JMS poller. Note: direct channels must be used with transactions. -->
                            	<channel id="wltpContentRouterChannel">		
                            		<queue capacity="1"/>
                            	</channel>
                            
                            	<!-- 3. Based on the header content of the xml text, the msg is redirected to the correct outbound handler -->
                            	<router ref="wltpContentRouter" input-channel="wltpContentRouterChannel" method="channelRerouter" ignore-channel-name-resolution-failures="false" />
                            
                            	<!-- 4. All the possible channels 	-->
                            	<jms:outbound-channel-adapter id="FIN_PORTAL.BATCH-CHANNEL" destination="FINANCIAL_PORTAL.BATCH-QUEUE"  />
                            	<jms:outbound-channel-adapter id="VORBATCH-CHANNEL" destination="VORBATCH-QUEUE"  />
                            
                            	<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
                            	<!-- Transaction management settings -->
                            
                            
                            
                            	<!-- Construct Atomikos UserTransactionManager, needed to configure Spring -->
                            	<bean id="AtomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
                            		init-method="init" destroy-method="close">
                            		<!-- when close is called, should we force transactions to terminate or 			not? -->
                            		<property name="forceShutdown" value="false" />
                            	</bean>
                            
                            	<!-- Also use Atomikos UserTransactionImp, needed to configure Spring -->
                            	<bean id="AtomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
                            		<property name="transactionTimeout" value="10000" />
                            	</bean>
                            
                            	<!-- Configure the Spring framework to use JTA transactions from Atomikos -->
                            	<bean id="JtaTransactionManager"
                            		class="org.springframework.transaction.jta.JtaTransactionManager">
                            		<property name="transactionManager" ref="AtomikosTransactionManager" />
                            		<property name="userTransaction" ref="AtomikosUserTransaction" />
                            		<!-- This is required for Spring Batch set set a custom isolation level	when it runs -->		
                            		<property name="allowCustomIsolationLevels" value="true"></property>
                            	</bean>
                            
                            	<!-- Configure the JMS connector; call init to register for recovery! -->
                            	<bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
                            		init-method="init" destroy-method="close">
                            		<property name="uniqueResourceName" value="My_MQSeries_XA_RMI" />
                            		<property name="xaConnectionFactory" ref="xaConnectionFactory" />
                            		<!-- Adds it to a transaction I think... -->
                            		<property name="localTransactionMode" value="false"></property>
                            	</bean>
                            
                            
                            
                            	<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->

                            Comment

                            Working...
                            X