Announcement Announcement Module
Collapse
No announcement yet.
Spring Integration + spring batch admin + Transaction Propagation issue Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Spring Integration + spring batch admin + Transaction Propagation issue

    Hi,
    I am workingon a POC which uses SI, spring batch admin and atomikos for transaction management. I have configured a jms inbound-channel-adapter to recieve message from the queue using a handler i created. I then put the recieved message into an output channel and also launch a batch job based on the message type. The batch job has a tasklet.In this tasklet i inject the corresponding channel. Once the job is launched, i read the data from the channel for processing the data and inserting into data base.

    I want to use atomikos transaction manager here so that if the databse insert fails the message should not be removed from the JMS queue.

    Below is my configuration. Please suggest if the approach i am following is worng. Pointers to some example would be great.

    Thanks

    Code:
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" >
    		<property name="connectionFactory" ref="sConnectionFactory" />
    		<property name="defaultDestination" ref="queue" />
    		<property name="messageConverter" ref="messageConverter"></property>
    		<property name="sessionTransacted" value="true" />
    		<property name="sessionAcknowledgeMode" value="0"/>
    	</bean>
    	
    	<bean id="sConnectionFactory"   class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="amqConnectionFactory" />
        </bean>
    
    	<bean id="amqConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"	init-method="init">
    		<property name="uniqueResourceName" value="XAemsMQ" />
    		<property name="xaConnectionFactory" ref="connectionFactory" />
    		<property name="poolSize" value="100" />
    	</bean>
    	<jee:jndi-lookup id="connectionFactory" jndi-name="emsConnectionFactory">
    		<jee:environment>
    			java.naming.factory.initial=com.tibco.tibjms.naming.TibjmsInitialContextFactory
    			java.naming.provider.url=tibjmsnaming://localhost:7222
    		</jee:environment>
    	</jee:jndi-lookup>
    
    	<jee:jndi-lookup id="queue" jndi-name="emsQueue">
    		<jee:environment>
    			java.naming.factory.initial=com.tibco.tibjms.naming.TibjmsInitialContextFactory
    			java.naming.provider.url=tibjmsnaming://localhost:7222
    		</jee:environment>
    	</jee:jndi-lookup>
    	
    	<int-jms:inbound-channel-adapter  id="jmsIn" jms-template="jmsTemplate" >
    		<integration:poller fixed-rate="100">
    			<integration:transactional transaction-manager="transactionManager" propagation="REQUIRED"/>
    		</integration:poller>
    	</int-jms:inbound-channel-adapter>
    	
    	<integration:channel id="securitiesChannel">
    		<integration:queue/>
    	</integration:channel>
    	<integration:channel id="etfChannel">
    		<integration:queue/>
    	</integration:channel>
    
    	<integration:channel id="outChannel">
    		<integration:queue capacity="100" />
    	</integration:channel>
    
     	<integration:service-activator 
    		input-channel="jmsIn" output-channel="outChannel" ref="handler"
    		method="process" />
    		
    	<bean id="handler" class="com.irebalpoc.integration.SecurityMessageProcessor"> <!-- returns a  Message<JobLaunchRequest> and also adds the incomming jmsmessage into any of the channels (etfChannel, securityChannel etc)
    		<property name="jobHashTable" ref="jobsMapping" />
    	</bean>
    <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"	>
    		<property name="transactionManager" ref="AtomikosTransactionManager" />
    		<property name="userTransaction" ref="AtomikosUserTransaction" />
    		<property name="allowCustomIsolationLevels" value="true" />
    	</bean> 
    <batch:job id="importSecuritiesJob" job-repository="jobRepository">
    		<batch:step id="importSecuritiesStep" >
    			<batch:tasklet 
    				transaction-manager="transactionManager"  start-limit="100" 
    				ref="securityEquityProcessor" >	
    			</batch:tasklet>
    		</batch:step>
    	</batch:job>
    	
    	<bean id="securityEquityProcessor" class="com.irebalpoc.batch.writer.SecurityEquityProcessor" scope="step">
    		<property name="channelName" value="#{jobParameters['channel-name']}" />		
    	</bean>
    Last edited by shashikanthb; May 3rd, 2012, 08:17 AM. Reason: code edited

  • #2
    Often, you can avoid the overhead and complexity of XA for situations like this. Take a look at http://www.javaworld.com/javaworld/j...nsactions.html; in particular, the Best Efforts 1PC Pattern.

    You also might want to take a look at using a message-driven-channel-adapter instead of a polled inbound-channel-adapter.

    Comment

    Working...
    X