Announcement Announcement Module
Collapse
No announcement yet.
Spring Batch Integration Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Spring Batch Integration

    I have to following process:

    1. I read data from a file (10000 records).
    2. I optimize each record based upon a model that specifies constraints and an objective function.
    3. I write output data satisfying the constraints and maximizing the objective function to another file.

    Because 2. is very CPU intensive it is something that has to be executed in parallel over multiple threads or even using a grid.

    Because its a lot of data using flatfiles Spring Batch seemed to be an obvious choice. However because we want to optimize multiple record in parallel Spring Integration also was an obvious choice.
    During a Spring Training Iwein (Spring Integration) told me that there was such a thing as Spring Batch Integration, which was not commonly used but was something with a lot of potential.

    The feature that seems to apply to the problem is the Chunking using Spring Batch Integration. However I have read the code and the configuration files and it isn't clear to me how it works exactly.

    Can you use a itemReader to read records from the file, pass them to the multiple itemProcessors (using Spring Integration), collect the results of the multiple itemProcessors and write them to file using an itemWriter?

  • #2
    Originally posted by bassegio14 View Post
    Can you use a itemReader to read records from the file, pass them to the multiple itemProcessors (using Spring Integration), collect the results of the multiple itemProcessors and write them to file using an itemWriter?
    Thats not really the way that the ChunkMessageChannelItemWriter works - it is an ItemWriter so it is the end of the line for the items, and the actual physical writing of your output file would happen in the ChunkHandler (the remote listener).

    That might work in your case, or it might not, depending on how the remote workers were deployed and provisioned, and if they had access to the output file you want them to write.

    It might be better in your case to use a special ItemProcessor with generic type <?,Future<?>> (substitute your strongly typed input / output for the ?). And use it to send a message to a Spring Integration message channel for each item, and build a Future with the result. The ItemWriter would then just have to call Future.get() on each item before writing to the output file, probably by delegating to an existing file writer.

    We didn't include this pattern in Spring Batch Integration yet because it has only very weak transactional semantics (your items are not being processed in a transaction). Most people who use Spring Batch care deeply about this, but you obviously don't so it might be a good option. I might try and find some time to play with that pattern and commit something in the next week or so, bubt let us know if you make progress, or need more clarification.

    Spring Batch Integration is pretty much open-ended at the minute - just waiting for community feedback. So if you are reading this, please use it and make some comments or suggestions.

    Comment


    • #3
      It seems to be a good solution but I have a question.

      What do you mean with building a Future with the result?
      I presume the send will be asynchronous because otherwise there is no parallelism.
      So the result will has to be something like a correlation id, so that in the get method of the Future you call the receive method on the reply channel with the correlation id as a filter.
      Did you have something like this I mind or do you have another solution?

      Comment


      • #4
        "Something like that" is precisely correct (I did say I hadn't tried it yet).

        Comment


        • #5
          This works pretty well:

          Code:
          public Future<O> process(final I item) throws Exception {
          	FutureTask<O> task = new FutureTask<O>(new Callable<O>() {
          		@SuppressWarnings("unchecked")
          		public O call() throws Exception {
          			return (O) messagingGateway.sendAndReceive(item);
          		}
          	});
          	taskExecutor.execute(task);
          	return task;
          }
          Plenty of options to tewak there in the TaskExecutor and MessagingGateway, so the ItemProcessor is pretty thin. I'll play with it a bit before committing but it looks sound.

          N.B. with Spring 3.0 all you need to do it annotate your ItemProcessor as @Async (no need for the TaskExecutor).
          Last edited by Dave Syer; May 12th, 2009, 04:51 AM.

          Comment


          • #6
            Dave,

            What do you mean by "very weak transactional semantics"?
            And why aren't the items processed in a transaction?
            Is that because of the parallelism?
            If somethings goes wrong during the processing is there a major difference compared to treating the items one by one?

            Comment


            • #7
              Originally posted by bassegio14 View Post
              And why aren't the items processed in a transaction?
              Because a transaction is thread bound. If you start work in a new thread, by definition it is not included in the current transaction. That's what I mean by "very weak transactional semantics."

              If somethings goes wrong during the processing is there a major difference compared to treating the items one by one?
              Don't understand the question. What do you mean by "one by one"?

              Comment


              • #8
                Don't understand the question. What do you mean by "one by one"?
                With Spring Integration, there is no guarantee of FIFO.
                If you send message A before message B, the reply to B can arrive before to the reply to A.

                With "one by one", I mean if item A occurs in the input file before item B, the result of item A will occur in the output file before the result of item B.

                Consequently if something goes wrong during the treatement of item A it is not possible that the result of processing a item B has already been written to file.
                Is is however possible that the result of the treatement of the item before item A wasn't yet written to file because of the commit interval.

                Is this who it works?

                Comment


                • #9
                  Actually the model above is FIFO by virtue of the List<Future<?>>. The writer always writes the items in the order they were read, but the processing can happen out of sequence, since it is in a background thread.

                  Comment

                  Working...
                  X