Announcement Announcement Module
Collapse
No announcement yet.
Multi-file input and ouput Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Multi-file input and ouput

    Hello,

    I have a requirement that I don't know how to handle.

    I have to read from multiple input files. For each input file, I have to write to a different output file (with same name but different suffix). During the processing, if a record is identified to be skipped, it has to be written to a different output file too (with same name as the input file but with .rejected suffix)

    Example:

    File pattern to read: /input/file.csv.*
    Files in the input directory: file.csv.001, file.csv.002
    If during processing, at least 1 record has been skipped in each file, the result should be something like:

    In the output directory: file.csv.001.out, file.csv.002.out
    In the reject directory: file.csv.001.rejected, file.csv.002.rejected

    That being said, my first guess was to subclass 'FaultTolerantStepFactoryBean' and 'TaskletStep' in order to be able to iterate thru the itemReader-itemProcessor-itemWriter-skipListener cycle for each input file.

    I'm about to start coding but I keep telling myself that I'm probably not the first guy with that kind of requirement. So that's why I turn myself to the Spring-Batch community for some insight.

    Regards,

    Gino.

  • #2
    We exactly had the same requirement of reading and wrting into multiple files. We achieved this by extending SimpleJobLauncher. We provided an extra attribute for this launcher as below.

    <code>
    <bean id="jobLauncher" class="=.....CustomJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
    <property name="resources" value="D:/folder" />
    </bean>

    </code>

    In overridden run method, we loop through each file read by resources attribute and process them accordingly.

    Comment


    • #3
      Gino,

      You should solve this problem by creating a composite ItemWriter that has two FlatFileItemWriters as properties. The write() method of the composite will determine to which file the record must be written.

      As for processing multiple input files, if they must map directly to output files (instead of treat all the input as one big file) then you should wrap the job launcher to loop through the files and to pass each filename as a parameter to a job being launched.

      Comment


      • #4
        Thanks Guys for the hints!

        I've implemented the "composite item writer" solution and it works fine! I only had to add a flag in the "item" class that indicate which file it has to be written to (valid or rejected record).

        Tomorrow, I'll try the "job launcher" solution to manage the one-to-one mapping between multiple input and output files.

        I'll let you know.

        Regards,

        Gino.

        Comment


        • #5
          It is almost working now...

          The JobLauncher implementation is a great idea and is pretty simple to implement!

          But I still have an issue with passing each file to be processed as JobParameter.

          Here's the code from the JobLauncher's run method:

          Code:
          // some code to manage state vs restartable, etc...
          
          for (Resource resource : resources) {
              JobParameter param = new JobParameter(resource.getFilename());
              jobParameters.getParameters().put("file.name", param);
              taskExecutor.execute(new MyRunnable(job, jobParameters, jobExecution));
              numberOfFilesProcessed++;
          }
          		
          return jobExecution;
          }

          Here's the step's config:

          Code:
          <bean id="myStep" parent="simpleStep">
              <property name="itemReader">
                  <bean parent="myFileItemReader" scope="step"/>
              </property>
              <property name="itemWriter">
                  <bean parent="myCompositeItemWriter" scope="step"/>
             </property>
          </bean>
          The reader:

          Code:
          <bean id="myFileItemReader" parent="flatFileItemReader">
              <property name="resource" value="file:${data.root}/${batch.name}/input/#{jobParameters[file.name]}" />
          
           ... other bean properties...
          </bean>
          The composite writer:

          Code:
          <bean id="myCompositeItemWriter" class="MyCompositeItemWriter">
              <property name="validDelegateWriter" ref="validItemWriter"/>
              <property name="rejectedDelegateWriter" ref="rejectedItemWriter"/>
          </bean>
          The delegate writers:

          Code:
          <bean id="validRecordItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
              <property name="resource" value="file:${data.root}/${batch.name}/output/#{jobParameters[file.name]}" />
          ... other properties...
          </bean>
          	
          <bean id="rejectedItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
              <property name="resource" value="file:${data.root}/${batch.name}/rejected/#{jobParameters[file.name]}"/>
          </bean>
          Then the step scope:

          Code:
          <bean class="org.springframework.batch.core.scope.StepScope" />
          And now, the console output:

          Job: [SimpleJob: [name=myJob]] launched with the following parameters: [{param=Gino-45}]

          Theres no "file.name" parameter...

          from my log: Opening file [input] for reading.


          And finally, the error:

          org.springframework.batch.item.ItemStreamException : Failed to initialize the reader


          Am I missing something? Obviously, I don't see it... but probably tomorrow's going to be a better day.


          Regards,

          Gino


          PS I'm using 2.0.0 M4

          Comment


          • #6
            Well, things are progressing:

            I've successfully passed my "file name" parameter:

            [SimpleJob: [name=myJob]] launched with the following parameters: [{timeInMilliseconds=1235070782983, file.name=file1.csv}]

            By using the JobParameterBuilder:

            Code:
            for (Resource resource : resources) {
                builder.addLong("timeInMilliseconds", System.currentTimeMillis());
                jobParameters = builder.addString("file.name", resource.getFilename()).toJobParameters();
                logger.debug("Processing file: [" + resource.getFilename() + "]");
                taskExecutor.execute(new MyRunnable(job, jobParameters, jobExecution));
                numberOfFilesProcessed++;
            }
            BUT... I still have the same error:

            org.springframework.batch.item.ItemStreamException : Failed to initialize the reader

            And the same message in my log:

            Opening file [input] for reading.

            So it looks like the late binding does not work:

            Code:
            <property name="resource"
            value="file:${data.root}/${batch.name}/input/#{jobParameters[file.name]}" />
            To be continued...

            Comment


            • #7
              Any Updates on this.. am trying similiar where Output needs to go to two files

              Garette - tried the compositeitemwriter as below.. how do we specify that on exception or some condition go to bad file....

              thanks....

              <job id="CSVFileLoadJobSkipErrors" restartable="true">
              <step id="processCSVFileLoadJobSkipErrors" >
              <tasklet>
              <chunk reader="itemReaderForTest" processor="itemProcessor" writer="compositeItemWriter" commit-interval="300" skip-limit="5">
              <streams>
              <stream ref="itemWriter"/>
              <stream ref="badItemWriter"/>
              </streams>
              <skippable-exception-classes>org.springframework.batch.item.file.FlatFi leParseException</skippable-exception-classes>
              </chunk>
              </tasklet>
              </step>
              </job>

              <beans:bean id="compositeItemWriter" class="org.springframework.batch.item.support.Comp ositeItemWriter" >
              <beansroperty name="delegates">
              <beans:list>
              <beans:ref bean="itemWriter"/>
              <beans:ref bean="badItemWriter"/>
              </beans:list>
              </beansroperty>
              </beans:bean>

              <beans:bean id="badItemWriter" class="org.springframework.batch.item.file.FlatFil eItemWriter">
              <beansroperty name="saveState" value="true"/>
              <beansroperty name="resource" ref="badOutputResource" />
              <beansroperty name="lineAggregator">
              <beans:bean class="org.springframework.batch.item.file.transfo rm.PassThroughLineAggregator" />
              </beansroperty>
              </beans:bean>

              <beans:bean id="itemWriter" class="org.springframework.batch.item.file.FlatFil eItemWriter">
              <!--
              <beansroperty name="shouldDeleteIfExists" value="true"/>
              -->
              <beansroperty name="saveState" value="true"/>
              <beansroperty name="resource" ref="outputResource" />
              <beansroperty name="lineAggregator">
              <beans:bean class="org.springframework.batch.item.file.transfo rm.DelimitedLineAggregator">
              <beansroperty name="delimiter" value="||"/>
              <beansroperty name="fieldExtractor">
              <beans:bean class="org.springframework.batch.item.file.transfo rm.BeanWrapperFieldExtractor">
              <beansroperty name="names" value="accountId,dataSourceTypeId,originalAccountN umber,accountNumber,primarySSN,updatedBy,updatedDa te"/>
              </beans:bean>
              </beansroperty>
              </beans:bean>
              </beansroperty>
              </beans:bean>

              <beans:bean id="outputResource" class="org.springframework.core.io.FileSystemResou rce">
              <beans:constructor-arg value="data/output/csv/pipedDelimitedOutput.txt" />
              </beans:bean>

              <beans:bean id="badOutputResource" class="org.springframework.core.io.FileSystemResou rce">
              <beans:constructor-arg value="data/output/csv/pipedDelimitedOutput_bad.txt" />
              </beans:bean>

              Also currently as there is no condition it writes the result twice but to same file instead of two seperate files..

              Comment


              • #8
                You'll need to write a custom composite ItemWriter. Something like:
                Code:
                public class CompositeItemWriter<T> implements ItemWriter<T> {
                
                    private ItemWriter<T> itemWriter; //setter omitted
                    private ItemWriter<T> badItemWriter; //setter omitted
                
                    public void write(List<? extends  T> items) throws Exception {
                        List<? extends  T> goodItems = new ArrayList<? extends T>();
                        List<? extends  T> badItems = new ArrayList<? extends T>();
                
                        for (T item: items) {
                            if (recordIsBad(item)) {
                                goodItems.add(item);
                            }
                            else {
                                badItems.add(item);
                            }
                        }
                
                        itemWriter.write(goodItems);
                        badItemWriter.write(badItems);
                    }
                }

                Comment


                • #9
                  Cerrog:

                  I am doing something similar. Any chance you share the progress you have made? Specifically looking for the job xml and job launcher and any other relevant config file you may be using.

                  Comment


                  • #10
                    I am also looking out for the same test case
                    Hoping that some one comes with a clear Source code to explain me in details , or else this dumb brain of mine cant understand anything

                    Cheers
                    Sandeep

                    Comment


                    • #11
                      I got it working as it should in order to meet our requirements.

                      Here's some stuff that I hope it will help!

                      The two problems that I needed nail down were:
                      A- the job has to iterate thru a set of files from the input directory
                      B- valid and rejected records must be written to separate output files in order to get handled by their respective business process

                      Here are pertinent parts of my main SB config file (removed some details for clarity).
                      FilePartitioner:
                      Code:
                      <beans:bean id="filePartitioner" class="com.MultiInputFilesPartitioner">
                        <beans:property name="resources" value="file:${data.root}/input/${collected.file.name}"/>
                      </beans:bean>
                      MultiInputFilesPartitioner implements Partitioner, customized to load the partitions based on available files (ex: sorted by date).

                      NormalizerStepMaster:
                      Code:
                      <beans:bean id="normalizerStepMaster" name="normalizerStep:master" class="com.PartitionStep">
                        <beans:property name="partitionHandler">
                          <beans:bean class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
                            <beans:property name="taskExecutor">
                              <beans:bean class="org.springframework.core.task.SyncTaskExecutor" />
                            </beans:property>
                            <beans:property name="step" ref="normalizerStep" />
                            <beans:property name="gridSize" value="3" />
                          </beans:bean>
                        </beans:property>
                        <beans:property name="stepExecutionSplitter">
                          <beans:bean class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter">
                            <beans:constructor-arg ref="jobRepository" />
                            <beans:constructor-arg ref="normalizerStep" />
                            <beans:constructor-arg ref="filePartitioner" />
                          </beans:bean>
                        </beans:property>
                        <beans:property name="jobRepository" ref="jobRepository" />
                      </beans:bean>
                      PartitionStep extends AbstractStep in order to customize the doExecute.

                      And finally, the NormalizerStep, which is executed once for each input file:
                      Code:
                      <beans:bean id="normalizerStep" parent="simpleStep">
                        <beans:property name="itemReader" ref="normalizerItemReader"/>
                        <beans:property name="itemProcessor">
                          <beans:bean id="compositeItemProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
                            <beans:property name="itemProcessors">
                              <beans:list>
                                <beans:ref bean="normalizerValidatingItemProcessor" />
                                <beans:ref bean="recordCounterItemProcessor" />
                              </beans:list>
                             </beans:property>
                           </beans:bean>
                        </beans:property>
                        <beans:property name="itemWriter" ref="normalizerCompositeItemWriter"/>
                        <beans:property name="streams">
                          <beans:list>
                            <beans:ref bean="validRecordItemWriter"/>
                            <beans:ref bean="rejectedRecordItemWriter"/>
                           </beans:list>
                         </beans:property>
                      </beans:bean>
                      The normalizerCompositeItemWriter is implemented as DHGarrette suggested.
                      The normalizerValidatingItemProcessor is the one that decides if a record is valid or rejected.
                      The recordCounterItemProcessor is a custom processor that checks if the record count match with the input file footer.

                      I really hope the it will help you get out of the Dead-end.

                      Good luck!

                      Gino

                      Comment


                      • #12
                        Thanks a lot for the ideas ,
                        You people are brilliant, Thank you.
                        Last edited by sansun2111; Jun 7th, 2010, 12:15 AM. Reason: Spelling mistakes

                        Comment


                        • #13
                          Multi file input and output

                          I have exactly the same requirement as below...

                          Can you please share job configuration which will help me to implement

                          Thanks
                          -Suri

                          Originally posted by cerrog View Post
                          Hello,

                          I have a requirement that I don't know how to handle.

                          I have to read from multiple input files. For each input file, I have to write to a different output file (with same name but different suffix). During the processing, if a record is identified to be skipped, it has to be written to a different output file too (with same name as the input file but with .rejected suffix)

                          Example:

                          File pattern to read: /input/file.csv.*
                          Files in the input directory: file.csv.001, file.csv.002
                          If during processing, at least 1 record has been skipped in each file, the result should be something like:

                          In the output directory: file.csv.001.out, file.csv.002.out
                          In the reject directory: file.csv.001.rejected, file.csv.002.rejected

                          That being said, my first guess was to subclass 'FaultTolerantStepFactoryBean' and 'TaskletStep' in order to be able to iterate thru the itemReader-itemProcessor-itemWriter-skipListener cycle for each input file.

                          I'm about to start coding but I keep telling myself that I'm probably not the first guy with that kind of requirement. So that's why I turn myself to the Spring-Batch community for some insight.

                          Regards,

                          Gino.

                          Comment

                          Working...
                          X