Announcement Announcement Module
Collapse
No announcement yet.
Remote chunking - slave nodes idle! Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Remote chunking - slave nodes idle!

    Hi,

    I'm using Remote Chunking strategy. Imagine that I have 150.000 records at the database. I've defined both pageSize and commit-interval to 1000. I've started one master node and 9 slave nodes. For my surprise, only 3 slave nodes are processing items while the others are idle, just listening to the queue but doing nothing. If I stop one of these working slaves one slave that is idle starts to process items, and so on. It seems to me that the other slaves only start to work after the complete execution of a previous slave's work (after processing 1000 records, as defined by commit-interval).

    Well, when I reduce commit-interval to a lower value, say 100, then all the slaves work togheter. Why does it happen? I can't undestand this behaviour. I really want all slaves working together as fast as they can, reading and processing 1k or more records.

    This is my configuration:

    masterBatchContext.xml

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
             ...
    	<!-- Spring Batch Job Registry -->
    	<bean id="jobRegistry"
    		class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
    
    	<!-- Spring Batch Job Launcher -->
    	<bean id="jobLauncher"
    		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    		<property name="jobRepository" ref="jobRepository" />
    	</bean>
    
    	<!-- Spring Batch Job Repository -->
    	<bean id="jobRepository"
    		class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"
    		p:dataSource-ref="subscriptionDS" p:transactionManager-ref="transactionManager" />
    
    	<!-- Spring Batch Job Registry Processor -->
    	<bean
    		class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
    		<property name="jobRegistry" ref="jobRegistry" />
    	</bean>
    
    	<!-- Spring Batch Subscriptions Reader -->
    	<bean id="subscriptionPageReader"
    		class="org.springframework.batch.item.database.JdbcPagingItemReader"
    		scope="step">
    		<property name="dataSource" ref="subscriptionDS" />
    		<property name="queryProvider">
    			<bean
    				class="org.springframework.batch.item.database.support.SqlServerPagingQueryProvider">
    				<property name="selectClause" value="..." />
    				<property name="fromClause" value="..." />
    				<property name="whereClause" value="..." />
    				<property name="sortKey" value="id" />
    			</bean>
    		</property>
    		<property name="pageSize" value="1000" />
    		<property name="rowMapper" ref="subscriptionRowMapper" />
    	</bean>
    
    	<bean id="subscriptionRowMapper"
    		class="com.m4u.subscription.core.batch.reader.SubscriptionRowMapper" />
    
    	<!-- Spring Batch Subscriptions Processor -->
    	<bean id="subscriptionProcessor"
            class="com.m4u.subscription.core.batch.processor.SubscriptionProcessor">
            <property name="client" ref="subscriptionClient"/>
        </bean>
    
    	<!-- Spring Batch Subscriptions Writer -->
    	<bean id="subscriptionWriter"
    		class="com.m4u.subscription.core.batch.writer.SubscriptionWriter" />
    
    	<!-- Spring Batch Job [Subscription Renew Job] -->
    	<job id="sBatchRenewJob" xmlns="http://www.springframework.org/schema/batch">
    		<step id="subscriptionsLoad">
    			<tasklet>
    				<chunk reader="subscriptionPageReader" processor="subscriptionProcessor"
    					writer="subscriptionWriter" commit-interval="1000" />
    			</tasklet>
    		</step>
    	</job>
    
    	<!-- Remote Chuncking Config -->
    	<bean id="chunkHandler"
    		class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
    		<property name="chunkWriter" ref="chunkWriter" />
    		<property name="step" ref="subscriptionsLoad" />
    	</bean>
    
    	<bean id="chunkWriter"
    		class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    		scope="step">
    		<property name="messagingOperations" ref="messagingGateway" />
    		<property name="replyChannel" ref="subscriptionReplies" />
    		<property name="maxWaitTimeouts" value="1000" />
    	</bean>
    </beans>
    masterIntegrationContext.xml (JMS config for master node)

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
             ...
    	<import resource="classpath:/jms-context.xml" />
    
    	<bean id="messagingGateway" class="org.springframework.integration.core.MessagingTemplate">
    		<property name="defaultChannel" ref="subscriptionRequests" />
    		<property name="receiveTimeout" value="1000" />
    	</bean>
    
    	<int:channel id="subscriptionRequests" />
    	<int:channel id="incoming" />
    
    	<int-jms:outbound-channel-adapter
    		channel="subscriptionRequests" connection-factory="connectionFactory"
    		destination-name="subscriptionRequests" />
    
    	<int:transformer input-channel="incoming"
    		output-channel="subscriptionReplies" ref="headerExtractor" method="extract" />
    
    	<bean id="headerExtractor"
    		class="org.springframework.batch.integration.chunk.JmsRedeliveredExtractor" />
    
    	<int:channel id="subscriptionReplies">
    		<!-- <int:queue message-store="messageStore" />-->
    		<int:queue />
    		<int:interceptors>
    			<bean id="pollerInterceptor"
    				class="org.springframework.batch.integration.chunk.MessageSourcePollerInterceptor">
    				<property name="messageSource">
    					<bean
    						class="org.springframework.integration.jms.JmsDestinationPollingSource">
    						<constructor-arg>
    							<ref bean="jmsTemplate" />
    						</constructor-arg>
    					</bean>
    				</property>
    				<property name="channel" ref="incoming" />
    			</bean>
    		</int:interceptors>
    	</int:channel>
    </beans>
    slaveBatchContext.xml

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
             ...
    	<import resource="classpath:/slaveIntegrationContext.xml" />
    
    	<bean id="chunkHandler"
    		class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
    		<property name="chunkProcessor">
    			<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
    				<property name="itemWriter" ref="writer" />
    				<property name="itemProcessor" ref="processor" />
    			</bean>
    		</property>
    	</bean>
    
    	<bean id="writer"
    		class="com.m4u.subscription.core.batch.writer.SubscriptionWriter" />
    
    	<bean id="processor"
    		class="com.m4u.subscription.core.batch.processor.SubscriptionProcessor">
    		<property name="client" ref="subscriptionClient"/>
    	</bean>
    
    </beans>
    slaveIntegrationContext.xml (JMS config for slave nodes)

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
            ...
    	<beans:import resource="classpath:/jms-context.xml" />
    
    	<channel id="subscriptionRequests">
    		<interceptors>
    			<wire-tap channel="logChannel" />
    		</interceptors>
    	</channel>
    
    	<channel id="subscriptionReplies">
    		<queue />
    		<interceptors>
    			<wire-tap channel="logChannel" />
    		</interceptors>
    	</channel>
    
    	<logging-channel-adapter auto-startup="true"
    		id="logChannel" level="DEBUG" />
    
    	<beans:bean id="transactionManager"
    		class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
    </beans:beans>
    jms-context.xml (JMS config that is common to both master and slave nodes)

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
         ...
        <context:property-placeholder location="classpath:subscriptionbatch.properties" ignore-unresolvable="true"/>
        
    	<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
    		destroy-method="stop">
    		<property name="connectionFactory">
    			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
    				<property name="brokerURL" value="failover:(${activemq.url})?randomize=false" />
    			</bean>
    		</property>
    	</bean>
    
    	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    		<property name="connectionFactory" ref="connectionFactory" />
    		<property name="defaultDestinationName" value="subscriptionReplies" />
    		<property name="receiveTimeout" value="100" />
    		<property name="sessionTransacted" value="true" />
    	</bean>
    
    	<jms:listener-container connection-factory="connectionFactory"
    		transaction-manager="transactionManager" acknowledge="transacted">
    		<jms:listener destination="subscriptionRequests"
    			response-destination="subscriptionReplies" ref="chunkHandler" method="handleChunk" />
    	</jms:listener-container>
    	
    	<bean id="subscriptionClient" class="com.m4u.subscription.core.jaxrs.client.SubscriptionJAXRSClient">
    	   <constructor-arg type="java.lang.String" value="${subscription.server.url}" />
    	</bean>
    
    </beans>
    Any suggestions?

  • #2
    Hi Tiago,
    I am learning how to do scaling using Spring Batch.
    Your code is helping me a lot.

    But i have few questions. Please help.

    In your masterBatchContext.xml, in your job configuration, there is a processor "subscriptionProcessor" which references "subscription Client" that is in your jms-context.xml. This is a bean of com.m4u.........SubscriptionJAXRSClient.
    So I need to know what type of client is that. What does it implement. How the data is transferred to the slaves.

    Also please tell me about the flow of your code. i.e how the data is transferring from different channels and destination from master and slaves. I know the basic concept of remote chunking. Just please tell me how it is happening in your code.

    Please help.
    Thanks.

    Comment

    Working...
    X