Announcement Announcement Module
Collapse
No announcement yet.
How to run items in a Chunk concurrently Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • How to run items in a Chunk concurrently

    Do you have any code snippets which demonstrate how to set up a chunk operation and then have the each item in the chunk run concurrently. The chunk is to become the transactional unit.

    The closest I've got is:
    Code:
    <bean id="jobConfiguration_1" class="org.springframework.batch.core.configuration.JobConfiguration">
      <property name="steps">    
        <list>
          <bean id="jc_1_step_1" class="org.springframework.batch.execution.step.ChunkOperationsStepConfiguration">            
            <constructor-arg>
              <bean id="jc_1_module1" class="org.springframework.batch.execution.tasklet.RestartableItemProviderTasklet" scope="step">
                <aop:scoped-proxy />
                <property name="itemProcessor">
                  <bean class="com.zzz.springdemo.batch.processor.MyProcessor">
                  </bean>                
                </property>
                <property name="itemProvider">
                  <bean class="com.zzz.springdemo.batch.provider.MyProvider">
                  </bean>
                </property>                                
              </bean>
            </constructor-arg>
             
            <property name="chunkOperations">
              <bean id="aa" class="org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate">
                <property name="taskExecutor" ref="threadPoolTaskExecutor"/>    
              </bean>            
            </property>
                          
          </bean>
        </list>      
      </property>
    </bean>
    
    
    
    <bean id="myStepExecutor" class="org.springframework.batch.execution.step.simple.SimpleStepExecutor" scope="prototype">
      <property name="transactionManager" ref="txManager" />
      <property name="repository" ref="simpleJobRepository" />
    </bean>
    
    <bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
      <property name="maxPoolSize" value="10"/>
    </bean
    Am I on the correct track ? I presume I need to write some code which describes what the 'Chunk' unit of work is ???

  • #2
    There is nothing technically wrong with your code sample - and I'm really pleased and impressed that you are using some of those features (e.g. ChunkOperationsStepConfiguration). We must bne doing something right. I'm also confused about what you are trying to achieve, and also concerned that you might be concentrating too hard on the concurrent part.

    Observation: if items are going to be provided and processed concurrently, then in practice the transactional unit of work is going to be the item (put it another way - the chunk size has to be one). This, I should add, is not a feature of Spring Batch, but a feature of transactions generally - they are normally single threaded.

    Would an asynchronous stepOperations make more sense - then each chunk is single threaded, and can be transactional? This is more common, but you configure it in the StepExecutor not the StepConfiguration.

    Also, are you sure that concurrent processing is going to help in your particular scenario? Can you measure the effect of synchronous processing? Are the items really expensive to process, in a way that will benefit from parallel processing? If the processing is I/O bound would it be better to split that up into stages, so that each one can consume as few external cycles as possible?

    Comment


    • #3
      In psudo code, I was trying to achieve the following:

      Code:
      for each Step
      {
      
        for each chunk
        {
          TX Start
          
            for each item in Chunk
            {
              // asynchronous chunk operation:
              taskExecuor.run(             
                Object o = readFromProvider()
                processor.process( o );      
              }
            }    
            Wait for all task executors to complete
          
          Update Repository with RESTARTDATA etc
          TX END
        } 
      }
      But, your observation is completely correct:' transactions generally - they are normally single threaded' I've now tried setting up an asynchronous step operation via the StepExecutor as suggested. There were a number of things I had to do before it started working:

      1. Remove scope="step" from SimpleStepConfiguration

      2. Change from hibernateJobDao to SqlJobDao. This was due to hibernate bombing out with StaleObjectStateException when it was trying to update a batch_step which has already been updated by another thread. It would be nice to have this working with hibernate ??

      After I got it working I noticed that it was not saving any restart data.

      For reference:

      Code:
        <bean id="jobExecutor" class="org.springframework.batch.execution.job.DefaultJobExecutor">
          <property name="jobRepository" ref="simpleJobRepository" />
          <property name="stepExecutorFactory">
            <bean class="org.springframework.batch.execution.step.PrototypeBeanStepExecutorFactory">
              <property name="stepExecutorName" value="myStepExecutor" />
            </bean>
          </property>
        </bean>
        
        <bean id="jobExecutorFacade" class="org.springframework.batch.execution.facade.SimpleJobExecutorFacade">
          <property name="jobRepository" ref="simpleJobRepository" />
          <property name="jobConfigurationLocator" ref="jobConfigurationRegistry"/>
          <property name="jobExecutor" ref="jobExecutor" />
        </bean>  
        
         
        <bean id="myStepExecutor" class="org.springframework.batch.execution.step.simple.SimpleStepExecutor" scope="prototype">
          <property name="transactionManager" ref="txManager" />
          <property name="repository" ref="simpleJobRepository" />  
          <property name="stepOperations">
            <bean class="org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate">
              <property name="taskExecutor" ref="threadPoolTaskExecutor"/>        
            </bean>
          </property>    
        </bean>  
        
        
        <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
        </bean>
        
        <bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
          <property name="corePoolSize" value="5" />
          <property name="maxPoolSize" value="10" />
          <property name="queueCapacity" value="0" />
        </bean>
      
        <!--  Job Launcher 1 -->
        <bean id="jobLauncher_1" class="org.springframework.batch.execution.bootstrap.TaskExecutorJobLauncher">
          <property name="jobExecutorFacade" ref="jobExecutorFacade" />
          <property name="taskExecutor" ref="taskExecutor"/>
          <property name="autoStart" value="false" />
          <property name="jobConfigurationName" value="jobConfiguration_1"/>
        </bean>
        
        
        <bean id="jobConfiguration_1" class="org.springframework.batch.core.configuration.JobConfiguration">
          <property name="steps">    
            <list>
                <bean id="jc_1_step_1" class="org.springframework.batch.execution.step.SimpleStepConfiguration">            
                  <constructor-arg>
                    <bean id="jc_1_module1" class="org.springframework.batch.execution.tasklet.RestartableItemProviderTasklet">
                      <property name="itemProcessor">
                          <bean class="com.zzz.springdemo.batch.processor.MyProcessor">
                          </bean>                
                      </property>
                      <property name="itemProvider">
                        <bean class="com.zzz.springdemo.batch.provider.MyProvider">
                        </bean>
                      </property>                                
                    </bean>
                  </constructor-arg>         
                  <property name="commitInterval" value="10"/>
                  <property name="allowStartIfComplete" value="true" />
                  <property name="saveRestartData" value="true" />
                  <property name="exceptionHandler">
                    <bean class="org.springframework.batch.repeat.exception.handler.SimpleLimitExceptionHandler">
                      <property name="limit" value="5" />
                      <property name="useParent" value="true"/>
                    </bean>
                   </property>             
                </bean>
            </list>      
          </property>
        </bean>

      Comment


      • #4
        So you switched to asynchronous chunks (not items as per the pseudo code above)?

        Why did you have to change the scope of the step configuration? (I'm not sure why it was step scoped to start with actually.)

        You can raise an issue in JIRA about the stale object exception if you would like to track it conveniently. Actually it is telling you something important - i.e. a signal that we are trying to do something cruel and unusual. Hibernate has detected a clash where Thread A is trying to overwrite changes made by Thread B - it is unlikely to be safe to simply ignore this. The data are not business data, but meta data about the batch, so maybe we can try and fix it in the framework, at least in the sunny day case.

        Failures and restart data might be an issue in this scenario though, so I think we should use your example to thrash out the point. Does it make sense at all to have a restartable process when the chunks are asynchronous? Probably not, I think. We might have to wait for 1.1 when we do asynchronous distributed chunk processing because the batch meta data does not really support restarts from this scenario right now.

        Comment


        • #5
          I think this is what my above configuration is doing:

          Code:
          for each step
          {
            while provider not exausted
            {
              count = 0;
              TX start
              while count < commit interval
              {
                taskExecutor.run(
                  Object o = readFromProvider()
                  processor.process( o )
                )
                count ++    
              }
              Update repository
              TX end
            }
          }

          Comment


          • #6
            I've created a jira issue about hibernate stale object exception:

            http://opensource.atlassian.com/proj...owse/BATCH-170

            Comment


            • #7
              Originally posted by markymiddleton View Post
              I think this is what my above configuration is doing:
              Just for the record, I think this is what it is doing (modulo another technical but overlookable error with the inner loop test)...

              Code:
              for each step
              {
                while provider not exausted
                {
                  taskExecutor.run(
                    count = 0;
                    TX start
                    while count < commit interval
                    {
                      Object o = readFromProvider()
                      processor.process( o )
                      count ++    
                    }
                    Update repository
                    TX end
                  )
                }
              }
              Note that stepOperations is what repeats a chunk, so the whole chunk is processed asynchronously.

              Comment

              Working...
              X