Announcement Announcement Module
Collapse
No announcement yet.
Manual chunk commit Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Manual chunk commit

    Hello,

    I'm using spring batch for data extraction tasks, in the most of cases, from XML to DB. I have one situation that i need to control the commit interval manually.

    Example: The application reads and process some registers provided from XML in the commit interval (chunk). The next register (in the same chunk yet) depends on the information readed previously. When this situation occurs, I need to write/commit the chunk before read the next one, which depends on the information commited on DB. Altought, if the information comes in different chunks, no problem, it's OK!

    I can intercept the information through StepListener and identify if depends or not of some previous register, but I don't know how I should commit the chunk manually.

    May you help me???

    Thanks from Brazil

  • #2
    The <chunk/> element in the step configuration allows you to inject a chunk-completion-policy that can be used to control the commit. I didn't understand your use case 100%, but I'd be surprised if you can't do something with a completion policy and a listener of some type (or something that implements both). Another common use case is forcing a commit after a timeout or time window.

    Comment


    • #3
      I would also need to control the commit-interval dependent on data in a peeked item.

      Is there any concept of a PeekableItemListener?
      How would it be possible to access the reader that is configured on the chunk element from a CompletionPolicy?
      Last edited by joustava; Sep 6th, 2010, 02:03 PM.

      Comment


      • #4
        Hi, thanks for reply.

        I will try to explain my doubt. I need to control the chunks commits depending on the input, before process and after read. I will study the Completion Policy to know if it helps me or not. Do you have some use example of Completion Policy and manual commit interval control?

        Thanks again!

        Fabiano

        Comment


        • #5
          There is a PeekableItemReader (and one implementation) in the framework. You even guessed the name right, so we must be doing something right.

          There are no framework samples (except unit tests in core) for chunk completion policy, but you can find a sample here from a user: http://github.com/magott/magott-springbatch-poc.

          In your case I would say you need to write a reader that pulls together a PeekableItemReader and a CompletionPolicy (maybe implements both interfaces, maybe just injects them and changes their state). Because of the state, there may be restrictions on multi-threaded use, unless you take special steps.

          Comment


          • #6
            Thanks for the hints, they made me implement something like this:

            Code:
            /**
             * Custom reader that needs to be configured BOTH as reader AND as
             * chunk-completion-policy to be able to update its internal state.
             */
            public class RecordReader extends SingleItemPeekableItemReader<Scheduling> implements CompletionPolicy {
            
            	private Record current;
            
            	@Override
            	public boolean isComplete(RepeatContext context) {
            		return ((ReaderRepeatContext) context).isComplete();
            	}
            
            	@Override
            	public boolean isComplete(RepeatContext context, RepeatStatus result) {
            		return ((ReaderRepeatContext) context).isComplete();
            	}
            
            	@Override
            	public RepeatContext start(RepeatContext parent) {
            		/*
            		 * Set first item of the chunk for later comparison.
            		 */
            		this.current = invokePeek();
            		return new ReaderRepeatContext(parent);
            	}
            
            	@Override
            	public void update(RepeatContext context) {
            		/*
            		 * Check if the step should finish.
            		 * In this case when there are no more records to process.
            		 */
            		if (current == null) {
            			context.setCompleteOnly();
            		}
            	}
            
            	private Scheduling invokePeek() {
            		Record peeked = null;
            		try {
            			peeked = peek();
            		} catch (Exception e) {
            			e.printStackTrace();
            		}
            		return peeked;
            	}
                   
                    //custom RepeatContext
            	protected class ReaderRepeatContext extends RepeatContextSupport {
            
            		public ReaderRepeatContext(RepeatContext parent) {
            			super(parent);
            		}
            
            		public boolean isComplete() {
            			Record next = null;
            			next = invokePeek();
            
            			if (next == null || next.getDate().after(current.getDate())) {
            				current = next;
            				return true;
            			}
            			return false;
            		}
            
            	}
            
            }
            And the step is configured something like this:

            Code:
            <step id="chunkStep">
            	<tasklet transaction-manager="jobRepository-transactionManager">
            	<chunk reader="peekableReader" 
                                processor="scheduleCopyProcessor"
            		    writer="scheduleCopyWriter" 
                                chunk-completion-policy="peekableReader"/>		
            	</tasklet>
            </step>
            
            <bean id="peekableReader" scope="singleton" class="RecordReader" >
            	<property name="delegate" ref="schedulingReader" />
            </bean>
            This seems to work for my use-case, in which records in a file have a date field and I want to commit records with the same date in one chunk, so a commit interval per new encountered date.

            Thanks for the hint, and always open for suggestions.
            Last edited by joustava; Sep 9th, 2010, 09:38 AM.

            Comment


            • #7
              Guys, the proposed solution works fine! I really liked that!

              In my case, I had to implement the fixed commit interval too, but it worked well!

              Thanks!!!

              Comment


              • #8
                I have a similar issue to this one, but I'm unsure if this meets my requirements. Does a chunk completion policy end the chunk? All I want to do is commit what's in the write buffer manually, then continue processing the chunk. How would I go about that?

                Comment


                • #9
                  A completion policy signals the end of the chunk (i.e. no more items will be taken from the reader in this transaction). Can you describe your use case in a bit more detail because I'm surprised to hear that you want to manipulate the transaction from inside your business logic?

                  Comment


                  • #10
                    We are parsing a log file from a piece of hardware. It has a line that starts a group within the database, then we load the corresponding data into related tables as we go. Sometimes, we'll get a power interrupt, which basically means that a new group is starting immediately on the line after, and we have to mark all previous groups that didn't complete successfully as invalid. The problem that we're running in to is that sometimes these invalid groups only exist in the chunk write buffer, so we can't update them. We want to flush the write buffer, read any groups that have a certain status from the database and update them to a new status, and then continue on from the next line.

                    How would you implement something like that?

                    Comment


                    • #11
                      Could you peek ahead and try to detect the invalid group (as per the original post in this thread), then force a commit using a completion policy and deal with marking incomplete groups in a listener, or possible the writer. Then the next chunk will start with the peeked item (which you know is part of the next group)?

                      Or you could do the group validation as a completely separate step?

                      Comment


                      • #12
                        Originally posted by Dave Syer View Post
                        Could you peek ahead and try to detect the invalid group (as per the original post in this thread), then force a commit using a completion policy and deal with marking incomplete groups in a listener, or possible the writer. Then the next chunk will start with the peeked item (which you know is part of the next group)?

                        Or you could do the group validation as a completely separate step?
                        The group validation as a separate step is an interesting idea that we hadn't thought of. That might work, although right now we're working on a hack to get it working before we go through and completely rewrite the batch job for a new log format.

                        Thanks for your help.

                        Comment

                        Working...
                        X