Announcement Announcement Module
Collapse
No announcement yet.
Multithreaded Itemwriter with JPA Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Multithreaded Itemwriter with JPA

    Hello,

    I am using an extended JpaItemWriter to write in a step with a tasklet that is multithreaded(using ThreadPoolExecuter). When the throttle-limit is reached, I get a database exception!!!. It happens around 90% of the time, though sometimes it is succesful. Any input is appreciated..I had made sure that there are around double the number of database connections available in the pool as the throttle-limit.
    My Job uses a concept and code very silimar to parallelJobs sample by using a flag in the database for restartability - the only difference is that instead of updating the processed flag in the database from the ItemProcessor using jdbcTemplate as in the sample, I am doing this directly in ItemWriter using JPA. Has anyone got any idea of this problem?

    Code:
    Exception [EclipseLink-2004] (Eclipse Persistence Services - 2.0.1.v20100213-r6600): org.eclipse.persistence.exceptions.ConcurrencyException
    Exception Description: A signal was attempted before wait() on ConcurrencyManager. This normally means that an attempt was made to 
    commit or rollback a transaction before it was started, or to rollback a transaction twice.
    	at org.eclipse.persistence.exceptions.ConcurrencyException.signalAttemptedBeforeWait(ConcurrencyException.java:84)
    	at org.eclipse.persistence.internal.helper.ConcurrencyManager.releaseReadLock(ConcurrencyManager.java:467)
    	at org.eclipse.persistence.internal.identitymaps.CacheKey.releaseReadLock(CacheKey.java:433)
    	at org.eclipse.persistence.internal.sessions.UnitOfWorkImpl.cloneAndRegisterObject(UnitOfWorkImpl.java:1005)
    	at org.eclipse.persistence.internal.sessions.UnitOfWorkImpl.cloneAndRegisterObject(UnitOfWorkImpl.java:896)
    	at org.eclipse.persistence.internal.sessions.UnitOfWorkIdentityMapAccessor.getAndCloneCacheKeyFromParent(UnitOfWorkIdentityMapAccessor.java:171)
    	at org.eclipse.persistence.internal.sessions.UnitOfWorkIdentityMapAccessor.getFromIdentityMap(UnitOfWorkIdentityMapAccessor.java:110)
    	at org.eclipse.persistence.internal.sessions.IdentityMapAccessor.getFromIdentityMap(IdentityMapAccessor.java:331)
    	at org.eclipse.persistence.internal.sessions.UnitOfWorkImpl.registerExistingObject(UnitOfWorkImpl.java:3883)
    	at org.eclipse.persistence.internal.sessions.UnitOfWorkImpl.registerExistingObject(UnitOfWorkImpl.java:3843)
    	at org.eclipse.persistence.queries.ObjectBuildingQuery.registerIndividualResult(ObjectBuildingQuery.java:362)
    	at org.eclipse.persistence.internal.descriptors.ObjectBuilder.buildWorkingCopyCloneNormally(ObjectBuilder.java:590)
    	at org.eclipse.persistence.internal.descriptors.ObjectBuilder.buildObjectInUnitOfWork(ObjectBuilder.java:551)
    	at org.eclipse.persistence.internal.descriptors.ObjectBuilder.buildObject(ObjectBuilder.java:491)
    	at org.eclipse.persistence.internal.descriptors.ObjectBuilder.buildObject(ObjectBuilder.java:443)
    	at org.eclipse.persistence.queries.ObjectLevelReadQuery.buildObject(ObjectLevelReadQuery.java:635)
    	at org.eclipse.persistence.queries.ReadAllQuery.registerResultInUnitOfWork(ReadAllQuery.java:838)
    	at org.eclipse.persistence.queries.ReadAllQuery.executeObjectLevelReadQuery(ReadAllQuery.java:464)
    	at org.eclipse.persistence.queries.ObjectLevelReadQuery.executeDatabaseQuery(ObjectLevelReadQuery.java:997)
    	at org.eclipse.persistence.queries.DatabaseQuery.execute(DatabaseQuery.java:675)
    	at org.eclipse.persistence.queries.ObjectLevelReadQuery.execute(ObjectLevelReadQuery.java:958)
    	at org.eclipse.persistence.queries.ReadAllQuery.execute(ReadAllQuery.java:432)
    	at org.eclipse.persistence.queries.ObjectLevelReadQuery.executeInUnitOfWork(ObjectLevelReadQuery.java:1021)
    	at org.eclipse.persistence.internal.sessions.UnitOfWorkImpl.internalExecuteQuery(UnitOfWorkImpl.java:2857)
    	at org.eclipse.persistence.internal.sessions.AbstractSession.executeQuery(AbstractSession.java:1225)
    	at org.eclipse.persistence.internal.sessions.AbstractSession.executeQuery(AbstractSession.java:1207)
    	at org.eclipse.persistence.internal.sessions.AbstractSession.executeQuery(AbstractSession.java:1181)
    	at org.eclipse.persistence.internal.jpa.EJBQueryImpl.executeReadQuery(EJBQueryImpl.java:453)
    	at org.eclipse.persistence.internal.jpa.EJBQueryImpl.getSingleResult(EJBQueryImpl.java:714)
    My configuration:

    Code:
     <job id="dailyJob" restartable="true" xmlns="http://www.springframework.org/schema/batch">
    
            <step id="collectAllIds" next="collectData">
                <description>
                    Collects all ids that are updated/inserted after the given beginDate from
                    database.
                    Only records that are in status UPDATED are collected.
                    Records along with the job instance id of this job execution is inserted to the table
                    Job_balance_note with status PENDING.
                </description>
                <tasklet transaction-manager="jpaTransactionManager" allow-start-if-complete="true">
                    <chunk reader="collectAllIdsReader"
                           processor="collectAllIdsProcessor"
                           writer="collectAllIdsWriter"
                           commit-interval="${collectAllIds.commit-interval}"/>
                </tasklet>
            </step>
            <step id="collectData" next="generateNotes">
                <tasklet transaction-manager="jpaTransactionManager" task-executor="preparationTaskExecutor"
                         throttle-limit="20">
                    <chunk reader="collectDataReader" processor="collectDataProcessor"
                           writer="collectDataWriter" commit-interval="1"/>
                </tasklet>
            </step>
            <step id="generateNotes">
                <tasklet transaction-manager="jpaTransactionManager" task-executor="generationTaskExecutor"
                         throttle-limit="20">
                    <chunk reader="generateNoteReader" processor="generateNoteProcessor"
                           writer="generateNoteWriter" commit-interval="1">
                    </chunk>
                </tasklet>
            </step>
    
            <listeners>
                <listener ref="commonJobExecutionListener"/>
            </listeners>
        </job>
        <bean id="preparationTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="10"/>
            <property name="maxPoolSize" value="10"/>
        </bean>
    
        <bean id="generationTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="10"/>
            <property name="maxPoolSize" value="10"/>
        </bean>
    In the reader/writer, I am injecting emtityManagerFactory for JpaItemWriter/sub classes.
    Last edited by zencv; Sep 21st, 2011, 08:17 AM. Reason: formatted
Working...
X