Announcement Announcement Module
Collapse
No announcement yet.
skip-limit and SkipLimitExceededException Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • skip-limit and SkipLimitExceededException

    Hi Everyone,

    So far, so good. Spring batch (v2.0.2) has impressed my boss and the world is good.

    I've hit a roadblock when it comes to handling FlatFileParseException in the FlatFileItemReader

    My use case is this:

    Load a file with more than 10 parse errors in it, expect to see the job failed and the first 10 parse errors.

    What I'm seeing is the job failed, but I would have expected one or all of the following to be populated on the JobExecution:
    - Exit description on the JobExecution to state the message of the SkipLimitExceededException
    - The exception to be in getAllFailureExceptions() or getFailureExceptions()

    This is not the worst part of it though.

    What is really hurting is that because the skip limit exceeded is an exception, the postProcess method on FaultTolerantChunkProvider is not being called.

    This means, I don't get the first 10 calls to the onSkipInRead listener.

    To sum it up:
    - Is there another way to achieve the functionality I'm looking for?
    AND
    - Is it worthwhile raising a JIRA to change this? What I'm thinking is required is to modify the exception handling in FaultTolerantChunkProvider to catch the SkipLimitExceededException, call the skip listeners and re-throw it - and I'm happy to do the work

    Cheers, Luke.

    I've included the relevant code below.


    Code:
    	
    <bean id="logSkipTask" class="com.shinetech.nbv.shadow.batch.tasks.LogSkipsTask" scope="step"/>
    	
    	<batch:job id="processUsage">
    		<batch:step id="loadFile">
    			<batch:tasklet>
    				<batch:chunk reader="fileReader" processor="readProcessor"
    					writer="NOOPWriter" commit-interval="10000" skip-limit="10">
    					<batch:skippable-exception-classes>
    	         				org.springframework.batch.item.file.FlatFileParseException
    			        </batch:skippable-exception-classes>
    				</batch:chunk>
    				<batch:listeners>
    					<batch:listener ref="logSkipTask" />
    				</batch:listeners>
    			</batch:tasklet>
    			<batch:next on="*" to="writeFile" />
    			<batch:next on="COMPLETED WITH SKIPS" to="logSkips" />
    			<batch:fail on="FAILED" exit-code="FAILED" />
    			
    		</batch:step>
    
    	   	<batch:step id="logSkips" next="writeFile">
    			<batch:tasklet ref="logSkipTask"/>
    	   	</batch:step>

    The tasklet/listener that handles logging the parse exceptions.
    Code:
    public class LogSkipsTask implements Tasklet
    {
        private List<Throwable> skipped = new ArrayList<Throwable>();
        
        public RepeatStatus execute(StepContribution contribution,
                ChunkContext chunkContext) throws Exception
        {
            // log all the skipped exceptions to the execution context.
            return RepeatStatus.FINISHED;
        }
        
        @BeforeStep
        public void initialiseSkips(StepExecution stepExecution)
        {
           stepExecution.getJobExecution().getExecutionContext().put(LogSkipsTask.class.getSimpleName(), skipped);
        }
        
        @OnSkipInRead
        public void addSkippedException(Throwable skippedThrowable)
        {
            System.out.println("Adding skipped....");
            skipped.add(skippedThrowable);
        }
    
        
    }

  • #2
    Clarification: the failure exceptions and stacktrace you are looking for are in the step execution (not the job execution).

    I don't see where you are setting the ExitStatus to "COMPLETED WITH SKIPS", which might just be missing from the code snippets, but if you aren't doing that then the tasklet step that stashes the skips is not being executed, so maybe your listener is being called but you never saw the output? (Aside: you didn't include the implementation of that tasklet, but if it is putting skip information in the execution context as per the comment, why do you need to do that?)

    There are definitely unit tests asserting that skip listeners get called in the right place, so we need to understand your implementation a bit better.

    Are we sure that your Tasklet is being converted to a listener correctly? You could try 2.0.4 in case there was a bug in the converter. Or implement the ItemSkipListener interface directly to make sure.

    What makes you so sure that postProcess method on FaultTolerantChunkProvider is not being called (since it is in the unit tests)?

    Comment


    • #3
      Thanks for the quick reply Dave,

      Originally posted by Dave Syer View Post
      Clarification: the failure exceptions and stacktrace you are looking for are in the step execution (not the job execution).
      There is one exception on the step execution and it is the SkipLimitExceededException

      Code:
      [StepExecution: id=0, name=loadFile, status=FAILED, exitStatus=FAILED, readCount=0, filterCount=0, writeCount=0 readSkipCount=10, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=1, exitDescription=org.springframework.batch.core.step.skip.SkipLimitExceededException: Skip limit of '10' exceeded

      Originally posted by Dave Syer View Post
      I don't see where you are setting the ExitStatus to "COMPLETED WITH SKIPS",
      This may be a direction in which I was going wrong. I was following on from the skipSampleJob in the samples. Do I need to have a JobExecutionDecider, like the SkipCheckingDecider? If I use that pattern, will COMPLETED WITH SKIPS be the outcome if the skip limit is exceeded?

      Originally posted by Dave Syer View Post
      There are definitely unit tests asserting that skip listeners get called in the right place, so we need to understand your implementation a bit better.
      Can you point me in the direction of the unit tests you are thinking of? I'll check them out and setup a test case to explain my implementation.

      Originally posted by Dave Syer View Post
      Are we sure that your Tasklet is being converted to a listener correctly?
      Yup, it works perfectly when the skip limit is not being exceeded.

      Originally posted by Dave Syer View Post
      What makes you so sure that postProcess method on FaultTolerantChunkProvider is not being called (since it is in the unit tests)?
      When I put a breakpoint on the postProcess method when the skip-limit is exceeded, it doesn't get hit.

      In the meantime, I've upgraded to 2.0.4 and I'll have a look at the JobExecutionDecider pattern to see if that will work.

      Hopefully, I can get a unit test working that I can send through to demonstrate the use case.

      By the way, I was storing the skips on the execution context so I can get a hold of them through the jobExecution.

      Comment


      • #4
        The unit tests I was thinking of are the FaultTolerantStepFactory*Tests in spring-batch-core. Since you mention it, I'm not sure that there is a test for the skip limit being exceeded *and* the skip listener being called, so maybe there is a bug.

        It isn't the decider in the sample that's important, it's having a StepExecutionListener that sets the exit status to the custom value "COMPLETED WITH SKIPS".

        Nothing wrong with storing skips in the job execution, but I would hesitate unless it was really necessary because it makes an assumption that the items are serializable. If you can deal with then all in the same step that they were skipped in it would be cleaner.

        Comment


        • #5
          Found a test that looks spot on: FaultTolerantStepFactoryBeanTests.testSkipOverLimi tOnRead

          I've attached a patch that adds a listener and the test passes. This is a pain, because it's not the behaviour I'm seeing in my app.

          The only lead I've got at the moment is that it looks like the unit test is doing a retry which leads to postProcess and the listeners being called. This doesn't appear to be happening in my app.

          So I guess it's back to the drawing board in getting a test case up and going.

          Any other ideas?

          Also, I changed the LogSkipsTask to implement SkipListener and got the same behaviour as the annotation configuration.

          Originally posted by Dave Syer View Post
          Nothing wrong with storing skips in the job execution, but I would hesitate unless it was really necessary because it makes an assumption that the items are serializable. If you can deal with then all in the same step that they were skipped in it would be cleaner.
          Small thing, but it might be worthwhile changing the signature of ExecutionContext.put(String, Object) to put(String, Serializable) to make the Serializable contract more explicit.

          Comment


          • #6
            Originally posted by lukematthews View Post
            Found a test that looks spot on: FaultTolerantStepFactoryBeanTests.testSkipOverLimi tOnRead
            I added another one with a listener. I think I understand the confusion: the listener is intentionally only called on items in a chunk that commits. If the step fails then a restart would re-present those items so they haven't technically been skipped on the first failure. The chunk that busts the skip limit does not commit, so you don't expect a skip listener callback (for any of those items). If you need to know about read errors (not read skips) then use an ItemReadListener. The idea is that it is useful to know about skips after the fact when the step has succeeded.

            Small thing, but it might be worthwhile changing the signature of ExecutionContext.put(String, Object) to put(String, Serializable) to make the Serializable contract more explicit.
            Can't really do that because serialization is in the contract of the ExecutionContextDao implementation not the ExecutionContext itself. Actually even the default Jdbc implementation doesn't require java.io.Serializable I don't think (I didn't use capital "S" did I?): it requires that the objects are serializable by XStream.

            Comment


            • #7
              OK, I'm thoroughly confused now.

              So the idea is, if the file exceeds the skip limit and the chunk does not commit I won't get notified of the skips? only the skip limit exceeded?

              I still don't get why the unit test notifies the listener and my code doesn't.

              Also, I added some assertions to the unit test. What is causing the code to continue reading after the skip limit exceeded exception is thrown?

              I would have expected the following to be true:
              assertEquals(0, stepExecution.getReadCount());
              but instead it's 3 - which means the other items have continued to be read.

              Getting back to basics, my requirement is this:
              - User loads file, if the file does not conform to the format, the user will be notified of the conformance errors.
              - The process will load up to a specified number conformance errors. If there are more errors than that amount, the file will stop the loading process.

              Am I on the right track with my code? I'll be building on this over the coming weeks, so I need to get the design right.

              Comment


              • #8
                I'd say you are definitely on track. Your user is actually probably only interested in read errors (things he needs to fix for a successful restart), not skips. The skips would tell him that there were some violations, but they were ignored.

                Comment


                • #9
                  Originally posted by lukematthews View Post
                  So the idea is, if the file exceeds the skip limit and the chunk does not commit I won't get notified of the skips? only the skip limit exceeded?
                  Not quite. You get a callback for all the skipped items that committed, but not the ones from the final failed chunk.
                  I still don't get why the unit test notifies the listener and my code doesn't.
                  The chunk size is 2 in the test, so your listener gets a callback for items "2" and "3" when the chunk with "1" and "4" in it commits (after "4" has been removed because of a write skip). It doesn't get a callback for the read error on "5" which trips the skip limit in the next chunk.
                  Also, I added some assertions to the unit test. What is causing the code to continue reading after the skip limit exceeded exception is thrown?

                  I would have expected the following to be true:
                  assertEquals(0, stepExecution.getReadCount());
                  but instead it's 3 - which means the other items have continued to be read.
                  For me the readCount() is 2, which makes sense ("1" and "4" were successfully read). So it doesn't continue reading after the skip limit is broken.

                  Comment


                  • #10
                    I've gone down the @onReadError path and the good news is everything is now working.

                    So it ends up looking like this:
                    - For every exception thrown on read (including skippable exception), the @onReadError listener logs them
                    - If the skip limit is exceeded, the job fails, with an exception of skip limit exceeded.

                    And the code changes are:
                    - I've removed logSkipsTask and the COMPLETED WITH SKIPS decision handling completly (no longer required)
                    - There is no longer a skipListener
                    - The logSkipsTask has effectively been replaced by a job listener that does the following:
                    - @beforeJob puts my list of skipped/read error exceptions onto the execution context
                    - @onReadError adds the exceptions to the list

                    I think I've got enough of an understanding on how it hangs together to get through.

                    Now that I've got it working, I've got an easy one for you: Now that the job is failing, the only way I can get to the skipLimitExceededException is through the stepExecution

                    for (StepExecution stepExecution : jobExecution.getStepExecutions())
                    {
                    messages.add(stepExecution.getExitStatus().getExit Description());
                    }

                    None of the failureExceptions arrays on the job or step execution are populated. Is there any reason for that?

                    I really appreciate you taking the time to help me with this Dave.
                    Last edited by lukematthews; Nov 4th, 2009, 05:28 PM.

                    Comment

                    Working...
                    X