Announcement Announcement Module
Collapse
No announcement yet.
Coordinate Execution Order of Jobs Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Coordinate Execution Order of Jobs

    Say you have a resource for example a file and a bunch of jobs that uses this file. The jobs execution order is important and some jobs types should cancel earlier jobs of the same type that touches the same file because only that last one is important.

    Does Spring Batch provide some help for this situation or do you need to manually search for earlier jobs that have a certain parameter.

    Does Spring Batch support execution order preservation based on some resource/job parameter in a situation where multiple jobs could run in parallel.

  • #2
    In Batch 2.1 there is native XML support for FlowStep and JobStep, e.g.

    Code:
    <job id="job1" ...>
    
    </job>
    
    <job id="job2" ...>
    
    </job>
    
    <job id="job3" ...>
       <step>
          <job ref="job1"/>
       </step>
       <step>
          <job ref="job2"/>
       </step>
    </job>
    So you can control the execution order (and preserve it on a restart), execute jobs in parallel, or conditionally (with <next on="..."/> or <decision.../>) etc.

    Comment


    • #3
      Dave, I have a similar case but not exactly the same. I need to download 5 files from the same ftp site and save them to database every day. Now I just configure it as one job and pass in different parameter. Is there a way I can group them to one job? Otherwise, I'll have to set up 5 schedules for the same job.

      Comment


      • #4
        I didn't quite understand what was different. You can use a single job to process 5 files (as 5 steps).

        Comment


        • #5
          Not able to run parellel jobs

          Hi i am trying to run two jobs which will be running after every one miniut.
          Here are my two jobs in database
          INSERT INTO t5c_batch_jobs (job_id, job_name, job_group_name, job_parameters, cron_expr) VALUES (2, 'job1', 'mail-job', 'smtpserver=q3tech.com;from=[email protected];to =[email protected];subject=mail subject;body=mail body', '0/60 * * * * ?');
          INSERT INTO batch_jobs_steps (step_id, step_name, step_reader_class, step_writer_class, step_processor_class, commit_interval, start_limit, job_id) VALUES (2, 'sendMail', 'com.t5c.batch.reader.MailJobItemReader', 'com.t5c.batch.writer.MailJobItemWriter', 'com.t5c.batch.processor.MailJobItemProcessor', 5, 100, 2);
          INSERT INTO t5c_batch_jobs (job_id, job_name, job_group_name, job_parameters, cron_expr) VALUES (1, 'job_cards', 'job_cards_test', 'cardhandler=getCards', '0/60 * * * * ?');
          INSERT INTO batch_jobs_steps (step_id, step_name, step_reader_class, step_writer_class, step_processor_class, commit_interval, start_limit, job_id) VALUES (1, 'job_cards', 'com.t5c.batch.reader.CardDataItemReader', 'com.t5c.batch.writer.CardDataItemWriter', 'com.t5c.batch.processor.CardDataItemProcessor', 1, 100, 1);
          My java class to configure these jobs
          <code>
          public class BatchJobScheduler {
          private static Log sLog = LogFactory.getLog(BatchJobScheduler.class);
          private ApplicationContext ac;

          static {
          try {
          loadProperties();
          metadataFeeder = new JobMetaDataFeeder();
          metadataFeeder.configureDataSource(mDriverClass, mConnectionUrl,
          mUser, mPassword);
          }

          private static void loadProperties() throws FileNotFoundException,
          IOException {
          Properties properties = new Properties();
          InputStream is;
          if (BatchJobScheduler.class.getClassLoader() != null) {
          is = BatchJobScheduler.class.getClassLoader().getResour ceAsStream(
          "batch.properties");
          } else {
          is = System.class.getClassLoader().getResourceAsStream(
          "batch.properties");
          }
          properties.load(is);
          mDriverClass = properties.getProperty("batch.jdbc.driver");
          mConnectionUrl = properties.getProperty("batch.jdbc.url");
          mUser = properties.getProperty("batch.jdbc.user");
          mPassword = properties.getProperty("batch.jdbc.password");
          }

          public void start(WebApplicationContext wac) throws Exception {
          try {
          ac = new FileSystemXmlApplicationContext("batch-spring.xml");
          mTransactionManager = (DataSourceTransactionManager) ac
          .getBean("mTransactionManager");
          List<JobMetadata> jobsMetaData = getJobsData(mDriverClass,
          mConnectionUrl, mUser, mPassword, null);
          createAndRunScheduler(jobsMetaData);

          }

          @SuppressWarnings("unchecked")
          public List<CronTriggerBean> getJobTriggers(List<JobMetadata> jobsMetaData)
          throws Exception {
          List<CronTriggerBean> triggers = new ArrayList<CronTriggerBean>();
          for (JobMetadata jobMetadata : jobsMetaData) {
          mJobLauncher = (SimpleJobLauncher) ac.getBean("mJobLauncher");
          job = (SimpleJob) ac.getBean("job");
          job.setName(jobMetadata.getJobName());
          ArrayList<Step> steps = new ArrayList<Step>();
          for (StepMetadata stepMetadata : jobMetadata.getSteps()) {
          mJobRepository = (JobRepository) ac
          .getBean("mRepositoryFactory");
          // System.err.println(ac.getBean("stepFactory").getCl ass());
          stepFactory = new SimpleStepFactoryBean<String, Object>();
          stepFactory.setTransactionManager(mTransactionMana ger);
          stepFactory.setJobRepository(mJobRepository);
          stepFactory.setCommitInterval(stepMetadata.getComm itInterval());
          stepFactory.setStartLimit(stepMetadata.getStartLim it());
          T5CItemReader itemReader = (T5CItemReader) BeanUtils
          .instantiateClass(Class.forName(stepMetadata
          .getStepReaderClass()));
          itemReader
          .setItems(getItemList(jobMetadata.getJobParameters ()));
          stepFactory.setItemReader(itemReader);
          stepFactory.setItemProcessor((ItemProcessor) BeanUtils
          .instantiateClass(Class.forName(stepMetadata
          .getStepProcessorClass())));
          stepFactory.setItemWriter((ItemWriter) BeanUtils
          .instantiateClass(Class.forName(stepMetadata
          .getStepWriterClass())));
          stepFactory.setBeanName(stepMetadata.getStepName() );
          steps.add((Step) stepFactory.getObject());
          }
          job.setSteps(steps);
          ReferenceJobFactory jobFactory = new ReferenceJobFactory(job);
          mapJobRegistry = (MapJobRegistry) ac.getBean("jobRegistry");
          mapJobRegistry.register(jobFactory);
          jobDetail = (JobDetailBean) ac.getBean("jobDetail");
          jobDetail.setJobClass(BatchJobLauncher.class);
          jobDetail.setGroup(jobMetadata.getJobGroupName());
          jobDetail.setName(jobMetadata.getJobName());
          Map<String, Object> jobDataMap = new HashMap<String, Object>();
          jobDataMap.put("jobName", jobMetadata.getJobName());
          jobDataMap.put("jobLocator", mapJobRegistry);
          jobDataMap.put("jobLauncher", mJobLauncher);
          jobDataMap.put("timestamp", new Date());
          jobDetail.setJobDataAsMap(jobDataMap);
          cronTrigger = (CronTriggerBean) ac.getBean("cronTrigger");
          cronTrigger.setJobDetail(jobDetail);
          cronTrigger.setJobName(jobMetadata.getJobName());
          cronTrigger.setJobGroup(jobMetadata.getJobGroupNam e());
          cronTrigger.setCronExpression(jobMetadata.getCronE xpression());
          triggers.add(cronTrigger);
          }
          return triggers;
          }

          private void createAndRunScheduler(List<JobMetadata> jobsMetaData)
          throws Exception {
          // System.err.println(ac.getBean("schedulerFactory"). getClass());
          schedulerFactory = new SchedulerFactoryBean();
          List<CronTriggerBean> triggerList = getJobTriggers(jobsMetaData);
          Trigger[] triggers = new Trigger[triggerList.size()];
          int triggerCount = 0;
          for (CronTriggerBean trigger : triggerList) {
          triggers[triggerCount] = trigger;
          triggerCount++;
          }
          schedulerFactory.setTriggers(triggers);
          schedulerFactory.afterPropertiesSet();
          }

          private List<JobMetadata> getJobsData(String driverClass,
          String connectionURL, String user, String password, String query)
          throws SQLException, ClassNotFoundException {
          metadataFeeder.createJobMetadata(query);
          return metadataFeeder.getJobsMetadata();
          }

          private List<String> getItemList(String jobParameterString) {
          List<String> itemList = new ArrayList<String>();
          String[] parameters = jobParameterString.split(";");
          for (String string : parameters) {
          String[] mapKeyValue = string.split("=");
          if (mapKeyValue.length == 2) {
          itemList.add(mapKeyValue[0] + ":" + mapKeyValue[1]);
          } else {
          // exception for invalid job parameters
          System.out.println("exception for invalid job parameters");
          }
          }
          return itemList;
          }
          }

          </code>

          spring file

          <code>


          <bean id="mDataSource" class="org.apache.commons.dbcp.BasicDataSource">
          <property name="driverClassName" value="org.postgresql.Driver" />
          <property name="url" value="jdbcostgresql://localhost:5433/batch_metadata" />
          <property name="username" value="t5c_dev" />
          <property name="password" value="t5c_dev" />
          </bean>

          <bean id="mTransactionManager" class="org.springframework.jdbc.datasource.DataSou rceTransactionManager">
          <property name="dataSource" ref="mDataSource" />
          </bean>

          <bean id="mRepositoryFactory" class="org.springframework.batch.core.repository.s upport.JobRepositoryFactoryBean">
          <property name="dataSource" ref="mDataSource" />
          <property name="transactionManager" ref="mTransactionManager" />
          </bean>

          <bean id="mJobLauncher" class="org.springframework.batch.core.launch.suppo rt.SimpleJobLauncher">
          <property name="jobRepository" ref="mRepositoryFactory" />
          <property name="taskExecutor">
          <bean class="org.springframework.core.task.SimpleAsyncTa skExecutor" />
          </property>
          </bean>
          <bean id="jobRegistry" class="org.springframework.batch.core.configuratio n.support.MapJobRegistry">
          </bean>

          <bean id="t5cItemReader" class="com.t5c.batch.reader.T5CItemReader">
          </bean>
          <bean id="t5cItemWriter" class="com.t5c.batch.writer.T5CItemWriter">
          </bean>
          <bean id="t5cItemProcessor" class="com.t5c.batch.processor.T5CItemProcessor">
          </bean>
          <bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDe tailBean">
          </bean>
          <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronT riggerBean">
          <property name="name" value="cronTrigger" />
          </bean>
          <bean id="job" class="org.springframework.batch.core.job.SimpleJo b">
          <property name="jobRepository" ref="mRepositoryFactory" />
          <property name="restartable" value="true" />
          </bean>

          </beans>
          </code>

          When ever i run the start method it gives me the output only from the job which is last in the triggerlist added to factory.
          I couldnt figured it out why it is happening .
          Regards

          Comment


          • #6
            Originally posted by Dave Syer View Post
            I didn't quite understand what was different. You can use a single job to process 5 files (as 5 steps).
            Each one will take 3 steps which mean I'll need to repeat the steps for all. Certainly you can config 15 steps in one job, I just don't feel it's efficient enough.

            Comment


            • #7
              Maybe you could use an external flow:

              Code:
              <job ...>
                  <flow id="one" parent="flow" next="two"/>
                  <flow id="two" parent="flow" next="three"/>
                  <flow id="three" parent="flow" next="four"/>
                  <flow id="four" parent="flow" next="five"/>
                  <flow id="five" parent="flow"/>
              </job>
              <flow id="flow>
                 <step .../>
                 <step .../>
                 <step .../>
              </flow>
              It all depends on the structure and input parameters for the steps, what the best configuration approach is.

              Comment


              • #8
                Thank you, Dave. This is exactly what I'm looking for. Is there a target date yet for 2.1 release? I'm trying to get the 2.1.0.M3 but couldn't find it. The links on the http://static.springsource.org/sprin...downloads.html page either don't have it or take forever to open (i.e. http://s3browse.com/explore/maven.sp...ramework/batch)

                Comment


                • #9
                  M3 is on that download link (follow the link to Milestones: http://static.springsource.org/downl...?project=BATCH). As always JIRA has an estimate of the release date for 2.1: currently it is sometime in January.

                  s3browse is no longer available I think, so to browse in S3 you need to get a client like S3Fox or S3Organizer.

                  Comment


                  • #10
                    Great, thanks a lot, Dave. Happy holiday to all Spring folks and Spring fans!

                    Comment

                    Working...
                    X