Announcement Announcement Module
Collapse
No announcement yet.
Transaction demarcation on Gateway with JMS Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Transaction demarcation on Gateway with JMS

    Hi

    I'm bringing Spring and Spring Integration into an existing Java program (runs standalone, not in a web environment) in order to replace a shared database integration with a messaging system.

    I'm looking for some guidance on publishing a JMS message to a queue within a transaction boundary.

    I have a service method that needs to carry out two functions:
    1. Using a Spring Integration Gateway get a message onto the bus; publishing to a JMS queue (direct channels, single thread)
    2. Assuming part 1 succeeds, send a TCP/IP packet over legacy system.
    If part 2 fails (IOException) then I want the JMS transaction to rollback.

    So I am thinking I need to annotate the service method something like
    Code:
    @Transactional(value="jmsTransactionManager",rollbackFor={IOException.class})
    .

    Am I on the right lines here? Is it possible to create this kind of transaction boundary with S-I ?

    Jon

  • #2
    Assuming you add that annotation to the Gateway interface method that you are invoking, and that method declares IOException in its 'throws' clause it should work. Are you planning on invoking two separate gateway methods... one for the JMS sending and one for the TCP sending? Or were you planing on having all of that behind the same gateway interface invocation?

    Comment


    • #3
      Assuming you are doing all the work in a single SI message flow (behind a gateway), it will work, but you don't need to rollbackFor an IOException because the IOException will be wrapped in a MessagingException which is a RuntimeException and default rollback rules will rollback on a RuntimeException.

      I just ran a test (based on the tcp-client-server sample) and all worked as expected. I used a <recipient-list-router /> to send the message to jms and then tcp...

      Code:
      public interface SimpleGateway {
      
      	@Transactional(value="txManager")
      	public void send(String text);
      	
      }
      Code:
      @ContextConfiguration("/META-INF/spring/integration/jmsAndTcpClientTxDemo-context.xml")
      @RunWith(SpringJUnit4ClassRunner.class)
      public class JmsAndTcpClientTxDemoTest {
      
      	@Autowired 
      	SimpleGateway gw;
      	
      	@Autowired
      	JmsTemplate jmsTemplate;
      	
      	@Test
      	public void testHappyDay() {
      		String text = "Hello world!";
      		try {
      			gw.send(text);
      		} catch (Exception e) {
      			e.printStackTrace();
      		}
      		jmsTemplate.setReceiveTimeout(100);
      		Object jmsMessage = jmsTemplate.receiveAndConvert();
      		assertEquals(text, jmsMessage);
      	}
      
      }
      Code:
      <?xml version="1.0" encoding="UTF-8"?>
      <beans:beans xmlns:beans="http://www.springframework.org/schema/beans"
      	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      	xmlns="http://www.springframework.org/schema/integration"
      	xmlns:ip="http://www.springframework.org/schema/integration/ip"
      	xmlns:jms="http://www.springframework.org/schema/integration/jms"
      	xmlns:tx="http://www.springframework.org/schema/tx"
      	xsi:schemaLocation="http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip.xsd
      		http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
      		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
      		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
      
      	<!-- Client side -->
      	<tx:annotation-driven/>
      	
      	<gateway id="gw" 
                   service-interface="org.springframework.integration.samples.tcpclientserver.SimpleGateway"
                   default-request-channel="toRouter"/>
              
              <recipient-list-router input-channel="toRouter">
              	<recipient channel="toJms"/>
           	        <recipient channel="toTcp"/>
              </recipient-list-router>
      
      	<ip:tcp-connection-factory id="client"
      		type="client"
      		host="localhost"
      		port="11111"
      		single-use="true"
      		so-timeout="10000"
              />
      	
      	<channel id="toJms" />
      	
      	<jms:outbound-channel-adapter
      		channel="toJms"
      		connection-factory="connectionFactory"
      		destination="queue"/>
      
              <!-- only for message retrieval in test case -->
      	<beans:bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      		<beans:property name="connectionFactory" ref="connectionFactory" />
      		<beans:property name="defaultDestination" ref="queue" />
      	</beans:bean>
      
      	<channel id="toTcp" />
      	
      	<ip:tcp-outbound-channel-adapter
      		channel="toTcp"
      		connection-factory="client"
      		/>
      
      	<beans:bean id="txManager" class="org.springframework.jms.connection.JmsTransactionManager">
      		<beans:property name="connectionFactory" ref="connectionFactory"/>
      	</beans:bean>
      
      	<beans:bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
      		<beans:constructor-arg value="test.queue"/>
      	</beans:bean>
      
      	<beans:bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
      		<beans:property name="targetConnectionFactory">
      			<beans:bean class="org.apache.activemq.ActiveMQConnectionFactory">
      				<beans:property name="brokerURL" value="vm://localhost"/>		
      			</beans:bean>
      		</beans:property>
      		<beans:property name="sessionCacheSize" value="10"/>
      		<beans:property name="cacheProducers" value="false"/>
      	</beans:bean>
      	
      	<!-- Server side -->
      	
      	<ip:tcp-connection-factory id="crLfServer"
      		type="server"
      		port="11111"/>
      			
      	<ip:tcp-inbound-channel-adapter
      		connection-factory="crLfServer"
      		channel="serverBytes2StringChannel"/>
      		
      	<transformer id="serverBytes2String"
      		input-channel="serverBytes2StringChannel"
      		output-channel="toSA" 
      		expression="new String(payload)"/>
      
      	<channel id="toSA" />
      
      	<service-activator input-channel="toSA"
      					   ref="someService"
      					   method="test1"
      	/>
      
      	<beans:bean id="someService" 
      		  class="org.springframework.integration.samples.tcpclientserver.SomeService" />
      		  
      		  
      </beans:beans>
      If you change the port on the client connection factory to, say, 11112 and re-run the test, it fails to retrieve the message because it was rolled back after the tcp operation failed to create a connection.

      With debugging turned on, you can also see the commit or rollback in each case.

      Hope that helps.

      Comment


      • #4
        Of course, if you are doing the TCP I/O yourself (*after* you send the message via a gateway), then a method annotated as you suggest, where the method is in a spring bean, and encapsulates the gateway call *and* the socket I/O, will work too.

        Comment


        • #5
          Thanks for the replies.

          Gary, it's the scenario in your second post that I am dealing with. Here is an idea of the code:

          I've not yet got the transactions working so I guess its something wrong with my context configuration. I'll work through it again now and then post back later with my xml config.

          Code:
          @Service
          public class MyService {
          
              @Autowired
              private DefaultGateway gateway;
          
              @Transactional(value="jmsTransactionManager",rollbackFor={IOException.class})
              public void respondAndRelay(Session session, Packet response, byte[] smppCommand) throws IOException {
          
                  // get the response onto the messaging bus (JMS Queue)
                  this.gateway.relaySmppCommand(smppCommand);
          
                  // legacy response over existing tcp/ip socket connection. Only sends if JMS publication succeeded 
                  if (!session.sendPDU(response)) {
                      // must rollback jms publication
                      throw new IOException("Failed to send smpp response pdu");
                  }
          
                  
          
              }

          Comment


          • #6
            I think maybe I'm misunderstanding JMS transactions here (its been a while!). I've been trying to rollback just the publishing side of the JMS operation instead of the consumption. As the consumers are in a different process and connecting to the broker over tcp/ip I guess I'm going to have to introduce XA transactions here too.

            Comment

            Working...
            X