Announcement Announcement Module
Collapse
No announcement yet.
OptimisticLockingFailure/ConcurrentModification Exceptions with Multithreaded Step Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • OptimisticLockingFailure/ConcurrentModification Exceptions with Multithreaded Step

    Hi all. I'm in desparate need of some help. (This is a long one, so prepare yourself...)

    We have a flow that we're trying to implement. All pieces work correctly, but we're looking for areas where we can speed it up. We call a series of web services and would like to make these steps multi-threaded.

    Here's the step def:
    Code:
        <batch:step id="listenersParentStep" abstract="true">
            <!-- these listeners will be used for each child step -->
            <batch:tasklet>
                <batch:listeners>
                    <batch:listener ref="detailLogStepExecutionListener"/>
                    <batch:listener ref="detailLogSkipListener"/>
                </batch:listeners>
            </batch:tasklet>
        </batch:step>
        <batch:step id="validation" parent="listenersParentStep">
                <batch:tasklet task-executor="taskExecutor">
                    <batch:chunk commit-interval="50" skip-limit="${skip.limit}"
                                 reader="threadSafeValidationReader" processor="cdfItemValidateProcessor"
                                 writer="threadSafeValidationWriter">
                        <batch:skippable-exception-classes>
                            <batch:include class="org.oclcwcp.batch.commons.exception.ValidationServiceException"/>
                        </batch:skippable-exception-classes>
                    </batch:chunk>
                </batch:tasklet>
                <batch:next on="*" to="tlInstructions"/>
                <batch:next on="FAILED" to="tlOrderClose"/>
            </batch:step>
    Here are the bean defs:
    Code:
    <bean id="threadSafeValidationReader" class="org.oclcwcp.batch.commons.reader.ThreadSafeItemStreamReader" scope="step">
            <property name="delegate" ref="cdfXmlItemReader"/>
        </bean>
    
        <bean id="threadSafeValidationWriter" class="org.oclcwcp.batch.commons.writer.ThreadSafeItemStreamWriter" scope="step">
            <property name="delegate" ref="cdfItemValidateWriter"/>
        </bean>
    
        <bean id="cdfItemValidateWriter" scope="step" parent="txtWriterParent">
            <property name="resource" value="file:#{jobParameters['toolf.out.working.directory']}#{jobParameters['toolf.out.input.file.name.without.path']}.WCPVALIDATE"/>
            <property name="saveState" value="false"/>
        </bean>
    
        <bean id="cdfUnmarshaller" class="org.oclcwcp.batch.commons.xml.CDFUnmarshaller"/>
        <bean id="cdfItemReaderParent" class="org.springframework.batch.item.xml.StaxEventItemReader" scope="prototype">
            <property name="fragmentRootElementName" value="CDFRec"/>
            <property name="unmarshaller" ref="cdfUnmarshaller"/>
        </bean>
        <bean id="cdfXmlItemReader" parent="cdfItemReaderParent" scope="step">
            <property name="resource" value="file:#{jobParameters['toolf.input.file.name']}"/>
            <property name="saveState" value="false"/>
        </bean>
    
        <bean id="cdfItemValidateProcessor" class="org.oclcwcp.batch.commons.processor.CDFItemProcessor" scope="step">
            <property name="wcpValidationServiceClient" ref="validationServiceCommonClient"/>
            <property name="orderId" value="#{jobParameters['toolf.orderId']}"/>
            <property name="vendorInstitutionSymbol" value="#{jobParameters['toolf.institutionSymbol']}"/>
        </bean>
    
        <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="10"/>
            <property name="maxPoolSize" value="20"/>
        </bean>
    The reader and writer are basically the same:
    Code:
    class ThreadSafeItemStreamReader<T> implements ItemStreamReader<T>, InitializingBean, StepExecutionListener {
        ItemReader<T> delegate
    
        T read() throws Exception {
            synchronized (this){
                return delegate.read()
            }
        }
    
        void open(ExecutionContext executionContext) {
            if(delegate instanceof ItemStream){
                ((ItemStream)delegate).open(executionContext)
            }
        }
    
        void update(ExecutionContext executionContext) {
            if(delegate instanceof ItemStream){
                ((ItemStream)delegate).update(executionContext)
            }
        }
    
        void close() {
            if(delegate instanceof ItemStream){
                ((ItemStream)delegate).close()
            }
        }
    
        void afterPropertiesSet() throws Exception {
            Assert.notNull(delegate, "delegate must be set")
        }
    
        void beforeStep(StepExecution stepExecution) {
            if(delegate instanceof StepExecutionListener){
                ((StepExecutionListener)delegate).beforeStep(stepExecution)
            }
        }
    
        ExitStatus afterStep(StepExecution stepExecution) {
            if(delegate instanceof StepExecutionListener){
                return ((StepExecutionListener)delegate).afterStep(stepExecution)
            }
    
            return null
        }
    }
    This is a snippet of the exception:
    Code:
    rg.springframework.dao.OptimisticLockingFailureException: Attempt to update step execution id=10844 with wrong version (4), where current version is 5
    	at org.springframework.batch.core.repository.dao.JdbcStepExecutionDao.updateStepExecution(JdbcStepExecutionDao.java:185)
    	at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:171)
    	at sun.reflect.GeneratedMethodAccessor122.invoke(Unknown Source)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    	at java.lang.reflect.Method.invoke(Method.java:597)
    	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
    	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
    	at $Proxy34.update(Unknown Source)
    	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:432)
    	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)
    	at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:264)
    	at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76)
    	at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:258)
                 ...
    
    Caused by: java.util.ConcurrentModificationException
                    at java.util.AbstractList$Itr.checkForComodification(AbstractList.java:372)
                    at java.util.AbstractList$Itr.next(AbstractList.java:343)
                    at com.thoughtworks.xstream.converters.collections.CollectionConverter.marshal(CollectionConverter.java:54)
                    ....
                    at com.thoughtworks.xstream.XStream.toXML(XStream.java:805)
                    at org.springframework.batch.core.repository.dao.XStreamExecutionContextStringSerializer.serialize(XStreamExecutionContextStringSerializer.java:43)
                    at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.serializeContext(JdbcExecutionContextDao.java:212)
                    at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.updateExecutionContext(JdbcExecutionContextDao.java:134)
                    at org.springframework.batch.core.repository.support.SimpleJobRepository.updateExecutionContext(SimpleJobRepository.java:184)
                    at sun.reflect.GeneratedMethodAccessor121.invoke(Unknown Source)
                    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
                    at java.lang.reflect.Method.invoke(Method.java:597)
                    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)
                    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
                    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
                    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110)
                    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
                    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
                    at $Proxy34.updateExecutionContext(Unknown Source)
                    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:429)
    We are using the JobRepositoryFactoryBean and not the MapJobRepositoryFactoryBean as others who have had a similar issue were using.

    Ideas? Anyone?

  • #2
    were u able to fix the issue? if not, try to clear your spring batch tables.

    Comment

    Working...
    X