Announcement Announcement Module
Collapse
No announcement yet.
Late-binding and Partitioning Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Late-binding and Partitioning

    I've set up a configuration that use a PartitionStep in order to use a single step to process multiple input files the same way.

    In fact, this thread is a followup from a previous post suggesting this approach.

    That being said, below is the execution log. It looks good but a strange behavior consist of the "writer" late-binding being done only once, so the output is all in single file.


    Code:
    INFO  [SimpleJobLauncher]#[main] No TaskExecutor has been set, defaulting to synchronous executor.         
    INFO  [SimpleJobLauncher]#[main] Job: [SimpleJob: [name=myJob]] launched with the following parameters: [{param=gino-61}]
    INFO  [AbstractJob]#[main] Executing step: [email protected]794cc
    INFO  [MultiInputFilesPartitioner]#[main] Files to be processed: 2
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Rehydrating scoped target: [lazyBindingProxy.myItemReader]         
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Replaced [file:/data/input/#{stepExecutionContext[filename]}] with [file:/data/input/file2.csv]
    DEBUG [StepScope]#[SimpleAsyncTaskExecutor-1] Creating object in scope=step, name=lazyBindingProxy.myItemReader#execution#678
    INFO  [MyItemReader]#[SimpleAsyncTaskExecutor-1] Opening file [file2.csv] for reading.                      
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Rehydrating scoped target: [lazyBindingProxy.myCompositeItemWriter]             
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Replaced [file:/data/normalized/#{stepExecutionContext[filename]}.normalized] with [file:/data/normalized/file2.csv.normalized]
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Replaced [file:/data/rejected/#{stepExecutionContext[filename]}.rejected] with [file:/data/rejected/file2.csv.rejected]
    DEBUG [StepScope]#[SimpleAsyncTaskExecutor-1] Creating object in scope=step, name=lazyBindingProxy.myCompositeItemWriter#execution#678
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-2] Rehydrating scoped target: [lazyBindingProxy.myItemReader]          
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-2] Replaced [file:/data/input/#{stepExecutionContext[filename]}] with [file:/data/input/file1.csv]                                
    DEBUG [StepScope]#[SimpleAsyncTaskExecutor-2] Creating object in scope=step, name=lazyBindingProxy.myItemReader#execution#677
    INFO  [MyItemReader]#[SimpleAsyncTaskExecutor-2] [B]Opening file [file1.csv] for reading/B].
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-2] Rehydrating scoped target: [lazyBindingProxy.myCompositeItemWriter]            
    DEBUG [StepScope]#[SimpleAsyncTaskExecutor-2] Creating object in scope=step, name=lazyBindingProxy.myCompositeItemWriter#execution#677
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Rehydrating scoped target: [lazyBindingProxy.myItemReader]            
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Replaced [file:/data/input/#{stepExecutionContext[filename]}] with [file:/data/input/file2.csv]
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Rehydrating scoped target: [lazyBindingProxy.myCompositeItemWriter]            
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-2] Rehydrating scoped target: [lazyBindingProxy.myItemReader]                                   
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-2] Replaced [file:/data/input/#{stepExecutionContext[filename]}] with [file:/data/input/file1.csv]
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-2] Rehydrating scoped target: [lazyBindingProxy.myCompositeItemWriter]            
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-2] Rehydrating scoped target: [lazyBindingProxy.myItemReader]                                   
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-2] Replaced [file:/data/input/#{stepExecutionContext[filename]}] with [file:/data/input/file1.csv] 
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Rehydrating scoped target: [lazyBindingProxy.myItemReader]          
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Replaced [file:/data/input/#{stepExecutionContext[filename]}] with [file:/data/input/file2.csv] 
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Rehydrating scoped target: [lazyBindingProxy.myCompositeItemWriter]              
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-2] Rehydrating scoped target: [lazyBindingProxy.myCompositeItemWriter]
    .. several lines repeated...
    Code:
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Rehydrating scoped target: [lazyBindingProxy.myCompositeItemWriter]             
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Rehydrating scoped target: [lazyBindingProxy.myItemReader]                                   
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Replaced [file:/data/input/#{stepExecutionContext[filename]}] with [file:/data/input/file2.csv]
    DEBUG [PlaceholderTargetSource]#[SimpleAsyncTaskExecutor-1] Rehydrating scoped target: [lazyBindingProxy.myCompositeItemWriter] 
    INFO  [SimpleJobLauncher]#[main] Job: [SimpleJob: [name=myJob]] completed with the following parameters: [{param=gino-61}] and the following status: [COMPLETED]
    The "stepScope" bean is defined and scope="step" is declared on both the Reader and Writer properties of the "slave" step.

    Does it as to be specified elsewhere?

    PartitionStep config:
    Code:
    <bean id="myJob" parent="simpleJob">
        <property name="steps">
    	<bean name="myStep:master" class="org.springframework.batch.core.partition.support.PartitionStep">
    	    <property name="partitionHandler">
    	      <bean class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
    		<property name="taskExecutor">
    		    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
    		</property>
    		<property name="step" ref="myStep" />
    		<property name="gridSize" value="10" />
    	      </bean>
    	    </property>
    	    <property name="stepExecutionSplitter">
    	        <bean class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter">
    		<constructor-arg ref="jobRepository" />
    		<constructor-arg ref="myStep" />
    		<constructor-arg ref="filePartitioner" />
    	        </bean>
    	    </property>
    	    <property name="jobRepository" ref="jobRepository" />
                </bean>
            </property>
    </bean>
    And finally the Partitioner config:
    Code:
    <bean id="filePartitioner" class="com.mycompany.MultiInputFilesPartitioner">
        <property name="resources" value="file:${data.root}/${batch.name}/input/*.csv"/>
        <property name="monitor" ref="monitor"/>
    </bean>

    I'm wondering why the late-binding does not work the same way on the Reader and Writer.

  • #2
    Curious?

    Just out of curiosity... did you use the MultiResourceItemWriter for your outputs? I believe that is how you map them to multiple output files.

    Keith

    Comment


    • #3
      It works for me still. I just added a partitioning sample to SVN if you want to play spot the difference. Can you isolate the problem in a test case somewhere? What about making the step a bit simpler as a temporary measure to work out why it isn't working, e.g.using a non-composite item writer?

      Comment


      • #4
        Originally posted by kmbarlow View Post
        Just out of curiosity... did you use the MultiResourceItemWriter for your outputs? I believe that is how you map them to multiple output files.

        Keith
        MultiResourceItemWriter is a slightly different use case - it creates multiple files from a single stream of items. I wonder if it could be useful in this case?

        Comment


        • #5
          Originally posted by kmbarlow View Post
          Just out of curiosity... did you use the MultiResourceItemWriter for your outputs? I believe that is how you map them to multiple output files.

          Keith
          In fact, based on a suggestion, I created a "CompositeItemWriter" by extending FlatFileItemWriter. I added an itemWriter property for each output file. In the write method, I select, based on some business conditions, to which file the item is going to be writen.
          Last edited by cerrog; Feb 25th, 2009, 08:24 AM. Reason: Precision

          Comment


          • #6
            It works now!

            After re-reading the docs, I've stumble on page 39 (5.1.9) where it says that when you're using a composite writer, you need to declare all the "delegate" writers as streams.

            I've also fixed some minor issues in my composite writer implementation.

            Finally, I've replaced the taskExecutor from SimpleAsyncTaskExecutor to SyncTaskExecutor in order to have the files processed sequentialy.

            Thanks!

            Comment

            Working...
            X