Announcement Announcement Module
Collapse
No announcement yet.
aggregator and partitionHandler Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • aggregator and partitionHandler

    Hi guys,

    I'm working on a partitioned job POC using Spring Batch and Spring Integration and have hit a snag that I'm hoping that someone can help me help me with. While my issue is in the context of a Spring Batch job, I figured I would try posting it here since the real issue I'm having is with it's use of Spring Integration.

    When I run my job with the configuration below, the data is partitioned correctly, it's sent to the slaves fine and the slaves process each step successfully to completion. The issue I run into is that the job ends in a failure due to a NullPointerException in the MessageChannelPartitionHandler's handle method when it attempts to receive the replies. I know that this is due to a timeout. The issue I'm having is that it seems like none of the messages are getting past the aggregator part of the process. I can see via jconsole messages being put on and being pulled off of both the requestsQueue and the stagingQueue, however no messages make it to the repliesQueue.Any help is appreciated! I have seen this problem on google but i have not seen any answer ?

    job.xml
    Code:
    <beans:bean id="partitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
    		<beans:property name="stepName" value="step1"/>
    		<beans:property name="gridSize" value="2"/>
    		<beans:property name="messagingOperations">
    			<beans:bean class="org.springframework.integration.core.MessagingTemplate">
    				<beans:property name="defaultChannel" ref="outbound-requests"/>
    				<beans:property name="receiveTimeout" value="100000"/>
    			</beans:bean>
    		</beans:property>
    	</beans:bean>
    
    	<int:channel id="outbound-requests"/>
    	<int-jms:outbound-channel-adapter connection-factory="connectionFactory" destination="requestsQueue" channel="outbound-requests"/>
    
    	<int:channel id="inbound-requests"/>
    	<int-jms:message-driven-channel-adapter connection-factory="connectionFactory" destination="requestsQueue" channel="inbound-requests"/>
    
    
    
    	<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests" output-channel="outbound-staging">
    
        </int:service-activator>
    
    	<beans:bean id="stepExecutionRequestHandler" class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
    		<beans:property name="jobExplorer" ref="jobExplorer"/>
    		<beans:property name="stepLocator" ref="stepLocator"/>
    	</beans:bean>
    
    	<int:channel id="outbound-staging"/>
    	<int-jms:outbound-channel-adapter connection-factory="connectionFactory" destination="stagingQueue" channel="outbound-staging"/>
    
    	<int:channel id="inbound-staging"/>
    	<int-jms:message-driven-channel-adapter connection-factory="connectionFactory" destination="stagingQueue" channel="inbound-staging"/>
    	<int:channel id="channel2"></int:channel>
    	<int:aggregator ref="partitionHandler" input-channel="inbound-staging" output-channel="channel2" />
    	
    
    	<int-jms:outbound-channel-adapter connection-factory="connectionFactory" destination="repliesQueue" channel="channel2"/>
    
    	<int:channel id="inbound-replies">
    		<int:queue/>
    	</int:channel>
    	<int-jms:message-driven-channel-adapter connection-factory="connectionFactory" destination="repliesQueue" channel="inbound-replies"/>

  • #2
    Any help ? is this a bug or something to do with regards to correlation ID ?

    Comment


    • #3
      I suggest you run with debug logging and follow the messages through the system(s); it should become obvious. If you can't figure out what's happening, post the logs here (in a zip).

      Comment


      • #4
        thanks Gary ,

        I have modify the code a bit according to other post in forum .. here is breif description
        JVM1 MY controller ( calling 2 slaves)
        JVM2 AcitveMq runing jvm localhost
        JVM3 slave 1
        JVM4 slave 2

        When i run the code its only invoking the JVM3 twice and wont invoke my second slave ?
        Code:
        JVM 4 : log
        2012-06-27 15:42:41,590 INFO org.springframework.jms.listener.DefaultMessageListenerContainer#0-2 [org.springframework.jms.listener.DefaultMessageListenerContainer] - <Successfully refreshed JMS Connection>
        2012-06-27 15:43:01,569 DEBUG InactivityMonitor WriteCheck [org.apache.activemq.transport.InactivityMonitor] - <9992 ms elapsed since last write check.>
        2012-06-27 15:43:11,568 DEBUG InactivityMonitor WriteCheck [org.apache.activemq.transport.InactivityMonitor] - <9999 ms elapsed since last write check.>
        [JVM3 log]
        2012-06-27 15:42:36,977 DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer#0-3 [org.apache.activemq.transport.WireFormatNegotiator] - <Sending: WireFormatInfo { version=6, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}>
        2012-06-27 15:42:37,014 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <Received WireFormat: WireFormatInfo { version=6, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}>
        2012-06-27 15:42:37,014 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <tcp://localhost/127.0.0.1:61616 before negotiation: OpenWireFormat{version=6, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false}>
        2012-06-27 15:42:37,016 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=6, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false}>
        [Controller log]
        
        2012-06-27 15:43:24,627 DEBUG main [org.springframework.batch.integration.partition.MessageChannelPartitionHandler] - <Sending request: [Payload=StepExecutionRequest: [jobExecutionId=0, stepExecutionId=1, stepName=step1]][Headers={timestamp=1340808204627, id=cde843fd-0286-4d01-978b-6006f99e9b10, replyChannel=org.springframework.integration.channel.QueueChannel@4fac7f84, correlationId=0:step1, sequenceSize=2, sequenceNumber=0}]>
        2012-06-27 15:43:24,627 DEBUG main [org.springframework.batch.integration.partition.MessageChannelPartitionHandler] - <Sending request: [Payload=StepExecutionRequest: [jobExecutionId=0, stepExecutionId=2, stepName=step1]][Headers={timestamp=1340808204627, id=7247b7d9-fb37-4fcc-b3fa-ca8c46ddfd9a, replyChannel=org.springframework.integration.channel.QueueChannel@4fac7f84, correlationId=0:step1, sequenceSize=2, sequenceNumber=1}]>
        2012-06-27 15:43:24,629 DEBUG taskExecutor-1 [org.apache.activemq.transport.WireFormatNegotiator] - <Sending: WireFormatInfo { version=6, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}>
        2012-06-27 15:43:24,632 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <Received WireFormat: WireFormatInfo { version=6, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}>
        2012-06-27 15:43:24,632 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <tcp://localhost/127.0.0.1:61616 before negotiation: OpenWireFormat{version=6, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false}>
        2012-06-27 15:43:24,634 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=6, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false}>
        2012-06-27 15:43:25,229 DEBUG taskExecutor-1 [org.apache.activemq.ActiveMQMessageConsumer] - <remove: ID:MW7EZEKHNX8CHR-56061-1340808204231-0:2:1:1, lastDeliveredSequenceId:0>
        2012-06-27 15:43:25,244 DEBUG taskExecutor-1 [org.apache.activemq.transport.tcp.TcpTransport] - <Stopping transport tcp://localhost/127.0.0.1:61616>
        2012-06-27 15:43:25,269 DEBUG taskExecutor-1 [org.apache.activemq.transport.WireFormatNegotiator] - <Sending: WireFormatInfo { version=6, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}>
        2012-06-27 15:43:25,274 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <Received WireFormat: WireFormatInfo { version=6, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}>
        2012-06-27 15:43:25,274 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <tcp://localhost/127.0.0.1:61616 before negotiation: OpenWireFormat{version=6, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false}>
        2012-06-27 15:43:25,274 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=6, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false}>
        2012-06-27 15:43:25,754 DEBUG taskExecutor-1 [org.apache.activemq.ActiveMQMessageConsumer] - <remove: ID:MW7EZEKHNX8CHR-56061-1340808204231-0:3:1:1, lastDeliveredSequenceId:0>
        2012-06-27 15:43:25,769 DEBUG taskExecutor-1 [org.apache.activemq.transport.tcp.TcpTransport] - <Stopping transport tcp://localhost/127.0.0.1:61616>
        2012-06-27 15:43:25,792 DEBUG main [org.springframework.batch.integration.partition.MessageChannelPartitionHandler] - <Received replies: [Payload=[StepExecution: id=1, version=9, name=step1.master:partition1, status=COMPLETED, exitStatus=COMPLETED, readCount=6, filterCount=0, writeCount=6 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=7, rollbackCount=0, exitDescription=, StepExecution: id=2, version=10, name=step1.master:partition0, status=COMPLETED, exit
        Status=COMPLETED, readCount=7, filterCount=0, writeCount=7 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=8, rollbackCount=0, exitDescription=]][Headers={timestamp=1340808205792, id=821af29b-b3a8-4aff-9d7f-0111a075abf4, correlationId=0:step1, replyChannel=org.springframework.integration.channel.QueueChannel@4fac7f84}]>
        2012-06-27 15:43:25,799 DEBUG main [org.springframework.batch.core.step.AbstractStep] - <Step execution success: id=0>
        2012-06-27 15:43:25,809 DEBUG main [org.springframework.batch.core.step.AbstractStep] - <Step execution complete: StepExecution: id=0, version=2, name=step1.master, status=COMPLETED, exitStatus=COMPLETED, readCount=13, filterCount=0, writeCount=13 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=15, rollbackCount=0>
        2012-06-27 15:43:25,814 DEBUG main [org.springframework.batch.core.job.flow.support.SimpleFlow] - <Completed state=geocodingJob.step1.master with status=COMPLETED>
        my code 
        <beans:bean id="partitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
        		<beans:property name="stepName" value="step1"/>
        		<beans:property name="gridSize" value="2"/>
        		<beans:property name="messagingOperations">
        			<beans:bean class="org.springframework.integration.core.MessagingTemplate">
        				<beans:property name="defaultChannel" ref="requests"/>
        				<beans:property name="receiveTimeout" value="999999"/>
        			</beans:bean>
        		</beans:property>
        	</beans:bean>
        	
        
        
        	<task:executor id="taskExecutor"  />
        
        	<int:channel id="requests">
        		<int:queue  />
        	</int:channel>
        
        <int:channel id="staging" />
        
        
        	<int:service-activator ref="stepExecutionRequestHandler" input-channel="worker" />
        
            <int:aggregator ref="partitionHandler" input-channel="staging" />
        
        
        <int-jms:outbound-gateway request-channel="requests" request-destination-name="bos.job.slave.requests" reply-channel="staging" receive-timeout="30000">
        		<int:poller fixed-rate="200" task-executor="taskExecutor" />
        	</int-jms:outbound-gateway>
        
        <int-jms:inbound-gateway request-channel="worker" request-destination-name="bos.job.slave.requests"  concurrent-consumers="3" />
        
        
        <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" >
        		<property name="brokerURL" value="tcp://localhost:61616"/>
        	</bean>
        
              
          
        	

        Comment


        • #5
          Given that you have concurrent consumers=3, it's perfectly possible for the two partitions to land on just one of the slaves; set it to 1, or use at least 4 partitions to ensure both slaves get some work.

          Comment


          • #6
            Thanks Gary ! it worked

            Comment

            Working...
            X