Announcement Announcement Module
Collapse
No announcement yet.
How to reset the inputProvider enabling the job to process x-times. Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • How to reset the inputProvider enabling the job to process x-times.

    I use the configurations below, - and want the job to process the resultset defined in the inputSource the every time the job is kicked off.

    I debugged into the ItemProviderProcessTasklet and see that the itemProvider the 2nd. time the job is kicked off, returns null:
    Code:
    			Object data = itemProvider.next();
    			if (data == null) {
    				return ExitStatus.FINISHED;
    			}
    which just exists processing....

    Using this configuration
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:aop="http://www.springframework.org/schema/aop"
    	xmlns:tx="http://www.springframework.org/schema/tx"
    	xmlns:p="http://www.springframework.org/schema/p"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="
    		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
    		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
    		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd">
    
        <description>This file defines the job for the ticker process.</description>
        <import resource="classpath:/springAccountBalancerCore-configurer.xml"/>
        <import resource="classpath:/springAccountBalancerCore-dao.xml"/>
        <import resource="classpath:/springAccountBalancerCore-biz.xml"/>
    
        <bean id="simpleTickerJob" class="org.springframework.batch.core.configuration.JobConfiguration" abstract="true">
            <property name="restartable" value="false" />
            <!--<property name="startLimit" value="1000"/>-->
        </bean>
    
        <bean parent="stepScope" />
    	<bean parent="jobConfigurationRegistryBeanPostProcessor" />
    
        <bean id="tickerJob" parent="simpleTickerJob">
    		<property name="steps">
    			<bean id="tickerStep1"
    				class="org.springframework.batch.execution.step.RepeatOperationsStepConfiguration">
                    <property name="tasklet">
    					<bean
    						class="org.springframework.batch.execution.tasklet.RestartableItemProviderTasklet">
                            <property name="itemProvider">
    							<bean class="org.springframework.batch.item.provider.InputSourceItemProvider">
    								<property name="inputSource" ref="hibernateTickerJobInputSource"/>
                                </bean>
    						</property>
    						<property name="itemProcessor">
    							<bean class="no.aftenposten.accountbalancer.batch.processor.SubscriptionLineTickerProcessor">
    								<property name="outputSource" ref="hibernateTickerJobOutputSource"/>
    								<property name="deliveryCalendarManager" ref="deliveryCalendarManager"/>
    								<property name="productLineDao" ref="productLineDao"/>
    								<property name="subscriptionDao" ref="subscriptionDao"/>
                                    <property name="accountingManager" ref="accountingManager" />
                                </bean>
    						</property>
    					</bean>
    				</property>
                    <property name="allowStartIfComplete" value="true"/>
                    <property name="chunkOperations">
    					<bean class="org.springframework.batch.repeat.support.RepeatTemplate">
    						<property name="interceptors" ref="hibernateTickerJobOutputSource"/>
    						<property name="completionPolicy">
    							<bean
    								class="org.springframework.batch.repeat.policy.SimpleCompletionPolicy">
    								<property name="chunkSize" value="1" />
    							</bean>
    						</property>
    					</bean>
    				</property>
    				<property name="stepOperations">
    					<bean class="org.springframework.batch.repeat.support.RepeatTemplate">
    						<property name="exceptionHandler">
    							<bean
    								class="org.springframework.batch.repeat.exception.handler.SimpleLimitExceptionHandler"
    								p:limit="2" p:useParent="true" p:type="java.lang.Exception" />
    						</property>
    					</bean>
    				</property>
    			</bean>
    		</property>
    	</bean>
    
    	<!-- This is a framework class that needs a delegate and also needs to be registered as a RepeatInterceptor in the chunk -->
    	<bean id="hibernateTickerJobOutputSource"                                                             
    		class="org.springframework.batch.io.support.HibernateAwareItemWriter">
    		<property name="sessionFactory" ref="sessionFactoryCoreAb" />
    		<property name="delegate" ref="subscriptionLineTickerJobWriter" />
    	</bean>
    
    
        <bean id="subscriptionLineTickerJobWriter"
            class="no.aftenposten.accountbalancer.batch.outputsource.AccountingWriter" parent="accountingJdbcDao">
        </bean>
    
            <!--class="no.aftenposten.accountbalancer.batch.outputsource.AccountingWriter">-->
            <!--<property name="dataSource" ref="oracleDataSource" />-->
    
        <bean id="hibernateTickerJobInputSource"  class="org.springframework.batch.io.cursor.JdbcCursorInputSource">
            <property name="dataSource" ref="pulsDataSource"/>
    	    <property name="mapper">
                <bean class="no.aftenposten.accountbalancer.batch.mapper.SubscriptionLineMapper"/>
    	    </property>
            <property name="sql" value="select * from SUBSCRIPTION_LINES where active = 1 and subscriptionLineId = 3"/>
    	</bean>
    
    
    </beans>
    How could I enable the inputsource/itemProvider being reset.

    Rgds.

    Geir

  • #2
    Off hand, I would try two things:

    1) Check if the data in the database is changing - that would explain why the cursor doesn't find any data the second time around. Make sure that the 'active' field is still set to 1 for subscriptionLineId 3 and that the entry where subscriptionLineId = 3 is still there after the job runs.

    2) Make sure that a new cursor is being created and that your process isn't rejoining the same cursor each time.

    This might be the case since allowRestartIfComplete is set to true. If you run the same job with the same job identifier more than once the step executor might believe that you are still running the same job and invoke the JdbcCursorInputSource's restart logic.

    For brevity, I've only included a small snippet of the relevant code in DefaultStepExecutor. Check out this code and the code that follows it for more information:
    Code:
    if (step.getStatus() == BatchStatus.COMPLETED
                 && stepConfiguration.isAllowStartIfComplete() == false) {
    	// step is complete, false should be returned, indicated that the
    	// step should
    	// not be started
    	return false;
    }
    (... should this block also be checking if the job configuration has set "restartable" to true? That's another story.)

    Anyway, try setting your allowStartIfComplete to false for the step, then run again and see if that fixes the problem (you may have to force a different job identifier / parameters [depending on which release you're using] to get it to run after you do this.

    Comment


    • #3
      I guess what you're looking for is "not restarting" the same job instance, but creating a new one each time. You achieve this by using different JobIdentifier(M3) or JobParameters(M4-SNAPSHOT) each time you run the job - e.g. by using current timestamp.

      Comment


      • #4
        This looks like a duplicate posting (http://forum.springframework.org/showthread.php?t=49032). All Doug and Robert's ideas were already mentioned there. Please pick up the thread there for more information.

        Comment


        • #5
          Dave, what about adding a check in the DefaultStepExecutor to check if the job is restartable in addition to the step allowing restart if complete?
          Last edited by dkaminsky; Jan 28th, 2008, 03:53 AM. Reason: cut out part where on further inspection I was wrong

          Comment


          • #6
            Doug,

            I'm not sure that an executor would really need to know. We made a decision to not throw an exception from the Repository if an already completed instance is re-run. Perhaps we need to rethink that and throw an exception.

            Comment


            • #7
              Well, the thing is that the JobDao (the SqlJobDao at least) will call createJob automatically if the job is not restartable, and this will create a job instance with the same identifier (parameters) as the completed instance. This seems to be the source of the complication that the allowStartIfCompleted check lets the step think it's restarting even though it's actually part of a completely new job instance (albeit a new job instance with the same parameters).

              I think that in the name of creating more understandable behavior, an exception being thrown at the Repository/Dao level would be a good idea (e.g. a DuplicateJobInstance or DuplicateJobIdentifier/Parameters exception), that way it never makes it to the StepExecutor in the first place. e.g. I have a job that is non-restartable - this means that if it fails, it needs to be run again without depending on any restart data - in other words, must be re-invoked manually. Since I restart manually anyway, I could provide a new identifier (parameters). If it makes it to the StepExecutor and a step is restarted (suppose the step is marked allowRestartIfComplete because it is shared by several jobs), this could violate business rules for the job I'm running.

              Comment


              • #8
                JIRA issue created: http://jira.springframework.org/browse/BATCH-321

                Comment

                Working...
                X