Announcement Announcement Module
Collapse
No announcement yet.
Running multiple instances of the Spring Batch job concurrently Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Running multiple instances of the Spring Batch job concurrently

    We are trying to prove out an ETL service proto type application using Spring Batch. It is wrapped in a web service and deployed in Weblogic container.
    The ETL data, obtained through the spring batch service, is then posted to a Solr implementation index for querying.
    Essentially, there are two parallel, multi-threaded spring batch steps involved in the job. The first step uses synchronized Jdbc Cursor
    item reader (wrapped in Synchronized delegator) to read in ID data from Oracle DB (chunked in 50).
    The processor in the first step then use these IDs to query detail data
    from the DB (through a custom DAO) and generate the pojo, which the item writer (custom) writes on to a Queue (commons collection UnboundedFifoBuffer queue).
    In the second spring batch step (running parallel to the first through split/flow), a custom item reader reads in the Queue message (pojos) and the item writer
    invokes Solr server via SolrJ api to write out into the Index. Here is the Spring Batch XML:


    <!--************************************************** *************-->

    <!--************************Job Definition*************************-->

    <!--************************************************** *************-->


    <batch:job id="ETLService">
    <batch:split id="etlService.split" task-executor="split.taskExecutor">
    <batch:flow>
    <batch:step id="etlService.getClaimBIDWList">
    <batch:tasklet task-executor="etlService.step.pooledTaskExecutor" throttle-limit="25">
    <batch:chunk
    reader="claimBIDWDBItemReader"
    processor="claimBIDWDetailItemProcessor"
    writer="claimBIDWQItemWriter"
    commit-interval="50"/>
    <batch:listeners>
    <batch:listener ref="claimBIDWQItemWriter" />
    </batch:listeners>
    </batch:tasklet>
    </batch:step>
    </batch:flow>
    <batch:flow>
    <batch:step id="etlService.indexClaimBIDWData">
    <batch:tasklet task-executor="etlService.step.pooledTaskExecutor" throttle-limit="25">
    <batch:chunk
    reader="claimBIDWQItemReader"
    writer="claimBIDWSolrItemWriter"
    commit-interval="50"/>
    <batch:listeners>
    <batch:listener ref="claimBIDWQItemReader"/>
    </batch:listeners>
    </batch:tasklet>
    </batch:step>
    </batch:flow>
    </batch:split>
    <batch:listeners>
    <batch:listener ref="qWrapper"/>
    </batch:listeners>
    </batch:job>


    <!-- Multithreaded support -->

    <bean id="split.taskExecutor" class="org.springframework.core.task.SimpleAsyncTa skExecutor">
    <property name="concurrencyLimit" value="2"/>
    </bean>

    <bean id="etlService.step.pooledTaskExecutor" class="org.springframework.scheduling.concurrent.T hreadPoolTaskExecutor">
    <property name="corePoolSize" value="200" />
    <property name="maxPoolSize" value="200" />
    </bean>

    <!--************************************************** *************-->

    <!--************************Item Readers***************************-->

    <!--************************************************** *************-->

    <bean id="claimBIDWDBItemReader" class="com.esuite.batch.itemreader.SynchronizingIt emReaderDelegator">
    <property name="itemReader">

    <bean class="org.springframework.batch.item.database.Jdb cCursorItemReader" scope="step">
    <property name="dataSource" ref="ETL_SVC_BIDW.ClaimDataSource"/>
    <property name="sql">
    <value>
    SELECT clmdim.CLAIM_NUM FROM DATAMART.CLM_DIM clmdim WHERE clmdim.RECEIVED_DT = '#{jobParameters['etl.claims.for.date']}'
    </value>
    </property>
    <property name="rowMapper">
    <bean class="com.esuite.batch.itemreader.row.ClaimBIDWET LRowMapper"/>
    </property>
    </bean>
    </property>
    </bean>

    <bean id="claimBIDWQItemReader" class="com.esuite.batch.itemreader.etl.ClaimBIDWQI temReader" scope="step" >
    <property name="qWrapper" ref="qWrapper"/>
    </bean>

    <!--************************************************** *************-->

    <!--************************Item Writers***************************-->

    <!--************************************************** *************-->

    <bean id="claimBIDWQItemWriter" class="com.esuite.batch.itemwriter.etl.ClaimBIDWQI temWriter" scope="step">
    <property name="qWrapper" ref="qWrapper"/>
    </bean>

    <bean id="claimBIDWSolrItemWriter" class="com.esuite.batch.itemwriter.etl.ClaimBIDWSo lrItemWriter" scope="step">
    <property name="indexerSvc" ref="BATCH_SVC_BIDW.indexerSvc"/>
    <property name="solrURL" value="#{jobParameters['etl.service.solr.url']}"/>
    <property name="jobId" value="#{stepExecution.jobExecution.jobId}"/>
    </bean>

    <!--************************************************** *************-->

    <!--************************Item Processors************************-->

    <!--************************************************** *************-->

    <bean id="claimBIDWDetailItemProcessor" class="com.esuite.batch.itemprocessor.ClaimBIDWDet ailItemProcessor">
    <property name="bidwDao" ref="dao1"/>
    <property name="viantDao" ref="dao2"/>
    </bean>

    <bean id="dao1" class="com.esuite.batch.pojo.ClaimBIDWDAO">
    <property name="datasource" ref="ETL_SVC_BIDW.ClaimDataSource"/> <!--- Weblogic Datasources -->
    </bean>

    <bean id="dao2" class="com.esuite.batch.pojo.ClaimDAO">
    <property name="datasource" ref="ETL_SVC.ClaimDataSource"/>
    </bean>

    <!--************************************************** *************-->

    <!--********************** Queue Resources *************************-->

    <!--************************************************** *************-->

    <bean id="qWrapper" class="com.esuite.batch.services.etl.QWrapper"/>

    </beans>




    This job is then run from a Junit client code, invoking the web service and passing in the '#{jobParameters['etl.claims.for.date']}' job parameter to
    retrieve the IDs from the first step jdbc cursor item reader. I am also able to run multiple instances of this job at the same time, passing in diffrent
    '#{jobParameters['etl.claims.for.date']}' variable (two different dates and they have distinct rows in the DB, so no overlap). What I am seeing when I run multiple
    instances is that:

    (1) I have to set the concurrency limit to 2 (if I run 2 instances of the same job with different bind variables) in the Split level Asynch. task Executor

    (2) The second instance of the job (kicked off concurrently with first instance with different bind variable value) is waiting to
    execute its steps till the steps from the first job is completed (instead of running in parallel with the first job instance) - meaning
    they are running sequentially.


    I am not able to figure out why the second instance of the job is not parallely executing with the first instance. The idea is to have mutiple instances
    of this job run concurrently for different dates (the bind variable), extracting data to the Solr index. If the jobs are going to run sequentially, we
    are not going to be able to utilize the full app server capabilities (like connection pools and concurrency abilities). Also, since we have to load a lot of
    data for the first Solr index bulk population, it makes sense to minimize the time it takes to complete the ETL process and running them sequentially
    doesn't help. Instead of using single dates for query criteria, we will be using selected date ranges in the actual implementation.

    Do we have our spring XML correctly configured for this scenario? FYI - The two steps within the job is running parallel through the split/flow configuration.

  • #2
    I am trying something exactly similar and facing the same issue as yours.

    The second job that i submit is not starting if the first job has JUnit ParallelTestSuite of 250 test cases.

    How did you resolve the issue ?
    Originally posted by mpish View Post
    We are trying to prove out an ETL service proto type application using Spring Batch. It is wrapped in a web service and deployed in Weblogic container.
    The ETL data, obtained through the spring batch service, is then posted to a Solr implementation index for querying.
    Essentially, there are two parallel, multi-threaded spring batch steps involved in the job. The first step uses synchronized Jdbc Cursor
    item reader (wrapped in Synchronized delegator) to read in ID data from Oracle DB (chunked in 50).
    The processor in the first step then use these IDs to query detail data
    from the DB (through a custom DAO) and generate the pojo, which the item writer (custom) writes on to a Queue (commons collection UnboundedFifoBuffer queue).
    In the second spring batch step (running parallel to the first through split/flow), a custom item reader reads in the Queue message (pojos) and the item writer
    invokes Solr server via SolrJ api to write out into the Index. Here is the Spring Batch XML:


    <!--************************************************** *************-->

    <!--************************Job Definition*************************-->

    <!--************************************************** *************-->


    <batch:job id="ETLService">
    <batch:split id="etlService.split" task-executor="split.taskExecutor">
    <batch:flow>
    <batch:step id="etlService.getClaimBIDWList">
    <batch:tasklet task-executor="etlService.step.pooledTaskExecutor" throttle-limit="25">
    <batch:chunk
    reader="claimBIDWDBItemReader"
    processor="claimBIDWDetailItemProcessor"
    writer="claimBIDWQItemWriter"
    commit-interval="50"/>
    <batch:listeners>
    <batch:listener ref="claimBIDWQItemWriter" />
    </batch:listeners>
    </batch:tasklet>
    </batch:step>
    </batch:flow>
    <batch:flow>
    <batch:step id="etlService.indexClaimBIDWData">
    <batch:tasklet task-executor="etlService.step.pooledTaskExecutor" throttle-limit="25">
    <batch:chunk
    reader="claimBIDWQItemReader"
    writer="claimBIDWSolrItemWriter"
    commit-interval="50"/>
    <batch:listeners>
    <batch:listener ref="claimBIDWQItemReader"/>
    </batch:listeners>
    </batch:tasklet>
    </batch:step>
    </batch:flow>
    </batch:split>
    <batch:listeners>
    <batch:listener ref="qWrapper"/>
    </batch:listeners>
    </batch:job>


    <!-- Multithreaded support -->

    <bean id="split.taskExecutor" class="org.springframework.core.task.SimpleAsyncTa skExecutor">
    <property name="concurrencyLimit" value="2"/>
    </bean>

    <bean id="etlService.step.pooledTaskExecutor" class="org.springframework.scheduling.concurrent.T hreadPoolTaskExecutor">
    <property name="corePoolSize" value="200" />
    <property name="maxPoolSize" value="200" />
    </bean>

    <!--************************************************** *************-->

    <!--************************Item Readers***************************-->

    <!--************************************************** *************-->

    <bean id="claimBIDWDBItemReader" class="com.esuite.batch.itemreader.SynchronizingIt emReaderDelegator">
    <property name="itemReader">

    <bean class="org.springframework.batch.item.database.Jdb cCursorItemReader" scope="step">
    <property name="dataSource" ref="ETL_SVC_BIDW.ClaimDataSource"/>
    <property name="sql">
    <value>
    SELECT clmdim.CLAIM_NUM FROM DATAMART.CLM_DIM clmdim WHERE clmdim.RECEIVED_DT = '#{jobParameters['etl.claims.for.date']}'
    </value>
    </property>
    <property name="rowMapper">
    <bean class="com.esuite.batch.itemreader.row.ClaimBIDWET LRowMapper"/>
    </property>
    </bean>
    </property>
    </bean>

    <bean id="claimBIDWQItemReader" class="com.esuite.batch.itemreader.etl.ClaimBIDWQI temReader" scope="step" >
    <property name="qWrapper" ref="qWrapper"/>
    </bean>

    <!--************************************************** *************-->

    <!--************************Item Writers***************************-->

    <!--************************************************** *************-->

    <bean id="claimBIDWQItemWriter" class="com.esuite.batch.itemwriter.etl.ClaimBIDWQI temWriter" scope="step">
    <property name="qWrapper" ref="qWrapper"/>
    </bean>

    <bean id="claimBIDWSolrItemWriter" class="com.esuite.batch.itemwriter.etl.ClaimBIDWSo lrItemWriter" scope="step">
    <property name="indexerSvc" ref="BATCH_SVC_BIDW.indexerSvc"/>
    <property name="solrURL" value="#{jobParameters['etl.service.solr.url']}"/>
    <property name="jobId" value="#{stepExecution.jobExecution.jobId}"/>
    </bean>

    <!--************************************************** *************-->

    <!--************************Item Processors************************-->

    <!--************************************************** *************-->

    <bean id="claimBIDWDetailItemProcessor" class="com.esuite.batch.itemprocessor.ClaimBIDWDet ailItemProcessor">
    <property name="bidwDao" ref="dao1"/>
    <property name="viantDao" ref="dao2"/>
    </bean>

    <bean id="dao1" class="com.esuite.batch.pojo.ClaimBIDWDAO">
    <property name="datasource" ref="ETL_SVC_BIDW.ClaimDataSource"/> <!--- Weblogic Datasources -->
    </bean>

    <bean id="dao2" class="com.esuite.batch.pojo.ClaimDAO">
    <property name="datasource" ref="ETL_SVC.ClaimDataSource"/>
    </bean>

    <!--************************************************** *************-->

    <!--********************** Queue Resources *************************-->

    <!--************************************************** *************-->

    <bean id="qWrapper" class="com.esuite.batch.services.etl.QWrapper"/>

    </beans>




    This job is then run from a Junit client code, invoking the web service and passing in the '#{jobParameters['etl.claims.for.date']}' job parameter to
    retrieve the IDs from the first step jdbc cursor item reader. I am also able to run multiple instances of this job at the same time, passing in diffrent
    '#{jobParameters['etl.claims.for.date']}' variable (two different dates and they have distinct rows in the DB, so no overlap). What I am seeing when I run multiple
    instances is that:

    (1) I have to set the concurrency limit to 2 (if I run 2 instances of the same job with different bind variables) in the Split level Asynch. task Executor

    (2) The second instance of the job (kicked off concurrently with first instance with different bind variable value) is waiting to
    execute its steps till the steps from the first job is completed (instead of running in parallel with the first job instance) - meaning
    they are running sequentially.


    I am not able to figure out why the second instance of the job is not parallely executing with the first instance. The idea is to have mutiple instances
    of this job run concurrently for different dates (the bind variable), extracting data to the Solr index. If the jobs are going to run sequentially, we
    are not going to be able to utilize the full app server capabilities (like connection pools and concurrency abilities). Also, since we have to load a lot of
    data for the first Solr index bulk population, it makes sense to minimize the time it takes to complete the ETL process and running them sequentially
    doesn't help. Instead of using single dates for query criteria, we will be using selected date ranges in the actual implementation.

    Do we have our spring XML correctly configured for this scenario? FYI - The two steps within the job is running parallel through the split/flow configuration.

    Comment

    Working...
    X