Announcement Announcement Module
Collapse
No announcement yet.
best practice for concurrent Job Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • best practice for concurrent Job

    I have once raised similar question before but seems that discussion move to other topic :P

    I have some job that may be invoked concurrently (i.e. same Job is invoked multiple times at the same time, with different parameter of course)

    In my tasklet, reader or writer, we may have to keep stateful information in it during execution. As all these beans are singleton beans, we cannot have the same Job launched multiple times concurrent as the stateful information inside will be messed up.

    So, is the following the best practice suggested by Spring Batch?
    For each job submission, create a new App Context (probably use current App Context as parent), and get the job launcher from it and run the Job. i.e., one app context per job execution.

  • #2
    That is correct. The samples contain two examples of this pattern - the quartz sample and the jmx one (TaskExecutorLancher). Both use a custom JobFactory also in the samples currently.

    Comment


    • #3
      adrianshum,

      Can you please share your information on how you are invoking that same job concurrently? Small code snippet would be helpful.

      Thanks!

      Comment


      • #4
        Originally posted by hailspring View Post
        adrianshum,

        Can you please share your information on how you are invoking that same job concurrently? Small code snippet would be helpful.

        Thanks!
        Nothing special in fact...

        Just having a Message Driven Pojo, which get the Job Launcher and invoke the job:

        Code:
        public class JobRequestWorker implements MessageListener {
          // member variables
        
          public void onMessage(Message msg) {
              if (msg instanceof ObjectMessage) {
                SubmitNewJobRequest req = (SubmitNewJobRequest) msg;
                Job job = this.jobLocator.getJob(req.getJobName());
                this.jobLauncher.run(job, req.getJobParameters());
              }
          }
        }
        something like this in high level.

        Comment


        • #5
          Originally posted by Dave Syer View Post
          That is correct. The samples contain two examples of this pattern - the quartz sample and the jmx one (TaskExecutorLancher). Both use a custom JobFactory also in the samples currently.
          Taken that approach already (using ClassPathXmlApplicationContextJobFactory). Any plan of putting ClassPathXmlApplicationContextJobFactory to distribution as it seems to be quite a standard way to do.

          Comment


          • #6
            Hi, I am having issues running the same job concurrently. Could you help point me in the right direction please?

            I am using quartz to trigger my job. The same job can be triggered at the same time but with different parameters. The first job has its job parameters overwritten by the second. I thought before that provided the job has different parameters it would create a new job instance, I understand now that this is just for job restartability.

            I use the SimpleAsyncTaskExecutor as described in the samples for other jobs however I want to run these jobs synchronously. I have also read in the spring batch documentation that the readers and writers are not designed for multi-threaded use.

            Is it possible to run the same job, with different job parameters, synchronously, safely in a multi-threaded environment?

            Many thanks.

            Comment


            • #7
              Is it possible to run the same job, with different job parameters, synchronously, safely in a multi-threaded environment?
              it is if your batch artefacts (readers, writers, etc) are stateless, but most of the time they're not. The solution in the latest versions of Spring Batch is the "step" scope: the objects are created only for a specific execution of a step, so multiple instances of a job don't interfere with each other.

              Comment


              • #8
                Ok thanks. I should have stated the version I was using. Spring Batch 2.0.2. I do use the step scope on my jobs to late bind the query to use on the reader and set the filename to write to on the writer. If the scope was step this is set at the job level and not the joblauncher, wouldn't this mean that spring batch has already decided to use the same job instance which therefore uses the same job parameters and ultimately overwrites them during the launch of subsequent concurrent jobs / threads?

                I appreciate your help here. My end result is two jobs scheduled for the same time with different job parameters writing to two different files but their contents are mangled and the header / footer totals that are stored on the execution context are incorrect.

                Would the latest version of spring batch provide different behavior?

                Thanks,

                Comment


                • #9
                  as soon as you use different job parameters, Spring Batch creates a new instance and a new corresponding job execution. So your two job instances shouldn't overlap, unless they share a same statefull component (without the step scope). Are you sure the item writer bean (I guess you refer to an item writer) is step-scoped bean? (don't hesitate to provide your configuration and the Java launching code).

                  the latest version of Spring Batch should behave the same for this (unless you hit a bug that has been corrected in the 2.1 branch).

                  Comment


                  • #10
                    This is what I thought however not the behavior I am seeing. The two jobs do contain unique job parameters, in fact a new Date() is set guaranteeing uniquness.

                    Code:
                    // Execute the batch Job
                    jobLauncher.run(batchExportJob, new JobParametersBuilder()
                    .addDate(BatchProcessingMessageCodes.DATE_TIME_STAMP, new Date())
                    .addString(BatchProcessingMessageCodes.EXPORT_FILE_NAME, parameters.getExportFileName())
                    .addString(BatchProcessingMessageCodes.FAILURE_FILE_NAME, parameters.getFailureFileName())
                    .addString(BatchProcessingMessageCodes.HEADER_FILE_PROCESS_DATE, parameters.getProcessDate())
                    .addString(BatchProcessingMessageCodes.EXTENSION, BatchProcessingMessageCodes.CSV_FILE_EXTENSION)					.addString(BatchProcessingMessageCodes.WHERE_CLAUSE, parameters.getWhereClause())
                    .addString(BatchProcessingMessageCodes.ENCRYPTION_KEY, BatchProcessingMessageCodes.ENCRYPTION_TRANSACTION_EXPORT).toJobParameters());
                    Code:
                    	<bean id="jobLauncher" 
                    		class="org.springframework.batch.core.launch.support.SimpleJobLauncher" >
                    		<property name="jobRepository" ref="jobRepository" />
                    	</bean>
                    Code:
                    	<batch:job id="batchExportJob" >
                    
                    		<batch:step id="findTransactionsReadyForExport" >
                    			<batch:tasklet transaction-manager="jpaTransactionManager" >
                    				<batch:chunk reader="transactionReader" processor="transactionProcessor"
                    					writer="transactionWriter" commit-interval="100" >
                    				</batch:chunk>
                    				<batch:listeners>
                    					<batch:listener	class="net.companyname.batch.listener.SkipCheckingListener" />
                    					<batch:listener ref="exportTransactionItemFailedLoggerListener" />
                    					<batch:listener ref="exportTransactionFileHeaderCallback" />
                    					<batch:listener ref="exportTransactionFileFooterCallback" />
                    				</batch:listeners>
                    			</batch:tasklet>
                    			<batch:fail on="FAILED" exit-code="FAILED" />
                    			<batch:next on="*" to="updateStatus" />
                    		</batch:step>
                    		
                    		<batch:step id="updateStatus">
                    			<batch:tasklet transaction-manager="jpaTransactionManager" ref="updateStatuses">
                    				<batch:listeners>
                    					<batch:listener ref="exportTransactionItemFailedLoggerListener" />
                    				</batch:listeners>
                    			</batch:tasklet>
                    			<batch:fail on="FAILED" exit-code="FAILED" />
                    			<batch:end on="*" exit-code="COMPLETED" />
                    		</batch:step>		
                    
                    	</batch:job>
                    	
                    	<bean id="transactionReader"
                    		class="org.springframework.batch.item.database.JdbcCursorItemReader"
                    		scope="step">
                    		<property name="dataSource" ref="dataSource" />
                    		<property name="sql"
                    			value="select t.id from transaction t join batch b on t.batch_id = b.id #{jobParameters[EXPORT_TRANSACTION_WHERE_CLAUSE]}" />
                    		<property name="rowMapper">
                    			<bean class="net.targetgroup.accounts.base.batch.mapper.IdRowMapper" />
                    		</property>
                    	</bean>
                    	
                    	<bean id="transactionProcessor"
                    		class="net.targetgroup.accounts.base.batch.processor.ExportTransactionProcessor">
                    		<property name="failListener" ref="exportTransactionItemFailedLoggerListener" />
                    	</bean>
                    	
                    	<bean id="transactionWriter"
                    		class="net.targetgroup.accounts.base.batch.writer.ExportTransactionWriter">
                    		<property name="delegate">
                    			<bean class="org.springframework.batch.item.file.FlatFileItemWriter">
                    				<property name="lineAggregator">
                    						<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                    							<property name="delimiter" value=","/>
                    							<property name="fieldExtractor">
                    								<bean class="net.targetgroup.accounts.base.batch.BeanWrapperFieldExtractor">
                    									<property name="names" value="some,comma,delimited,values"/>
                    								</bean>
                    							</property>
                    						</bean>	
                    						
                    				</property>
                    				<property name="headerCallback" ref="exportTransactionFileHeaderCallback" />
                    				<property name="footerCallback" ref="exportTransactionFileFooterCallback" />
                    			</bean>
                    		</property>
                    	</bean>
                    You can see that I pass a new Date() as a parameter and so the two jobs are different. By the time the code gets to the header the first thread has had its parameters replaced by the second. I have put breakpoints on and as yet have not identified the point they get replaced. I can see that the spring code does create a new instance but then gets overwritten further on.

                    It is worth mentioning that I get the stepExecution in the header and processor by implementing StepExecutionListener and use the beforeStep(StepExecution stepExecution) to set it.

                    Apologies for the long post!

                    Comment


                    • #11
                      You can see that I pass a new Date() as a parameter and so the two jobs are different. By the time the code gets to the header the first thread has had its parameters replaced by the second.
                      ok with the Date parameter to create a new instance, but are the other parameters different (getExportFileName() for example)? If not, some step-scoped writers are perhaps working different things to the same file.

                      It is worth mentioning that I get the stepExecution in the header and processor by implementing StepExecutionListener and use the beforeStep(StepExecution stepExecution) to set it.
                      are those beans (header, processor) step-scoped? If not, they should, otherwise the job instances would indeed interfere (they'd share the exact same instances). From you configuration, only the JdbcCursorItemReader is step-scoped.

                      Comment


                      • #12
                        Spot on, it was the processor not having scope="step" (the writer did I was experimenting at the time I copied into previous reply).

                        The correct rows are now being written to the correct files, my footer totals are incorrect for one file but I'm sure that is another matter for me to look at.

                        Thanks for all your help and your quick responses. It really makes a difference.

                        Comment

                        Working...
                        X