Announcement Announcement Module
Collapse
No announcement yet.
Remote Partitioning Example and questions Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Remote Partitioning Example and questions

    I am new to Spring Integration, and I try to make an Remote-Paritioning POC with JMS, base on the VanillaIntegrationTest of the spring-batch-integration unit test. Here is my resulting config, can anyone tell me if I am on the right track?
    (Spring Batch 2.1.6, Spring Batch Integration 1.2.0, Spring Integration 2.0.3)
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
       (bunch of xml namespaces deleted)
    >
        <!-- Active-MQ JMS queues partitionRequestsQueue, partitionStagingQueue and partitionRepliesQueue defined in another file -->
        <!-- jobLauncher defined in another file too, using default sync executor -->
    
        <batch:job id="integrationPartitionJob" >
            <batch:step id="integrationPartitionStep-master" >
                <batch:partition step="integrationPartitionStep1" partitioner="integrationPartitioner" handler="partitionHandler" />
            </batch:step>
        </batch:job>
    
        <bean id="integrationPartitioner" class="my.SamplePartitioner" scope="step"/>
    
        <batch:step id="integrationPartitionStep1" >
            <batch:tasklet>
                <bean id="integrationPartitionTasklet" class="my.PartitionTasklet"
                        scope="step">
                    <property name="partitionedValue" value="#{stepExecutionContext[partitionedValue]}"/>
                </bean>
            </batch:tasklet>
        </batch:step>
    
        <!--  Partition Handler: send the partitions to requests channel -->
        <bean id="partitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler" > <!-- scope="step" -->
            <property name="messagingOperations" ref="outboundRequestMessagingTemplate" />
            <property name="replyChannel" ref="inboundPartitionReplies" />
            <property name="stepName" value="integrationPartitionStep1" />
            <property name="gridSize" value="2" />
        </bean>
    
    
        <!-- Outbound Request channel, and bind the channel to JMS partitionRequestsQueue -->
        <integration:channel id="outboundPartitionRequests" />
        <bean id="outboundRequestMessagingTemplate" class="org.springframework.integration.core.MessagingTemplate">
            <property name="defaultChannel" ref="outboundPartitionRequests" />
            <property name="receiveTimeout" value="1000" />
        </bean>
        <int-jms:outbound-channel-adapter connection-factory="batch.jmsFactory"  destination="partitionRequestsQueue" channel="outboundPartitionRequests" />
    
    
        <!-- Inbound Request Channel, from JMS partitionRequestsQueue, handle by stepExecutionRequestHandler, output to outboundStaging -->
        <integration:channel id="inboundPartitionRequests" />
        <int-jms:message-driven-channel-adapter id="reqQueueAdapter" connection-factory="batch.jmsFactory" destination="partitionRequestsQueue" channel="inboundPartitionRequests"/>
    
    
        <bean id="partitionStepExecutionRequestHandler" class="org.springframework.batch.integration.partition.StepExecutionRequestHandler"
            p:jobExplorer-ref="jobExplorer" p:stepLocator-ref="stepLocator"/>
    
    
        <integration:service-activator ref="partitionStepExecutionRequestHandler" input-channel="inboundPartitionRequests" output-channel="outboundPartitionStaging">
        </integration:service-activator>
    
    
        <!-- Outbound Staging channel, and bind the channel to JMS partitionStagingQueue -->
        <integration:channel id="outboundPartitionStaging" />
        <bean id="outboundStagingMessagingTemplate" class="org.springframework.integration.core.MessagingTemplate">
            <property name="defaultChannel" ref="outboundPartitionStaging" />
            <property name="receiveTimeout" value="1000" />
        </bean>
        <int-jms:outbound-channel-adapter connection-factory="batch.jmsFactory" destination="partitionStagingQueue" channel="outboundPartitionStaging" />
    
        <!-- Inbound Staging Channel, from JMS partitionStagingQueue, handle by partitionHandler (as aggregator), output to outboundPartitionReplies -->
        <integration:channel id="inboundPartitionStaging" />
        <int-jms:message-driven-channel-adapter id="stagingQueueAdapter" connection-factory="batch.jmsFactory" destination="partitionStagingQueue" channel="inboundPartitionStaging"/>
    
        <integration:aggregator ref="partitionHandler" timeout="10000" input-channel="inboundPartitionStaging" output-channel="outboundPartitionReplies" >
        </integration:aggregator>
    
    
        <!-- Outbound Replies channel, and bind the channel to JMS partitionRepliesQueue -->
        <integration:channel id="outboundPartitionReplies" />
        <bean id="outboundRepliesMessagingTemplate" class="org.springframework.integration.core.MessagingTemplate">
            <property name="defaultChannel" ref="outboundPartitionReplies" />
            <property name="receiveTimeout" value="1000" />
        </bean>
        <int-jms:outbound-channel-adapter connection-factory="batch.jmsFactory" destination="partitionRepliesQueue" channel="outboundPartitionReplies" />
    
    
        <!-- Inbound Replies Channel, from JMS partitionRepliesQueue -->
        <integration:channel id="inboundPartitionReplies">
            <integration:queue/>
        </integration:channel>
        <int-jms:message-driven-channel-adapter id="repliesQueueAdapter" connection-factory="batch.jmsFactory" destination="partitionRepliesQueue" channel="inboundPartitionReplies"/>
    
    </beans>
    Basically I am breaking the original requests channel in orginal VanillaIntegrationTest (similar for staging and replies channel) into 1 outbound and 1 inbound channel (both are direct channel). Outbound one is connected to a JMS outbound-channel-adapter which send to the requests-JMS-Queue. Then a JMS message-driven-channel-adapter receive data from JMS and connects to the inbound channel.

    Is it what supposed to be done?

    Have made a unit test to run it, most of the time it is fine and jobLauncher return a COMPLETED jobExecution. However, it failed once a while, with NPE happened in MessageChannelPartitionHandler:
    Code:
    java.lang.NullPointerException
            at org.springframework.batch.integration.partition.MessageChannelPartitionHandler.handle(MessageChannelPartitionHandler.java:135)
            at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:104)
            at ....
    Is there anything wrong in my config causing this?
    =========================
    I have several extra questions which I feel a bit hard to find out, I wish someone can give me some guide to find the answer:

    1) Can the same set of channels (and hence, JMS queue) be shared across concurrent job executions which needs partitioning? It seems to me that my existing config, when 'receiving' from those queue, may get mixed data from other execution (for which may be from totally different Jobs). Is there some configs I have missed?

    2) I am a bit confused on how the partition be distributed remotely. For example, I am having a JobFactory that will create a separate Child App Ctx for each job launch which contains the job and step. Assume in server1, it somehow partitioned the work and put into the JMS queue. There is no job running in other servers, which means there is no channel connecting to the JMS queue, hence, no other server will pick up the partitioned work from JMS to work on. Therefore, does it mean that my approach of putting all the above partition-related artifact in same file be a wrong way?

    I hope I am not asking stupid question...

  • #2
    It looks like the NullPointerException comes when there is a timeout. It's not a very helpful error message so you could raise a JIRA against Spring Batch Admin for that.

    You are correct to assume that you will need a separate JMS Queue per Job per Step that is partitioned in this way. It's not very efficient, but the only alternative I can think of is to use a Message selector (which has other issues). JMS is not really well suited to this problem, and you don't need it to do remote partitioning, so I would question whether it might be easier to use RMI or HTTP, and not use the MessageChannelPartitionHandler at all. I haven't seen it used in real projects much, so it's not surprising you are having teething problems. I hope you persevere though, and we can make it work more smoothly for everyone.

    The remote workers will need to be listening to the channel that you are sending the requests on, so if your ApplicationContext is only created for the duration of a Job execution on the master, that isn't going to work. You need a different ApplicationContext or a different strategy for its lifecycle. I would be tempted to try and use the same context on all nodes, so that would mean it would have to be a long-running context. I can't think of any reason that wouldn't work.

    Comment


    • #3
      Cancel that middle point - you don't need a separate JMS destination per Step. You can use Spring Integration JMS gateways, or the JmsInvokingServiceExporter from Spring JMS, and either lets you use JmsReplyTo. Spring Integration also includes a message selector automatically if you specify a reply-destination in your outbound-gateway.

      I'm working on an example integration test so it should be on github in a couple of days. I'm still not sure it makes a huge amount of sense, but I have two clients I'm working with this week that want to do this, so maybe I'll figure out why and it will be worth it.

      Comment


      • #4
        we are working on remote partitioning and could you please share an example how to do spring batch partitioning on master and slave

        Comment

        Working...
        X