Announcement Announcement Module
Collapse
No announcement yet.
problem with skiplimitstep Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • problem with skiplimitstep

    hi, i'm using SpringBatch 1.1
    i have a job with 20 steps (my problem is limited to the first 3 steps) , the first one parse a flat file, and put lines in a Staging Table, the second step read from staging table, cast items in an object (with a date property, so i have a date parser) and put formed object in tables, this step have an intervalcommit =12, so i use a third step to make a second pass to the statgingtable to save rollbacked good entities, so this step have the same configuration, with an interval commit =1. This steps is a SkipLimit stemps, witch skip java.lang.Exception and 1000 limit skip (i have 100 items to process in my test case)
    mu problem is, the first step done succussfully, the second one done successfully with 20 rollbacked item (i have 2 items with malformed date, so i throw java.text.ParseException), but the third failed after processing all of rest of items and throws the same exception

    this is my job config:
    Code:
    	<bean id="migrationJob" parent="simpleJob">
    		<property name="steps">
    			<list>
    				<ref bean="stagingClientStep" />
    				<ref bean="loadingClientStep" />
    				<ref bean="retryLoadingClientStep" />
                                     ....
    
    </bean>
    	<bean id="stagingClientStep" parent="simpleStep">
    		<property name="commitInterval" value="100" />
    		<property name="streams" ref="clientItemReader" />
    		<property name="itemReader" ref="clientItemReader" />
    		<property name="itemWriter">
    			<bean
    				class="aaaa.commons.integration.batch.springbatch.util.StagingItemWriter">
    				<property name="dataSource" ref="dataSource" />
    				<property name="lobHandler" ref="lobHandler" />
    				<property name="incrementer">
    					<bean parent="incrementerParent">
    						<property name="incrementerName" value="BATCH_STAGING_SEQ" />
    					</bean>
    				</property>
    				<property name="idProperty" value="Numcli" />
    			</bean>
    		</property>
    	</bean>
    
    	<bean id="loadingClientStep" parent="skipLimitStep">
    		<property name="taskExecutor">
    			<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
    		</property>
    		<property name="itemReader">
    			<bean
    				class="aaaaa.commons.integration.batch.springbatch.util.StagingItemReader">
    				<property name="lobHandler" ref="lobHandler" />
    				<property name="dataSource" ref="dataSource" />
    			</bean>
    		</property>
    		<property name="itemWriter" ref="clientItemWriter" />
    	</bean>
    
    
    	<bean id="retryLoadingClientStep" parent="loadingClientStep">
    		<property name="commitInterval" value="1" />
    	</bean>
    
    
    	<bean id="skipLimitStep" class="org.springframework.batch.core.step.item.SkipLimitStepFactoryBean"
    		parent="simpleStep" abstract="true">
    		<property name="skipLimit" value="1000" />
    		<property name="skippableExceptionClasses"
                  value="java.lang.Exception"/>
    	</bean>
    
    
    	<bean id="simpleStep" class="org.springframework.batch.core.step.item.SimpleStepFactoryBean"
    		abstract="true">
    		<property name="transactionManager" ref="jpaTransactionManager" />
    		<property name="jobRepository" ref="jobRepository" />
    		<property name="throttleLimit" value="5" />
    		<property name="commitInterval" value="12" />
    		<property name="listeners">
            	<ref bean="itemFailureListener"/>
        	</property>
    	</bean>
    and this is the exception trace:


    Code:
    15266  [           main] ERROR - 2009-02-04 10:43:12,254 - org.springframework.batch.core.step.AbstractStep.execute(195) | Encountered an error executing the step: class org.springframework.batch.item.adapter.DynamicMethodInvocationException: java.lang.reflect.InvocationTargetException
    15528  [           main] ERROR - 2009-02-04 10:43:12,516 - org.springframework.batch.core.launch.support.CommandLineJobRunner.start(209) | Job Terminated in error:
    org.springframework.batch.item.adapter.DynamicMethodInvocationException: java.lang.reflect.InvocationTargetException
            at org.springframework.batch.item.adapter.AbstractMethodInvokingDelegator.doInvoke(AbstractMethodInvokingDelegator.java:108)
            at org.springframework.batch.item.adapter.AbstractMethodInvokingDelegator.invokeDelegateMethodWithArgument(AbstractMethodInvokingDelegator.java:62)
            at org.springframework.batch.item.adapter.ItemWriterAdapter.write(ItemWriterAdapter.java:35)
            at org.springframework.batch.item.support.DelegatingItemWriter.write(DelegatingItemWriter.java:45)
            at org.springframework.batch.core.step.item.BatchListenerFactoryHelper$2.write(BatchListenerFactoryHelper.java:99)
            at org.springframework.batch.core.step.item.SimpleItemHandler.doWrite(SimpleItemHandler.java:105)
            at org.springframework.batch.core.step.item.SkipLimitStepFactoryBean$StatefulRetryItemHandler$1.doWithRetry(SkipLimitStepFactoryBean.java:420)
            at org.springframework.batch.retry.callback.RecoveryRetryCallback.doWithRetry(RecoveryRetryCallback.java:109)
            at org.springframework.batch.retry.support.RetryTemplate.execute(RetryTemplate.java:168)
            at org.springframework.batch.core.step.item.SkipLimitStepFactoryBean$StatefulRetryItemHandler.write(SkipLimitStepFactoryBean.java:436)
            at org.springframework.batch.core.step.item.SimpleItemHandler.handle(SimpleItemHandler.java:71)
            at org.springframework.batch.core.step.item.ItemOrientedStep$2.doInIteration(ItemOrientedStep.java:367)
            at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:346)
            at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:212)
            at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
            at org.springframework.batch.core.step.item.ItemOrientedStep.processChunk(ItemOrientedStep.java:360)
            at org.springframework.batch.core.step.item.ItemOrientedStep$1.doInIteration(ItemOrientedStep.java:248)
            at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:215)
            at java.lang.Thread.run(Thread.java:595)
    Caused by: java.lang.reflect.InvocationTargetException
            at sun.reflect.GeneratedMethodAccessor112.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
            at java.lang.reflect.Method.invoke(Method.java:585)
            at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:276)
            at org.springframework.batch.item.adapter.AbstractMethodInvokingDelegator.doInvoke(AbstractMethodInvokingDelegator.java:105)
            ... 18 more
    Caused by: java.text.ParseException: Unparseable date: ""
            at java.text.DateFormat.parse(DateFormat.java:335)
            at aaaaaaa.core.business.services.impl.ClientContratServiceImpl.createHolderMoral(ClientContratServiceImpl.java:1286)
            at aaaaaaa.core.business.services.impl.ClientContratServiceImpl.saveClient(ClientContratServiceImpl.java:322)
            at sun.reflect.GeneratedMethodAccessor113.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
            at java.lang.reflect.Method.invoke(Method.java:585)
            at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:307)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
            at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:106)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
            at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
            at $Proxy28.saveClient(Unknown Source)
            ... 23 more

  • #2
    If loadingClientStep is unable to parse certain dates, and retryLoadingClientStep performs the exact same processing, then wouldn't you expect retryLoadingClientStep to skip the same records? After all, the record hasn't changed and suddenly become parseable. What is the purpose of using commitInterval=1 the second time through?

    Comment


    • #3
      Based on the exception, I would say you're using either an ItemReaderAdaptor or an ItemWriterAdaptor, and that the method name you gave was incorrect for the class specified.

      Comment


      • #4
        Originally posted by DHGarrette View Post
        If loadingClientStep is unable to parse certain dates, and retryLoadingClientStep performs the exact same processing, then wouldn't you expect retryLoadingClientStep to skip the same records? After all, the record hasn't changed and suddenly become parseable. What is the purpose of using commitInterval=1 the second time through?
        in the first time, i have 100 items, 10 transactions (i commit every 10 item proceed), 2 of this transaction are rollbacked, with 20 items, but i have only 2 bad items, and 18 of this 20 items are good and must be treated, this is why i set commitInterval to 1 and make an other pass.

        Comment


        • #5
          Originally posted by h View Post
          in the first time, i have 100 items, 10 transactions (i commit every 10 item proceed), 2 of this transaction are rollbacked, with 20 items, but i have only 2 bad items, and 18 of this 20 items are good and must be treated, this is why i set commitInterval to 1 and make an other pass.
          Actually, Spring Batch does this for you. When you skip a record, the transaction is rolled back. However, the framework will automatically reprocess the chunk (excluding any skipped records). This ensures that only the bad records are left unprocessed.

          Comment


          • #6
            Originally posted by lucasward View Post
            Based on the exception, I would say you're using either an ItemReaderAdaptor or an ItemWriterAdaptor, and that the method name you gave was incorrect for the class specified.
            i'm using an ItemWriterAdapter, but the methodname is correct, because the same itemwriter is used in the second step, and the exception is thrown by this method (loock on the log: aaaaaaa.core.business.services.impl.ClientContratS erviceImpl.saveClient )

            Comment


            • #7
              Originally posted by DHGarrette View Post
              Actually, Spring Batch does this for you. When you skip a record, the transaction is rolled back. However, the framework will automatically reprocess the chunk (excluding any skipped records). This ensures that only the bad records are left unprocessed.
              thank you for your response, but in my case the chunk is never reprocess.
              i removed the retry step from my job and i run the test, i have the same problem in an other step.
              so now i have 3 problems:
              in the BATCH_STEP_EXECUTION table i have 0 in the READ_SKIP_COUNT and WRITE_SKIP_COUNT
              my chunks are never reprocess
              and my job is faled with less then 1000 exceptions

              this is the log:

              Code:
              96000  [           main] DEBUG - 2009-02-04 16:26:28,147 - JdbcJobExecutionDao.updateJobExecution(158) | Truncating long message before update of JobExecution: JobExecution: id=864, startTime=Wed Feb 04 16:24:58 CET 2009, endTime=Wed Feb 04 16:26:28 CET 2009, job=[JobInstance: id=864, JobParameters=[{schedule.date=Wed Feb 04 16:24:51 CET 2009}{}{}{}], Job=[migrationJob]]
              96062  [           main] INFO - 2009-02-04 16:26:28,209 - SimpleJobLauncher$1.run(90) | Job: [SimpleJob: [name=migrationJob]] failed with the following parameters: [{schedule.date=Wed Feb 04 16:24:51 CET 2009}{}{}{}]
              org.springframework.batch.item.adapter.DynamicMethodInvocationException: java.lang.reflect.InvocationTargetException
              	at org.springframework.batch.item.adapter.AbstractMethodInvokingDelegator.doInvoke(AbstractMethodInvokingDelegator.java:108)
              	at org.springframework.batch.item.adapter.AbstractMethodInvokingDelegator.invokeDelegateMethodWithArgument(AbstractMethodInvokingDelegator.java:62)
              	at org.springframework.batch.item.adapter.ItemWriterAdapter.write(ItemWriterAdapter.java:35)
              	at org.springframework.batch.item.support.DelegatingItemWriter.write(DelegatingItemWriter.java:45)
              	at org.springframework.batch.core.step.item.BatchListenerFactoryHelper$2.write(BatchListenerFactoryHelper.java:99)
              	at org.springframework.batch.core.step.item.SimpleItemHandler.doWrite(SimpleItemHandler.java:105)
              	at org.springframework.batch.core.step.item.SkipLimitStepFactoryBean$StatefulRetryItemHandler$1.doWithRetry(SkipLimitStepFactoryBean.java:420)
              	at org.springframework.batch.retry.callback.RecoveryRetryCallback.doWithRetry(RecoveryRetryCallback.java:109)
              	at org.springframework.batch.retry.support.RetryTemplate.execute(RetryTemplate.java:168)
              	at org.springframework.batch.core.step.item.SkipLimitStepFactoryBean$StatefulRetryItemHandler.write(SkipLimitStepFactoryBean.java:436)
              	at org.springframework.batch.core.step.item.SimpleItemHandler.handle(SimpleItemHandler.java:71)
              	at org.springframework.batch.core.step.item.ItemOrientedStep$2.doInIteration(ItemOrientedStep.java:367)
              	at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:346)
              	at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:212)
              	at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
              	at org.springframework.batch.core.step.item.ItemOrientedStep.processChunk(ItemOrientedStep.java:360)
              	at org.springframework.batch.core.step.item.ItemOrientedStep$1.doInIteration(ItemOrientedStep.java:248)
              	at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:215)
              	at java.lang.Thread.run(Thread.java:595)
              Caused by: java.lang.reflect.InvocationTargetException
              	at sun.reflect.GeneratedMethodAccessor274.invoke(Unknown Source)
              	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
              	at java.lang.reflect.Method.invoke(Method.java:585)
              	at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:276)
              	at org.springframework.batch.item.adapter.AbstractMethodInvokingDelegator.doInvoke(AbstractMethodInvokingDelegator.java:105)
              	... 18 more
              Caused by: java.lang.Exception: null found on DB when search contract (link.Numctr) 335031212525
              	at aaaaaaa.aaa.core.business.services.impl.ClientContratServiceImpl.saveLink(ClientContratServiceImpl.java:818)
              	at sun.reflect.GeneratedMethodAccessor275.invoke(Unknown Source)
              	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
              	at java.lang.reflect.Method.invoke(Method.java:585)
              	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:310)
              	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
              	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
              	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:106)
              	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
              	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
              	at $Proxy28.saveLink(Unknown Source)
              	... 23 more
              96062  [           main] ERROR - 2009-02-04 16:26:28,209 - CustomCommandLineJobRunner.start(94) | Job Terminated in error:
              org.springframework.batch.item.adapter.DynamicMethodInvocationException: java.lang.reflect.InvocationTargetException
              	at org.springframework.batch.item.adapter.AbstractMethodInvokingDelegator.doInvoke(AbstractMethodInvokingDelegator.java:108)
              	at org.springframework.batch.item.adapter.AbstractMethodInvokingDelegator.invokeDelegateMethodWithArgument(AbstractMethodInvokingDelegator.java:62)
              	at org.springframework.batch.item.adapter.ItemWriterAdapter.write(ItemWriterAdapter.java:35)
              	at org.springframework.batch.item.support.DelegatingItemWriter.write(DelegatingItemWriter.java:45)
              	at org.springframework.batch.core.step.item.BatchListenerFactoryHelper$2.write(BatchListenerFactoryHelper.java:99)
              	at org.springframework.batch.core.step.item.SimpleItemHandler.doWrite(SimpleItemHandler.java:105)
              	at org.springframework.batch.core.step.item.SkipLimitStepFactoryBean$StatefulRetryItemHandler$1.doWithRetry(SkipLimitStepFactoryBean.java:420)
              	at org.springframework.batch.retry.callback.RecoveryRetryCallback.doWithRetry(RecoveryRetryCallback.java:109)
              	at org.springframework.batch.retry.support.RetryTemplate.execute(RetryTemplate.java:168)
              	at org.springframework.batch.core.step.item.SkipLimitStepFactoryBean$StatefulRetryItemHandler.write(SkipLimitStepFactoryBean.java:436)
              	at org.springframework.batch.core.step.item.SimpleItemHandler.handle(SimpleItemHandler.java:71)
              	at org.springframework.batch.core.step.item.ItemOrientedStep$2.doInIteration(ItemOrientedStep.java:367)
              	at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:346)
              	at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:212)
              	at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
              	at org.springframework.batch.core.step.item.ItemOrientedStep.processChunk(ItemOrientedStep.java:360)
              	at org.springframework.batch.core.step.item.ItemOrientedStep$1.doInIteration(ItemOrientedStep.java:248)
              	at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:215)
              	at java.lang.Thread.run(Thread.java:595)
              Caused by: java.lang.reflect.InvocationTargetException
              	at sun.reflect.GeneratedMethodAccessor274.invoke(Unknown Source)
              	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
              	at java.lang.reflect.Method.invoke(Method.java:585)
              	at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:276)
              	at org.springframework.batch.item.adapter.AbstractMethodInvokingDelegator.doInvoke(AbstractMethodInvokingDelegator.java:105)
              	... 18 more
              Caused by: java.lang.Exception: null found on DB when search contract (link.Numctr) 335031212525
              	at aaaaaa.aaa.core.business.services.impl.ClientContratServiceImpl.saveLink(ClientContratServiceImpl.java:818)
              	at sun.reflect.GeneratedMethodAccessor275.invoke(Unknown Source)
              	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
              	at java.lang.reflect.Method.invoke(Method.java:585)
              	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:310)
              	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
              	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
              	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:106)
              	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
              	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
              	at $Proxy28.saveLink(Unknown Source)
              	... 23 more

              Comment


              • #8
                and this is my StagingItemReader:


                Code:
                /**
                 * Thread-safe database {@link ItemReader} implementing the process indicator
                 * pattern.
                 */
                public class StagingItemReader extends JdbcDaoSupport implements ItemStream, ItemReader, StepExecutionListener {
                
                	// Key for buffer in transaction synchronization manager
                	private static final String BUFFER_KEY = StagingItemReader.class.getName() + ".BUFFER";
                
                	private static Log logger = LogFactory.getLog(StagingItemReader.class);
                
                	private StepExecution stepExecution;
                
                	private LobHandler lobHandler = new DefaultLobHandler();
                
                	private Object lock = new Object();
                
                	private volatile boolean initialized = false;
                
                	private volatile Iterator keys;
                	
                	private String itemType;
                
                	/**
                	 * Public setter for the {@link LobHandler}.
                	 * 
                	 * @param lobHandler the {@link LobHandler} to set (defaults to
                	 * {@link DefaultLobHandler}).
                	 */
                	public void setLobHandler(LobHandler lobHandler) {
                		this.lobHandler = lobHandler;
                	}
                
                	/**
                	 * 
                	 * @see org.springframework.batch.item.database.DrivingQueryItemReader#close(ExecutionContext)
                	 */
                	public void close(ExecutionContext executionContext) {
                		initialized = false;
                		keys = null;
                		if (TransactionSynchronizationManager.hasResource(BUFFER_KEY)) {
                			TransactionSynchronizationManager.unbindResource(BUFFER_KEY);
                		}
                	}
                
                	/**
                	 * 
                	 * @see org.springframework.batch.item.database.DrivingQueryItemReader#open(ExecutionContext)
                	 */
                	public void open(ExecutionContext executionContext) {
                		// Can be called from multiple threads because of lazy initialisation...
                		synchronized (lock) {
                			if (keys == null) {
                				keys = retrieveKeys().iterator();
                				logger.info("Keys obtained for staging.");
                				initialized = true;
                			}
                		}
                	}
                
                	private List retrieveKeys() {
                
                		synchronized (lock) {
                
                			return getJdbcTemplate().query(
                
                			"SELECT ID FROM BATCH_STAGING WHERE JOB_ID=? AND PROCESSED=? AND ITEM_TYPE=? ORDER BY ID",
                
                			new Object[] { stepExecution.getJobExecution().getJobId(), StagingItemWriter.NEW, itemType },
                
                			new RowMapper() {
                				public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
                					return new Long(rs.getLong(1));
                				}
                			}
                
                			);
                
                		}
                
                	}
                
                	public Object read() throws Exception {
                		Long id = doRead();
                
                		if (id == null) {
                			return null;
                		}
                		Object result = getJdbcTemplate().queryForObject("SELECT VALUE FROM BATCH_STAGING WHERE ID=?",
                				new Object[] { id }, new RowMapper() {
                					public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
                						byte[] blob = lobHandler.getBlobAsBytes(rs, 1);
                						return SerializationUtils.deserialize(blob);
                					}
                				});
                		// Update now - changes will rollback if there is a problem later.
                		int count = getJdbcTemplate().update("UPDATE BATCH_STAGING SET PROCESSED=? WHERE ID=? AND PROCESSED=?",
                				new Object[] { StagingItemWriter.DONE, id, StagingItemWriter.NEW });
                		if (count != 1) {
                			throw new OptimisticLockingFailureException("The staging record with ID=" + id
                					+ " was updated concurrently when trying to mark as complete (updated " + count + " records.");
                		}
                		return result;
                	}
                
                	private Long doRead() {
                		if (!initialized) {
                			throw new ReaderNotOpenException("ItemStream must be open before it can be read.");
                		}
                
                		Long key = getBuffer().next();
                		if (key == null) {
                			synchronized (lock) {
                				if (keys.hasNext()) {
                					Assert.state(TransactionSynchronizationManager.isActualTransactionActive(),
                							"Transaction not active for this thread.");
                					Long next = (Long) keys.next();
                					getBuffer().add(next);
                					key = next;
                					logger.debug("Retrieved key from list: " + key);
                				}
                			}
                		}
                		else {
                			logger.debug("Retrieved key from buffer: " + key);
                		}
                		return key;
                
                	}
                
                	private StagingBuffer getBuffer() {
                		if (!TransactionSynchronizationManager.hasResource(BUFFER_KEY)) {
                			TransactionSynchronizationManager.bindResource(BUFFER_KEY, new StagingBuffer());
                		}
                		return (StagingBuffer) TransactionSynchronizationManager.getResource(BUFFER_KEY);
                	}
                
                	public boolean recover(Object data, Throwable cause) {
                		return false;
                	}
                
                	private static class StagingBuffer {
                
                		private List list = new ArrayList();
                
                		private Iterator iter = new ArrayList().iterator();
                
                		public Long next() {
                			if (iter.hasNext()) {
                				return (Long) iter.next();
                			}
                			return null;
                		}
                
                		public void add(Long next) {
                			list.add(next);
                		}
                
                		public void rollback() {
                			logger.debug("Resetting buffer on rollback: " + list);
                			iter = new ArrayList(list).iterator();
                //			list.clear();
                //			iter = new ArrayList().iterator();
                		}
                
                		public void commit() {
                			logger.debug("Clearing buffer on commit: " + list);
                			list.clear();
                			iter = new ArrayList().iterator();
                		}
                
                		public String toString() {
                			return "list=" + list + "; iter.hasNext()=" + iter.hasNext();
                		}
                	}
                
                	/**
                	 * Mark is supported in a multi- as well as a single-threaded environment.
                	 * The state backing the mark is a buffer, and access is synchronized, so
                	 * multiple threads can be accommodated. Buffers are stored as transaction
                	 * resources (using
                	 * {@link TransactionSynchronizationManager#bindResource(Object, Object)}),
                	 * so they are thread bound.
                	 * 
                	 * @see org.springframework.batch.item.ItemReader#mark()
                	 */
                	public void mark() {
                		getBuffer().commit();
                	}
                
                	/*
                	 * (non-Javadoc)
                	 * 
                	 * @see
                	 * org.springframework.batch.item.ItemStream#reset(org.springframework.batch
                	 * .item.ExecutionContext)
                	 */
                	public void reset() {
                		getBuffer().rollback();
                	}
                
                	/*
                	 * (non-Javadoc)
                	 * 
                	 * @see
                	 * org.springframework.batch.item.ExecutionContextProvider#getExecutionContext
                	 * ()
                	 */
                	public void update(ExecutionContext executionContext) {
                	}
                
                	/*
                	 * (non-Javadoc)
                	 * 
                	 * @see
                	 * org.springframework.batch.core.domain.StepListener#afterStep(StepExecution
                	 * )
                	 */
                	public ExitStatus afterStep(StepExecution stepExecution) {
                		return null;
                	}
                
                	/*
                	 * (non-Javadoc)
                	 * 
                	 * @seeorg.springframework.batch.core.domain.StepListener#beforeStep(org.
                	 * springframework.batch.core.domain.StepExecution)
                	 */
                	public void beforeStep(StepExecution stepExecution) {
                		this.stepExecution = stepExecution;
                	}
                
                	/*
                	 * (non-Javadoc)
                	 * 
                	 * @see
                	 * org.springframework.batch.core.domain.StepListener#onErrorInStep(java
                	 * .lang.Throwable)
                	 */
                	public ExitStatus onErrorInStep(StepExecution stepExecution, Throwable e) {
                		return null;
                	}
                
                	public String getItemType() {
                	    return itemType;
                	}
                
                	public void setItemType(String itemType) {
                	    this.itemType = itemType;
                	}
                
                }

                Comment

                Working...
                X