Announcement Announcement Module
Collapse
No announcement yet.
FlatFileItemWriter written lines are not sorted Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • FlatFileItemWriter written lines are not sorted

    Hello,

    I'm facing a strange problem with FlatFileItemWriter. The written lines are not sorted in the output file. The last invalid line is not at the bottom of my output file.

    This writer is called by a SkipListener to log invalid lines while reading from an input file using flatFileReader.

    This is my config :

    Code:
    	<batch:job id="myJob">
    		<batch:listeners>
    			<batch:listener ref="appJobExecutionListener" />
    		</batch:listeners>
    		<batch:step id="step1">
    			<batch:tasklet>
    				<batch:listeners>
    					<batch:listener ref="mySkipListener" />
    				</batch:listeners>
    				<batch:chunk reader="flatFileReader" writer="itemWriter"
    					processor="compositeItemProcessor" commit-interval="${my.commit.interval}"
    					skip-limit="${my.skip.limit}" retry-limit="${my.retry.limit}">
    					<batch:skippable-exception-classes>
    						<batch:include
    							class="org.springframework.batch.item.file.FlatFileParseException" />
    						<batch:include
    							class="org.springframework.batch.item.validator.ValidationException" />
    					</batch:skippable-exception-classes>
    					<batch:streams>
    						<batch:stream ref="errorWriter" />
    					</batch:streams>
    				</batch:chunk>
    			</batch:tasklet>
    		</batch:step>
    	</batch:job>
    
    	<bean id="myLineAggregator" class="com.xm.auth.batch.my.MyLineAggregator" />
    	<bean id="errorWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    		<property name="resource" value="${my.error.file.location}" />
    		<property name="lineAggregator" ref="myLineAggregator" />
    	</bean>
    But if I set commit-interval to '1' the problem disappear !! but for performance raisons I can not keep this value.

    I'm missing any configuration ?

    Thanks in advance.
    Last edited by Fadjer; Aug 19th, 2010, 05:47 AM.

  • #2
    Skipped items are not necessarily kept in order as they are passed to the listener. Why is it important?

    Comment


    • #3
      In my case every line contains a number, and it's required that the skiped lines keep the same ordre than the original file ....

      Comment


      • #4
        I checked the source code and it looks like the order *is* preserved. What version of Spring Batch are you using?

        Comment


        • #5
          I'm using the version 2.1.2.

          In my case, my LineAggregator is configured to receive multiple class types that extend the same class :
          Code:
          public class MyLineAggregator implements LineAggregator<AbstractParsedLine> {
          
          
          String aggredatedLine = null;
          		if (parsedLine instanceof HeaderParsedLine) {
          			aggredatedLine = this
          					.aggregateHeaderLine((HeaderParsedLine) parsedLine);
          		}
          
          		if (parsedLine instanceof DetailParsedLine) {
          			aggredatedLine = this
          					.aggregateDetailLine((DetailParsedLine) parsedLine);
          		}
          
          		if (parsedLine instanceof TrailerParsedLine) {
          			aggredatedLine = this
          					.aggregateTrailerParsedLine((TrailerParsedLine)parsedLine);
          		}
          
          		return aggredatedLine;
          
          }
          Is there any manner to define an order between this classes types ? I think the problem is coming from here .....

          Comment


          • #6
            The aggregator should be called with the items in order as they are written. You said you were using a listener to do the writing, so you actually control that process. From what I can tell the items are passed to the listener in the order they failed, but that isn't necessarily the order they were read. Maybe you need to impose an order in your reader that you can re-impose in the listener by sorting the list of items?

            Comment


            • #7
              So the problem can not come from the Agregator.

              Perhaps it's comming from the FieldSetMapper. Here is my config, and as I mentioned in my previous post I'm extracting three different object for the same file : Header, Detail and Trailer

              Code:
              	<bean id="headerFieldSetMapper"
              		class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
              		<property name="prototypeBeanName" value="headerParsedLine" />
              	</bean>
              	<bean id="headerParsedLine" class="com.xm.auth.batch.my.model.HeaderParsedLine"
              		scope="prototype" />
              
              	<bean id="detailFieldSetMapper"
              		class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
              		<property name="prototypeBeanName" value="detailParsedLine" />
              	</bean>
              	<bean id="detailParsedLine" class="com.xm.auth.batch.my.model.DetailParsedLine"
              		scope="prototype" />
              
              	<bean id="trailerFieldSetMapper"
              		class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
              		<property name="prototypeBeanName" value="trailerParsedLine" />
              	</bean>
              	<bean id="trailerParsedLine" class="com.xm.auth.batch.my.model.TrailerParsedLine"
              		scope="prototype" />
              
              
              	<bean id="fieldSetFactory"
              		class="com.xm.auth.batch.my.model.MyFieldSetFactory" />
              	<bean id="headerLineTokenizer"
              		class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
              		<property name="names" value="${my.header.fields.names}" />
              		<property name="columns" value="${my.header.fields.indexes}" />
              		<property name="fieldSetFactory" ref="fieldSetFactory" />
              	</bean>
              	<bean id="detailLineTokenizer"
              		class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
              		<property name="names" value="${my.detail.fields.names}" />
              		<property name="columns" value="${my.detail.fields.indexes}" />
              		<property name="fieldSetFactory" ref="fieldSetFactory" />
              	</bean>
              	<bean id="trailerLineTokenizer"
              		class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
              		<property name="names" value="${my.trailer.fields.names}" />
              		<property name="columns" value="${my.trailer.fields.indexes}" />
              		<property name="fieldSetFactory" ref="fieldSetFactory" />
              	</bean>
              How to ensure an order between different object classes when extracted by the FieldSetMapper ?

              Thx in advance.
              Last edited by Fadjer; Aug 19th, 2010, 05:50 AM.

              Comment


              • #8
                I'm not sure what the problem is yet, but it can't be anything that a FieldSetMapper or an Aggregator can handle (they should always be stateless). If the items are not coming to the skip listener in the order that they were read, that suggests the failures are transient and could succeed if retried?

                Anyway, if you have an order that you can impose on the items as they are written, then you have to find a place to accumulate the state into a chunk before it is written. If ItemSkipListener.onSkipInWrite() accepted the whole list of skipped items you wouldn't have the problem, so I guess the best place to deal with it might be there. If the step is single threaded you can implement ItemSkipListener and ChunkListener, accumulate state in your skip listener and then sort it and write it out in the chunk listener (note that it will be outside the transaction in that case - if you care about that then you need a transaction synchronization instead of a chunk listener).

                Comment

                Working...
                X