Announcement Announcement Module
Collapse
No announcement yet.
Partitioning using Spring Batch Integration JMS getting single threaded Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Partitioning using Spring Batch Integration JMS getting single threaded

    I'm trying to implement partitioning in a Spring Batch job using Spring Batch Integration. For the test scenario, I have it partitioned into 20 partitions. When I run the job, I see them getting processed sequentially, one partition after the next instead of getting processed in parallel.

    I have a feeling something is messed up my configuration around the Spring Integration queues/gateways. I've tried combing through my Spring Integration configuration specifically around the JMS Inbound Gateway and I can see that the concurrent-consumers and max-concurrent-consumers are both set. I'm not seeing anything stand out in the JMS Outbound Gateway configuration either.

    Any ideas as to what I could be missing? Any help would be greatly appreciated.

    <!-- Partition Handler -->
    <bean id="customer.test.partition.jms.handler" class="org.springframework.batch.integration.parti tion.MessageChannelPartitionHandler">
    <property name="messagingOperations">
    <bean class="org.springframework.integration.core.Messag ingTemplate">
    <property name="defaultChannel" ref="customer.test.partition.jms.requests"/>
    <property name="receiveTimeout" value="500000000"/>
    </bean>
    </property>
    <property name="stepName" value="customer.test.partition.jms.addressLineUpda teStep" />
    <property name="gridSize" value="20" />
    </bean>


    <!-- Master Configuration -->
    <int:channel id="customer.test.partition.jms.requests">
    <int:queue />
    </int:channel>
    <int:channel id="customer.test.partition.jms.reply"/>

    <task:executor id="customer.partition.jms.requestTaskExecutor" pool-size="20" />
    <int-jms:outbound-gateway connection-factory="springbatch.jmsConnectionFactory"
    request-channel="customer.test.partition.jms.requests"
    request-destination="springbatch.partition.jms.requestsQue ue"
    reply-channel="customer.test.partition.jms.reply"
    receive-timeout="500000000">
    <int-jms:reply-listener/>
    <intoller task-executor="customer.partition.jms.requestTaskExecut or" />
    </int-jms:outbound-gateway>
    <int:aggregator input-channel="customer.test.partition.jms.reply" ref="customer.test.partition.jms.handler"/>



    <!-- Slave Configuration -->
    <int:channel id="customer.test.partition.jms.worker"/>
    <int-jms:inbound-gateway request-channel="customer.test.partition.jms.worker"
    connection-factory="springbatch.jmsConnectionFactory"
    request-destination="springbatch.partition.jms.requestsQue ue"
    concurrent-consumers="20"
    max-concurrent-consumers="50"
    max-messages-per-task="1"/>

    <int:service-activator input-channel="customer.test.partition.jms.worker" ref="springbatch.stepExecutionRequestHandler"/>
    Last edited by sathya_sn; Aug 23rd, 2013, 06:15 PM.

  • #2
    2 things:
    1. Please use the code tags so we can better read your XML.
    2. Gateways are single threaded. You need to use an executor channel to get multithreaded working. You can take a look at my example on Github here: https://github.com/mminella/Spring-Batch-Talk-2.0

    Comment


    • Mani_Prabhu
      Mani_Prabhu commented
      Editing a comment
      Hi mminella . could you help me by spring remote partitioning example (sample with master and slave)

  • #3
    Michael, Thanks for the response... Changed the requests channel to be an executor channel and removed the poller from the outbound gateway. Now the requests channel is configured identical to your example. The outbound gateway configuration is different since I'm using JMS but looks very close to me. I haven't configured any of the channels as "int-jms" channels since I'm assuming the gateway will translate them to JMS queues.

    With this configuration it is still single threaded. Also, the messages don't even seem to be getting to the JMS destination (if I believe the MessageCount in JBoss JMX console). I guess I'm missing something. Anything stand out in the configuration below?

    Thanks again!

    Partition Handler configuration
    Code:
        <!-- Partition Handler -->    
        <bean id="customer.test.partition.jms.handler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
            <property name="messagingOperations">
                <bean class="org.springframework.integration.core.MessagingTemplate">
                    <property name="defaultChannel" ref="customer.test.partition.jms.requests"/>
                    <property name="receiveTimeout" value="500000000"/>
                </bean>
            </property>
            <property name="stepName" value="customer.test.partition.jms.addressLineUpdateStep" />        
            <property name="gridSize" value="20" />        
        </bean>
    Configuration for the Master
    Code:
    <!-- Master Configuration -->                
        <task:executor id="customer.partition.jms.requestQueueExecutor" pool-size="20" />    
        <int:channel id="customer.test.partition.jms.requests">
            <int:dispatcher task-executor="customer.partition.jms.requestQueueExecutor"/>    
        </int:channel>        
        <int:channel id="customer.test.partition.jms.reply"/>
                        
        <int-jms:outbound-gateway 
            connection-factory="springbatch.jmsConnectionFactory"
            request-channel="customer.test.partition.jms.requests"         
            request-destination="springbatch.partition.jms.requestsQueue"
            reply-channel="customer.test.partition.jms.reply"         
            receive-timeout="500000000">
            <int-jms:reply-listener/>
        </int-jms:outbound-gateway>
        <int:aggregator input-channel="customer.test.partition.jms.reply" ref="customer.test.partition.jms.handler"/>
    Slave Configuration
    Code:
        <!--     Slave Configuration -->
        <int:channel id="customer.test.partition.jms.worker"/>        
        <int-jms:inbound-gateway request-channel="customer.test.partition.jms.worker"
            connection-factory="springbatch.jmsConnectionFactory"
            request-destination="springbatch.partition.jms.requestsQueue"        
            concurrent-consumers="20"
            max-concurrent-consumers="50" 
            max-messages-per-task="1"/>
            
        <int:service-activator input-channel="customer.test.partition.jms.worker" ref="springbatch.stepExecutionRequestHandler"/>

    Comment


    • #4
      Bad partitioning logic

      This is a little embarrassing... Bad data combined with the way my partitioner works ended up allocating all the records to be processed to just one partition. This gave me the impression that all the partitions were getting single threaded where all but one of the partitions didn't really have anything to process and had completed.

      With the bad data cleaned up, things are getting allocated to all the partitions equally and are getting processed concurrently.

      Comment

      Working...
      X