Announcement Announcement Module
Collapse
No announcement yet.
How to configure chunking from spring-batch-integration ? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • How to configure chunking from spring-batch-integration ?

    Hi,

    I am trying to use the chunk package from the spring batch integration (i.e. an example for remote chunking) in combination with the partitioning from spring batch. My partitioning is working, so I have - based on a property - several instances of a step running each doing some processing on a part of the data. Every step is quite heavy on the processing. However if a chunk of ids is finished I would like to already pass them to a next stage for another stage of processing. For this purpose I wanted to use the remote chunking, but instead of having one master step, every slave step from the partitioning picture is in fact becoming a master for the remote chunking. Currently I am stuck on how to configure this in my spring configuration. How can I make sure that every thread working on a partition can send data to a dedicated channel, that can process the chunk of ids asynchronously in the next stage.

    Hopefully this is already giving an idea of what I am trying to achieve.

  • #2
    Job Configuration to clarify my demand

    Most importantly how can I configure (see below) the simplemessaginggateway, the corresponding channels, and the handler on those channels so that each partitioned slave step has its own instances, and so each thread processing a partition is working with its own set of channels and handler...

    Code:
    <job id="mergeAREStagingJob">
    		<step id="myPartitionStep" parent="masterStep" />
    	</job>
    	<bean id="masterStep"
    		class="org.springframework.batch.core.partition.support.PartitionStep">
    		<property name="partitionHandler" ref="partitionHandler" />
    		<property name="stepExecutionSplitter" ref="stepExecutionSplitter" />
    		<property name="jobRepository" ref="jobRepository" />
    	</bean>
    	<step id="slaveStep">
    		<tasklet transaction-manager="transactionManager2"
    			allow-start-if-complete="true">
    			<chunk reader="customerItemReader" writer="mergeItemWriter"
    				commit-interval="${commit.interval}">
    			</chunk>
    		</tasklet>
    	</step>
    	<bean id="stepExecutionSplitter"
    		class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter">
    		<constructor-arg index="0" ref="jobRepository" />
    		<constructor-arg index="1" ref="slaveStep" />
    		<constructor-arg index="2">
    			<bean class="com.virtualbusiness.CustomerPartitioner" />
    		</constructor-arg>
    	</bean>
    	<bean id="partitionHandler"
    		class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
    		<property name="taskExecutor" ref="taskExecutor" />
    		<property name="step" ref="slaveStep" />
    		<property name="gridSize" value="${grid.size}" />
    	</bean>
    	<bean id="transactionManager2" scope="step"
    		class="org.springframework.transaction.jta.WebSphereUowTransactionManager" />
    	<bean id="taskExecutor"
    		class="org.springframework.scheduling.commonj.WorkManagerTaskExecutor">
    		<property name="workManagerName" value="wm/default" />
    	</bean>
    	<bean id="customerItemReader"
    		class="org.springframework.batch.item.database.JdbcCursorItemReader"
    		scope="step">
    		<property name="dataSource" ref="non-XADataSource" />
    		<property name="rowMapper">
    			<bean class="com.virtualbusiness.CustomerRowMapper" />
    		</property>
    		<property name="sql"
    			value="SELECT CUSTOMER_ID FROM CUSTOMER_FSM WHERE CURRENT_STATE in ('VALID','MERGED') and MOD(ARE_SEQ_ID,${grid.size}) = #{stepExecutionContext[partitionNumber]}" />
    	</bean>
    
    	<bean id="mergeItemWriter" class="com.virtualbusiness.MergeItemWriter"
    		scope="step">
    		<!-- A transactional datasource -->
    		<constructor-arg ref="oracleDataSource" />
    		<!--
    			The next step in the process for customers that are successfully
    			merged!
    		-->
    		<property name="nextItemWriter">
    			<bean
    				class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter">
    				<property name="messagingGateway" ref="sendMergedCustomersToNextStage" />
    			</bean>
    		</property>
    	</bean>
    	<bean id="sendMergedCustomersToNextStage"
    		class="org.springframework.integration.gateway.SimpleMessagingGateway"
    		scope="step">
    		<property name="requestChannel" ref="nextStageRequests" />
    		<property name="replyChannel" ref="nextStageResponses" />
    	</bean>
    	<integration:channel id="nextStageRequests">
    		<integration:dispatcher task-executor="taskExecutor" />
    	</integration:channel>
    	<integration:channel id="nextStageResponses" />
    	<integration:chain input-channel="nextStageRequests" output-channel="nextStageResponses">
    		<integration:service-activator ref="chunkHandler" method="handleChunk" />
    	</integration:chain>
    
    	<bean id="chunkHandler"
    		class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
    		<property name="chunkProcessor">
    			<bean
    				class="com.virtualbusiness.ChunkProcessorNextStageCustomers" />
    		</property>
    	</bean>

    Comment

    Working...
    X