Announcement Announcement Module
Collapse
No announcement yet.
Restart/Continue where left off Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Restart/Continue where left off

    Hi,

    Really new to Spring Batch so it's gonna be easy I guess.
    I can't figure out how "Restart/Resume failed job" works exactly.
    Here is my sample app:
    - 1 simple job with 1 step.
    - Step has an ItemReader and an ItemWriter.
    - I just read ints from a list of 100 items in ItemReader
    - ItemWriter just outputs the number it gets as param in "write()".
    - I also put a counter in the ItemWriter so it throws an Exception when the number of items it "write()"s reaches 10.

    Now, I run the job with the CommandLineJobRunner.
    I run it once. It stops (throws an Exception) as expected and displayed items from 1 to 10 (so far so good).
    I run it again (same job params). It displays numbers from 1 to 10 again.
    Why???
    I run it again without changing anything and now it displays numbers from 11 to 20.
    Why???

    I keep doing that and it looks like the "second" attempt always seem to "try to restart" the previous failed job execution (not sure I'm using the right term here...), and the following run seems to "continue" where the previous job execution left off.
    What I'd like to do is to always resume a failed job in my case...and avoid trying to restart from 0 the same job.

    I read the User Guide a couple times but I'm still lost.

    Can anyone shine a light on this?

    NB: I'm using Spring Batch 1.1.3-RELEASE-A.

  • #2
    Do you use framework provided readers and writers? How are they configured? How have you configured your JobRepository?

    Comment


    • #3
      What is your chunk size? Note that on a restart, it will begin at the end of the last completed chunk. If your chunk size is 50 and you throw an exception after 10 items, it will keep starting at 1. If your chunk size is 1, and you throw an exception on the 10th, it should start on the 10th when you restart.

      Comment


      • #4
        @magott:
        My ItemReader extends AbstractBufferedItemReaderItemStream.
        No particular reason except that I'm interested in the StaxEventItemReader one for my project (need to be able to resume a failed job when readin an XML). The AbstractBufferedItemReaderItemStream seemed to implement the ItemStream and handle the update of the execution context the way I want to (I think...).
        My ItemWriter just implements the interface ItemWriter.

        Code:
        package test.springbatch;
        
        import java.util.ArrayList;
        import java.util.List;
        
        import org.springframework.batch.item.support.AbstractBufferedItemReaderItemStream;
        import org.springframework.util.ClassUtils;
        
        import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
        
        public class TestItemReader extends AbstractBufferedItemReaderItemStream {
        	
        	private static final List<Integer> NUMBERS = new ArrayList<Integer>();
        	private AtomicInteger mIncrementer = new AtomicInteger(0);	
        	static {
        		for (int i = 0; i < 100; i++) {
        			NUMBERS.add(i);
        		}		
        	}
        	
        	public TestItemReader() {
        		setName(ClassUtils.getShortName(TestItemReader.class));		
        	}
        	
        	@Override
        	protected void doClose() throws Exception {}
        
        	@Override
        	protected void doOpen() throws Exception {}
        	
        	@Override
        	protected Object doRead() throws Exception {
        		if (mIncrementer.get() > NUMBERS.size()) return null;
        		return NUMBERS.get(mIncrementer.getAndIncrement());
        	}
        }
        Code:
        package test.springbatch;
        
        import org.springframework.batch.item.ClearFailedException;
        import org.springframework.batch.item.FlushFailedException;
        import org.springframework.batch.item.ItemWriter;
        
        import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
        
        public class TestItemWriter implements ItemWriter {
        	
        	private AtomicInteger mCounter = new AtomicInteger(1);
        
        	public void clear() throws ClearFailedException {}
        
        	public void flush() throws FlushFailedException {}
        
        	public void write(Object pObj) throws Exception {
        		System.out.println("Writing " + pObj);
        		if (mCounter.getAndIncrement() >= 10) throw new Exception("It's alright.");
        	}
        
        }
        @chudak: I didn't specify anything regarding chunk size. The weird thing is that the "read.count" variable is stored properly in the database (i.e. its value is indeed the number of items read so far).


        I have to correct a little bit what I said before. It's less predictable actually.
        The first run is always right (obviously).
        The second run sometimes resumes the previous job, sometimes not.
        Successive runs are also "random" (starts at 27, restart from 0, or starts again at 9).
        I really think it's just me not understanding some concept.

        Am I not supposed to run a failed job with the same job parameters?
        What do I have to do to resume a failed job?

        Comment


        • #5
          Originally posted by lpezet View Post
          Am I not supposed to run a failed job with the same job parameters?
          What do I have to do to resume a failed job?
          If you start a job with the SAME parameters as the failed job, it will run the failed job.

          Comment


          • #6
            Would that resume the job or not? Meaning, would it start from where it left off? (if configured to do so)

            Comment


            • #7
              Originally posted by lpezet View Post
              Would that resume the job or not? Meaning, would it start from where it left off? (if configured to do so)
              Yes, that's how I'm doing it.

              Code:
                      <!-- Prototype job bean -->
                      <bean id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true">
              		<property name="jobRepository" ref="jobRepository" />
              		<property name="restartable" value="true" />
              		<property name="jobExecutionListeners" ref="provisioningBatchErrorListener"/>
              	</bean>
              
                      <!-- Prototype step bean -->
              	<bean id="simpleStep" class="org.springframework.batch.core.step.item.SkipLimitStepFactoryBean" abstract="true">
              		<property name="transactionManager" ref="transactionManager" />
              		<property name="jobRepository" ref="jobRepository" />
              		<property name="commitInterval" value="50" />
              	</bean>

              Comment


              • #8
                Thanks for sharing that chudak.

                I have a similar config:
                Code:
                <bean id="TestSimpleJob" class="org.springframework.batch.core.job.SimpleJob">
                		<property name="jobRepository" ref="test.springbatch.JobRepository" />
                		<property name="restartable" value="true"/>
                		<property name="steps">
                			<list>
                				<bean class="org.springframework.batch.core.step.item.SimpleStepFactoryBean">
                					<property name="jobRepository" ref="test.springbatch.JobRepository" />
                					<property name="transactionManager" ref="test.springbatch.TxManager" />
                					<property name="itemReader"><bean class="test.springbatch.TestItemReader"/></property>					
                					<property name="itemWriter"><bean class="test.springbatch.TestItemWriter"/></property>
                				</bean>				
                			</list>
                		</property>
                	</bean>
                I changed my ItemReader and avoided the AbstractBufferedItemReaderItemStream class.
                I now implement ItemStream and ItemReader.
                Code:
                package test.springbatch;
                import (...);
                public class TestItemReader implements ItemStream, ItemReader {
                	
                	private static final List<Integer> NUMBERS = new ArrayList<Integer>();
                	private AtomicInteger mIncrementer = new AtomicInteger(0);
                	private int mCurrentItemCount;	
                	static {
                		for (int i = 0; i < 100; i++) {
                			NUMBERS.add(i);
                		}		
                	}
                	
                	public void close(ExecutionContext pContext) throws ItemStreamException {}
                	
                	public void mark() throws MarkFailedException {}
                	
                	public void open(ExecutionContext pContext) throws ItemStreamException {
                		if (pContext.containsKey("read.count")) {
                			int oItemCount = Long.valueOf(pContext.getLong("read.count")).intValue();
                			try {
                				jumpToItem(oItemCount);
                			}
                			catch (Exception e) {
                				throw new ItemStreamException("Could not move to stored position on restart", e);
                			}
                
                			mCurrentItemCount = oItemCount;
                		} else {
                			System.out.println("Did not find read.count key in execution context.");
                		}
                	}
                	
                	private void jumpToItem(int pItemCount) {
                		while (mIncrementer.getAndIncrement() < pItemCount) {}
                	}
                
                	public Object read() throws Exception, UnexpectedInputException, NoWorkFoundException, ParseException {
                		if (mIncrementer.get() >= NUMBERS.size()) return null;
                		mCurrentItemCount++;		
                		return NUMBERS.get(mIncrementer.getAndIncrement());
                	}
                	
                	public void reset() throws ResetFailedException {}
                	
                	public void update(ExecutionContext pContext) throws ItemStreamException {
                		pContext.putLong("read.count", mCurrentItemCount);
                	}
                	
                }
                I'm still having some weird/random behavior.
                It would resume the failed job up to a certain point sometimes (all fine up to the 4th run) and then restart from "scratch" (from 0).
                It's really frustrating...

                chudak you must have some magic touch

                Comment


                • #9
                  I'm not sure why the 4th run wouldn't work, but every subsequent one would. Seems very fishy, but from looking at your reader, I can't see anything off hand that's wrong.

                  @deckingraj
                  I don't understand the question really, did you try and run the failed job again, but it wouldn't let you? If so, it's probably because restart saves the state of the reader at the last commit point. So if you say "throw an exception when i is 5", and it starts again at 4, you will always keep failing.

                  Comment


                  • #10
                    Sorry lucasward, maybe I'm not explaining it clearly.
                    The "4th run then random" was really just a one time experience.
                    It's always pretty random.Sometimes it would work for couple runs in a row, then gets weird, some other times it would keep restarting from 0 for couple runs then would start at 9, etc. I didn't notice any pattern.

                    Every time I check the db, especially job_execution and execution_context tables see if there are any difference.
                    They are all "similar": the "read.count" is always stored properly, all jobs are said to have FAILED, etc, all using the same job instance, etc.

                    I'm still clueless...

                    Comment


                    • #11
                      Are you using this class in a multi-threaded step? If so, how have you configured it?

                      Comment


                      • #12
                        Originally posted by lucasward View Post
                        Are you using this class in a multi-threaded step? If so, how have you configured it?
                        Nope. Just running it from command line.
                        There is really nothing more from what I have posted already.
                        Code:
                        java -Xmx512M -classpath "etc;target/springbatch-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar" org.springframework.batch.core.launch.support.CommandLineJobRunner test_spring_jobs.xml TestSimpleJob uid=123
                        Each "run" I was mentioning in previous posts are a single run of that previous command.

                        Comment


                        • #13
                          How it's launched isn't what I was referring to. You're using atomic integer, etc. So I was assuming you've configured your step to have more than one thread.

                          Comment


                          • #14
                            Hi Lucasward

                            I dont think i did a good job in explaining my problem. Lemme try once again.

                            Here is the scenario

                            Run 1
                            - Job Failed(due to the forced exception)
                            - A row is added in the JOB_EXECUTION_TABLE

                            JOB_EXECUTION_ID=0, VERSION=1, JOB_INSTANCE_ID=0, CREATE_TIME=2008-11-17 10:04:07.318, START_TIME=2008-11-17 10:04:07.365, END_TIME=2008-11-17 10:04:08.02, STATUS=FAILED, CONTINUABLE=N, EXIT_CODE=FAILED

                            Run 2
                            - A new Job Passed
                            - Old Job still lying in Failed state
                            - Only one new row added in JOB_EXECUTION_TABLE

                            JOB_EXECUTION_ID=0, VERSION=3, JOB_INSTANCE_ID=0, CREATE_TIME=2008-11-17 10:04:07.318, START_TIME=2008-11-17 10:04:07.365, END_TIME=2008-11-17 10:04:08.02, STATUS=FAILED, CONTINUABLE=N, EXIT_CODE=FAILED

                            JOB_EXECUTION_ID=1, VERSION=3, JOB_INSTANCE_ID=1, CREATE_TIME=2008-11-17 10:09:03.386, START_TIME=2008-11-17 10:09:03.386, END_TIME=2008-11-17 10:09:06.255, STATUS=COMPLETED, CONTINUABLE=N, EXIT_CODE=COMPLETED


                            I understand when you say that the failed job might always fail if i have forced the exception before the writer last commit but i am atleast expecting a new row in the Job_Exection table with the same Job_instance(0 in this case) where status might still be "Failed". This will proove that Spring batch tried to invoke the failed job but it failed again due to bad data or whatever reason which unfortunately is not happening in my case

                            I hope i am clear this time.

                            Comment


                            • #15
                              Originally posted by lucasward View Post
                              How it's launched isn't what I was referring to. You're using atomic integer, etc. So I was assuming you've configured your step to have more than one thread.
                              Oh good catch!
                              So no, I could use a simple int. No threads specified/used/spawn anywhere.
                              I posted the command line cause it just calls the job defined in the spring config I posted a little before...using the 2 ItemReader+ItemWriter I also posted before, which none mentioned anything about threads, besides the suspicious use of AtomicInteger indeed

                              Comment

                              Working...
                              X