Announcement Announcement Module
Collapse
No announcement yet.
multiple jobs are triggered from quartz scheduler even if the job is a StatefuJob Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • multiple jobs are triggered from quartz scheduler even if the job is a StatefuJob

    Hi All,

    First I couldn't find a Spring-Quartz-Integration forum so putting it here, sorry if putting in wrong one.

    Scenario:
    I have a spring-batch project where the scheduler is Quartz API and am using a cron trigger for it. I have few jobs which are scheduled in 30 seconds, and am running in clustered environments so most of the time the jobs are completed within 30 seconds, but in some cases (depending on the data that my reader reads and processor processes) it executes more than 30 seconds and I don't want a concurrent job to be fired when another one is still running.

    I am using spring 3.0.5.RELEASE, spring-batch-2.1.7.RELEASE and quartz-1.8.6.

    Below are my configuration:
    pom.xml

    All other related dependencies (spring & others) are excluded for brevity.

    configuration for spring applicationContext.xml
    Code:
    			
    	<!-- quartz scheduler factory which creates all job triggers -->
    	<bean id="quartzScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    		<property name="dataSource" ref="quartzDataSource"/>
            <property name="transactionManager" ref="quartzTransactionManager"/>
            <property name="overwriteExistingJobs" value="true"/>
            <property name="autoStartup" value="true" />
            <property name="waitForJobsToCompleteOnShutdown" value="true"/>
    		<property name="applicationContextSchedulerContextKey" value="applicationContext"/>
    		<property name="schedulerContextAsMap">
    			<map>
    				<entry key="jobLocator" value-ref="jobRegistry" />
    				<entry key="jobLauncher" value-ref="jobLauncher" />
    			</map>
    		</property>
    		<property name="triggers">
    			<list>
    				<ref bean="testJobTrigger" />
    				<ref bean="testJobOneTrigger" />
    			</list>
    		</property>
    		<property name="quartzProperties">
                <props>
                	<!-- below values can also be externalized from property file loaded via spring if needed, instead of hard coding -->
                    <prop key="org.quartz.scheduler.instanceName">TestBatchScheduler</prop>
                    <prop key="org.quartz.scheduler.instanceId">AUTO</prop>
                    <prop key="org.quartz.jobStore.misfireThreshold">100</prop>
                    <prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>
                    <prop key="org.quartz.jobStore.driverDelegateClass">org.quartz.impl.jdbcjobstore.oracle.OracleDelegate</prop>
                    <prop key="org.quartz.jobStore.tablePrefix">QRTZ_</prop>
                    <prop key="org.quartz.jobStore.isClustered">true</prop>
                    <!-- <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop> -->
                    <prop key="org.quartz.threadPool.class">org.springframework.scheduling.quartz.SimpleThreadPoolTaskExecutor</prop>
                    <prop key="org.quartz.threadPool.threadCount">30</prop>
                    <prop key="org.quartz.threadPool.threadPriority">5</prop>
                </props>
          </property>
    	</bean>
    
    	<bean id="testJobTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
    		<property name="jobDetail" ref="testJobDetail" />
    		<property name="misfireInstructionName" value="MISFIRE_INSTRUCTION_DO_NOTHING"/>
    		<property name="cronExpression" value="20/30 * * * * ?" />
    	</bean>
    	
    	<bean id="testJobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">
    		<property name="jobClass" value="mycompany.spring.batch.test.quartz.QuartzJobLauncher" />
    		<property name="group" value="test-batch" />
    		<property name="jobDataAsMap">
    			<map>
    				<entry key="jobName" value="testJob"/>
    			</map>
    		</property>
    	</bean>
    	
    	<bean id="testJobOneTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
    		<property name="jobDetail" ref="testJobOneDetail" />
    		<property name="misfireInstructionName" value="MISFIRE_INSTRUCTION_DO_NOTHING"/>
    		<property name="cronExpression" value="25/30 * * * * ?" />
    	</bean>
    	
    	<bean id="testJobOneDetail" class="org.springframework.scheduling.quartz.JobDetailBean">
    		<property name="jobClass" value="mycompany.spring.batch.test.quartz.QuartzJobLauncher" />
    		<property name="group" value="test-batch" />
    		<property name="jobDataAsMap">
    			<map>
    				<entry key="jobName" value="testJobOne"/>
    			</map>
    		</property>
    	</bean>
    	
    	<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    		<property name="jobRepository" ref="jobRepository" />
    		<property name="taskExecutor">
    			<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
    		</property>
    	</bean>
    
    	<batch:job id="testJobOne" job-repository="jobRepository">
    		 <batch:step id="testJobOneStep">
    			<batch:tasklet transaction-manager="quartzTransactionManager"	ref="testTaskletOne"/>
    		</batch:step> 
    	</batch:job>
    	
    	<bean id="testTaskletOne" class="mycompany.spring.batch.test.TestTaskletOne"/>
            <batch:job id="testJob" job-repository="jobRepository">
    		<batch:step id="testJobStep">
    			<batch:tasklet transaction-manager="quartzTransactionManager"	ref="testTasklet"/>
    		</batch:step> 
    	</batch:job>
    	
    	<bean id="testTasklet" class="mycompany.spring.batch.test.TestTasklet"/>
    
        <bean id="quartzDataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" lazy-init="true" destroy-method="close">
    		<property name="driverClass" value="${batch.jdbc.driver}"/>  	
     		<property name="jdbcUrl" value="${batch.jdbc.url}"/>
    		<property name="user" value="${batch.jdbc.user}"/>  
         	<property name="password" value="${batch.jdbc.password}"/>  
         	<property name="maxPoolSize" value="${batch.jdbc.connection.pool.maxsize}"/>
            <property name="minPoolSize" value="${batch.jdbc.connection.pool.minsize}"/>
       </bean>
    
    	<bean id="quartzTransactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    		<property name="dataSource" ref="quartzDataSource" />
    	</bean>
        
        <bean id="jobRepository"
    		class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    		<property name="dataSource" ref="quartzDataSource" />
    		<property name="transactionManager" ref="quartzTransactionManager" />
    	</bean>
    		
    	<!-- define a job registry over here, we can define our own , here one in Map implementation -->
    	<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
    	
    	<bean class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
    		<property name="jobRegistry" ref="jobRegistry"/>
    	</bean>
    
    	<bean id="jobExplorer"
    		class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
    		<property name="dataSource" ref="quartzDataSource"/>
    	</bean>
    	
    	<bean id="lobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler" />
    
    	<bean id="incrementerParent" class="org.springframework.jdbc.support.incrementer.OracleSequenceMaxValueIncrementer">
    		<property name="dataSource" ref="quartzDataSource" />
    		<property name="incrementerName" value="ID" />
    	</bean>


    Other configurations or codes are put in below as it crosses max limit.

    In my console I can see multiple jobs are fired, and quartz tables has the next firetime is updated by adding schduled rate to current execution time. I couldn't try new quartz API (2.X) with respective annotations of StatefulJob as I have to upgrade to spring 3.1 as 3.0.5 doesn't support it , and it will be very costly for me to test again my whole applications.

    I am not sure where I am going wrong, appreciate for any pointers.

    Thanks in advance
    Ajit Das

  • #2
    pom.xml
    Code:
            <dependency>
    			<groupId>org.quartz-scheduler</groupId>
    			<artifactId>quartz</artifactId>
    			<!-- quartz 2.x not compatible with Spring 3.0.x -->
    			<version>1.8.6</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.quartz-scheduler</groupId>
    			<artifactId>quartz-oracle</artifactId>
    			<version>1.8.6</version>
    		</dependency>
            <dependency>
    			<groupId>org.springframework.batch</groupId>
    			<artifactId>spring-batch-core</artifactId>
    			<version>2.1.7.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.batch</groupId>
    			<artifactId>spring-batch-infrastructure</artifactId>
    			<version>2.1.7.RELEASE</version>
    		</dependency>

    code for QuartzJobLauncher.java

    Code:
    public class QuartzJobLauncher extends QuartzJobBean implements StatefulJob{
    
    	/** Map parameter for getting the job name from job invocation */
    	private static final String JOB_NAME = "jobName";
    
    	/** Map parameter for getting the job launch date from job invocation */
    	private static final String JOB_LAUNCH_DATE = "job.launch.date";
    
    
    	/**
    	 * Execute the Quartz Job bean which is passed with a key name 'jobName' in the JobDataMap.
    	 * 
    	 * @param context execution context
    	 */
    	@SuppressWarnings("unchecked")
    	protected void executeInternal(final JobExecutionContext context) {
    
    		final Map<String,Object> jobDataMap = context.getMergedJobDataMap();
    		// add the current date to job parameters in order to run the same job
    		// with same parameters multiple times
    		if (jobDataMap.get(JOB_LAUNCH_DATE) == null) {
    			final Calendar calender = Calendar.getInstance();
    			final Date currentDate = calender.getTime();
    			jobDataMap.put(JOB_LAUNCH_DATE, currentDate);
    		}
    		final String jobName = (String) jobDataMap.get(JOB_NAME);
    		if (jobName == null) {
    			throw new IllegalStateException(
    					"Job Name can't be null, " 
    							+"must be passed in the JobDataMap while executing the batch");
    		}
    		final JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap);
    		try {
    			// added for removing the serialization issue
    			SchedulerContext schedulerContext = context.getScheduler().getContext();
    			final ApplicationContext applicationContext = (ApplicationContext) schedulerContext
    					.get("applicationContext");
    			final JobLauncher jobLauncher = applicationContext.getBean(
    					"jobLauncher", JobLauncher.class);
    			final JobLocator jobLocator = applicationContext.getBean("jobRegistry",
    					JobLocator.class);
    			final Job jobToRun = jobLocator.getJob(jobName);
    
    			jobLauncher.run(jobToRun, jobParameters);
    		} catch (SchedulerException e) {
    			throw new IllegalStateException(jobName + " has exception");
    		}
    	}
    
    	/**
    	 * Get the job parameters passed from configured job map from spring context xml.
    	 * 
    	 * @param jobDataMap the Map
    	 * @return JobParameters from job data map
    	 */
    	private JobParameters getJobParametersFromJobMap(
    			final Map<String, Object> jobDataMap) {
    		final JobParametersBuilder builder = new JobParametersBuilder();
    
    		for (Map.Entry<String,Object> entry : jobDataMap.entrySet()) {
    			final String key = (String) entry.getKey();
    			final Object value = entry.getValue();
    			if (((value instanceof String)) && (!key.equals(JOB_NAME))) {
    				builder.addString(key, (String) value);
    			} else if (((value instanceof Float))
    					|| ((value instanceof Double))) {
    				builder.addDouble(key,
    						Double.valueOf(((Number) value).doubleValue()));
    			} else if (((value instanceof Integer))
    					|| ((value instanceof Long))) {
    				builder.addLong(key, Long.valueOf(((Number) value).longValue()));
    			} else if ((value instanceof Date)) {
    				builder.addDate(key, (Date) value);
    			}/* else {
    				//ignore 
    				log.debug("JobDataMap contains values which are not job parameters (ignoring).");
    			}*/
    		}
    
    		return builder.toJobParameters();
    	}
    
    
    }
    java code for TestTasklet.java

    Code:
    public class TestTasklet implements Tasklet {
        @Override
        public RepeatStatus execute(final StepContribution contribution,
                final ChunkContext chunkContext)  {
        	Random random = new Random();
        	int randomNo = random.nextInt(1000);
            System.out.println("Tasklet TestTasklet###"+randomNo+" for spring-test started @ "+new Date());
            //some code that executes for more than schedule time
            System.out.println("Tasklet TestTasklet###"+randomNo+" for spring-test ended   @ "+new Date());
            return RepeatStatus.FINISHED;
        }
    
    
    }
    java code for TestTaskletOne.java (same as TestTasklet.java)

    Comment

    Working...
    X