Announcement Announcement Module
Collapse
No announcement yet.
JdbcCursorItemReader & AsyncTaskExecutor Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • JdbcCursorItemReader & AsyncTaskExecutor

    Helo, Dave
    How can I configure job step if I want to process each row from JdbcCursorItemReader in parallel threads?
    It is my job cofiguration:
    HTML Code:
     <job id="myJob" restartable="false">
            <step id="step1">
                <tasklet throttle-limit="20" task-executor="taskExecutor">
                 <!--<tasklet>-->
                    <chunk commit-interval="1"
                           reader="itemReader"
                           processor="itemProcessor">
                           <!--skip-policy="alwaysSkipItemSkipPolicy">-->
                        <writer adapter-method="doExecute">
                            <beans:ref bean="myService"/>
                        </writer>
                    </chunk>
                </tasklet>
            </step>
        </job>
    
    <bean id="itemReader" parent="abstractJdbcItemReader" p:sql="select * from MY_TABLE">
            <property name="rowMapper">
                <bean class="org.springframework.jdbc.core.BeanPropertyRowMapper"
                      p:mappedClass="com.my.proj.entity.SomeEntity"/>
            </property>
        </bean>
    
      <bean id="abstractJdbcItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"
              p:dataSource-ref="extendedConnectionDataSource"
              p:useSharedExtendedConnection="true"
              abstract="true"/>
    
    <bean id="taskExecutor" lass="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
    In this case I've got an Exception:
    ERROR [AbstractStep] Encountered an error executing the step
    org.springframework.jdbc.UncategorizedSQLException : Attempt to process next row failed; uncategorized SQLException for SQL [select * from MY_TABLE]; SQL state [null]; error code [17011]; exhausted resultset; nested exception is java.sql.SQLException: Истраченный набор результатов
    at org.springframework.jdbc.support.AbstractFallbackS QLExceptionTranslator.translate(AbstractFallbackSQ LExceptionTranslator.java:83)
    at org.springframework.jdbc.support.AbstractFallbackS QLExceptionTranslator.translate(AbstractFallbackSQ LExceptionTranslator.java:80)
    at org.springframework.jdbc.support.AbstractFallbackS QLExceptionTranslator.translate(AbstractFallbackSQ LExceptionTranslator.java:80)
    at org.springframework.batch.item.database.AbstractCu rsorItemReader.doRead(AbstractCursorItemReader.jav a:450)
    at org.springframework.batch.item.support.AbstractIte mCountingItemStreamItemReader.read(AbstractItemCou ntingItemStreamItemReader.java:85)
    at org.springframework.batch.core.step.item.SimpleChu nkProvider.doRead(SimpleChunkProvider.java:90)
    at org.springframework.batch.core.step.item.SimpleChu nkProvider.read(SimpleChunkProvider.java:148)
    at org.springframework.batch.core.step.item.SimpleChu nkProvider$1.doInIteration(SimpleChunkProvider.jav a:108)
    at org.springframework.batch.repeat.support.RepeatTem plate.getNextResult(RepeatTemplate.java:367)
    at org.springframework.batch.repeat.support.RepeatTem plate.executeInternal(RepeatTemplate.java:214)
    at org.springframework.batch.repeat.support.RepeatTem plate.iterate(RepeatTemplate.java:143)
    at org.springframework.batch.core.step.item.SimpleChu nkProvider.provide(SimpleChunkProvider.java:103)
    at org.springframework.batch.core.step.item.ChunkOrie ntedTasklet.execute(ChunkOrientedTasklet.java:68)
    at org.springframework.batch.core.step.tasklet.Taskle tStep$ChunkTransactionCallback.doInTransaction(Tas kletStep.java:386)
    at org.springframework.transaction.support.Transactio nTemplate.execute(TransactionTemplate.java:130)
    at org.springframework.batch.core.step.tasklet.Taskle tStep$2.doInChunkContext(TaskletStep.java:264)
    at org.springframework.batch.core.scope.context.StepC ontextRepeatCallback.doInIteration(StepContextRepe atCallback.java:76)
    at org.springframework.batch.repeat.support.TaskExecu torRepeatTemplate$ExecutingRunnable.run(TaskExecut orRepeatTemplate.java:258)
    at java.lang.Thread.run(Thread.java:619)
    Caused by: java.sql.SQLException: exhausted resultset
    at oracle.jdbc.driver.DatabaseError.throwSqlException (DatabaseError.java:112)
    at oracle.jdbc.driver.DatabaseError.throwSqlException (DatabaseError.java:146)
    at oracle.jdbc.driver.DatabaseError.throwSqlException (DatabaseError.java:208)
    at oracle.jdbc.driver.OracleResultSetImpl.getLong(Ora cleResultSetImpl.java:537)
    at org.jboss.resource.adapter.jdbc.WrappedResultSet.g etLong(WrappedResultSet.java:711)
    at org.springframework.jdbc.support.JdbcUtils.getResu ltSetValue(JdbcUtils.java:161)
    at org.springframework.jdbc.core.BeanPropertyRowMappe r.getColumnValue(BeanPropertyRowMapper.java:308)
    at org.springframework.jdbc.core.BeanPropertyRowMappe r.mapRow(BeanPropertyRowMapper.java:246)
    at org.springframework.batch.item.database.JdbcCursor ItemReader.readCursor(JdbcCursorItemReader.java:13 8)
    at org.springframework.batch.item.database.AbstractCu rsorItemReader.doRead(AbstractCursorItemReader.jav a:445)
    ... 15 more
    If I don't use AsyncTaskExecutor my job works fine. But, I think, all rows from Reader are processed in the same thraed: one by one...
    In JavaDoc of AbstractItemCountingItemStreamItemReader Robert Kasanicky wrote:
    * Subclasses are inherently *not* thread-safe.
    So JdbcCursorItemReader isn't supported in Multi-threaded Step?
    For my reason, have I to use JdbcPagingItemReader with pageSize=1?

    Thank you,
    Artem Bilan

  • #2
    It is most possible the thread-safe problem!
    you should use the partition step for such issue to instead of!

    Comment


    • #3
      Thanks for reply, Though a late ;-)

      At first: I've extended JdbcPagingItemReader and have overwritten doRead():
      Code:
      class MyJdbcPagingItemReader<T> extends JdbcPagingItemReader<T> {
      
      	private final Lock lock = new ReentrantLock()
      
      	volatile T previousResult
      
      	@Override
      	protected T doRead() {
      		lock.lock()
      		try {
      			if (page > 0 && previousResult == null) return null
      			previousResult = super.doRead()
      			return previousResult
      		}
      		finally {
      			lock.unlock()
      		}
      	}
      }
      And it was working albeit with some wornings.
      But right now I've fixed my task with more flexible solution.
      I'm using JdbcCursorItemReader and ItemWriterAdapter with delegation to Spring Integration gateway over async-executor channel.

      So, the problem is fixed!

      Comment

      Working...
      X