Announcement Announcement Module
Collapse
No announcement yet.
Chunking, batchUpdate sizes and a single commit Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Chunking, batchUpdate sizes and a single commit

    Suppose you have a lookup/code database table of +100,000 records:

    Database dataset:
    1 - Value1 - valid
    2 - Value2 - valid
    3 - Value3 - valid
    ...

    Every now and then (daily or hourly), a single file comes that contains the full dataset to be used update this table. Any additional rows the database table has that the file dataset does not include should be marked as 'invalid':

    New dataset (file):

    1 - NewValue1
    2 - Value2
    4 - Value4

    Result:

    1 - NewValue1 - valid
    2 - Value2 - valid
    3 - Value3 - invalid
    4 - Value4 - valid

    At no point should the database table contain a only a partially updated subset of the data - that is, the full table should be updated as a single unit of work. (Our current logic is: first mark all the data as invalid, and for every insert/update, set the row as valid.)

    We wish to use 'batches' such that we can use JDBC's batchUpdate functionality, but we only wish to have one commit to update the entire table at once. At first, we considered setting commit-interval to Integer.MAX_VALUE but we feel that job results should not be dependent on commit-interval. Furthermore, we do not want all the records to be stored in memory at the same time.

    So then, we wish to define a JDBC batchUpdate size, but not a commit-size. Is the best solution to code everything in a Tasklet? Or can spring batch's chunking pattern (ItemReader/ItemProcessor/ItemWriter) be used in a special way such that only one commit is performed at the end of the job?

    We have considered writing an ItemReader<Iterable<T>> and an ItemWriter<Iterable<T>> such that it is sort of a 'stream' sent between the reader and writer, and the iterable created in the ItemReader gets its data from a delegate ItemReader, and the ItemWriter writes it's data to a delagate ItemWriter. The ItemWriter then could keep count of how much is actually read, something like:

    Code:
    public class IterableWriter<T> implements ItemWriter<Iterable<T>>, StepExecutionListener {
    
    	private int readCount = 0;
    	private int batchSize = 1000;
    	private ItemWriter<T> delegate;
    
    	@Override
    	public void write(List<? extends Iterable<T>> items) throws Exception {
    		// read 'stream' from Iterable and keep track of actual read count
    		// send data to delegate everytime the batchSize is reached
    	}
    
    	@Override
    	public ExitStatus afterStep(StepExecution stepExecution) {
    		stepExecution.setReadCount(readCount);
    		return null;
    	}
    }
    We have also considered using a helper table (not an actual temporary table, because Oracle empties them upon commit or rollback, and with a connection pool, temporary tables using session retention would be unpredictable). The logic would be to first empty the helper table, populate it in chunks/several commits, then at the end use an execution step to use Oracle's MERGE statement to move the data to the actual table in a single commit. This solution has parallel processing problems such that only a single job can use on the helper table at a time. Also, it seems like a much more complex solution to a simple problem.

    The 'lookup' table is given as an example as this is easy to relate to, but we also have several other batch jobs that have the same problem in that many records must be committed in a single transaction, but we still wish to have 'chunking' in relation to JDBC batchUpdate to speed up the import, and the business logic can be easily divided into reader/processor/writer parts.

    I guess what I am looking for is an interface that the ItemWriter could implement such that it could control the transaction directly - chunk commits do nothing (or perhaps issue a savepoint), but job commits are the actual commit.

    Any suggests?

    Thanks!
    Paul

  • #2
    I think batchUpdate is not available in Spring batch but it will be helpful.

    Comment


    • #3
      Originally posted by tiger.spring View Post
      I think batchUpdate is not available in Spring batch but it will be helpful.
      there an ItemWriter implementation that uses batch updates. Batch updates are also available through the JdbcTemplate.batchUpdate method in Spring JDBC.

      Comment


      • #4
        jdbcTemplate has updateBatch

        We use jdbcTemplate to do the updateBatch (I have omitted some special logic used to divide data up into INSERTs and UPDATEs, but here is some pseudocode using JDBC batch update):

        Code:
        public class XXXItemWriter implements ItemWriter<X>, InitializingBean {
        
            ...
        
            public void afterPropertiesSet() throws Exception {
                jdbcTemplate = new JdbcTemplate(dataSource);
            }
        
            @Override
            public int[] write(final List<X> items) throws Exception {
                String sql = ...
                int[] ret = getJdbcTemplate().batchUpdate(sql, new BatchPreparedStatementSetter() {
                    @Override
                    public void setValues(PreparedStatement ps, int i) throws SQLException {
                        X row = items.get(i);
                        // ps.setXXX(row.getAbc());
                    }
        
                    @Override
                    public int getBatchSize() {
                        return items.size();
                    }
                });
                // check ret is correct...
            }
        }
        So what we are considering currently is a Tasklet that does a lot of the logic Spring Batch does for chunks (full code is given):

        Code:
        import java.util.ArrayList;
        import java.util.List;
        
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import org.springframework.batch.core.StepContribution;
        import org.springframework.batch.core.StepExecution;
        import org.springframework.batch.core.scope.context.ChunkContext;
        import org.springframework.batch.core.step.tasklet.Tasklet;
        import org.springframework.batch.item.ExecutionContext;
        import org.springframework.batch.item.ItemProcessor;
        import org.springframework.batch.item.ItemReader;
        import org.springframework.batch.item.ItemStream;
        import org.springframework.batch.item.ItemWriter;
        import org.springframework.batch.item.support.PassThroughItemProcessor;
        import org.springframework.batch.repeat.RepeatStatus;
        import org.springframework.beans.factory.InitializingBean;
        
        public class SingleCommitChunkOrientedTasklet<I,O> implements Tasklet, InitializingBean {
            
            private static Logger LOGGER = LoggerFactory.getLogger(SingleCommitChunkOrientedTasklet.class);
            
            private ItemReader<I> reader;
            private ItemProcessor<I, O> processor;
            private ItemWriter<O> writer;
            
            private Tasklet preTasklet;
            private Tasklet postTasklet;
            
            private int chunkSize = 1000;
            private int count = 0;
            
            public void setReader(ItemReader<I> reader) {
                this.reader = reader;
            }
            
            public void setProcessor(ItemProcessor<I, O> processor) {
                this.processor = processor;
            }
            
            public void setWriter(ItemWriter<O> writer) {
                this.writer = writer;
            }
            
            public void setChunkSize(int chunkSize) {
                this.chunkSize = chunkSize;
            }
            
            public void setPreTasklet(Tasklet preTasklet) {
                this.preTasklet = preTasklet;
            }
            
            public void setPostTasklet(Tasklet postTasklet) {
                this.postTasklet = postTasklet;
            }
        
            @SuppressWarnings("unchecked")
            @Override
            public void afterPropertiesSet() throws Exception {
                if (processor == null) {
                    processor = (ItemProcessor<I, O>) new PassThroughItemProcessor<I>();
                }
            }
        
            @Override
            public RepeatStatus execute(StepContribution contribution,
                    ChunkContext chunkContext) throws Exception {
                StepExecution stepExecution = prepare(contribution, chunkContext);
                execute(stepExecution);
                return finish(contribution, chunkContext);
            }
        
            private StepExecution prepare(StepContribution contribution,
                    ChunkContext chunkContext) throws Exception {
                
                StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
                if (preTasklet != null) {
                    while (preTasklet.execute(contribution, chunkContext).isContinuable()) {};
                }
                if (reader instanceof ItemStream) {
                    LOGGER.debug("Opening ItemStream...");
                    ExecutionContext executionContext = stepExecution.getExecutionContext();
                    ((ItemStream)reader).open(executionContext);
                    ((ItemStream)reader).update(executionContext);
                }
                return stepExecution;
            }
        
            private void execute(StepExecution stepExecution) throws Exception {
                I item;
                List<O> items = new ArrayList<O>();
                while ((item = reader.read()) != null) {
                    stepExecution.setReadCount(count);
                    O processedItem = processor.process(item);
                    if (processedItem != null) {
                        items.add(processedItem);
                        if (++count % chunkSize == 0) {
                            LOGGER.debug("Writing item numbers {}-{} ({} items)", new Object[]{count-items.size()+1, count, items.size()});
                            writer.write(items);
                            stepExecution.setWriteCount(count);
                            items = new ArrayList<O>();
                        }
                    }
                }
                stepExecution.setReadCount(count);
                if (! items.isEmpty()) {
                    LOGGER.debug("Writing item numbers {}-{} ({} items)", new Object[]{count-items.size()+1, count, items.size()});
                    writer.write(items);
                    stepExecution.setWriteCount(count);
                }
            }
        
            private RepeatStatus finish(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                if (postTasklet != null) {
                    while (postTasklet.execute(contribution, chunkContext).isContinuable()) {};
                }
                if (reader instanceof ItemStream) {
                    LOGGER.debug("Closing ItemStream...");
                    ((ItemStream)reader).close();
                }
                return RepeatStatus.FINISHED;
            }
        
        }
        Then instead of using
        Code:
            <job id="myJob" xmlns="http://www.springframework.org/schema/batch" >
                <step id="prepare" next="step1">
                    <tasklet ref="myTasklet" />
                </step>
                <step id="step1">
                    <tasklet>
                        <chunk reader="myItemReader" processor="myItemProcessor" writer="myItemWriter" commit-interval="1000" />
                    </tasklet>
                </step>
            </job>
        we have
        Code:
            <bean id="singleCommitTasklet" class="x.y.z.SingleCommitChunkOrientedTasklet" scope="step">
                <property name="preTasklet" ref="myTasklet" />
                <property name="reader" ref="myItemReader" />
                <property name="processor" ref="myItemProcessor" />
                <property name="writer" ref="myItemWriter" />
                <property name="chunkSize" value="1000" />
            </bean>
        
            <job id="myJob" xmlns="http://www.springframework.org/schema/batch" >
                <step id="step1">
                    <tasklet ref="singleCommitTasklet" />
                </step>
            </job>
        It seems to work OK, but of course we will have to add more logic as needed (different listeners and so forth - stuff that is already implemented in Spring Batch as it is). It would be nice if Spring Batch had something like:

        Code:
        <atomic-chunk reader="myItemReader" processor="myItemProcessor" writer="myItemWriter" chunk-size="1000" />
        Or
        Code:
        <step ... postponeCommit="true">
         ...
        </step>
        I realize jobs won't be 'resumeable', but to meet the task's requirements and keep the programming pattern as similar as possible, I feel the 'postponeCommit' would be easiest to implement and give users a lot of flexibility.

        I know other's have had trouble with unit tests and the locking semaphore if using @Transactional - we have had trouble with this too as we would like to use DbUnit to sync/populate a table, run the job, check the results and then rollback the entire transaction/job/dbunit stuff such that the database is in the exact same state as when the test started. It would be easy to write a job configuration with postponeCommit="true" for everything and then use @TransactionConfiguration(defaultRollback=true) and @Transactional in the unit test.

        Spring Batch has a lot of positive features, but we feel we need more control over our transactions.

        Comment


        • #5
          Okay. But still the chunk will commit the transaction after every commit-interval number of records written by ItemWriter set for its chunk.

          The issue here is that you don't want to issue a commit unless whole transaction is complete.

          I encountered "chunk-completion-policy" while defining a chunk for my job though I could not find much documentation for it but it states that:

          Code:
          A transaction will be committed when this policy decides to complete. Defaults to a SimpleCompletionPolicy with chunk size equal to the commit-interval attribute.
          Last edited by tiger.spring; Apr 29th, 2011, 11:40 AM. Reason: this is a response to arno

          Comment


          • #6
            A custom CompletionPolicy (extending SimpleCompletionPolicy) looks interesting, but it seems that controls the number of reads before continuing to the writer, not actually hopping over the commit following the writer call.

            Of course, we still wish to read the data in chunks so that we don't have too much in memory at one time, but a custom completion policy appears to have nothing to do with the writer's commit.

            Should we perhaps try a custom TransactionManager?

            Comment


            • #7
              Did you try splitting the job in multiple steps, where first step is chunk processing and the second step is a tasklet whose only job is to insert data into Database.

              So, how about this?

              Code:
              <job id="sampleJob">
              		<!-- First Step is collector. validate and the collect.
              		If all is fine, then we move to daoWriter. If someone is trying to abandon the job, then we STOP the job -->
              		<step id="collector" next="daoWriter">
              			<tasklet>
              				<chunk reader="Reader"
              					processor="Processor" 
                                                       writer="Collector"
              					commit-interval="10" skip-limit="100">
              					<skippable-exception-classes>
              						<include class="java.lang.Exception"/>
              						<exclude          class="org.springframework.batch.item.ReaderNotOpenException"/>
              					</skippable-exception-classes>
              				</chunk>
              			</tasklet>
              		</step>
              		
              		<step id="daoWriter">
              		  <tasklet ref="DaoWriter" />
              		</step>
              	
              	</job>
              So what you can do is, validate and then collect data in an ArrayList maybe??!! and store in JobExecutionContext. Note that, store in JobExecutionContext as that is one being persisted across a job and not StepExecutionContext.

              When processing data in your processor, if you feel somethins is wrong - turn on the flag of TerminateOnly from StepExecutionListener. This will help in graceful completion - well almost graceful; just that a stacktrace of JobInterruptedException.
              When your first step is done, control goes back to framework which check for the TerminateOnly flag. If ON, it will stop.Else goes to your second step. There, a simple Tasklet class should do.Iterate over your ArrayList or anything and write it. Vola!! You achieve your single commit point.

              Hope this helps.

              Cheers

              Comment

              Working...
              X