Announcement Announcement Module
Collapse
No announcement yet.
Multi-Threaded Job:Threads not exiting Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Multi-Threaded Job:Threads not exiting

    Hello,

    I have been trying to get a Spring Batch step multi-threaded for performance gains. It's a Chunk Step with a jdbcCursorReader, a processor and a writer that writes to the database using JPA.

    I basically want to drive all threads of the job from the unique values returned from the JdbcCursorReader result set. I added a synchronized ItemReader with the jdbcCursorReader as the delegate.

    Code:
    <bean id="accountReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
        <property name="dataSource" ref="someDataSource"/>
        <property name="sql" 
           value="select distinct(account_number) from some_table where cycle_gid = #{jobParameters['cycleId']}" />        
        <property name="rowMapper">
    	   <bean class="org.springframework.jdbc.core.SingleColumnRowMapper">
    	       	<constructor-arg value="java.lang.Long" />
    	   </bean>
         </property>
    </bean>
    
    <bean id="synchedAccountReader" class="com.reader.SynchronizingItemReader">
         <property name="delegate" ref="accountReader"/>
    </bean>
    
    
    <bean id="calculateProcessor" class="com.processor.AppliedTransactionProcessor" scope="step">
    	<property name="cycleId" value="#{jobParameters['cycleId']}" />
    	<property name="cycleBeginDate" value="#{jobParameters['cycleBeginDate']}" />
    </bean>
    
    <bean id="summaryWriter" class="com.writer.SummaryWriter" />
    
    <job id="dailyTransactionJob" xmlns="http://www.springframework.org/schema/batch"  restartable="false">
    	<step id="getAppliedTransactions">
    		<tasklet transaction-manager="emTxManager" task-executor="taskExecutor">
    			<chunk reader="synchedAccountReader" processor="calculateProcessor"
    					writer="summaryWriter" commit-interval="1000" />
    		</tasklet>
    	</step>
    </job>
    
          
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    	<property name="corePoolSize" value="3" />
    	<property name="maxPoolSize" value="3" />		
    </bean>
    
    <bean class="org.springframework.batch.core.scope.StepScope" />
    Here is the SynchronizingItemReader.groovy courtesy of Spring Batch In Action. I also added a final List of processed accounts, so in debug mode i'm able to monitor and check the total pool of accounts processed across threads. On a full run of 30,000 accounts, no collisions occurred but the job never exits.

    Code:
    public class SynchronizingItemReader implements ItemReader<Long>, ItemStream {
    	private ItemReader<Long> delegate;
            private final List<Long> processedAccounts = new ArrayList<Long>();
    	public synchronized Long read() throws Exception {		
    		Long account = delegate.read();	
                   if (account == null) {
    			System.out.println("$threadName: - DONE");
    		} else {
    			if (!processedAccounts.contains(account)) {
    				processedAccounts.add(account);	
    			} else {
    				throw new Exception("BAD");
    			}
    		}	
    		return account;
    	}
    
    	public ItemReader<Long> getDelegate() {
    		return delegate;
    	}
    
    	public void setDelegate(ItemReader<Long> delegate) {
    		this.delegate = delegate;
    	}
    
    	// Stream
    	
    	public void close() throws ItemStreamException {
    		if (this.delegate instanceof ItemStream) {
    			((ItemStream)this.delegate).close();
    		}
    	}
    
    	public void open(ExecutionContext context) throws ItemStreamException {
    		if (this.delegate instanceof ItemStream) {
    			((ItemStream)this.delegate).open(context);
    		}
    	}
    
    	public void update(ExecutionContext context) throws ItemStreamException {
    		if (this.delegate instanceof ItemStream) {
    			((ItemStream)this.delegate).update(context);
    		}
    	}
    
    }

    At first I thought it was an entity manager lock on the writer side, so I commented out all writes in the writer and just output logs. It still freezes. (InvoiceSummaryWriter.groovy)
    Code:
    @Slf4j
    @Scope("step")
    public class InvoiceSummaryWriter implements ItemWriter<InvoiceSummary> {
    
    	@Resource
    	InvoiceSummaryDaoInterface invoiceSummaryDao;
    
    	
    	@Override
    	public void write(List<? extends InvoiceSummary> items) throws Exception {
    		def size = items.size();
    		String threadName = Thread.currentThread().getName();				
    		items.each {
    			/*InvoiceSummary invSummary = invoiceSummaryDao.getInvoiceSummaryByCustomerNumber(it.customerNumber,it.billingCycleId);
    			if (!invSummary) {
    				invSummary = it;
    			} else {
    				//this.mergeItem(invSummary, it);
    			}
    			invoiceSummaryDao.saveOrUpdate(invSummary);
    			System.out.println("Thread $threadName:Saving Invoice Summary $invSummary");*/
    		}
    		//invoiceSummaryDao.em.flush();	
    		System.out.println("Thread $threadName:Supposed to flush $size items");
    
    	}
    
    	protected void mergeItem(InvoiceSummary dbSummary, InvoiceSummary itemToMerge) {
    		["previousBalance","creditDebit","payments","over30Days","over60Days","over90Days","over120Days","currentBalance"].each{		
    			if (itemToMerge."$it" != null) {	
    				dbSummary."$it" = (itemToMerge."$it");
    			}
    		}
    	}
    }

    My last attempt to understand what was going on was
    • Reduce the thread count to 2
    • Change commit level to 1
    • Make the driving query only return 1 account


    The output was as follows with it waiting at the end
    taskExecutor-2: - DONE
    taskExecutor-2: - DONE
    taskExecutor-2: - DONE
    taskExecutor-2: - DONE
    Thread taskExecutor-1:Supposed to flush 1 items
    taskExecutor-2: - DONE


    When I remove the task-executor and run it as a single thread
    Thread main:Supposed to flush 1 items
    main: - DONE

    Any suggestions or tips on troubleshooting this issue would be greatly appreciated.
    Last edited by emckissick; Dec 3rd, 2012, 05:43 PM.

  • #2
    Update

    I believe it may be the way i'm instantiating Spring Batch that is causing my problem. When I create a test class using SpringJUnit4ClassRunner my jvm exists

    Code:
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(classes=AppConfig.class)
    @Slf4j
    public class MultithreadedStepTest {
    
    	@Autowired
    	private JobLauncher launcher;
    	
    	@Autowired
    	private JobRegistry jobRegistry;
    	@Autowired
    	@Qualifier("dailyTransactionJob")
    	private Job testMultiThreadedJob;
    
    	@Resource
    	BillingCycleDao cycleDao;
    	
    	@Test
    	public void testMultithreadedStep() throws Exception {
    		long count = 55;
    		BillingCycle cycle = cycleDao.getCurrentBillingCycle();
    		JobExecution multiThreadedJobExec = launcher.run(
    				 jobRegistry.getJob("dailyTransactionJob"),
    			new JobParametersBuilder()
    				.addLong("count",count)
    				.addLong("cycleId",cycle.getBillingCycleId())
    				.addDate("cycleBeginDate", cycle.getBeginDate())
    				.toJobParameters()
    		);
    	println multiThreadedJobExec.isStopping();
    	println multiThreadedJobExec.getExitStatus();
    	}
    }
    Pertinent logging
    taskExecutor-1: - DONE
    5800 [main] INFO org.springframework.batch.core.launch.support.Simp leJobLauncher - Job: [FlowJob: [name=dailyTransactionJob]] completed with the following parameters: [{count=55, cycleId=161, cycleBeginDate=1351742400000}] and the following status: [COMPLETED]
    false
    exitCode=COMPLETED;exitDescription=
    5819 [Thread-1] INFO org.springframework.context.support.GenericApplica tionContext - Closing org.springframework.context.support.GenericApplica tionContext@12d56b37: startup date [Tue Dec 04 12:25:22 EST 2012]; root of context hierarchy
    5820 [Thread-1] INFO org.springframework.beans.factory.support.DefaultL istableBeanFactory - Destroying singletons in org.springframework.beans.factory.support.DefaultL istableBeanFactory@1fc4f0f8: defining beans [org.springframework.context.annotation.internalCon figurationAnnotationProcessor,org.springframework. context.annotation.internalAutowiredAnnotationProc essor,org.springframework.context.annotation.inter nalRequiredAnnotationProcessor,org.springframework .context.annotation.internalCommonAnnotationProces sor,org.springframework.context.annotation.interna lPersistenceAnnotationProcessor,appConfig,org.spri ngframework.context.annotation.ConfigurationClassP ostProcessor$ImportAwareBeanPostProcessor#0,com.ca rfax.soa.config.DaoAppConfig#0,billingCycleDao,soa TransactionDao,invoiceSummaryDao,soaDailyAgingDao, appProperties,soaBatchDriver,calculateBalanceProce ssor,propertyPlaceholderConfigurer,billingAdminDat aSource,billingEmFactory,org.springframework.orm.j pa.support.PersistenceAnnotationBeanPostProcessor# 0,org.springframework.aop.config.internalAutoProxy Creator,org.springframework.transaction.annotation .AnnotationTransactionAttributeSource#0,org.spring framework.transaction.interceptor.TransactionInter ceptor#0,org.springframework.transaction.config.in ternalTransactionAdvisor,jpaTransactionManager,job Repository,jobLauncher,jobRegistry,org.springframe work.batch.core.configuration.support.JobRegistryB eanPostProcessor#0,transactionManager,billingAdmin TxManager,org.springframework.batch.core.scope.int ernalStepScope,org.springframework.beans.factory.c onfig.CustomEditorConfigurer,org.springframework.b atch.core.configuration.xml.CoreNamespacePostProce ssor,getPreviousBalance,calcPrevBalanceJob,getAppl iedTransactions,dailyTransactionJob,synchedCustMas tReader,custMastReader,calculateAppliedTxProcessor ,taskExecutor,invoiceSummaryWriter,scopedTarget.ca lculateBalanceProcessor,scopedTarget.custMastReade r,scopedTarget.calculateAppliedTxProcessor,scopedT arget.invoiceSummaryWriter]; root of factory hierarchy
    5822 [Thread-1] INFO org.springframework.scheduling.concurrent.ThreadPo olTaskExecutor - Shutting down ExecutorService 'taskExecutor'
    5825 [Thread-1] INFO org.springframework.orm.jpa.LocalContainerEntityMa nagerFactoryBean - Closing JPA EntityManagerFactory for persistence unit 'applicationPU'
    5825 [Thread-1] INFO org.hibernate.impl.SessionFactoryImpl - closing

    However when I try to call the same job from my driver class. It freezes

    Code:
    class BatchDriver {
    
    	@Autowired
    	private JobLauncher jobLauncher;
    
    	@Autowired
    	private JobRegistry jobRegistry;
    
    	@Resource
    	BillingCycleDao cycleDao;
    	
    	static main(args) {
    	
    		String instruction = args.length > 0 ? args[0] : "";
    		ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig);	//initialize Spring Context
    		SoaBatchDriver batchProcess = context.getBean(SoaBatchDriver);
    		batchProcess.run(instruction);
    		
    	}
    
    	public void run(String instruction) {
    		
    		BillingCycle cycle = cycleDao.getCurrentBillingCycle();
    		JobExecution execution;
    		Job dailyJob = jobRegistry.getJob("dailyTransactionJob");
    				execution = jobLauncher.run(dailyJob, new JobParametersBuilder()
    						.addLong("cycleId", cycle.billingCycleId)
    						.addDate("cycleBeginDate", cycle.beginDate)
    						.toJobParameters()
    						);
    							
    		println execution.getExitStatus();
    	}
    The logging for this is
    5262 [main] INFO org.springframework.batch.core.launch.support.Simp leJobLauncher - Job: [FlowJob: [name=dailyTransactionJob]] completed with the following parameters: [{cycleId=161, cycleBeginDate=1351742400000}] and the following status: [COMPLETED]
    false
    exitCode=COMPLETED;exitDescription=
    Both jobs are marked as completed, with isStopping equals false. But my test exits and my driver won't.

    Comment


    • #3
      I ended up manually closing the context and calling the exit() method from the CommandLineJobRunner.class

      Code:
      class BatchDriver {
      
      	@Autowired
      	private JobLauncher jobLauncher;
      
      	@Autowired
      	private JobRegistry jobRegistry;
      
      	@Resource
      	BillingCycleDao cycleDao;
      	
      	private ExitCodeMapper exitCodeMapper = new SimpleJvmExitCodeMapper();
      	static main(args) {
      	
      		String instruction = args.length > 0 ? args[0] : "";
      		ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig);	//initialize Spring Context
      		SoaBatchDriver batchProcess = context.getBean(SoaBatchDriver);
      		int result = batchProcess.run(instruction);
      		context.close();
      		command.exit(result);
      		
      	}
      
      	public int run(String instruction) {
      		
      		BillingCycle cycle = cycleDao.getCurrentBillingCycle();
      		JobExecution execution;
      		Job dailyJob = jobRegistry.getJob("dailyTransactionJob");
      				execution = jobLauncher.run(dailyJob, new JobParametersBuilder()
      						.addLong("cycleId", cycle.billingCycleId)
      						.addDate("cycleBeginDate", cycle.beginDate)
      						.toJobParameters()
      						);
      							
      		return exitCodeMapper.intValue(execution.getExitStatus().getExitCode());
      	}
      The job now completes and exits. I've run a few tests on sample data and so far so good.

      Comment

      Working...
      X