Announcement Announcement Module
Collapse
No announcement yet.
help to convert ChunkStepIntegrationTests to JMS (multi process) Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • help to convert ChunkStepIntegrationTests to JMS (multi process)

    I am evaluating Spring Batch and I would like to showcase the following scenario.

    A job reads "something" (probably rows from a database) and create chunks of work. Each chunk is sent to a JMS queue. The listener of that JMS queue reads each chunk and processes it.

    Regarding chunk status, I would expect the handler to update its chunk. If the handler fails X times (rollbacks), the message is sent to a DLQ to avoid poison messages and this DLQ simply updates the chunk as an error.

    I had a look to StepIntegrationTests and it does mostly what I want (except the handling of the response is different, not sure it can be changed or why is it done that way).

    I have upgraded the config to use JMS endpoints instead but I have the following config problems:

    * There is no multi-threading, all chunks are processed in the thread that actually started the job
    * If I don't set a poller on my jms out queue I got the "No poller has been defined for endpoint jmsOut"

    What is wrong in my config?

    Code:
    <integration:annotation-config/>
        <integration:channel id="requests"/>
        <integration:channel id="replies">
            <integration:queue/>
        </integration:channel>
    
        <integration:poller max-messages-per-poll="1" id="defaultPoller"
                            default="true">
            <integration:interval-trigger interval="3000"/>
        </integration:poller>
    
        <bean id="messagingGateway" class="org.springframework.integration.gateway.SimpleMessagingGateway">
            <property name="requestChannel" ref="requests"/>
            <property name="replyChannel" ref="replies"/>
            <property name="replyTimeout" value="1000"/>
        </bean>
    
        <jms:message-driven-channel-adapter id="jmsIn" destination="batchChunkInQueue" channel="requests"/>
        <jms:outbound-channel-adapter id="jmsOut" destination="batchChunkOutQueue" channel="replies"/>
        <integration:service-activator input-channel="requests" output-channel="replies" ref="chunkHandler"/>
    where chunkHandler is the regular chunkHandler of the test and batchChunkInQueue and batchChunkOutQueue are regular activeMQ queues.

  • #2
    Originally posted by snicoll View Post
    Regarding chunk status, I would expect the handler to update its chunk. If the handler fails X times (rollbacks), the message is sent to a DLQ to avoid poison messages and this DLQ simply updates the chunk as an error.
    That's an interesting approach. I think we can implement that in the framework. Currently the chunk fails if the handler reports an error.

    * There is no multi-threading, all chunks are processed in the thread that actually started the job
    That's because you are still subscribing to the requests channel in your service-activator (so the processing is all local). N.B. with Spring Integration 2.0 you can use native jms:channel instances in place of the integration:channels. That would make it simpler, I think (with 1.0 you have to set up 4 channels - in/out for client/server).

    * If I don't set a poller on my jms out queue I got the "No poller has been defined for endpoint jmsOut"
    Maybe you could ask about that one on the Spring Integration forum?

    Comment


    • #3
      Done.

      http://forum.springsource.org/showth...916#post274916

      Comment


      • #4
        why not partitioning?

        I am new myself, but since you are hitting a database, you have some sort of understanding of your data, so why are you using "Chucks" over partitioning your data?

        Given your problem, I would think that you would have multiple jms queues and your partition SPI would setup an executioncontext for each of your jms queues and your data would be partitioned to be sent to one of your jms queues.

        Any thoughts on this?

        Comment


        • #5
          Agree broadly that partitioning can be easier for database inputs. Note though that JMS is not needed as a transport for partitioning: it doesn't hurt, but you don't need it either - any transport will do. If you did want to do it you could use the MessageChannelPartitionHandler from Spring Batch Integration (then you could test it locally as well).

          P.S. I moved Spring Batch Integration over to Spring Batch Admin as part of the 1.0.0.M2 release.

          Comment


          • #6
            Originally posted by rhodebump View Post
            I am new myself, but since you are hitting a database, you have some sort of understanding of your data, so why are you using "Chucks" over partitioning your data?

            Given your problem, I would think that you would have multiple jms queues and your partition SPI would setup an executioncontext for each of your jms queues and your data would be partitioned to be sent to one of your jms queues.

            Any thoughts on this?
            Right. Turned out that it is what we want to do, especially because we have the requirement to limit the number of resources handling a particular job (to avoid a situation where a particular job is executed by 50 concurrent threads). What we're trying to achieve now is the following:

            * Stage the data to process with one row per item (id, jobId, status, sequence number, link to the item to handle)
            * Partition the data using the sequence number where gridSize is the max number of concurrent threads executing the step
            * Send out one message per partition (could be a clustered JMS queue but could be anything else). Processing of the partition is done with a given commit interval where the item is processed and its status updated accordingly

            We still have to handle cases like restart, parallel steps (without duplicating the content of the staging table preferably)

            I agree that partitioning is much better than remote chunking in this case. That was part of the learning process!

            Comment

            Working...
            X