Announcement Announcement Module
Collapse
No announcement yet.
Configure two different datasources ( one for reading and another for writing ) Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Configure two different datasources ( one for reading and another for writing )

    Hi,

    I am very new to the spring batch. I was trying to setup a ETL process, where i can read data from one database, apply some transformations to it and then write it back to another database.

    So basically i should be able to configure two datasources along with their respective transaction management beans.

    Is this possible?


    Thanks in Advance,
    Vamsi Annem
    Last edited by vamsiannem; Jan 30th, 2012, 08:49 AM. Reason: Adding thanks note

  • #2
    Just to be very clear, i have an example below:

    I have 2 different datasources, one to read and another one to write results. Following is the job configuration:

    <job id="sampleJob" job-repository="jobRepository">
    <step id="step1" transaction-manager="myTransactionManager">
    <tasklet>
    <chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" commit-interval="10"/>
    <tasklet>
    </step>
    </job>

    ItemReader should get data from dataSource_1.
    ItemWriter should write data to dataSource_2.

    As per the documentaion, we can configure a single transaction manager at tasklet


    In this scenario, how do i use the transaction manager here ?


    Thanks,
    Vamsi.

    Comment


    • #3
      Error: resource 'mysql' cannot be used outside XA transaction scope for Global trx

      I am an issue with Transaction Manager configuration for XA Transactions, Please help me.

      Here is the Log:
      ----------------------------------------------------------------------------------------------------------
      DEBUG main org.springframework.batch.core.repository.dao.Jdbc StepExecutionDao - Truncating long message before update of StepExecution, original message is: org.springframework.batch.item.ItemStreamException : Failed to initialize the reader
      at org.springframework.batch.item.support.AbstractIte mCountingItemStreamItemReader.open(AbstractItemCou ntingItemStreamItemReader.java:137)
      at org.springframework.batch.item.support.CompositeIt emStream.open(CompositeItemStream.java:93)
      at org.springframework.batch.core.step.tasklet.Taskle tStep.open(TaskletStep.java:301)
      at org.springframework.batch.core.step.AbstractStep.e xecute(AbstractStep.java:192)
      at org.springframework.batch.core.job.SimpleStepHandl er.handleStep(SimpleStepHandler.java:135)
      at org.springframework.batch.core.job.flow.JobFlowExe cutor.executeStep(JobFlowExecutor.java:61)
      at org.springframework.batch.core.job.flow.support.st ate.StepState.handle(StepState.java:60)
      at org.springframework.batch.core.job.flow.support.Si mpleFlow.resume(SimpleFlow.java:144)
      at org.springframework.batch.core.job.flow.support.Si mpleFlow.start(SimpleFlow.java:124)
      at org.springframework.batch.core.job.flow.FlowJob.do Execute(FlowJob.java:135)
      at org.springframework.batch.core.job.AbstractJob.exe cute(AbstractJob.java:281)
      at org.springframework.batch.core.launch.support.Simp leJobLauncher$1.run(SimpleJobLauncher.java:120)
      at org.springframework.core.task.SyncTaskExecutor.exe cute(SyncTaskExecutor.java:49)
      at org.springframework.batch.core.launch.support.Simp leJobLauncher.run(SimpleJobLauncher.java:114)

      Caused by: org.springframework.jdbc.UncategorizedSQLException : Executing query; uncategorized SQLException for SQL [SELECT * FROM mmfdb.transaction limit 1, 10]; SQL state [null]; error code [0]; error enlisting a JdbcConnectionHandle of a JdbcPooledConnection from datasource mysql in state ACCESSIBLE with usage count 1 wrapping com.mysql.jdbc.jdbc2.optional.JDBC4MysqlXAConnecti on@78dc4c on com.mysql.jdbc.jdbc2.optional.JDBC4ConnectionWrapp er@c70b0d; nested exception is java.sql.SQLException: error enlisting a JdbcConnectionHandle of a JdbcPooledConnection from datasource mysql in state ACCESSIBLE with usage count 1 wrapping com.mysql.jdbc.jdbc2.optional.JDBC4MysqlXAConnecti on@78dc4c on com.mysql.jdbc.jdbc2.optional.JDBC4ConnectionWrapp er@c70b0d
      at org.springframework.jdbc.support.AbstractFallbackS QLExceptionTranslator.translate(AbstractFallbackSQ LExceptionTranslator.java:83)
      at org.springframework.jdbc.support.AbstractFallbackS QLExceptionTranslator.translate(AbstractFallbackSQ LExceptionTranslator.java:80)
      at org.springframework.jdbc.support.AbstractFallbackS QLExceptionTranslator.translate(AbstractFallbackSQ LExceptionTranslator.java:80)
      at org.springframework.batch.item.database.JdbcCursor ItemReader.openCursor(JdbcCursorItemReader.java:13 0)
      at org.springframework.batch.item.database.AbstractCu rsorItemReader.doOpen(AbstractCursorItemReader.jav a:401)
      at org.springframework.batch.item.support.AbstractIte mCountingItemStreamItemReader.open(AbstractItemCou ntingItemStreamItemReader.java:134)
      ... 24 more
      Caused by: bitronix.tm.internal.BitronixSystemException: resource 'mysql' cannot be used outside XA transaction scope. Set allowLocalTransactions to true if you want to allow this and you know your resource supports this.
      at bitronix.tm.resource.common.TransactionContextHelp er.enlistInCurrentTransaction(TransactionContextHe lper.java:79)
      at bitronix.tm.resource.jdbc.JdbcConnectionHandle.enl istResource(JdbcConnectionHandle.java:84)
      ... 34 more
      ----------------------------------------------------------------------------------------------------------

      I was able to configure bitronix JTA transaction manager for XA resources.
      1. I have configured a transaction manager for spring batch internal usage, and a JTA Transaction manager at tasklet level.
      2. Inside tasklet i read the data from dataSource_1, process the data and then write the data to dataSource_2.

      Code has been shared below:
      1. launch-context.xml [ Configuration to create job repository and job launcher ....]
      <bean id="jobLauncher"
      class="org.springframework.batch.core.launch.suppo rt.SimpleJobLauncher">
      <property name="jobRepository" ref="jobRepository" />
      </bean>

      <bean id="jobRepository"
      class="org.springframework.batch.core.repository.s upport.JobRepositoryFactoryBean"
      p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" p:databaseType="POSTGRES" />

      <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource">
      <property name="driverClassName" value="${batch.jdbc.driver}" />
      <property name="url" value="${batch.jdbc.url}" />
      <property name="username" value="${batch.jdbc.user}" />
      <property name="password" value="${batch.jdbc.password}" />
      </bean>



      <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSou rceTransactionManager" lazy-init="true">
      <property name="dataSource" ref="dataSource" />
      </bean>


      <bean id="placeholderProperties" class="org.springframework.beans.factory.config.Pr opertyPlaceholderConfigurer">
      <property name="location" value="classpath:batch.properties" />
      <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
      <property name="ignoreUnresolvablePlaceholders" value="true" />
      <property name="order" value="1" />
      </bean>
      ......
      ----------------------------------------------------------------------------------------------------
      2. readWriteJob-db.xml [two datasources and trx manager configuration using bitronix ]

      <bean id="dataSource_1" class="bitronix.tm.resource.jdbc.PoolingDataSource "
      init-method="init" destroy-method="close">
      <property name="className" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSo urce" />
      <property name="uniqueName" value="mysql" />
      <property name="minPoolSize" value="0" />
      <property name="maxPoolSize" value="10" />
      <property name="driverProperties">
      <props>
      <prop key="url">jdbc:mysql://localhost:3306/testdb</prop>
      <prop key="user">root</prop>
      <prop key="password">root</prop>
      </props>
      </property>
      </bean>

      <bean id="dataSource_2" class="bitronix.tm.resource.jdbc.PoolingDataSource "
      init-method="init" destroy-method="close">
      <property name="className" value="org.postgresql.xa.PGXADataSource" />
      <property name="uniqueName" value="postgres" />
      <property name="minPoolSize" value="0" />
      <property name="maxPoolSize" value="10" />
      <property name="driverProperties">
      <props>
      <prop key="user">postgres</prop>
      <prop key="password">postgres</prop>
      <prop key="portNumber">5432</prop>
      <prop key="serverName">localhost</prop>
      <prop key="databaseName">testdb</prop>
      </props>
      </property>
      </bean>



      <bean id="btmConfig" factory-method="getConfiguration"
      class="bitronix.tm.TransactionManagerServices">
      <property name="serverId" value="spring-btm" />
      </bean>

      <bean id="bitronixTrxMgr" class="bitronix.tm.TransactionManagerServices"
      factory-method="getTransactionManager"
      depends-on="btmConfig,dataSource_1,dataSource_2" destroy-method="shutdown">
      </bean>

      <bean id="JtaTransactionManagers" class="org.springframework.transaction.jta.JtaTran sactionManager">
      <property name="transactionManager" ref="bitronixTrxMgr"/>
      <property name="userTransaction" ref="bitronixTrxMgr"/>
      </bean>
      -----------------------------------------------------------------------------------------------------------
      3. readWriteJob.xml

      <job id="readWriteJob" incrementer="timeStampIncrement" >
      <step id="handleTrxsStep" >
      <tasklet transaction-manager="JtaTransactionManagers">
      <chunk commit-interval="1" reader="transactionsReader"
      processor="transactionProcessor" writer="transactionsWriter">
      </chunk>
      </tasklet>
      </step>
      </job>
      <!-- Item Reader -->
      <beans:bean id="transactionsReader"
      class="org.springframework.batch.item.database.Jdb cCursorItemReader">
      <beansroperty name="dataSource" ref="dataSource_1"/>
      <beansroperty name="sql" value="${query.trx.read}"/>
      <beansroperty name="rowMapper">
      <beans:bean
      class="com.mmf.batch.examples.dao.rowmapper.Transa ctionsReaderRowMapper"/>
      </beansroperty>
      </beans:bean>

      <!-- Item Processor -->
      <beans:bean id="transactionProcessor" class="com.mmf.batch.examples.dao.TrxItemProcessor "/>

      <!-- Item Writer -->
      <beans:bean id="transactionsWriter" class="com.mmf.batch.examples.dao.TransactionItemW riterImpl">
      <beans:constructor-arg index="0" ref="dataSource_2" />
      <beansroperty name="queries" ref="queries"/>
      </beans:bean>
      <utilroperties id="queries">
      <beansrop key="TRX_GROUP_INSERT">${query.trx.group.insert}</beansrop>
      <beansrop key="TRX_LOAN_INSERT">${query.trx.loan.insert}</beansrop>
      <beansrop key="TRX_READ">${query.trx.read}</beansrop>
      </utilroperties>

      Comment

      Working...
      X