Announcement Announcement Module
Collapse
No announcement yet.
creating spring batch FLOW JOB dynamically Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • creating spring batch FLOW JOB dynamically

    Hi All,

    I am trying to create the Spring Batch Job dynamically.

    I got a link http://forum.springsource.org/showth...bs-dynamically in this forum describes how to SEQUENTIAL JOB dynamically. It helped me a lot.

    Now I am trying to create SPLIT/FLOW JOB dynamically, but I am not able to create SPLIT/FLOW JOB dynamically.

    Can you help me to create FLOW JOB dynamically. If anyone have created FLOW JOB dynamically please share a sample code of "how to create SPLIT/FLOW JOB dynamically"


    Thanks
    Parag Phatowali
    Last edited by paragflume; Feb 14th, 2013, 09:14 AM.

  • #2
    Hi All,


    Finally I am able to create Spring Batch Flow Job programmatically / dynamically.

    Here is sample code :
    Code:
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.JobParametersBuilder;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.job.flow.FlowExecutionStatus;
    import org.springframework.batch.core.job.flow.FlowJob;
    import org.springframework.batch.core.job.flow.JobExecutionDecider;
    import org.springframework.batch.core.job.flow.support.SimpleFlow;
    import org.springframework.batch.core.job.flow.support.StateTransition;
    import org.springframework.batch.core.job.flow.support.state.DecisionState;
    import org.springframework.batch.core.job.flow.support.state.EndState;
    import org.springframework.batch.core.job.flow.support.state.StepState;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.core.step.tasklet.TaskletStep;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.transaction.PlatformTransactionManager;
    
    public class JoinStart {
    
    	public static void main(String[] args) throws Exception {
    		JoinStart s = new JoinStart();
    		s.runShellAction("");
    	}
    
    	public void runShellAction(String command) throws Exception {
    
    
    		ApplicationContext context = new ClassPathXmlApplicationContext("context.xml");
    
    		JobRepository jobRepository = (JobRepository) context.getBean("jobRepository");
    
    		PlatformTransactionManager transactionManager = (PlatformTransactionManager) context.getBean("transactionManager");
    
    		JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
    
    		JobParameters jobPerameter = new JobParametersBuilder().addDate("date_param", new java.util.Date()).toJobParameters();
    
    		Tasklet t1 = new TJoin("join1",5);
    		TaskletStep ts1 = new TaskletStep("join1");
    		ts1.setTasklet(t1);
    		ts1.setJobRepository(jobRepository);
    		ts1.setTransactionManager(transactionManager);
    
    		Tasklet t2 = new TJoin("join2",5);
    		TaskletStep ts2 = new TaskletStep("join2");
    		ts2.setTasklet(t2);
    		ts2.setJobRepository(jobRepository);
    		ts2.setTransactionManager(transactionManager);
    
    		Tasklet t3 = new TJoin("join3",5);
    		TaskletStep ts3 = new TaskletStep("join3");
    		ts3.setTasklet(t3);
    		ts3.setJobRepository(jobRepository);
    		ts3.setTransactionManager(transactionManager);
    		
    		Tasklet t4 = new TJoin("join4",5);
    		TaskletStep ts4 = new TaskletStep("join4");
    		ts4.setTasklet(t4);
    		ts4.setJobRepository(jobRepository);
    		ts4.setTransactionManager(transactionManager);
    
    		JobExecutionDecider decider = new JobExecutionDecider() {
    
    			@Override
    			public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
    
    				return new FlowExecutionStatus("SWITCH_A");
    			}
    		};
    
    
    		FlowJob flowJob = new FlowJob("flow_job");
    
    		SimpleFlow simpleFlow = new SimpleFlow("simple_job");
    		List<StateTransition> transitions = new ArrayList<StateTransition>();
    
    
    		transitions.add(StateTransition.createStateTransition(new StepState(ts1), "decision"));
    		transitions.add(StateTransition.createStateTransition(new DecisionState(decider, "decision"), "join2"));
    		transitions.add(StateTransition.createStateTransition(new DecisionState(decider, "decision"), "SWITCH_A", "join3"));
    		
    		transitions.add(StateTransition.createStateTransition(new DecisionState(decider, "decision"), "SWITCH_B", "join4"));
    
    		transitions.add(StateTransition.createStateTransition(new StepState(ts2), ExitStatus.COMPLETED.getExitCode(), "end2"));
    		transitions.add(StateTransition.createStateTransition(new StepState(ts2), ExitStatus.FAILED.getExitCode(), "end3"));
    
    		transitions.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.COMPLETED, "end2")));
    		transitions.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.FAILED, "end3")));
    
    		transitions.add(StateTransition.createStateTransition(new StepState(ts3), ExitStatus.COMPLETED.getExitCode(), "end4"));
    		transitions.add(StateTransition.createStateTransition(new StepState(ts3), ExitStatus.FAILED.getExitCode(), "end5"));
    
    		transitions.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.COMPLETED, "end4")));
    		transitions.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.FAILED, "end5")));
    		
    		
    		transitions.add(StateTransition.createStateTransition(new StepState(ts4), ExitStatus.COMPLETED.getExitCode(), "end6"));
    		transitions.add(StateTransition.createStateTransition(new StepState(ts4), ExitStatus.FAILED.getExitCode(), "end7"));
    
    		transitions.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.COMPLETED, "end6")));
    		transitions.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.FAILED, "end7")));
    
    
    		simpleFlow.setStateTransitions(transitions);
    		simpleFlow.afterPropertiesSet();
    
    		flowJob.setFlow(simpleFlow);
    		flowJob.setJobRepository(jobRepository);
    		flowJob.afterPropertiesSet();
    
    		try {
    			jobLauncher.run(flowJob, jobPerameter);
    		} catch (Exception e) {
    			System.out.println(" Job Launching Error : " + e);
    		}
    		
    	}
    	
    	
    
    }

    The Tasklet :

    Code:
    import java.io.File;
    
    import org.springframework.batch.core.StepContribution;
    import org.springframework.batch.core.scope.context.ChunkContext;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.repeat.RepeatStatus;
    
    public class TJoin implements Tasklet {
    
    	private String name;
    	private int i;
    
    	public int getI() {
    		return i;
    	}
    
    	public void setI(int i) {
    		this.i = i;
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	@Override
    	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
    		try {
    			File file = new File("d:\\ttt\\" + this.getName() + ".TXT");
    			System.out.println(" Thread " + this.getName() + " is going to sleep");
    			Thread.sleep(1000 * i);
    			System.out.println(" Thread " + this.getName() + " wakedup");
    			file.createNewFile();
    		} catch (Exception e) {
    			System.out.println(" Error :: " + e);
    		}
    		return RepeatStatus.FINISHED;
    	}
    
    	public TJoin(String name, int i) {
    		super();
    		this.name = name;
    		this.i = i;
    	}
    
    }
    Last edited by paragflume; Mar 6th, 2013, 05:45 AM.

    Comment


    • #3
      Hi All,

      Here is example of how to create Split Flow jobs ( Spring Batch jobs runs in parallel ) :

      Code:
      import java.util.ArrayList;
      import java.util.List;
      
      import org.springframework.batch.core.JobParameters;
      import org.springframework.batch.core.JobParametersBuilder;
      import org.springframework.batch.core.job.flow.Flow;
      import org.springframework.batch.core.job.flow.FlowExecutionStatus;
      import org.springframework.batch.core.job.flow.FlowJob;
      import org.springframework.batch.core.job.flow.support.SimpleFlow;
      import org.springframework.batch.core.job.flow.support.StateTransition;
      import org.springframework.batch.core.job.flow.support.state.EndState;
      import org.springframework.batch.core.job.flow.support.state.SplitState;
      import org.springframework.batch.core.job.flow.support.state.StepState;
      import org.springframework.batch.core.launch.JobLauncher;
      import org.springframework.batch.core.launch.support.SimpleJobLauncher;
      import org.springframework.batch.core.repository.JobRepository;
      import org.springframework.batch.core.step.tasklet.Tasklet;
      import org.springframework.batch.core.step.tasklet.TaskletStep;
      import org.springframework.context.ApplicationContext;
      import org.springframework.context.support.ClassPathXmlApplicationContext;
      import org.springframework.core.task.SimpleAsyncTaskExecutor;
      import org.springframework.transaction.PlatformTransactionManager;
      
      public class JoinParallelStart {
      
      	public static void main(String[] args) throws Exception {
      		JoinParallelStart s = new JoinParallelStart();
      		s.runShellAction("");
      	}
      
      	public void runShellAction(String command) throws Exception {
      
      		ApplicationContext context = new ClassPathXmlApplicationContext("context.xml");
      
      		JobRepository jobRepository = (JobRepository) context.getBean("jobRepository");
      
      		PlatformTransactionManager transactionManager = (PlatformTransactionManager) context.getBean("transactionManager");
      
      		JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
      		SimpleJobLauncher simpleJobLauncher = (SimpleJobLauncher) jobLauncher;
      		simpleJobLauncher.setJobRepository(jobRepository);
      		simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
      		jobLauncher = simpleJobLauncher;
      
      		JobParameters jobPerameter = new JobParametersBuilder().addDate("date_param", new java.util.Date()).toJobParameters();
      
      		Tasklet t1 = new TJoin("join1", 270);
      		TaskletStep ts1 = new TaskletStep("join1");
      		ts1.setTasklet(t1);
      		ts1.setJobRepository(jobRepository);
      		ts1.setTransactionManager(transactionManager);
      
      		Tasklet t2 = new TJoin("join2", 30);
      		TaskletStep ts2 = new TaskletStep("join2");
      		ts2.setTasklet(t2);
      		ts2.setJobRepository(jobRepository);
      		ts2.setTransactionManager(transactionManager);
      
      		Tasklet t3 = new TJoin("join3", 30);
      		TaskletStep ts3 = new TaskletStep("join3");
      		ts3.setTasklet(t3);
      		ts3.setJobRepository(jobRepository);
      		ts3.setTransactionManager(transactionManager);
      
      		Tasklet t4 = new TJoin("join4", 30);
      		TaskletStep ts4 = new TaskletStep("join4");
      		ts4.setTasklet(t4);
      		ts4.setJobRepository(jobRepository);
      		ts4.setTransactionManager(transactionManager);
      
      		FlowJob flowJob = new FlowJob("flow_job");
      
      		SimpleFlow simpleFlow1 = new SimpleFlow("simple_job_1");
      		List<StateTransition> transitions1 = new ArrayList<StateTransition>();
      
      		transitions1.add(StateTransition.createStateTransition(new StepState(ts1), "join2"));
      		transitions1.add(StateTransition.createStateTransition(new StepState(ts2), "end0"));
      		transitions1.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.COMPLETED, "end0")));
      
      		simpleFlow1.setStateTransitions(transitions1);
      
      		SimpleFlow simpleFlow2 = new SimpleFlow("simple_job_2");
      		List<StateTransition> transitions2 = new ArrayList<StateTransition>();
      
      		transitions2.add(StateTransition.createStateTransition(new StepState(ts3), "join4"));
      		transitions2.add(StateTransition.createStateTransition(new StepState(ts4), "end1"));
      		transitions2.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.COMPLETED, "end1")));
      		simpleFlow2.setStateTransitions(transitions2);
      
      		List<Flow> flows = new ArrayList<Flow>();
      		flows.add(simpleFlow1);
      		flows.add(simpleFlow2);
      
      		SplitState splitState = new SplitState(flows, "split_00");
      		splitState.setTaskExecutor(new SimpleAsyncTaskExecutor());
      
      		SimpleFlow ssplitFlow3 = new SimpleFlow("split_flow_3");
      		List<StateTransition> transitions3 = new ArrayList<StateTransition>();
      		transitions3.add(StateTransition.createStateTransition(splitState, "end_split_0"));
      		transitions3.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.COMPLETED, "end_split_0")));
      		ssplitFlow3.setStateTransitions(transitions3);
      
      		flowJob.setFlow(ssplitFlow3);
      		flowJob.setJobRepository(jobRepository);
      		flowJob.afterPropertiesSet();
      
      		try {
      			jobLauncher.run(flowJob, jobPerameter);
      		} catch (Exception e) {
      			System.out.println(" Job Launching Error : " + e);
      		}
      
      	}
      
      }

      Comment


      • #4
        Hi All,

        This linkhttps://fisheye.springsource.org/bro...s.java?hb=true helped me a lot.


        Thanks
        Parag Phatowali

        Comment


        • #5
          Request to post the context.xml file

          Hi, Even i am trying to achieve the same thing with some modifications. Can you please post the context.xml file. i need to see what classes are you referring for the respositories, transaction manager and everything. i need to do some modifications in my scenario.

          Thank you in advance!!!

          Comment

          Working...
          X