Announcement Announcement Module
Collapse
No announcement yet.
TaskExecutorPartitionHandler - how to setup the step Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • TaskExecutorPartitionHandler - how to setup the step

    Hi all,

    i would like to build a batch job to update some data in a db table. My idea is to select the "oldest"/"next to be updated" rows or ids from the table and parallel fetch the new data via a webservice for those rows, store the new information per thread/row and proceed.

    I would therefor like to use the TaskExecutorPartitionHandler to "execute the partitioned {@link Step} locally in multiple threads" (as the javadoc is telling me).

    My Bean is:
    Code:
        @Bean
        public PartitionHandler partitionHandler() throws Exception
        {
            TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
            partitionHandler.setGridSize(gridSize);
            partitionHandler.setStep(step);
            partitionHandler.setTaskExecutor(taskExecutor());
    
            return partitionHandler;
        }
    I have setup the Job, Repository, the master step, StepExecutionSplitter and TaskExecutor as:
    Code:
        @Bean
        public Job myJob() throws Exception
        {
            SimpleJob job = new SimpleJob("My Job");
            job.setJobRepository(jobRepository());
            job.addStep(stepMaster());
            
            return job;
        }
    
        @Bean
        public JobRepository jobRepository() throws Exception
        {
            MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean();
            jobRepositoryFactory.setTransactionManager(jpaTransactionManager);
            jobRepositoryFactory.setIsolationLevelForCreate("ISOLATION_DEFAULT");
    
            return jobRepositoryFactory.getJobRepository();
        }
    
        @Bean
        public Step stepMaster() throws Exception
        {
            PartitionStep stepMaster = new PartitionStep();
            stepMaster.setPartitionHandler(partitionHandler());
            stepMaster.setStepExecutionSplitter(stepExecutionSplitter());
            stepMaster.setJobRepository(jobRepository());
    
            return stepMaster;
        }
    
        @Bean
        public StepExecutionSplitter stepExecutionSplitter() throws Exception
        {
            SimpleStepExecutionSplitter stepExecutionSplitter = new SimpleStepExecutionSplitter();
            stepExecutionSplitter.setJobRepository(jobRepository());
            stepExecutionSplitter.setStepName("My step");
            stepExecutionSplitter.setPartitioner(partitioner); // [1]
    
            return stepExecutionSplitter;
        }
    
        @Bean
        public TaskExecutor taskExecutor()
        {
            SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
            return taskExecutor;
    
        }
    In the partitioner [1] i use a repository to get some data (ids) and store them in a map of ExecutionContexts. All missing variable declarations are @Autowired - so far no problem.

    My question is how do i setup the next step (ItemProcessor, ItemWriter) to fire the web requests based on the ExecutionContext i created and store the received data back?
    I also see the the job is only starting the first (gridSize) threads and then stops. Whats the issue there? Im starting it with a @Scheduled method in conjunction with a class implementing the JobLauncher.

    Thanks for your help.

    Best regards, Ullrich

  • #2
    Hi,

    in the meanwhile i was able to figure out what i was missing. Thanks to the Book "Spring Batch in Action" p. 394 ff.

    > My question is how do i setup the next step (ItemProcessor, ItemWriter) to fire the web requests based on the ExecutionContext i created and store the received data back?

    Till now i thought the StepExecutionSplitter is an ItemReader somehow - providing the already read chunks/items whatever. Instead the StepExecutionSplitter prepares the context for each ItemReader - so we need one!
    Code:
        @Bean
        public Step myRealStep() throws Exception
        {
            SimpleStepFactoryBean<ProductInformation, Integer> stepFactory = new SimpleStepFactoryBean<>();
            stepFactory.setTransactionManager(transactionManager);
            stepFactory.setJobRepository(jobRepository());
            stepFactory.setItemReader(itemReader);
            stepFactory.setItemProcessor(itemProcessor);
            stepFactory.setItemWriter(itemWriter);
    
            return (Step) stepFactory.getObject();
        }
    The PartitionHandler then looks like:
    Code:
        @Bean
        public PartitionHandler partitionHandler() throws Exception
        {
            TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
            partitionHandler.setGridSize(gridSize);
            partitionHandler.setStep(myRealStep());
            partitionHandler.setTaskExecutor(taskExecutor());
    
            return partitionHandler;
        }
    Then was able to declare the ItemReader (Processor/Writer) as:
    Code:
    @Component
    public class MyItemReader implements ItemReader<MyEntity>, ItemStream
    { 
        private int from;
        private int to;
    
        @Override
        public MyEntity read() throws Exception, UnexpectedInputException, ParseException,
                NonTransientResourceException
        {
            // now fetch the desired entity with jpa, hibernat or something
            // dont forget: this method is called until it returns "null", so save the internal state
            // and return one entity after another till you reach the boundary
        }
    
        @Override
        public void open(ExecutionContext executionContext) throws ItemStreamException
        {
             from = executionContext.getInt("minId");
             to = executionContext.getInt("maxId");
        }
    
    }
    > I also see the the job is only starting the first (gridSize) threads and then stops. Whats the issue there?

    I simply was creating to few ExecutionContexts with too few data. So nothing special here.

    Hope this helps someone.

    Regards, Ullrich

    Comment

    Working...
    X