Announcement Announcement Module
Collapse
No announcement yet.
We doesn't the job process the input source upon restart.. Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • We doesn't the job process the input source upon restart..

    I am kicking of my job with the jmx access. It runs ok, - but when issuing the command once more, - I just get this result back, - and no input source is processed...

    2008-01-24 19:54:46,776 [INFO ] [SimpleAsyncTaskExecutor-5] org.springframework.batch.execution.launch.SimpleJ obLauncher - Launching: ScheduledJobIdentifier: name=tickerJob,key=key,scheduleDate=Thu Jan 24 19:51:00 CET 2008
    2008-01-24 19:54:46,782 [INFO ] [SimpleAsyncTaskExecutor-5] org.springframework.batch.execution.launch.SimpleJ obLauncher - Completed successfully: ScheduledJobIdentifier: name=tickerJob,key=key,scheduleDate=Thu Jan 24 19:51:00 CET 2008

    As I am planning to run the command by Quartz, once every 24 hours, -it would be nice that I don't have to restart the jvm.

    Do I have to reset something, to have the inputsource being read again?

    I believe that I there should not be a need for starting up a new VM for every/time a job of a RestartableItemProviderTasklet is started?

    Geir
    Last edited by geira; Jan 24th, 2008, 01:24 PM.

  • #2
    If you launch the same Job twice, and it was successful the first time, it will not run again - you don't want to process the same data twice. It sounds like you actually have two different instances of the same Job to run. In that case they either have to have different JobIdentifiers (=JobParameters in m4), or you need to set restartable=false in the Job(Configuration). The reatartable flag will probably also change its name in m4, because it is confusing, but we haven't settled on a new name yet.

    Comment


    • #3
      Our case is that we want the job to process a resultset from a query once every 24 hour. Is the correct approach to set set restartable=false and kick of the same job name once again, or should we do it in another way?

      The job didn't fire again, - even after setting restarable=false, the reponse:

      Hibernate: select events0_.SUBSCRIPTIONID as SUBSCRIP7_2_, events0_.EVENTID as EVENTID2_, events0_.EVENTID as EVENTID59_1_, events0_.CHANGEDBY as CHANGEDBY59_1_, events0_.LASTCHANGED as LASTCHAN3_59_1_, events0_.EVENTCODEID as EVENTCOD6_59_1_, events0_.eventDate as eventDate59_1_, events0_.eventText as eventText59_1_, events0_.SUBSCRIPTIONID as SUBSCRIP7_59_1_, eventcodeb1_.EVENTCODEID as EVENTCOD1_60_0_, eventcodeb1_.CHANGEDBY as CHANGEDBY60_0_, eventcodeb1_.LASTCHANGED as LASTCHAN3_60_0_, eventcodeb1_.description as descript4_60_0_ from EVENTS events0_ left outer join EVENT_CODES eventcodeb1_ on events0_.EVENTCODEID=eventcodeb1_.EVENTCODEID where events0_.SUBSCRIPTIONID=?
      Hibernate: select subscripti0_.SUBSCRIPTIONID as SUBSCRI25_1_, subscripti0_.subscriptionLineId as subscrip1_1_, subscripti0_.subscriptionLineId as subscrip1_67_0_, subscripti0_.CHANGEDBY as CHANGEDBY67_0_, subscripti0_.LASTCHANGED as LASTCHAN3_67_0_, subscripti0_.active as active67_0_, subscripti0_.copies as copies67_0_, subscripti0_.dayDiscount as dayDisco6_67_0_, subscripti0_.dayPrice as dayPrice67_0_, subscripti0_.delivery_1 as delivery8_67_0_, subscripti0_.delivery_2 as delivery9_67_0_, subscripti0_.delivery_3 as delivery10_67_0_, subscripti0_.delivery_4 as delivery11_67_0_, subscripti0_.delivery_5 as delivery12_67_0_, subscripti0_.delivery_6 as delivery13_67_0_, subscripti0_.delivery_7 as delivery14_67_0_, subscripti0_.discount as discount67_0_, subscripti0_.distributionCost_1 as distrib16_67_0_, subscripti0_.distributionCost_2 as distrib17_67_0_, subscripti0_.distributionCost_3 as distrib18_67_0_, subscripti0_.distributionCost_4 as distrib19_67_0_, subscripti0_.distributionCost_5 as distrib20_67_0_, subscripti0_.distributionCost_6 as distrib21_67_0_, subscripti0_.distributionCost_7 as distrib22_67_0_, subscripti0_.gross as gross67_0_, subscripti0_.net as net67_0_, subscripti0_.PRODUCTLINEID as PRODUCT26_67_0_, subscripti0_.SUBSCRIPTIONID as SUBSCRI25_67_0_ from SUBSCRIPTION_LINES subscripti0_ where subscripti0_.SUBSCRIPTIONID=?
      Hibernate: select events0_.SUBSCRIPTIONID as SUBSCRIP7_2_, events0_.EVENTID as EVENTID2_, events0_.EVENTID as EVENTID59_1_, events0_.CHANGEDBY as CHANGEDBY59_1_, events0_.LASTCHANGED as LASTCHAN3_59_1_, events0_.EVENTCODEID as EVENTCOD6_59_1_, events0_.eventDate as eventDate59_1_, events0_.eventText as eventText59_1_, events0_.SUBSCRIPTIONID as SUBSCRIP7_59_1_, eventcodeb1_.EVENTCODEID as EVENTCOD1_60_0_, eventcodeb1_.CHANGEDBY as CHANGEDBY60_0_, eventcodeb1_.LASTCHANGED as LASTCHAN3_60_0_, eventcodeb1_.description as descript4_60_0_ from EVENTS events0_ left outer join EVENT_CODES eventcodeb1_ on events0_.EVENTCODEID=eventcodeb1_.EVENTCODEID where events0_.SUBSCRIPTIONID=?
      2008-01-24 21:56:19,469 [INFO ] [SimpleAsyncTaskExecutor-1] org.mule.extras.client.MuleClient - There is already a manager locally available to this client, no need to create a new one
      2008-01-24 21:56:19,546 [INFO ] [SimpleAsyncTaskExecutor-1] org.mule.providers.vm.VMMessageDispatcher - Connected: VMMessageDispatcher{this=301cdb, endpoint=vm://deliverycalendar.deliveryday}
      2008-01-24 21:56:19,610 [INFO ] [SimpleAsyncTaskExecutor-1] org.mule.providers.tcp.TcpMessageDispatcher - Connected: TcpMessageDispatcher{this=4349fe, endpoint=tcp://muletest.aftenposten.no:60701}
      2008-01-24 21:56:24,083 [INFO ] [SimpleAsyncTaskExecutor-1] org.springframework.batch.execution.launch.SimpleJ obLauncher - Completed successfully: ScheduledJobIdentifier: name=tickerJob,key=key,scheduleDate=Thu Jan 24 21:55:34 CET 2008
      2008-01-24 21:56:36,247 [INFO ] [SimpleAsyncTaskExecutor-2] org.springframework.batch.execution.launch.SimpleJ obLauncher - Launching: ScheduledJobIdentifier: name=tickerJob,key=key,scheduleDate=Thu Jan 24 21:55:34 CET 2008
      2008-01-24 21:56:36,271 [INFO ] [SimpleAsyncTaskExecutor-2] org.springframework.batch.execution.launch.SimpleJ obLauncher - Completed successfully: ScheduledJobIdentifier: name=tickerJob,key=key,scheduleDate=Thu Jan 24 21:55:34 CET 2008

      The job, did only fetch a resultset from database, the first time it was kicked off.

      Any suggestions to which approach we should take?

      Rgds.
      Geir
      Last edited by geira; Jan 24th, 2008, 03:00 PM.

      Comment


      • #4
        Can you look at the BATCH_STEP_EXECUTION table to ensure the step is actually being executed?

        You can also use the NativeJob from Quartz to launch the job in a separate process (jvm).

        Comment


        • #5
          What does your ItemReader/InputSource do? Maybe there is no data to process on the second run? Can you post your job configuration? (Please use the [code][/code] brackets to post logs and code.)
          Last edited by Dave Syer; Jan 24th, 2008, 06:10 PM. Reason: formatting

          Comment


          • #6
            I thought that when the job was started once more, - that the feed/inputsource was read again...?

            And also if the result from the search criterias in the sql-query matched, - I would get the resultset into the inputsource for process once more?

            Geir

            Code:
            <?xml version="1.0" encoding="UTF-8"?>
            <beans xmlns="http://www.springframework.org/schema/beans"
            	xmlns:aop="http://www.springframework.org/schema/aop"
            	xmlns:tx="http://www.springframework.org/schema/tx"
            	xmlns:p="http://www.springframework.org/schema/p"
            	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            	xsi:schemaLocation="
            		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
            		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
            		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd">
            
                <description>This job imports payments from the accounting system</description>
                <import resource="classpath:/springAccountBalancerCore-configurer.xml"/>
                <import resource="classpath:/springAccountBalancerCore-dao.xml"/>
            
                <bean id="simpleJob"
                    class="org.springframework.batch.core.configuration.JobConfiguration"
                    abstract="true">
                    <property name="restartable" value="false" />
                </bean>
            
            
                <bean parent="stepScope" />
            	<bean parent="jobConfigurationRegistryBeanPostProcessor" />
            
                <bean id="paymentImportJob" parent="simpleJob">
                    <property name="steps">
                        <bean id="step1"
                            class="org.springframework.batch.execution.step.RepeatOperationsStepConfiguration">
                            <property name="tasklet">
                                <bean class="org.springframework.batch.execution.tasklet.RestartableItemProviderTasklet">
                                    <property name="itemProvider">
                                        <bean class="org.springframework.batch.item.provider.InputSourceItemProvider">
                                            <property name="inputSource" ref="fileInputSource" />
                                        </bean>
                                    </property>
                                    <property name="itemProcessor">
                                        <bean
                                            class="no.aftenposten.accountbalancer.batch.processor.PaymentImportLineProcessor">
                                            <property name="outputSource" ref="invoiceManagerOutputSource" />
                                            <property name="invoicingManager" ref="invoicingManager"/>
                                        </bean>
                                    </property>
                                </bean>
                            </property>
                            <property name="chunkOperations">
                                <bean class="org.springframework.batch.repeat.support.RepeatTemplate">
                                    <property name="interceptors" ref="invoiceManagerOutputSource" />
                                    <property name="completionPolicy">
                                        <bean class="org.springframework.batch.repeat.policy.SimpleCompletionPolicy">
                                            <property name="chunkSize" value="1" />
                                        </bean>
                                    </property>
                                </bean>
                            </property>
                            <property name="stepOperations">
                                <bean class="org.springframework.batch.repeat.support.RepeatTemplate">
                                    <property name="exceptionHandler">
                                        <bean class="org.springframework.batch.repeat.exception.handler.SimpleLimitExceptionHandler"
                                            p:limit="0" p:useParent="true" p:type="java.lang.Exception" />
                                    </property>
                                </bean>
                            </property>
                        </bean>
                    </property>
                </bean>
            
                <!-- This is a framework class that needs a delegate and also needs to be registered as a RepeatInterceptor in the chunk -->
                <bean id="invoiceManagerOutputSource"
                    class="no.aftenposten.accountbalancer.batch.outputsource.InvoicingManagerOutputSource" >
                    <property name="invoicingDao" ref="invoicingJdbcDao"/>
                </bean>
            
                <bean id="paymentLineMapper" class="no.aftenposten.accountbalancer.batch.mapper.PaymentFieldSetMapper" />
            
            	<bean id="paymentLineDescriptor"
            		class="org.springframework.batch.io.file.support.transform.DelimitedLineTokenizer">
            		<property name="delimiter" value=";"/>
            	</bean>
            
                <bean id="fileInputSource" class="org.springframework.batch.io.file.support.DefaultFlatFileInputSource">
            		<property name="resource"
            			value="classpath:paymentInput.txt" />
                    <property name="tokenizer" ref="paymentLineDescriptor" />
                    <property name="fieldSetMapper" ref="paymentLineMapper" />
            	</bean>
            
                <bean parent="customEditorConfigurer" />
            
            </beans>

            Comment


            • #7
              The definition you just showed looks like it's reading from a file, not a database using hibernate?

              Also, can you post a dump of your status tables?

              Comment


              • #8
                Here you have the result from the BATCH_STEP_EXECUTION table

                Code:
                ID	VERSION	STEP_ID	JOB_EXECUTION_ID	START_TIME	END_TIME	STATUS	COMMIT_COUNT	TASK_COUNT	TASK_STATISTICS	CONTINUABLE	EXIT_CODE	EXIT_MESSAGE
                1	0	1	1	2008-01-25 20:39:00.351	2008-01-25 20:39:30.618	COMPLETED	2	2	#Fri Jan 25 20:39:30 CET 2008
                sqlCursorInput.skippedRrecordCount=0
                sqlCursorInput.lastProcessedRowNum=1
                	N	COMPLETED	(null)
                2	0	2	2	2008-01-25 20:40:13.111	2008-01-25 20:40:15.528	COMPLETED	1	1	#Fri Jan 25 20:40:15 CET 2008
                sqlCursorInput.skippedRrecordCount=0
                sqlCursorInput.lastProcessedRowNum=1
                	N	COMPLETED	(null)
                3	0	3	3	2008-01-25 20:41:03.45	2008-01-25 20:41:05.516	COMPLETED	1	1	#Fri Jan 25 20:41:05 CET 2008
                sqlCursorInput.skippedRrecordCount=0
                sqlCursorInput.lastProcessedRowNum=1
                	N	COMPLETED	(null)
                We want to process the "select * from SUBSCRIPTION_LINES where active = 1 and subscriptionLineId = 3" once every 24 hour, - and do inserts into other tables, - though via jdbc... It doesn't seem to read the table, after having processed the table once.

                Code:
                <?xml version="1.0" encoding="UTF-8"?>
                <beans xmlns="http://www.springframework.org/schema/beans"
                	xmlns:aop="http://www.springframework.org/schema/aop"
                	xmlns:tx="http://www.springframework.org/schema/tx"
                	xmlns:p="http://www.springframework.org/schema/p"
                	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                	xsi:schemaLocation="
                		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
                		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
                		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd">
                
                    <description>This file defines the job for the ticker process.</description>
                    <import resource="classpath:/springAccountBalancerCore-configurer.xml"/>
                    <import resource="classpath:/springAccountBalancerCore-dao.xml"/>
                    <import resource="classpath:/springAccountBalancerCore-biz.xml"/>
                
                    <bean id="simpleTickerJob" class="org.springframework.batch.core.configuration.JobConfiguration" abstract="true">
                        <property name="restartable" value="false" />
                    </bean>
                
                    <bean parent="stepScope" />
                	<bean parent="jobConfigurationRegistryBeanPostProcessor" />
                
                    <bean id="tickerJob" parent="simpleTickerJob">
                		<property name="steps">
                			<bean id="tickerStep1"
                				class="org.springframework.batch.execution.step.RepeatOperationsStepConfiguration">
                                <property name="tasklet">
                					<bean
                						class="org.springframework.batch.execution.tasklet.RestartableItemProviderTasklet">
                						<property name="itemProvider">
                							<bean class="org.springframework.batch.item.provider.InputSourceItemProvider">
                								<property name="inputSource" ref="hibernateTickerJobInputSource"/>
                							</bean>
                						</property>
                						<property name="itemProcessor">
                							<bean class="no.aftenposten.accountbalancer.batch.processor.SubscriptionLineTickerProcessor">
                								<property name="outputSource" ref="hibernateTickerJobOutputSource"/>
                								<property name="deliveryCalendarManager" ref="deliveryCalendarManager"/>
                								<property name="productLineDao" ref="productLineDao"/>
                								<property name="subscriptionDao" ref="subscriptionDao"/>
                                                <property name="accountingManager" ref="accountingManager" />
                                            </bean>
                						</property>
                					</bean>
                				</property>
                				<property name="chunkOperations">
                					<bean class="org.springframework.batch.repeat.support.RepeatTemplate">
                						<property name="interceptors" ref="hibernateTickerJobOutputSource"/>
                						<property name="completionPolicy">
                							<bean
                								class="org.springframework.batch.repeat.policy.SimpleCompletionPolicy">
                								<property name="chunkSize" value="1" />
                							</bean>
                						</property>
                					</bean>
                				</property>
                				<property name="stepOperations">
                					<bean class="org.springframework.batch.repeat.support.RepeatTemplate">
                						<property name="exceptionHandler">
                							<bean
                								class="org.springframework.batch.repeat.exception.handler.SimpleLimitExceptionHandler"
                								p:limit="2" p:useParent="true" p:type="java.lang.Exception" />
                						</property>
                					</bean>
                				</property>
                			</bean>
                		</property>
                	</bean>
                
                	<!-- This is a framework class that needs a delegate and also needs to be registered as a RepeatInterceptor in the chunk -->
                	<bean id="hibernateTickerJobOutputSource"                                                             
                		class="org.springframework.batch.io.support.HibernateAwareItemWriter">
                		<property name="sessionFactory" ref="sessionFactoryCoreAb" />
                		<property name="delegate" ref="subscriptionLineTickerJobWriter" />
                	</bean>
                
                
                    <bean id="subscriptionLineTickerJobWriter"
                        class="no.aftenposten.accountbalancer.batch.outputsource.AccountingWriter" parent="accountingJdbcDao">
                    </bean>
                
                        <!--class="no.aftenposten.accountbalancer.batch.outputsource.AccountingWriter">-->
                        <!--<property name="dataSource" ref="oracleDataSource" />-->
                
                    <bean id="hibernateTickerJobInputSource"  class="org.springframework.batch.io.cursor.JdbcCursorInputSource">
                        <property name="dataSource" ref="pulsDataSource"/>
                	    <property name="mapper">
                            <bean class="no.aftenposten.accountbalancer.batch.mapper.SubscriptionLineMapper"/>
                	    </property>
                        <property name="sql" value="select * from SUBSCRIPTION_LINES where active = 1 and subscriptionLineId = 3"/>
                	</bean>
                
                
                </beans>
                Last edited by geira; Jan 25th, 2008, 02:04 PM.

                Comment


                • #9
                  It looks to me as if neither the input source nor its parent and grandparent is scope="step". This is probably just about OK in a standalone job, but in this case where you are re-launching multiple times in the same VM, what you get is the same input source coming back in second and subsequent runs. Not surprisingly then, it says it is exhausted at the beginning of those runs, because it was exhausted at the end of the first one. It needs to be closed and re-opened for each run, and the best way to do that is to make it scope="step".

                  Practically all input source (a.k.a. ItemReader) and output source (a.k.a. ItemWriter) are stateful, and therefore require scope="step" so that Spring can handle their lifecycle. This will be simpler when we get an XML namespace, because you won't have to know that this is necessary for the common out-of-the-box components.

                  Comment


                  • #10
                    I managed to get it work, - reloading the inputsource by firing up a new Spring context every time the job is started.

                    "parent" - is the main Spring context, - the application is running in.
                    I call the configuration.xml, the same name as the Spring-batch "job"-name.
                    I have exposed the JobStarter as a JMX via MBeanExporter.

                    Any thoughts about doing it this way?


                    Code:
                    public class JobStarterImpl extends StartServer implements JobStarter{
                        private static transient Log log = LogFactory.getLog(JobStarterImpl.class);
                    
                        JobLauncher launcher;
                    
                        public void setLauncher(JobLauncher launcher) {
                            this.launcher = launcher;
                        }
                    
                        public void run(final String jobName) {
                    
                            new Thread(new Runnable() {
                                ExitStatus status = ExitStatus.FAILED;
                                public void run() {
                                        log.info("Loading job xml: "+jobName);
                                        new ClassPathXmlApplicationContext(new String[]{"batch/jobs/"+jobName+".xml"},
                                                parent);
                    
                                    try {
                                        log.info("Starting job-"+jobName);
                                        status = launcher.run(jobName).getExitStatus();
                                        log.info("Exit code: "+status.getExitCode());                    
                                    } catch (NoSuchJobConfigurationException e) {
                                        log.error("No such configuration",e);
                                    } catch (JobExecutionAlreadyRunningException e) {
                                        log.error("Job "+jobName+" is already running");
                                    }
                                }
                                ;
                            }).start();
                    
                        }
                    }

                    Comment

                    Working...
                    X