Announcement Announcement Module
Collapse
No announcement yet.
Multithreading with Stax input files exception ? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Multithreading with Stax input files exception ?

    Hi,

    I'm defining a batch solution for our product used in insurance and banking.
    I'm currently investigating Spring-batch, which seems a near-perfect solution to our needs..

    Using trunk or 1.0m2, when I try to setup a multithreading processing of an XML input file using StaxEventReaderInputSource and TaskExecutorRepeatTemplate with SimpleAsyncTaskExecutor, I get the following error:

    Code:
    10:09:56,863 DEBUG SimpleAsyncTaskExecutor-1 TransactionTemplate:151 - Initiating transaction rollback on application exception
    org.springframework.dao.DataAccessResourceFailureException: Error while reading from event reader; nested exception is javax.xml.stream.XMLStreamException: ParseError at [row,col]:[1,1]
    Message: Content is not allowed in prolog.
        at org.springframework.batch.io.file.support.StaxEventReaderInputSource.moveCursorToNextFragment(StaxEventReaderInputSource.java:261)
        at org.springframework.batch.io.file.support.StaxEventReaderInputSource.read(StaxEventReaderInputSource.java:88)
        at org.springframework.batch.io.file.support.StaxEventReaderInputSource$$FastClassByCGLIB$$db438604.invoke(<generated>)
        at net.sf.cglib.proxy.MethodProxy.invoke(MethodProxy.java:149)
        at org.springframework.aop.framework.Cglib2AopProxy$CglibMethodInvocation.invokeJoinpoint(Cglib2AopProxy.java:694)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:131)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:119)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
        at org.springframework.aop.framework.Cglib2AopProxy$DynamicAdvisedInterceptor.intercept(Cglib2AopProxy.java:629)
        at org.springframework.batch.io.file.support.StaxEventReaderInputSource$$EnhancerByCGLIB$$94aa0c51.read(<generated>)
        at org.springframework.batch.item.provider.InputSourceItemProvider.next(InputSourceItemProvider.java:48)
        at org.springframework.batch.execution.tasklet.ItemProviderProcessTasklet.execute(ItemProviderProcessTasklet.java:141)
        at org.springframework.batch.execution.step.simple.SimpleStepExecutor.doTaskletProcessing(SimpleStepExecutor.java:372)
        at org.springframework.batch.execution.step.simple.DefaultStepExecutor.doTaskletProcessing(DefaultStepExecutor.java:61)
        at org.springframework.batch.execution.step.simple.SimpleStepExecutor$2.doInIteration(SimpleStepExecutor.java:347)
        at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:324)
        at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:201)
        at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:131)
        at org.springframework.batch.execution.step.simple.SimpleStepExecutor.processChunk(SimpleStepExecutor.java:334)
        at org.springframework.batch.execution.step.simple.SimpleStepExecutor$1$2.doInTransaction(SimpleStepExecutor.java:220)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:127)
        at org.springframework.batch.execution.step.simple.SimpleStepExecutor$1.doInIteration(SimpleStepExecutor.java:208)
        at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:227)
        at java.lang.Thread.run(Thread.java:619)
    Caused by: javax.xml.stream.XMLStreamException: ParseError at [row,col]:[1,1]
    Message: Content is not allowed in prolog.
        at com.sun.org.apache.xerces.internal.impl.XMLStreamReaderImpl.next(XMLStreamReaderImpl.java:588)
        at com.sun.xml.internal.stream.XMLEventReaderImpl.peek(XMLEventReaderImpl.java:271)
        at org.springframework.batch.io.file.support.stax.DefaultTransactionalEventReader.peek(DefaultTransactionalEventReader.java:86)
        at org.springframework.batch.io.file.support.stax.DefaultFragmentEventReader.peek(DefaultFragmentEventReader.java:152)
        at org.springframework.batch.io.file.support.StaxEventReaderInputSource.moveCursorToNextFragment(StaxEventReaderInputSource.java:246)
        ... 24 more

    Maybe I did something wrong or does anybody know if it should be working ?

    G.C.
    Last edited by gcollin; Dec 3rd, 2007, 04:36 AM.

  • #2
    You can't use a multithreaded repeat template for reading a file (at least not yet) because the InputSource does transaction synchronization and a transaction is single threaded. I assume it is probably the processing that you want to parallelise anyway, not the reading. This should be easier in m4, but will still require some additional work if you need restartability. N.B. we only plan to support parallel processing with zero effort after 1.0.

    Comment


    • #3
      How to parallelise processing ?

      Hi,

      Thanks for your reply.

      You are right, I would like to parallelise processing. But the only sample with parallel stuff is adhoc.xml and I more or less copied the config of it.

      How can I find an example of parallelisation processing ?
      Should I wait for m4 for that ?

      G. C.

      Comment


      • #4
        The adhoc job is not parallel processing inside the job - just launching the job asynchronously from the JobLauncher.

        Actually, thinking about it, the Stax input source buffers Stax events in its TX synchronization, so you might actually be able to use it with a TaskExecutorRepeatTemplate - but make sure it is in the stepOperations, not the chunkOperations.

        You will still lose Restartable behaviour, until we go beyond 1.0.

        Comment


        • #5
          I did stepOperations

          Looking at my config. I see that I did do the paralleling stuff in stepOperations.
          But I think my mistake is that the inputSource was not registered with step scope, so It created two instances of it in different threads.

          I'm currently fighting with my Spring config. to change that.
          I'll keep you posted.

          Thx

          Gérard

          Comment


          • #6
            Example for Parallel Processing

            Hi Gerard,
            did you get the parallel processing working? Can you give an example here with xml snippets?

            regards,
            Ramkumar

            Comment


            • #7
              In fact....

              In fact I switched to something else.
              I almost convinced my clients we should do multiprocessing instead of multi-tasking ,at least for now.

              Anyway, I think we can try to wrap any InputSource into a "synchronized input source" so that any thread can access it sequentially.

              We will loose the parallel processing on data access, but we'll have multi-tasking on processing.....

              Regards,

              Gérard

              Comment


              • #8
                Hi Gerard,
                Thanks for your quick reply.
                <We will loose the parallel processing on data access, but we'll have multi-tasking on processing....>
                Can you tell me how you done this? Even i am interested only on processing not while reading the data.

                Comment


                • #9
                  Sample I used

                  Here is a snipplet of the spring-batch config I used for parallel processing.
                  Thing is, it does not work for now because the itemProvider is accessed concurrently by the parallelized itemProcessors, and for now it's not supported.
                  For me, you need to create your own ItemProvider wrapper, that will synchronize all the access to it.
                  => I'm not sure you don't lose other features of spring-batch, like cold or hot-Restart


                  Code:
                  <bean name="jobLauncher" class="org.springframework.batch.execution.launch.SimpleJobLauncher">
                          <property name="jobRepository" ref="jobRepository" />
                          <property name="jobConfigurationLocator" ref="jobConfigurationRegistry"/>
                          <property name="jobExecutor" ref="jobExecutor" />
                          <property name="jobIdentifierFactory" ref="jobIdentifierFactory"/>
                      </bean>
                  
                  	<bean id="jobConfigurationRegistry" class="org.springframework.batch.execution.configuration.MapJobConfigurationRegistry"/>
                  
                      <bean class="org.springframework.batch.execution.configuration.JobConfigurationRegistryBeanPostProcessor">
                          <property name="jobConfigurationRegistry" ref="jobConfigurationRegistry"/>
                      </bean>
                  
                  
                      <!--aop:config>
                  		<aop:advisor pointcut="execution(* org.springframework.batch.execution..*Repository+.*(..))"
                  			advice-ref="txAdvice" />
                  	</aop:config>
                  	<tx:advice id="txAdvice" transaction-manager="transactionManager">
                  		<tx:attributes>
                  			<tx:method name="*" />
                  		</tx:attributes>
                  	</tx:advice-->
                  
                  	<bean id="jobExecutor" class="org.springframework.batch.execution.job.DefaultJobExecutor">
                  		<property name="jobRepository" ref="jobRepository" />
                  		<property name="stepExecutorFactory">
                  			<bean class="org.springframework.batch.execution.step.PrototypeBeanStepExecutorFactory">
                  				<property name="stepExecutorName" value="stepExecutor" />
                  			</bean>
                  		</property>
                  	</bean>
                  
                  	<bean id="stepExecutor" class="org.springframework.batch.execution.step.simple.DefaultStepExecutor"
                  		scope="prototype">
                  		<property name="transactionManager" ref="sqlTransactionManager" />
                  		<property name="repository" ref="jobRepository" />
                          <!-- Support for concurrent execution before the transaction -->
                          <property name="stepOperations" ref="repeater" />
                  
                      </bean>
                      
                      <bean id="sqlTransactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager" lazy-init="true">
                          <property name="dataSource" ref="businessDB" />
                      </bean>
                  
                      <bean id="repeater" class="org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate">
                          <property name="taskExecutor" ref="asyncExecutor" />
                      </bean>
                  
                      <bean id="asyncExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
                             <property name="concurrencyLimit" value="2"/>
                      </bean>
                  
                  
                  	<bean id="simpleJob" class="org.springframework.batch.core.configuration.JobConfiguration"
                  		abstract="true">
                  		<property name="restartable" value="true" />
                      </bean>
                  
                  	<bean id="simpleStep" class="org.springframework.batch.execution.step.SimpleStepConfiguration"
                  		abstract="true">
                  		<property name="allowStartIfComplete" value="true" />
                  		<property name="saveRestartData" value="true" />
                  		<property name="exceptionHandler">
                  			<bean
                  				class="org.springframework.batch.repeat.exception.handler.SimpleLimitExceptionHandler">
                  				<property name="limit" value="5" />
                  				<property name="useParent" value="true"/>
                  			</bean>
                  		</property>
                  		<property name="commitInterval" value="1" />
                  	</bean>
                  
                      <bean id="jobConfiguration" parent="simpleJob">
                          <property name="name" value="xmlStaxJob" />
                          <property name="steps">
                              <bean id="step1" parent="simpleStep">
                                  <constructor-arg>
                                      <bean
                                          class="org.springframework.batch.execution.tasklet.RestartableItemProviderTasklet">
                                          <property name="itemProvider">
                                              <bean class="org.springframework.batch.item.provider.InputSourceItemProvider">
                                                  <property name="inputSource">
                                                      <ref bean="fileInputSource"/>
                                                  </property>
                                              </bean>
                                          </property>
                  
                                          <property name="itemProcessor">
                                                  <ref bean="itemProcessor" />
                                          </property>
                                      </bean>
                                  </constructor-arg>
                              </bean>
                          </property>
                      </bean>
                  Hope this helps,

                  Gérard

                  Comment

                  Working...
                  X