Announcement Announcement Module
Collapse
No announcement yet.
Steps repeating with same data Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Steps repeating with same data

    Hi,

    I have a single Job containing a single Step and multiple ItemProcessors. For performance reasons (the job is processing ~50Tb of data), I am using a partition handler. Processing is handled in multiple local threads, as disk IO is identified to be a bottleneck - there is no middleware is in place.

    I have just discovered that the first ItemProcessor in my chain is occasionally retrieving the same record from the ItemReader multiple times.

    The read() method in my ItemReader is synchronized and I can confirm that each record is only retrieved once from the ItemReader. Since this seems odd behaviour, I'm wondering if there's something wonky in my partitioning setup. I'm pretty sure it's by the book, but I'd much appreciate any thoughts.

    Hopefully the below snippet is sufficient:

    Code:
    <job id="migrateJob" xmlns="http://www.springframework.org/schema/batch">
            <step id="migrate.master">
                <partition step="migrate" partitioner="partitioner">
                    <handler grid-size="#{applicationProperties['job.poolSize']}" task-executor="taskExecutor" />
                </partition>
            </step>
        </job>
    
        <step id="migrate" xmlns="http://www.springframework.org/schema/batch">
            <tasklet transaction-manager="transactionManager">
                <chunk reader="fedoraReader" processor="recordProcessor" writer="rosettaWriter" skip-limit="10" commit-interval="1">
                    <skippable-exception-classes>
                        <include class="java.lang.Exception" />
                    </skippable-exception-classes>
                </chunk>
                <listeners>
                    <listener ref="skipListener" />
                    <listener ref="processListener" />
                </listeners>
            </tasklet>
        </step>
    
        <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
    For what it's worth, here's the test console output from my ItemReader read() method interspersed with output from the first ItemProcessor. You'll see that, for instance, identifier 'archives:1006' is being processed multiple times:

    Code:
    Reading record: 0 / archives:1 / SimpleAsyncTaskExecutor-6
    Reading record: 1 / archives:10 / SimpleAsyncTaskExecutor-3
    Reading record: 2 / archives:1000 / SimpleAsyncTaskExecutor-11
                    Processing record: archives:10 / SimpleAsyncTaskExecutor-3
                    Processing record: archives:1 / SimpleAsyncTaskExecutor-6
    Reading record: 3 / archives:1001 / SimpleAsyncTaskExecutor-8
                    Processing record: archives:1001 / SimpleAsyncTaskExecutor-8
                    Processing record: archives:1000 / SimpleAsyncTaskExecutor-11
    Reading record: 4 / archives:1002 / SimpleAsyncTaskExecutor-10
                    Processing record: archives:1002 / SimpleAsyncTaskExecutor-10
    Reading record: 5 / archives:1003 / SimpleAsyncTaskExecutor-7
                    Processing record: archives:1003 / SimpleAsyncTaskExecutor-7
    Reading record: 6 / archives:1004 / SimpleAsyncTaskExecutor-5
                    Processing record: archives:1004 / SimpleAsyncTaskExecutor-5
    Reading record: 7 / archives:1005 / SimpleAsyncTaskExecutor-2
                    Processing record: archives:1005 / SimpleAsyncTaskExecutor-2
    Reading record: 8 / archives:1006 / SimpleAsyncTaskExecutor-6
                    Processing record: archives:1006 / SimpleAsyncTaskExecutor-6
    Reading record: 9 / archives:1007 / SimpleAsyncTaskExecutor-4
                    Processing record: archives:1007 / SimpleAsyncTaskExecutor-4
    Reading record: 10 / archives:1008 / SimpleAsyncTaskExecutor-9
                    Processing record: archives:1008 / SimpleAsyncTaskExecutor-9
                    Processing record: archives:1000 / SimpleAsyncTaskExecutor-11
                    Processing record: archives:1006 / SimpleAsyncTaskExecutor-6
                    Processing record: archives:1003 / SimpleAsyncTaskExecutor-7
    Reading record: 11 / archives:1009 / SimpleAsyncTaskExecutor-11
                    Processing record: archives:1009 / SimpleAsyncTaskExecutor-11

  • #2
    Have worked it out; as suspected, my haziness regarding partitioning and naive implementation of SimpleStepExecutionSplitter (only!) was to blame. For others who are confused, take a look at the Spring Batch code and examples; in particular, the MinMaxPartitioner and related code.

    Comment


    • #3
      Simliar problem - no solution yet

      I want to start and state that we are using Spring Batch 2.0.3 and upgrading right now is not an option, although fixing this issue is important. The batch also runs in a Websphere 6.1 EJB container with the application context loaded using ContextSingletonBeanFactoryLocator

      We have a batch run that runs very slow because the tasks are executed sequentially. So I am trying to use the partioning stuff to make it run concurrently. What I'm trying to achieve it so process 1700 records in 10 threads one at a time. So 10 should execute simultaneously and when they complete the next 10 until all 1700 rows are processed.

      I do not understand the whole gridsize thing. I think just understanding will solve my problem.

      In the examples the partioner (ColumnRangePartitioner), I do not see a MinMaxPartitioner by the way, does not return a map of size gridsize it is larger than gridsize. Looking at the source of MultiResourcePartitioner - it ignores the gridsize altogether. Looking at SimplePartitioner is not helpful either because that creates a map of gridsize with empty contexts. So does not realy partition anything.

      I tried both approaches and I am currently testing on one machine, so no clustering issues. First getting my partitioner to create a map as big as the data set and then to only returning the first 'gridsize' records in the map. In both cases the batch keeps on processing the same items over and over.

      Here is my Partitioner. Note that the stepwriter updates the processed field to 1 (this actually happes for first 'gridsize' records) so partioner cannot pick same records up if called again.
      Code:
      	public Map<String, ExecutionContext> partition(int gridsize) {
      		Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
      		
      		Object[] args = new Object[1];
      		args[0] = jobId;
      
                      //->> change here to apply gridsize
      		//jdbcTemplate.setMaxRows(gridsize);
      
      		List<Map> idList = jdbcTemplate.queryForList("SELECT ID FROM " + this.tableName + " PROCESSED = 0 AND JOBID = ? ORDER BY ID ASC",args);
      		int count = 0;
      		for (Map m : idList){
      			Long id = (Long)m.get("id");
      			ExecutionContext context = new ExecutionContext();
      			context.putLong(keyName, id);
      			result.put("partition" + count, context);
      			count++;
      		}
      		
      		System.out.println(" MAP SIZE: " + result.size());
      		
      		return result;
      	}
      The step definition looks like this
      Code:
      <beans:bean name="extract:master" class="org.springframework.batch.core.partition.support.PartitionStep">
      	    <beans:property name="jobRepository" ref="jobRepository" />
      	    <beans:property name="stepExecutionSplitter">
      			<beans:bean name="stepExecutionSplitter" class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter">
      			    <beans:constructor-arg ref="jobRepository" />
      		   	    <beans:constructor-arg  ref="extract" />
      			    <beans:constructor-arg ref="extractIdPartitioner"/>
      			</beans:bean>
      	    </beans:property>
      	    <beans:property name="partitionHandler">
      	    	<beans:bean class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
      	    		<beans:property name="taskExecutor" ref="taskExecutor"/>
      			    <beans:property name="step" ref="extract" />
      			    <beans:property name="gridSize" value="10" />
      			</beans:bean>
      	    </beans:property>
      		<beans:property name="stepExecutionListeners">
      			<beans:list>
      				<beans:ref bean="extractIdPartitioner"/> <!-- added here to pick up jobId from step execution context and save it to use later.-->
      			</beans:list>
      		</beans:property>
      	</beans:bean>
      	
      	<beans:bean id="extractIdPartitioner" class="momentum.wealth.batch.jobs.statements.IDPartitioner">
      		<beans:property name="dataSource" ref="dataSource"/>	
      	</beans:bean>	
      	
      	<step id="extract">
      		<tasklet allow-start-if-complete="true" >
      			<chunk reader="extractReader" writer="extractWriter" commit-interval="1" skip-limit="10">
      				<skippable-exception-classes>
      					java.lang.Exception
      				</skippable-exception-classes>
      			</chunk>	
      			<listeners>
      				<listener ref="extractStepListener"/>
      			</listeners>					
      		</tasklet>
      	</step>	
      
      
      	<beans:bean name="taskExecutor" class="org.springframework.scheduling.commonj.WorkManagerTaskExecutor">
      		<beans:property name="workManagerName" value="wm/worker"/>
      	</beans:bean>
      Last edited by divren; Jun 10th, 2011, 08:21 AM.

      Comment


      • #4
        I changed my reader to return null if the row's processed column is 1 so it will only read something the first time and the second time the reader is called it returns null.

        It the case of a small gridsize like 10 it completes the 10 and then the batch completes successfully.

        When ignoring the partition size in the the partitioner effectively returning a map entry for every record (1700) it seems to process all the rows but it also seems to try to start a thread per record all at once

        Comment


        • #5
          I think either you have just misunderstood the responsibility of a Partitioner, or I don't understand your requirement. It's role is to produce a set of ExecutionContext instances for the StepExecutions that are kicked off by the PartitionHandler. If you want 1700 StepExecutions then what you did is correct, and you can ignore the gridsize (it's only a hint). If you want 10 (=gridsize), then you need to split your 1700 records into 10 batches. It looks like you can use your code with the jdbcTemplate, but only create 10 ExecutionContext objects and put entries in them for minId and maxId (or whatever you want to call them) that you can then refer back to in your extractReader (which should therefore be step scoped).

          It's actually pretty similar to the JDBC partitioning sample in the Batch distro, just with a slightly different Partitioner strategy.

          Comment


          • #6
            Thanks Dave, you are right that I did not understand the role of the partitioner properly. I hoped that if I added more than 10 contexts that still only 10 would execute at the same time. My problem is that the work required by 1 step execution is significant. In production the batch to partition is more like 100k records and there is no way that 1 thread will be able to handle more than 10 items. The call a reporting service to create pdf documents.

            I was thinking of using the batch-integration stuff instead?

            Comment


            • #7
              An an aside - the MinMaxPartitioner is found in the test code for Spring Batch. It is a naive implementation (it doesn't handle situations where grid size is larger than total records, for instance, and there's a slight bug in the calculations), but is a good starting point if your data can be partitioned this way.

              I'm not sure why a more robust version isn't produced as an option in the API, to be honest. I found it to be an invaluable starter, particularly since I found that there were many interrelating concepts with Spring Batch - and partitioning specifically - that took a while to fully understand.

              I have a working example of multi-threaded steps and an enhanced MinMaxPartitioner if you're interested, and if it is indeed still relevant.

              Comment


              • #8
                Good. You can make contributions via Github (read the instructions there and follow the contributor agreement link first). A more robust MinMaxPartitioner would be more than welcome, especially if you are actually using it.

                Comment


                • #9
                  Absolutely; I will do this in the morning.

                  M


                  Originally posted by Dave Syer View Post
                  Good. You can make contributions via Github (read the instructions there and follow the contributor agreement link first). A more robust MinMaxPartitioner would be more than welcome, especially if you are actually using it.

                  Comment


                  • #10
                    Thanks for your replies but I still do not have a clear idea of how to solve my problem. It seems that partitioning is not the solution. I'll start a new thread for that as the duplicating was only a side effect.

                    Comment


                    • #11
                      I don't understand the problem yet. Partitioning seemed like a reasonable approach to processing your 100K rows, but your partitioner was implemented wrong to get the right number of partitions. What was the issue?

                      Comment


                      • #12
                        Basically I think it is not really practical to process so many records as part of one step - but is is likely that I am misunderstanding something.

                        Sorry for the inconvenience. I started a new thread here - I hope it wasn't the wrong approach. I hope it explains the problem in more detail: http://forum.springsource.org/showth...706#post366706

                        Originally posted by Dave Syer View Post
                        I don't understand the problem yet. Partitioning seemed like a reasonable approach to processing your 100K rows, but your partitioner was implemented wrong to get the right number of partitions. What was the issue?

                        Comment

                        Working...
                        X