Announcement Announcement Module
Collapse
No announcement yet.
Dynamic commit-interval Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Dynamic commit-interval

    Hi all,

    I have a spring batch applications with flat file as item reader.

    This flat contains two differents types of records.

    In the definition of my chunk i have to specify a dynamic commit interval.

    The application has to commit each time it reads a new type of record.

    Exemple :

    1, 'recordType1', 50
    1, 'recordType1', 51
    1, 'recordType1', 52
    COMMIT
    2, 'recordType2', 'foo'
    COMMIT
    1, 'recordType1', 53
    COMMIT
    2, 'recordType2', 'foo'
    COMMIT
    1, 'recordType1', 50
    1, 'recordType1', 50
    1, 'recordType1', 50
    1, 'recordType1', 50
    COMMIT
    2, 'recordType2', 'foo'
    2, 'recordType2', 'foo'
    2, 'recordType2', 'foo'
    2, 'recordType2', 'foo'
    COMMIT


    How can I do this ?

    Thanxs!

  • #2
    Take a look at the chunk-completion-policy. You can implement the CompletionPolicy interface and configure that to perform some form of logic to determine if the chunk is complete or not.

    Comment


    • #3
      hi,

      thank you for replying, yes im always searching for it.

      Indeed, it works well with chunk-completion-policy... With a little business logic i can commit by bloc of same records, but i have a problem by using it...

      chunk-completion-policy supposes that the entire resource file is correct because it reads all the resource file before doing anything else.

      If the the resource file has for example three corrects blocs but the fourth is wrong then none records will be inserted in my DB.
      Or i want that the three first corrects blocs will be persisted ...

      Comment


      • #4
        If you don't want the transaction to roll back, you can skip the exception via the skippable-exception-classes (section 5.1.5 here: http://static.springsource.org/sprin...igureStep.html).

        Comment


        • #5
          Yes, I know, but I want transaction rollback on the entire bloc that got wrong record, then stopping application

          The previous right blocs must be fully commited

          Comment


          • #6
            Can you post your job configuration?

            Comment


            • #7
              Here you are :

              Something to say you : First i used the JPA dialect, then I remove it and implement my own writer (my writer do nothing!) In my processor I inject a Service (VenteService) that do some business logic and manage transaction



              Code:
              <?xml version="1.0" encoding="UTF-8"?>
              <beans xmlns="http://www.springframework.org/schema/beans"
              	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
              	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
              	xmlns:util="http://www.springframework.org/schema/util"
              
              	xsi:schemaLocation="http://www.springframework.org/schema/beans 
              	http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
              	http://www.springframework.org/schema/batch
              	http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
              	http://www.springframework.org/schema/context 
                      http://www.springframework.org/schema/context/spring-context-3.1.xsd 
                      http://www.springframework.org/schema/tx 
                      http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
                      http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd">
              
              	 <import resource="classpath:/applicationContext-dao.xml" />
              
              	<!-- Instruct Spring to perform declarative transaction management automatically 
              		on annotated classes. -->
              	<tx:annotation-driven transaction-manager="transactionManager" />
              
              	<bean id="venteReader" class="org.springframework.batch.item.file.FlatFileItemReader">
              		<property name="resource" value="classpath:prixvente_000001.txt" />
              
              		<property name="lineMapper">
              			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
              				<property name="lineTokenizer">
              					<bean
              						class="org.springframework.batch.item.file.transform.PatternMatchingCompositeLineTokenizer">
              						<property name="tokenizers">
              							<map>
              								<entry key="1*" value-ref="line1Tokenizer" />
              								<entry key="2*" value-ref="line2Tokenizer" />
              							</map>
              						</property>
              					</bean>
              				</property>
              
              				<property name="fieldSetMapper">
              					<bean class="com.ubik.newapp.batch.VenteFieldSetMapper">
              					</bean>
              				</property>
              			</bean>
              		</property>
              	</bean>
              
              	<bean id="line1Tokenizer"
              		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
              		<property name="names"
              			value="ligneType,numeroCondition, debutValidite, finValidite, prixValeur, miseAJour, numeroArticle, originePV, topPrixDeVenteConseille, prixVenteMin, codePrixDeVente" />
              		<property name="delimiter">
              			<util:constant
              				static-field="org.springframework.batch.item.file.transform.DelimitedLineTokenizer.DELIMITER_TAB" />
              		</property>
              	</bean>
              
              	<bean id="line2Tokenizer"
              		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
              		<property name="names"
              			value="ligneType,numeroCondition,numeroEntiteMagasin,debutValiditeMag,finValiditeMag,miseAJour,extra1,extra2,extra3,extra4,extra5" />
              		<property name="delimiter">
              			<util:constant
              				static-field="org.springframework.batch.item.file.transform.DelimitedLineTokenizer.DELIMITER_TAB" />
              		</property>
              	</bean>
              
              <!-- 	<bean id="venteWriter" class="org.springframework.batch.item.database.JpaItemWriter"> -->
              <!-- 	<property name="entityManagerFactory" ref="entityManagerFactory" /> -->
              <!-- 	</bean> -->
              
              	<bean id="dataSource" destroy-method="close"
              		class="org.apache.commons.dbcp.BasicDataSource">
              		<property name="driverClassName" value="org.postgresql.Driver" />
              		<property name="url" value="jdbc:postgresql://127.0.0.1:5433/ubik" />
              		<property name="username" value="postgres" />
              		<property name="password" value="ubikubik" />
              	</bean>
                  
              <!--     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> -->
              <!--      <property name="dataSource" ref="dataSource"/> -->
              <!--   </bean> -->
              
              	<bean id="jobLauncher"
              		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
              		<property name="jobRepository" ref="jobRepository" />
              	</bean>
              
              	<bean id="jobRepository"
              		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
              <!--   		<property name="transactionManager" ref="transactionManager" /> -->
              <!--  		<property name="isolationLevelForCreate" value="PROPAGATION_REQUIRED" />  -->
              	</bean>
              
              	<!-- ========================= BUSINESS OBJECT DEFINITIONS ========================= -->
              
              	<!-- Activates various annotations to be detected in bean classes: Spring's 
              		@Required and @Autowired, as well as JSR 250's @Resource. -->
              	<context:annotation-config />
              
              	<context:component-scan base-package="com.ubik.newapp"
              		scoped-proxy="targetClass" />
              
              	<job id="importVentes" xmlns="http://www.springframework.org/schema/batch" >
              		<step id="readWriteVente">
              			<tasklet >
               				<chunk reader="venteReader" processor="venteProcessor" 
               					reader-transactional-queue="true" processor-transactional="true" writer="venteWriter" chunk-completion-policy="completionPolicy" >
                                 
              				</chunk>
              			</tasklet>
              		</step>
              	</job>
              
              	<bean id="completionPolicy" class="com.ubik.newapp.batch.CompletionPolicy" />
              
               	<bean id="venteProcessor" class="com.ubik.newapp.batch.VenteProcessor" />
              
               	<bean id="venteWriter" class="com.ubik.newapp.batch.VenteWriter" />
              </beans>

              VenteService :
              Code:
              package com.ubik.newapp.service.third.impl;
              
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.stereotype.Service;
              import org.springframework.transaction.PlatformTransactionManager;
              import org.springframework.transaction.TransactionDefinition;
              import org.springframework.transaction.TransactionStatus;
              import org.springframework.transaction.support.DefaultTransactionDefinition;
              
              import com.ubik.newapp.dao.third.VenteDao;
              import com.ubik.newapp.dto.third.Vente;
              import com.ubik.newapp.service.third.VenteService;
              
              @Service
              public class VenteServiceImpl implements VenteService {
              
              	@Autowired
              	private VenteDao venteDao;
              	
              	@Autowired
                  private PlatformTransactionManager transactionManager;
              	
              	private static final int LIGNE_TYPE_MAG = 1;
              	
              	private boolean commit;
              
              	public void create(Vente p) {
              		
              		//BLOC COMMIT WORKS WITH : 
              		
              		//ISOLATION_DEFAULT
              		//ISOLATION_READ_COMMITTED
              		//ISOLATION_READ_UNCOMMITTED
              		//ISOLATION_SERIALIZABLE
              		//PROPAGATION_MANDATORY
              		//PROPAGATION_REQUIRED
              		//PROPAGATION_SUPPORTS
              		//TIMEOUT_DEFAULT
              		TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition(
                              TransactionDefinition.PROPAGATION_REQUIRED));
              		
              		venteDao.save(p);
              		if(commit && p.getLigneType() == LIGNE_TYPE_MAG){
              			transaction.flush();
              			transactionManager.commit(transaction);
              			commit = false;
              		}
              		else if(p.getLigneType()== LIGNE_TYPE_MAG){
              			commit = true;
              		}
              	}
              }

              Comment


              • #8
                Ok...Questions:
                1. Why did you configure your reader as a transactional queue? That really should only be true if you're reading from a JMS queue or similar resource. A flat file should not be configured this way.
                2. Why is your processor transactional? What is it doing that prevents it from just participating in the chunk's transaction?

                Comment


                • #9
                  i can't

                  Comment


                  • #10
                    Code:
                    <?xml version="1.0" encoding="UTF-8"?>
                    <beans xmlns="http://www.springframework.org/schema/beans"
                    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
                    	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
                    	xmlns:util="http://www.springframework.org/schema/util"
                    
                    	xsi:schemaLocation="http://www.springframework.org/schema/beans 
                    	http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
                    	http://www.springframework.org/schema/batch
                    	http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
                    	http://www.springframework.org/schema/context 
                            http://www.springframework.org/schema/context/spring-context-3.1.xsd 
                            http://www.springframework.org/schema/tx 
                            http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
                            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd">
                    
                    	 <import resource="classpath:/applicationContext-dao.xml" />
                    
                    	<!-- Instruct Spring to perform declarative transaction management automatically 
                    		on annotated classes. -->
                    	<tx:annotation-driven transaction-manager="transactionManager" />
                    
                    	<bean id="venteReader" class="org.springframework.batch.item.file.FlatFileItemReader">
                    		<property name="resource" value="classpath:prixvente_000001.txt" />
                    
                    		<property name="lineMapper">
                    			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                    				<property name="lineTokenizer">
                    					<bean
                    						class="org.springframework.batch.item.file.transform.PatternMatchingCompositeLineTokenizer">
                    						<property name="tokenizers">
                    							<map>
                    								<entry key="1*" value-ref="line1Tokenizer" />
                    								<entry key="2*" value-ref="line2Tokenizer" />
                    							</map>
                    						</property>
                    					</bean>
                    				</property>
                    
                    				<property name="fieldSetMapper">
                    					<bean class="com.ubik.newapp.batch.VenteFieldSetMapper">
                    					</bean>
                    				</property>
                    			</bean>
                    		</property>
                    	</bean>
                    
                    	<bean id="line1Tokenizer"
                    		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    		<property name="names"
                    			value="ligneType,numeroCondition, debutValidite, finValidite, prixValeur, miseAJour, numeroArticle, originePV, topPrixDeVenteConseille, prixVenteMin, codePrixDeVente" />
                    		<property name="delimiter">
                    			<util:constant
                    				static-field="org.springframework.batch.item.file.transform.DelimitedLineTokenizer.DELIMITER_TAB" />
                    		</property>
                    	</bean>
                    
                    	<bean id="line2Tokenizer"
                    		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    		<property name="names"
                    			value="ligneType,numeroCondition,numeroEntiteMagasin,debutValiditeMag,finValiditeMag,miseAJour,extra1,extra2,extra3,extra4,extra5" />
                    		<property name="delimiter">
                    			<util:constant
                    				static-field="org.springframework.batch.item.file.transform.DelimitedLineTokenizer.DELIMITER_TAB" />
                    		</property>
                    	</bean>
                    
                    <!-- 	<bean id="venteWriter" class="org.springframework.batch.item.database.JpaItemWriter"> -->
                    <!-- 	<property name="entityManagerFactory" ref="entityManagerFactory" /> -->
                    <!-- 	</bean> -->
                    
                    	<bean id="dataSource" destroy-method="close"
                    		class="org.apache.commons.dbcp.BasicDataSource">
                    		<property name="driverClassName" value="org.postgresql.Driver" />
                    		<property name="url" value="jdbc:postgresql://127.0.0.1:5433/ubik" />
                    		<property name="username" value="postgres" />
                    		<property name="password" value="ubikubik" />
                    	</bean>
                        
                    <!--     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> -->
                    <!--      <property name="dataSource" ref="dataSource"/> -->
                    <!--   </bean> -->
                    
                    	<bean id="jobLauncher"
                    		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
                    		<property name="jobRepository" ref="jobRepository" />
                    	</bean>
                    
                    	<bean id="jobRepository"
                    		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
                    <!--   		<property name="transactionManager" ref="transactionManager" /> -->
                    <!--  		<property name="isolationLevelForCreate" value="PROPAGATION_REQUIRED" />  -->
                    	</bean>
                    
                    	<!-- ========================= BUSINESS OBJECT DEFINITIONS ========================= -->
                    
                    	<!-- Activates various annotations to be detected in bean classes: Spring's 
                    		@Required and @Autowired, as well as JSR 250's @Resource. -->
                    	<context:annotation-config />
                    
                    	<context:component-scan base-package="com.ubik.newapp"
                    		scoped-proxy="targetClass" />
                    
                    	<job id="importVentes" xmlns="http://www.springframework.org/schema/batch" >
                    		<step id="readWriteVente">
                    			<tasklet >
                     				<chunk reader="venteReader" processor="venteProcessor" 
                     					reader-transactional-queue="true" processor-transactional="true" writer="venteWriter" chunk-completion-policy="completionPolicy" >
                                       
                    				</chunk>
                    			</tasklet>
                    		</step>
                    	</job>
                    
                    	<bean id="completionPolicy" class="com.ubik.newapp.batch.CompletionPolicy" />
                    
                     	<bean id="venteProcessor" class="com.ubik.newapp.batch.VenteProcessor" />
                    
                     	<bean id="venteWriter" class="com.ubik.newapp.batch.VenteWriter" />
                    </beans>

                    Comment


                    • #11
                      i can't ..............

                      Comment


                      • #12
                        solution was to override spring batch api to have expected result

                        Comment

                        Working...
                        X