Announcement Announcement Module
Collapse
No announcement yet.
mapping new rows from DB by restarting the job Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • mapping new rows from DB by restarting the job

    I have created a job and i am checking the condition for restarting the job in job decider. My problem is "if i restarted it then its fetching same records from 1 to 10" because my sql is "select * from userdetails limit 10". My problem is "I want to fetch from 11th record" when the particular job is restarted.
    Code:
    <job id="batchJob" job-repository="jobRepository" incrementer="myJobParametersIncrementer" xmlns="http://www.springframework.org/schema/batch">
    		<step id="updateData" next="limitDecision">
    			<tasklet transaction-manager="transactionManager" allow-start-if-complete="true">
    				<chunk reader="userReader" writer="userWriter"
    					commit-interval="10">
    					<listeners>
    						<listener ref="stepExecListener" />
    					</listeners>
    				</chunk>
    			</tasklet>
    		</step>
    		<decision id="limitDecision" decider="limitDecider">
    			<next on="CONTINUE" to="updateData" />
    			<end on="COMPLETED" />
    		</decision>
    	</job>
    	
    	<bean id="myJobParametersIncrementer" class="com.inf.repeat.MyJobParametersIncrementer" />
    
    	<bean id="limitDecider" class="com.inf.repeat.LimitDecider">
    		<constructor-arg name="dataSource" ref="jpaDataSource" />
    	</bean>
    
    	<bean id="stepExecListener" class="com.inf.repeat.LoopStepExecutionListener">
    		<constructor-arg name="dataSource" ref="jpaDataSource" />
    	</bean>
    
    	<bean id="dataTasklet" class="com.inf.mapper.DataTasklet"
    		scope="step">
    		<constructor-arg name="dataSource" ref="jpaDataSource" />
    	</bean>
    
    	<bean id="userReader"
    		class="org.springframework.batch.item.database.JdbcCursorItemReader"
    		scope="step">
    		<property name="dataSource" ref="jpaDataSource" />
    		<property name="sql" value="select * from userdetails limit 10 " />
    		<property name="rowMapper" ref="dbObjectRowMapper" />
    	</bean>
    
    	<bean id="userWriter"
    		class="org.springframework.batch.item.database.JdbcBatchItemWriter"
    		scope="step">
    		<property name="dataSource" ref="jpaDataSource" />
    		<property name="sql" value="insert into newuserdetails (name,gender) values (?,?)" />
    		<property name="itemPreparedStatementSetter" ref="newPreparedStatementSetter" />
    	</bean>
    Code:
    public class LimitDecider implements JobExecutionDecider {
    
    	JdbcTemplate jdbcTemplate;
    	private static final String SQL = "select count(*) from userdetails";
    
    	public LimitDecider(DataSource dataSource) {
    		this.jdbcTemplate = new JdbcTemplate(dataSource);
    	}
    
    	@Override
    	public FlowExecutionStatus decide(JobExecution jobExecution,
    			StepExecution stepExecution) {
    		int countTotalRows = jdbcTemplate.queryForInt(SQL);
    		int oldValue = (Integer) jobExecution.getExecutionContext().get("newStart");
    		int newValue = (Integer) jobExecution.getExecutionContext().get("start");
    		int startValue = oldValue + newValue;
    		stepExecution.getJobExecution().getExecutionContext().put("newStart", startValue);
    		
                    if (startValue <= countTotalRows) {
    			return new FlowExecutionStatus("CONTINUE");
    		} else {
    			return new FlowExecutionStatus("COMPLETED");
    		}
    	}
    }
    Code:
    public class LoopStepExecutionListener extends ChunkListenerSupport implements StepExecutionListener{
    
    	private StepExecution stepExecution;
    
    	@Override
    	public void beforeStep(StepExecution stepExecution) {
    		this.stepExecution = stepExecution;
    	}
    
    	@Override
    	public void beforeChunk() {
    		stepExecution.getJobExecution().getExecutionContext().put("start", stepExecution.getReadCount());
    	}
    	
    	@Override
    	public void afterChunk() {
    		
    	}
    
            @Override
    	public ExitStatus afterStep(StepExecution stepExecution) {	
    		
    		return null;
    	}
    }

  • #2
    I don't understand why you may want to select a different set of records when you restart a particular instance. Can you explain a bit more your use case? Normally, restarting a job should restart it where it left off.

    Comment


    • #3
      Thanks snicoll for quick reply...

      I think it will be clear by one example.

      Ex : Assume I have 50 records in my database and i have a reader which will read 10 records at a time. Then my decider will check if there is more no of records present in DB then read those records. So reader should execute 5 times. First it should read 1-10 then 11-20 then 21- 30 ...so on.

      Or if you have any alternative idea pl. let me know.
      Thank You.

      Comment


      • #4
        I think you can save the start position on step ExecutionContext for persistent, then pass it to the sql as a paramater when it restart!
        look like:
        select * from table limit #{stepExecutionContext['startPos'], 10

        Comment


        • #5
          Originally posted by biswaranjan View Post
          Thanks snicoll for quick reply...

          I think it will be clear by one example.

          Ex : Assume I have 50 records in my database and i have a reader which will read 10 records at a time. Then my decider will check if there is more no of records present in DB then read those records. So reader should execute 5 times. First it should read 1-10 then 11-20 then 21- 30 ...so on.

          Or if you have any alternative idea pl. let me know.
          Thank You.
          I don't understand why you need a decider for that. Your SQL query should retrieve the data perimeter of your job instance. If that yields 50 results and you use chunk oriented processing with a chunk size of 10, you get that out-of-the-box.

          Obviously, if your job failed somewhere and you want to restart, you need to restart from where you left off, but JdbcCursorItemReader does that by default (see saveState)

          Comment

          Working...
          X