Announcement Announcement Module
Collapse
No announcement yet.
Issues with while using AsyncItemProcessor and ThreadPoolExecutor Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Issues with while using AsyncItemProcessor and ThreadPoolExecutor

    Hi All,

    We were evaluating some of the features Spring Batch in order to address one of our business problem. Our batch reads a FlatFile which have over 0.5 million accounts via FlatFileItemReader and we make a account validation call (Webservice) against one of our Middleware Systems via an Implementation of Itemreader and we persist those account details in one of our databases. Due to a larger volume we have intially planned to implement it via a multi-threaded implementation by delegating our ItemProcessor/Writer logic via AsyncItemProcessor/ItemWriter where we have injected one of the implementation of ConcurrentThreadPoolExecutor/ThreadPoolExecutor as a dependecy.

    Oflate, we have been facing issues while execution (where the batch automatically fails after a point). Our implementation look much like the below.


    Code:
                   <batch:step id="stepA">
    			<batch:tasklet task-executor="taskExecutor"
    				throttle-limit="20">
    				<batch:chunk reader="itemReader" processor="asyncItemProcessor"
    					writer="asyncItemWriter" commit-interval="50"/>					
    			</batch:tasklet>
    		</batch:step>

    Code:
    <bean id="asyncItemProcessor" scope="step"
    		class="org.springframework.batch.integration.async.AsyncItemProcessor">
    		<property name="delegate">
    			<bean class="my.MyItemProcessor"
    			-----------------------------
    			-----------------------------
    			</bean>
    		</property>
    		<property name="taskExecutor" ref="taskExecutor1" />
    	</bean>
    
            <bean id="asyncItemWriter" scope="step"
    		class="org.springframework.batch.integration.async.AsyncItemWriter">
    		<property name="delegate">
    			<bean class="my.MyItemWriter">
    				----------------------------------			
    				----------------------------------
    				----------------------------------
    			</bean>
    		</property>
    	</bean>
    
    
            <bean id="taskExecutor"
    		class="org.springframework.scheduling.concurrent.ConcurrentTaskExecutor" scope="step">
    		<property name="concurrentExecutor">
    			<bean
    				class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    				<property name="corePoolSize" value="10" />
    				<property name="maxPoolSize" value="90" />
    				<property name="queueCapacity" value="200" />
    			</bean>
    		</property>
    	</bean>
    
            <bean id="taskExecutor1" parent="taskExecutor"/>
    While execution of the "stepA" after certain point the ThreadPoolTaskExecutor exists with the below exception.

    Code:
    2011-06-06 10:04:57,387 ERROR []  - Encountered an error executing the step
    org.springframework.core.task.TaskRejectedException: Executor [[email protected]f2db2d] did not accept task: java.util.concurrent.FutureTask@12b9f79
    	at org.springframework.core.task.support.TaskExecutorAdapter.execute(TaskExecutorAdapter.java:67)
    	at org.springframework.scheduling.concurrent.ConcurrentTaskExecutor.execute(ConcurrentTaskExecutor.java:95)
    	at org.springframework.batch.integration.async.AsyncItemProcessor.process(OdnsfAsyncItemProcessor.java:83)
            ---------------------------------------------------------------------- 
            ----------------------------------------------------------------------
            ----------------------------------------------------------------------
            ----------------------------------------------------------------------
    	at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:125)
            ---------------------------------------------------------------------- 
            ----------------------------------------------------------------------
            ----------------------------------------------------------------------
            ----------------------------------------------------------------------
    	at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:265)
    	at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76)
    	at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:258)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    	at java.lang.Thread.run(Unknown Source)


    NOTE: When we inject a depency in form of a SimpleAsyncTaskExecutor both at Chunk Tasklet and to the AsyncItemProcessor the implementation works.

    As per the error it looks more likely that ThreadPoolExecutor was not able to allocate appropriate resources with marginal configurations.

    Can i request someone to advice on tuning it, possibly educate on best pratices which we can follow for requirements as these.

    Thanks in advance

    Rahul

  • #2
    I think you might be able to get some advice about how to set up your task executor from the main Spring forum (it's not a Batch problem per se). If I were you I'd use corePoolSize=maxPoolSize, queueCapacity=0 and rejection policy CALLER_RUNS (the last point is the most important for ensuring that tasks are never rejected causing the job to fail). Spring 3.0 has a namespace convenience for all of this.

    Comment


    • #3
      Thanks a lot Dave..That helped...
      Apologies for posting it different forum...
      Before, closing on this just had few questions.

      1). Looking at the suggested configurations things are quite similar to what SimpleAsyncItemProcessor does. Is there a way forward to optimize ThreadPoolTaskExecutor configurations further in the context of spring batch specially when we are working on a high volume of data.

      2). In most of the Samples it is seen that SimpleAsyncItemProcessor is preferred over other AsynTask implementation, can this be generalized as a best practice.

      If the above set of questions are inappropriate within the scope of this forum, i don't mind raising it in main spring forum.

      Thanks in advance

      Rahul

      Comment


      • #4
        Spring Batch isn't really very special here. You should read the Javadocs for ThreadPoolTaskExecutor and also study the java.util.concurrent Executor that it is based on. The default settings are optimized to prevent excessive context switching and thread creation, at the expense of sometimes using more memory, and (sometimes) not executing tasks as fast as possible. This is mainly applicable for scenarios where the work done by each task is trivial. Batch applications tend to do a lot of stuff in each task (i.e. long-lived Runnables), so the default settings are not appropriate. To optimize fully you would need to measure total throughput in a realistic Step, but you can, as a rule of thumb, use the suggestions I made above and ensure that you don't use a pool with oders of magnitude more threads than you have physical processors.

        SimpleAsyncTaskExecutor is only used in the samples because it is simple to set up (I assume that's what you mean?). ThreadPoolTaskExecutor should be preferred for most real-life applications (maybe we should switch the samples to emphasise this).

        Comment

        Working...
        X