Announcement Announcement Module
Collapse
No announcement yet.
Run Spring Batch job on grids - Proactive Scheduling Partition Handler Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Run Spring Batch job on grids - Proactive Scheduling Partition Handler

    Hello,

    I have written a Partition Handler in order to execute a spring batch job on an infrastructure managed by the ProActive Scheduler.
    The solution consists in 2 java classes :
    - the partition handler itself - it creates a ProActive Scheduling job, iterates over the step executions provided by a step execution splitter and creates a (proactive) task for each step execution. The job execution id, step execution id, step name and the spring job configuration file name are passed to the task
    - a java class that implements the execution on the remote node: StepExecutable which gathers an ApplicationContext which willl provide the corresponding step execution and step to be executed.

    As what I see, there are 3 limitations in my approach:
    - I neeed a job repository accessible from the remote nodes (I use a mysql database). I think is the same problem Antony described in his post http://forum.springsource.org/showth...highlight=grid
    - I did not manage to obtain, in my partition handler, the name of the xml file which defines the spring job (this reference need to be sent to the remote nodes in order for the corresponding step to be found and executed). But I think this can be done, It's just that I'm not experienced with spring
    - The last limitation, still related to the xml file describing the spring job, is that this file need to be accessible on the remote nodes. I hope that in the next version of the Scheduling we'll be able to transfer this automatically.

    Otherwise my first test is working just fine. I put a post on the Proactive blog with details about this experience:

    http://blog.activeeon.com/?p=79


    I'll put here my code, just in case someone is curios
    (see my first comment, cannot paste it here, "message too long" )
    Well, that's it. Any comments, opinions are welcomed.
    Last edited by esalagea; May 25th, 2009, 06:46 AM.

  • #2
    Partition Handler code

    Code:
    //============================================================================
    // Name        : Spring Batch example with Proactive Scheduler
    // Author      : Emil Salageanu, ActiveEon team
    // Version     : 0.1
    // Copyright   : Copyright ActiveEon 2008-2009, Tous Droits Réservés (All Rights Reserved)
    // Description : Simple example of executing a Spring batch job on the ProActive Scheduler
    //================================================================================
    package com.activeeon.sandbox.spring;
    
    import java.io.File;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Map;
    import java.util.Set;
    
    import org.ow2.proactive.scheduler.common.SchedulerAuthenticationInterface;
    import org.ow2.proactive.scheduler.common.SchedulerConnection;
    import org.ow2.proactive.scheduler.common.UserSchedulerInterface;
    import org.ow2.proactive.scheduler.common.job.JobEnvironment;
    import org.ow2.proactive.scheduler.common.job.JobId;
    import org.ow2.proactive.scheduler.common.job.JobResult;
    import org.ow2.proactive.scheduler.common.job.TaskFlowJob;
    import org.ow2.proactive.scheduler.common.task.JavaTask;
    import org.ow2.proactive.scheduler.common.task.TaskResult;
    import org.springframework.batch.core.BatchStatus;
    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.partition.PartitionHandler;
    import org.springframework.batch.core.partition.StepExecutionSplitter;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.util.Assert;
    
    /**
     * A {@link PartitionHandler} that creates and submit a job on the ProActive
     * Scheduler
     * 
     * @author Emil Salageanu
     * 
     */
    public class ProactiveSchedulerPartitionHandler implements PartitionHandler,
            InitializingBean {
    
        private int gridSize = 1;
    
        private String proactiveSchedulerUrl;
        private String proactiveSchedulerUserName;
        private String proactiveSchedulerPassword;
    
        private Step step;
    
        public void afterPropertiesSet() throws Exception {
            Assert.notNull(step, "A Step must be provided.");
        }
    
        /**
         * Passed to the {@link StepExecutionSplitter} in the
         * {@link #handle(StepExecutionSplitter, StepExecution)} method, instructing
         * it how many {@link StepExecution} instances are required, ideally. The
         * {@link StepExecutionSplitter} is allowed to ignore the grid size in the
         * case of a restart, since the input data partitions must be preserved.
         * 
         * @param gridSize
         *            the number of step executions that will be created
         */
        public void setGridSize(int gridSize) {
            this.gridSize = gridSize;
        }
    
        /**
         * Setter for the {@link Step} that will be used to execute the partitioned
         * {@link StepExecution}. This is a regular Spring Batch step, with all the
         * business logic required to complete an execution based on the input
         * parameters in its {@link StepExecution} context.
         * 
         * @param step
         *            the {@link Step} instance to use to execute business logic
         */
        public void setStep(Step step) {
            this.step = step;
        }
    
        /**
         * @see PartitionHandler#handle(StepExecutionSplitter, StepExecution)
         */
        public Collection<StepExecution> handle(
                StepExecutionSplitter stepExecutionSplitter,
                StepExecution masterStepExecution) throws Exception {
    
            Collection<StepExecution> result = new ArrayList<StepExecution>();
    
            // A job to be scheduled on the ProactiveScheduler
            TaskFlowJob proactiveSchedulerJob = new TaskFlowJob();
            proactiveSchedulerJob.setName("Master Step: "
                    + masterStepExecution.toString());
    
            // the step executions
            Set<StepExecution> stepExecutions = stepExecutionSplitter.split(
                    masterStepExecution, gridSize);
    
            for (StepExecution stepExecution : stepExecutions) {
                String jobExecutionId = stepExecution.getJobExecution().getId()
                        .toString();
                String stepExecutionId = stepExecution.getId().toString();
                String stepName = step.getName();
    
                JavaTask jt = new JavaTask();
                jt.setName(stepExecutionId);
                jt.setExecutableClassName(StepExecutable.class.getName());
                jt.addArgument(StepExecutable.JOB_EXECUTION_ID, jobExecutionId);
                jt.addArgument(StepExecutable.STEP_EXECUTION_ID, stepExecutionId);
                jt.addArgument(StepExecutable.STEP_NAME, stepName);
                jt.addArgument(StepExecutable.JOB_CONFIG_FILE,
                        "/launch-context.xml");
                proactiveSchedulerJob.addTask(jt);
    
            }// for all StepExecution
    
            //classes of this application will be automatically transfered with the job
            this.setJobClasPath(proactiveSchedulerJob);
    
            SchedulerAuthenticationInterface auth = SchedulerConnection
                    .join(proactiveSchedulerUrl);
            UserSchedulerInterface uischeduler = auth.logAsUser(
                    proactiveSchedulerUserName, proactiveSchedulerPassword);
            JobId id = uischeduler.submit(proactiveSchedulerJob);
    
            // blocking loop until we have the result:
    
            JobResult jr = null;
            while (jr == null) {
                Thread.sleep(3000);
                jr = uischeduler.getJobResult(id);
    
            }
    
            Map<String, TaskResult> jobResults = jr.getAllResults();
            // the jobresults map contains entries of type: <task_name, TaskResult>
            // and the task_name is the same as the corresponding step execution id
            // Therefore we iterate over the stepExecutions list and get the result
            // for each stepExecution
            // This will allow us to manage exceptions on the remote tasks
    
            for (StepExecution stepExecution : stepExecutions) {
                TaskResult taskResult = jobResults.get(stepExecution.getId()
                        .toString());
                try {
                    StepExecution se = (StepExecution) taskResult.value();
                    result.add(se);
                } catch (Throwable t) {
                    // an exception has been thrown during the launch of the task on
                    // the remote node
                    // we will mark the local step execution as FAILED, attach the
                    // exception to it and addd it to the results
    
                    ExitStatus exitStatus = ExitStatus.FAILED
                            .addExitDescription("TaskExecutor rejected the task for this step.");
                    stepExecution.setStatus(BatchStatus.FAILED);
                    stepExecution.setExitStatus(exitStatus);
                    stepExecution.addFailureException(t);
                    result.add(stepExecution);
    
                    // we print the stacktrace:
                    System.out.println("an error occured while executing the step "
                            + stepExecution.getId().toString() + ":");
                    t.printStackTrace();
    
                }
            }
            return result;
        }
    
        public void setProactiveSchedulerUrl(String proactiveSchedulerUrl) {
            this.proactiveSchedulerUrl = proactiveSchedulerUrl;
        }
    
        public void setProactiveSchedulerUserName(String proactiveSchedulerUserName) {
            this.proactiveSchedulerUserName = proactiveSchedulerUserName;
        }
    
        public void setProactiveSchedulerPassword(String proactiveSchedulerPassword) {
            this.proactiveSchedulerPassword = proactiveSchedulerPassword;
        }
    
        protected void setJobClasPath(org.ow2.proactive.scheduler.common.job.Job job) {
    
            String appClassPath = "";
            try {
                File appMainFolder = new File(this.getClass().getProtectionDomain()
                        .getCodeSource().getLocation().toURI());
                appClassPath = appMainFolder.getAbsolutePath();
            } catch (java.net.URISyntaxException e1) {
                e1.printStackTrace();
            }
            JobEnvironment je = new JobEnvironment();
            try {
                je.setJobClasspath(new String[] { appClassPath });
                System.out.println("job classpath: " + appClassPath);
            } catch (java.io.IOException e) {
                e.printStackTrace();
            }
            job.setEnvironment(je);
        }
    
      }

    Comment


    • #3
      StepExecutable code

      Code:
      //============================================================================
      // Name        : Spring Batch example with Proactive Scheduler
      // Author      : Emil Salageanu, ActiveEon team
      // Version     : 0.1
      // Copyright   : Copyright ActiveEon 2008-2009, Tous Droits Réservés (All Rights Reserved)
      // Description : Simple example of executing a Spring batch job on the ProActive Scheduler
      //================================================================================
      
      package com.activeeon.sandbox.spring;
      
      import java.io.Serializable;
      import java.util.Map;
      
      import org.ow2.proactive.scheduler.common.task.TaskResult;
      import org.ow2.proactive.scheduler.common.task.executable.JavaExecutable;
      import org.springframework.batch.core.JobInterruptedException;
      import org.springframework.batch.core.Step;
      import org.springframework.batch.core.StepExecution;
      import org.springframework.batch.core.explore.JobExplorer;
      import org.springframework.beans.factory.BeanFactory;
      import org.springframework.context.ApplicationContext;
      import org.springframework.context.support.ClassPathXmlApplicationContext;
      
      public class StepExecutable extends JavaExecutable {
      
      	public static final String JOB_EXECUTION_ID = "jobExecutionId";
      	public static final String STEP_EXECUTION_ID = "stepExecutionId";
      	public static final String STEP_NAME = "stepName";
      	public static final String JOB_CONFIG_FILE = "JobConfigFile";
      
      	private Long jobExecutionId;
      	private Long stepExecutionId;
      	private String stepName;
      	private String configFile;
      
      	@Override
      	public void init(Map<String, String> args) throws Exception {
      		jobExecutionId = Long.parseLong(args.get(JOB_EXECUTION_ID));
      		stepExecutionId = Long.parseLong(args.get(STEP_EXECUTION_ID));
      		stepName = args.get(STEP_NAME);
      		configFile = args.get(JOB_CONFIG_FILE);
      	}
      
      	@Override
      	public Serializable execute(TaskResult... arg0) throws Throwable {
      
      		ApplicationContext context = new ClassPathXmlApplicationContext(
      				configFile, this.getClass());
      
      		System.out.println("ApplicationContext ClassLoader:"
      				+ context.getClass().getClassLoader());
      
      		BeanFactory factory = context;
      		Step step = (Step) factory.getBean(stepName);
      		JobExplorer je = (JobExplorer) factory.getBean("jobExplorer");
      		StepExecution stepExecution = (StepExecution) je.getStepExecution(
      				jobExecutionId, stepExecutionId);
      
      		try {
      			step.execute(stepExecution);
      		} catch (JobInterruptedException e) {
      			// TODO Auto-generated catch block
      			e.printStackTrace();
      			stepExecution.addFailureException(e);
      		}
      		return stepExecution;
      	}
      
      }

      Comment

      Working...
      X