Announcement Announcement Module
Collapse
No announcement yet.
Processor missed call in a retry Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Processor missed call in a retry

    Hello,

    We are facing a behaviour that we cannot understand. In the code bellow, that's the parent step our test step. We've created 3 kinds of exceptions:
    • Fatal Exceptions: if a reader, processor, writer throws that exceptions, the batch is immediately in failure
    • Ignorable Exceptions: the batch can retry to re-process, or re-write, the skip policy returns true for that kind of exceptions
    • Non ignorable exception: the batch can retry, but the skip policy doesn't skip these exceptions.

    Code:
    	<bean id="abstractGenericStep" class="org.springframework.batch.core.step.item.FaultTolerantStepFactoryBean" abstract="true">
    		<property name="transactionManager" ref="transactionManager" />
    		<property name="jobRepository" ref="jobRepository" />
    		<property name="backOffPolicy" ref="timeIntervalPolicy" />
    		<property name="retryPolicy">
    			<bean id="defaultRetryPolicy" class="org.springframework.batch.retry.policy.CompositeRetryPolicy">
    				<property name="policies">
    					<set>
    						<ref bean="timeoutRetryPolicy" />
    						<ref bean="attemptsCounterRetryPolicy" />
    						<bean id="exceptionsFilterRetryPolicy" class="com.bsb.sf.batch.service.exception.ExceptionsFilterRetryPolicy">
                                   <property name="batchExceptionManager" ref="batchExceptionManager" />
                            </bean>
    					</set>
    				</property>
    			</bean>		
    		</property>
    		<property name="skipPolicy" ref="defaultSkipPolicy" />
            <property name="allowStartIfComplete" value="false" />
    	</bean>
    The "timeoutRetryPolicy" and "attemptsCounterRetryPolicy" are provided by Spring Batch (org.springframework.batch.retry.policy.TimeoutRet ryPolicy and org.springframework.batch.retry.policy.SimpleRetry Policy) while "exceptionsFilterRetryPolicy" is our own retry policy. It's used to returns false to fatal exceptions, otherwise true. The default skip policy skips ignorable exceptions and doesn't skip non ignorable exceptions.


    We have a commit-interval of 5. The first five items are written correctly. We throw an exception when the batch writes the 6th and 7th items. That exception is a fatal exception. So, the batch must stop, unfortunately, the processor is called to process the 6th. A RetryExhaustedException is thrown after its process and the batch is stopped.


    Is our configuration correct ?
    Last edited by sebge2; Mar 8th, 2010, 11:47 AM.

  • #2
    More precisely, here is the stack trace of the ExhaustedRetryException:
    Code:
    2010-03-08 17:24:37 [main] AbstractStep [ERROR] Encountered an error executing the step
    org.springframework.batch.retry.ExhaustedRetryException: Retry exhausted after last attempt in recovery path, but exception is not skippable.; nested exception is com.bsb.sf.batch.exception.DatabaseReaderTableNotExist: Exception planned to be launch at item 7
    	at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$5.recover(FaultTolerantChunkProcessor.java:376)
    	at org.springframework.batch.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:414)
    	at org.springframework.batch.retry.support.RetryTemplate.doExecute(RetryTemplate.java:283)
    	at org.springframework.batch.retry.support.RetryTemplate.execute(RetryTemplate.java:187)
    	at org.springframework.batch.core.step.item.BatchRetryTemplate.execute(BatchRetryTemplate.java:213)
    	at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.write(FaultTolerantChunkProcessor.java:389)
    	at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:178)
    	at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:74)
    	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:347)
    	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:128)
    	at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:261)
    	at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76)
    	at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367)
    	at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214)
    	at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
    	at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:247)
    	at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:196)
    	at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:115)
    	at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:61)
    	at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60)
    	at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144)
    	at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124)
    	at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:99)
    	at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:281)
    	at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120)
    	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:49)
    	at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114)
    	at com.bsb.sf.batch.service.DefaultJobLauncher.run(DefaultJobLauncher.java:70)
    	at com.bsb.sf.batch.service.impl.JobEngineManagerImpl.startJob(JobEngineManagerImpl.java:64)
    	at com.bsb.sf.batch.service.staging.support.AbstractStagingTest.startErrorJob(AbstractStagingTest.java:64)
    	at com.bsb.sf.batch.service.staging.StagedItemCompositeWriterTest.testErrorFlagging(StagedItemCompositeWriterTest.java:117)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    	at java.lang.reflect.Method.invoke(Method.java:585)
    	at org.springframework.test.context.junit4.SpringTestMethod.invoke(SpringTestMethod.java:160)
    	at org.springframework.test.context.junit4.SpringMethodRoadie.runTestMethod(SpringMethodRoadie.java:233)
    	at org.springframework.test.context.junit4.SpringMethodRoadie$RunBeforesThenTestThenAfters.run(SpringMethodRoadie.java:333)
    	at org.springframework.test.context.junit4.SpringMethodRoadie.runWithRepetitions(SpringMethodRoadie.java:217)
    	at org.springframework.test.context.junit4.SpringMethodRoadie.runTest(SpringMethodRoadie.java:197)
    	at org.springframework.test.context.junit4.SpringMethodRoadie.run(SpringMethodRoadie.java:143)
    	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.invokeTestMethod(SpringJUnit4ClassRunner.java:160)
    	at org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51)
    	at org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44)
    	at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27)
    	at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37)
    	at org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42)
    	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:97)
    	at org.junit.runner.JUnitCore.run(JUnitCore.java:130)
    	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:94)
    	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:165)
    	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:60)
    Caused by: com.bsb.sf.batch.exception.DatabaseReaderTableNotExist: Exception planned to be launch at item 7
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
    	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
    	at java.lang.reflect.Constructor.newInstance(Constructor.java:494)
    	at com.bsb.sf.incubator.batch.simple.ExceptionItemWriterCounter.write(ExceptionItemWriterCounter.java:59)
    	at com.bsb.sf.batch.service.staging.StagedItemCompositeWriter.write(StagedItemCompositeWriter.java:54)
    	at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:156)
    	at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:137)
    	at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$3.doWithRetry(FaultTolerantChunkProcessor.java:302)
    	at org.springframework.batch.retry.support.RetryTemplate.doExecute(RetryTemplate.java:238)
    	... 49 more
    We've tried to debug to see what happens when DatabaseReaderTableNotExist is thrown.
    We've seen that our retry policy seems to decide correctly that the exception should not be skipped. The exception is re-thrown then it arrives in RepeatTemplate.executeInternal(). The exception is forwarded to an instance of ExceptionHandler. I assume it is supposed to decide if the exception is fatal or not, it simply decides that it is not fatal and the while() in RepeatTemplate.executeInternal() continues. Am I wrong or isn't it supposed to decide in that place that the step should stop? (maybe we forgot some configuration)
    Last edited by Nicolas Vanhoren; Mar 8th, 2010, 11:53 AM.

    Comment


    • #3
      Where is the fatal exception thrown? If it's in an ItemWriter then there is a scan through the chunk looking for the failed item so we expect the items to be processed again one-by-one. If it generates that exception again, then the step should stop.

      Comment


      • #4
        Yes it's thrown by the writer.

        "If it generates that exception again, then the step should stop. " But if we have put an attempt counter greater than 1 (the counter starts when it starts to process one-by-one) it must try 2 times again, right ?

        But why is the processor called to process the first entity of the failed commit-interval, if the retry policy returns false (canRetry = false) ?
        Last edited by sebge2; Mar 9th, 2010, 03:07 AM.

        Comment


        • #5
          Plus there is only the processor that is called a second time, not the writer, and it is only called with one of the two items that failed in the processor. Of course, that processor never throws any error (in our case, it just does nothing).
          So, after our writer throws and exception, that exception (DatabaseReaderTableNotExist) is stored, the processor is called once, then some class decides to get back the previous exception, wrap it in a ExhaustedRetryException and throw it. That exception is catched in RepeatTemplate which decides it's a fatal exception and re-throws it.

          Comment


          • #6
            That sounds like the expected behaviour. The Step is not aware when it comes to the ItemProcessor that your item is going to processed as a failure in the ItemWriter. If your ItemProcessor is transactional or otherwise idempotent you shouldn't care if it is called multiple times. You can prevent it from being called again for all items by marking the step as processor-transactional="false" (not the default).

            Comment


            • #7
              Ok, I can understand the processor should support being called multiple times. But why isn't the writer called a second time too?

              Comment


              • #8
                Originally posted by Dave Syer View Post
                That sounds like the expected behaviour. The Step is not aware when it comes to the ItemProcessor that your item is going to processed as a failure in the ItemWriter. If your ItemProcessor is transactional or otherwise idempotent you shouldn't care if it is called multiple times. You can prevent it from being called again for all items by marking the step as processor-transactional="false" (not the default).
                We're ok with the reprocessing. What we're not ok is the writer not being called anymore. here is what we expect.

                Chunk With 5 items:
                * 5 read, 5 processed, exception in the writer -> rollback
                * 1 read, 1 processed, 1 written -> commit
                * 1 read, 1 processed, 1 written -> commit
                * 1 read, 1 processed, 1 written -> commit
                * 1 read, 1 processed, exception -> rollback
                * 1 read, 1 processed, 1 written -> commit
                * Job FAILED

                If this is not the actual behavior, can you explain why please?

                Comment


                • #9
                  Let's be more specific about your example. 5 items each identifiable by a letter. "d" is going to deterministically cause the writer to fail.

                  Here's what happens with retry-limit=0, skip-limit>0, commit-interval=5 (the reads are cached unless reader-transactional="true"):

                  * [a, b, c, d, e] read; [a, b, c, d, e] processed; [a, b, c, d, e] passed to writer, exception!, rollback
                  * [a] processed, [a] written
                  * [b] processed, [b] written
                  * [c] processed, [c] written
                  * [d] processed, [d] passed to writer, exception!, rollback
                  * [d] skipped (not passed to processor or writer)
                  * [e] processed, [e] written

                  "d" has to be passed to the writer twice, otherwise there is no way to know that it caused an error.

                  "d" is passed to the processor twice by default (can be changed with processor-transactional="false").
                  Last edited by Dave Syer; Mar 10th, 2010, 09:35 AM. Reason: Corrected error (d is not processed on the skip tx)

                  Comment


                  • #10
                    About recovering from an exception thrown by a writer: we have just made new accurate tests which have the following behaviour:

                    Chunk: a, b
                    - process of a, b
                    - write of a, b => exception
                    - process of a
                    - write of a, b => exception
                    - process of a
                    - write of a => exception
                    - process of b
                    - write of b

                    If an exception is thrown by a writer, what's the normal behaviour ? (default value for reader-transactional and processor-transactional). Must it process and write one-by-one each item of the commit-interval ?

                    Code:
                    ExceptionTestProcessor [TRACE] successful processing [account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-0(#10, number = 0)]
                    ExceptionTestProcessor [TRACE] successful processing [account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-1(#11, number = 1)]
                    ExceptionTestWriter [TRACE] Item 1/2: account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-0(#10, number = 0)
                    ExceptionTestWriter [TRACE] Item 2/2: account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-1(#11, number = 1)
                    ExceptionTestWriter [TRACE] error writing [account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-0(#10, number = 0)]
                    
                    ExceptionTestProcessor [TRACE] successful processing [account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-0(#10, number = 0)]
                    ExceptionTestWriter [TRACE] Item 1/2: account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-0(#10, number = 0)
                    ExceptionTestWriter [TRACE] Item 2/2: account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-1(#11, number = 1)
                    ExceptionTestWriter [TRACE] error writing [account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-0(#10, number = 0)]
                    
                    ExceptionTestProcessor [TRACE] successful processing [account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-0(#10, number = 0)]
                    ExceptionTestWriter [TRACE] Item 1/2: account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-0(#10, number = 0)
                    ExceptionTestWriter [TRACE] Item 2/2: account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-1(#11, number = 1)
                    ExceptionTestWriter [TRACE] error writing [account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-0(#10, number = 0)]
                    
                    ExceptionTestProcessor [TRACE] successful processing [account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-0(#10, number = 0)]
                    ExceptionTestWriter [TRACE] Item 1/1: account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-0(#10, number = 0)
                    ExceptionTestWriter [TRACE] error writing [account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-0(#10, number = 0)]
                    
                    ExceptionTestProcessor [TRACE] successful processing [account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-1(#11, number = 1)]
                    ExceptionTestWriter [TRACE] Item 1/1: account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-1(#11, number = 1)
                    ExceptionTestProcessor [TRACE] successful processing [account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-1(#11, number = 1)]
                    ExceptionTestWriter [TRACE] Item 1/1: account=880b2609-974d-43d5-8f44-2dbd9627800f-123123-1(#11, number = 1)
                    Last edited by sebge2; Mar 15th, 2010, 11:45 AM.

                    Comment


                    • #11
                      I can't follow the log fragment without more detail on the implementation. Can you set up a failing test case and post that?

                      Comment


                      • #12
                        The test case is: commit-interval of 2, max-retries = 1, exception thrown every time the item a is written.

                        This is the behaviour we have seen:
                        Code:
                        Chunk: a, b
                        - process of a, b
                        - write of a, b => ignorable exception
                        - process of a
                        - write of a, b => ignorable exception
                        - process of a
                        - write of a => ignorable exception
                        - process of b
                        - write of b
                        We're expecting:
                        Code:
                        Chunk: a, b
                        - process of a, b
                        - write of a, b => exception
                        - process of a
                        - write of a => exception
                        - process of a
                        - write of a => exception
                        - process of b
                        - write of b
                        It seems that when it has tried one times to write a, b (max-retries = 1) according to the exception which is ignorable (the exception is skipped), it begins to process and write one-by-one. But, if the exception cannot be skipped (non-ignorable), it never processes and writes one-by-one and we get the follow result:
                        Code:
                        Chunk: a, b
                        - process of a, b
                        - write of a, b => ignorable exception
                        - process of a => can be retried + cannot be skipped
                        We are using templates, I hope the following code is adequate to understand the config file. The exception is thrown by the first partition of step1.partition (the second partition is executed successfully).

                        Code:
                            <job id="templateJob" parent="abstractGenericJob"
                                 xmlns="http://www.springframework.org/schema/batch">
                        		<step id="preProcessing" next="staging" parent="templateJob.internal.preProcessing"/>
                                <step id="staging" next="step1.partition" parent="templateJob.internal.staging"/>
                        		<step id="step1.partition" next="step2.partition" parent="templateJob.internal.partition1"/>
                        		<step id="step2.partition" parent="templateJob.internal.partition2">
                                    <next on="COMPLETED" to="postProcessing"/>
                                    <end on="COMPLETED WITH *"/>
                        		</step>
                        		<step id="postProcessing" parent="templateJob.internal.postProcessing"/>
                            </job>
                        
                            <!-- Step 2 -->
                            <step id="templateJob.internal.partition2" parent="abstractPartitionerStagedStep"
                                 xmlns="http://www.springframework.org/schema/batch">
                                <partition step="step2" partitioner="step2.partitioner">
                                    <handler grid-size="${step2.grid.size}" task-executor="step2.taskExecutor"/>
                                </partition>
                            </step>
                        
                        	<step id="step2" parent="abstractStagedPartitionStep"
                                 xmlns="http://www.springframework.org/schema/batch">
                                <!-- outside a partition must use abstractStagedStep -->
                        		<tasklet>
                        			<chunk reader="stagingItemReader" processor="step2.internal.itemProcessor"
                        				   writer="step2.internal.itemWriter" commit-interval="${step2.commit.interval}"/>
                        		</tasklet>
                        	</step>
                        Last edited by sebge2; Mar 18th, 2010, 07:49 AM.

                        Comment


                        • #13
                          Moreover, when I've looked at the log, I've seen that our SkipPolicy receives a negative skipCount for that test (i < 0):

                          Code:
                              public boolean shouldSkip(final Throwable throwable, final int i) throws SkipLimitExceededException {
                                  try {
                                      final boolean isSkipped = delegateSkipPolicy.shouldSkip(throwable, i);
                                      if (!isSkipped) {
                                          logger.debug("Not skip " + throwable + " with number of skipped  [" + i + "]");
                                          callErrorListeners(throwable);
                                      } else {
                                          logger.debug("Skip " + throwable + " with number of skipped [" + i + "]");
                                      }
                          
                                      return isSkipped;
                                  }
                                  catch (SkipLimitExceededException e) {
                                      callErrorListeners(throwable);
                                      throw e;
                                  }
                              }
                          The log:
                          Code:
                          2010-03-18 13:41:06 [lazyBindingProxy.step1.taskExecutor#sysinit-2] DefaultSkipPolicy [DEBUG] It's ignorable, so SKIP
                          2010-03-18 13:41:06 [lazyBindingProxy.step1.taskExecutor#sysinit-2] StagingSkipPolicy [DEBUG] Skip com.bsb.sf.batch.test.exception.MyIgnorableException: Throwing again an exception for [account=11fb0177-526d-4932-8245-d0f6c6a37061-123123-0(#10, number = 0)] (retryCount=1) with number of skipped [-1]

                          Comment


                          • #14
                            That's expected, and if it is the cause of all the confusion I'm sorry (we can make the documentation of SkipPolicy more explicit). A call with skipCount=-1 means "is this a skipable exception, never mind how many skips?" Your custom skip policy might need to be aware of that I suppose.

                            Comment

                            Working...
                            X