Announcement Announcement Module
Collapse
No announcement yet.
Remote Chunking - how to completely offload all the processing to slave nodes Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Remote Chunking - how to completely offload all the processing to slave nodes

    Hi all,

    I'm using the remote chunking strategy and I don't want the master node to process items. Instead, I want it to only execute the query and the slaves take care of processing. How can I achieve this?

    With my current configuration the master is not only executing the query, but it is processing items as well.

    This is my configuration:

    masterBatchContext.xml
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    	<import resource="classpath:masterIntegrationContext.xml" />
    
    	<!-- DS config -->
    	<context:property-placeholder location="classpath:jdbc.properties"
    		ignore-unresolvable="true" />
    
    	<bean id="subscriptionDS" class="org.apache.commons.dbcp.BasicDataSource"
    		destroy-method="close" p:driverClassName="${jdbc.driverClassName}"
    		p:url="${jdbc.url}" p:username="${jdbc.username}" p:password="${jdbc.password}" />
    
    	<bean id="sessionFactory"
    		class="org.springframework.orm.hibernate4.LocalSessionFactoryBean">
    		<property name="dataSource" ref="subscriptionDS" />
    		<property name="configLocation">
    			<value>classpath:hibernate.cfg.xml</value>
    		</property>
    		<property name="hibernateProperties">
    			<props>
    				...
    			</props>
    		</property>
    	</bean>
    
    	<bean
    		class="org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor" />
    
    	<bean class="org.springframework.orm.hibernate4.HibernateExceptionTranslator" />
    
    	<bean id="transactionManager"
    		class="org.springframework.orm.hibernate4.HibernateTransactionManager">
    		<property name="sessionFactory" ref="sessionFactory" />
    	</bean>
    
    	<!-- Spring Batch Job Registry -->
    	<bean id="jobRegistry"
    		class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
    
    	<!-- Spring Batch Job Launcher -->
    	<bean id="jobLauncher"
    		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    		<property name="jobRepository" ref="jobRepository" />
    	</bean>
    
    	<!-- Spring Batch Job Repository -->
    	<bean id="jobRepository"
    		class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"
    		p:dataSource-ref="subscriptionDS" p:transactionManager-ref="transactionManager" />
    
    	<!-- Spring Batch Job Registry Processor -->
    	<bean
    		class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
    		<property name="jobRegistry" ref="jobRegistry" />
    	</bean>
    
    	<!-- Spring Batch Subscriptions Reader -->
    	<bean id="subscriptionPageReader"
    		class="org.springframework.batch.item.database.JdbcPagingItemReader"
    		scope="step">
    		<property name="dataSource" ref="subscriptionDS" />
    		<property name="queryProvider">
    			<bean
    				class="org.springframework.batch.item.database.support.SqlServerPagingQueryProvider">
    				<property name="selectClause" value="..." />
    				<property name="fromClause" value="..." />
    				<property name="whereClause" value="..." />
    				<property name="sortKey" value="id" />
    			</bean>
    		</property>
    		<property name="pageSize" value="1000" />
    		<property name="rowMapper" ref="subscriptionRowMapper" />
    	</bean>
    
    	<bean id="subscriptionRowMapper"
    		class="com.m4u.subscription.core.batch.reader.SubscriptionRowMapper" />
    
    	<!-- Spring Batch Subscriptions Processor -->
    	<bean id="subscriptionProcessor"
            class="com.m4u.subscription.core.batch.processor.SubscriptionProcessor">
            <property name="client" ref="subscriptionClient"/>
        </bean>
    
    	<!-- Spring Batch Subscriptions Writer -->
    	<bean id="subscriptionWriter"
    		class="com.m4u.subscription.core.batch.writer.SubscriptionWriter" />
    
    	<!-- Spring Batch Job [Subscription Renew Job] -->
    	<job id="sBatchRenewJob" xmlns="http://www.springframework.org/schema/batch">
    		<step id="subscriptionsLoad">
    			<tasklet>
    				<chunk reader="subscriptionPageReader" processor="subscriptionProcessor"
    					writer="subscriptionWriter" commit-interval="50" />
    			</tasklet>
    		</step>
    	</job>
    
    	<!-- Remote Chuncking Config -->
    	<bean id="chunkHandler"
    		class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
    		<property name="chunkWriter" ref="chunkWriter" />
    		<property name="step" ref="subscriptionsLoad" />
    	</bean>
    
    	<bean id="chunkWriter"
    		class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    		scope="step">
    		<property name="messagingOperations" ref="messagingGateway" />
    		<property name="replyChannel" ref="subscriptionReplies" />
    		<property name="maxWaitTimeouts" value="1000" />
    	</bean>
    
    </beans>
    masterIntegrationContext.xml (JMS config for master)

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    	<import resource="classpath:/jms-context.xml" />
    
    	<bean id="messagingGateway" class="org.springframework.integration.core.MessagingTemplate">
    		<property name="defaultChannel" ref="subscriptionRequests" />
    		<property name="receiveTimeout" value="1000" />
    	</bean>
    
    	<int:channel id="subscriptionRequests" />
    	<int:channel id="incoming" />
    
    	<int-jms:outbound-channel-adapter
    		channel="subscriptionRequests" connection-factory="connectionFactory"
    		destination-name="subscriptionRequests" />
    
    	<int:transformer input-channel="incoming"
    		output-channel="subscriptionReplies" ref="headerExtractor" method="extract" />
    
    	<bean id="headerExtractor"
    		class="org.springframework.batch.integration.chunk.JmsRedeliveredExtractor" />
    
    	<int:channel id="subscriptionReplies">
    		<int:queue />
    		<int:interceptors>
    			<bean id="pollerInterceptor"
    				class="org.springframework.batch.integration.chunk.MessageSourcePollerInterceptor">
    				<property name="messageSource">
    					<bean
    						class="org.springframework.integration.jms.JmsDestinationPollingSource">
    						<constructor-arg>
    							<ref bean="jmsTemplate" />
    						</constructor-arg>
    					</bean>
    				</property>
    				<property name="channel" ref="incoming" />
    			</bean>
    		</int:interceptors>
    	</int:channel>
    
    </beans>
    slaveBatchContext.xml

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    	<import resource="classpath:/slaveIntegrationContext.xml" />
    
    	<bean id="chunkHandler"
    		class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
    		<property name="chunkProcessor">
    			<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
    				<property name="itemWriter" ref="writer" />
    				<property name="itemProcessor" ref="processor" />
    			</bean>
    		</property>
    	</bean>
    
    	<bean id="writer"
    		class="com.m4u.subscription.core.batch.writer.SubscriptionWriter" />
    
    	<bean id="processor"
    		class="com.m4u.subscription.core.batch.processor.SubscriptionProcessor">
    		<property name="client" ref="subscriptionClient"/>
    	</bean>
    
    </beans>
    slaveIntegrationContext.xml (JMS config for slave)

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    	<beans:import resource="classpath:/jms-context.xml" />
    
    	<channel id="subscriptionRequests">
    		<interceptors>
    			<wire-tap channel="logChannel" />
    		</interceptors>
    	</channel>
    
    	<channel id="subscriptionReplies">
    		<queue />
    		<interceptors>
    			<wire-tap channel="logChannel" />
    		</interceptors>
    	</channel>
    
    	<logging-channel-adapter auto-startup="true"
    		id="logChannel" level="DEBUG" />
    
    	<beans:bean id="transactionManager"
    		class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
    		
    	<jms:listener-container connection-factory="connectionFactory"
    		transaction-manager="transactionManager" acknowledge="transacted">
    		<jms:listener destination="subscriptionRequests"
    			response-destination="subscriptionReplies" ref="chunkHandler" method="handleChunk" />
    	</jms:listener-container>
    
    </beans:beans>
    jms-context.xml (JMS config that is common to both master and slaves)

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
        <context:property-placeholder location="classpath:subscriptionbatch.properties" ignore-unresolvable="true"/>
        
    	<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
    		destroy-method="stop">
    		<property name="connectionFactory">
    			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
    				<property name="brokerURL" value="failover:(${activemq.url})?randomize=false" />
    			</bean>
    		</property>
    	</bean>
    
    	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    		<property name="connectionFactory" ref="connectionFactory" />
    		<property name="defaultDestinationName" value="subscriptionReplies" />
    		<property name="receiveTimeout" value="100" />
    		<property name="sessionTransacted" value="true" />
    	</bean>
    
    	<jms:listener-container connection-factory="connectionFactory"
    		transaction-manager="transactionManager" acknowledge="transacted">
    		<jms:listener destination="subscriptionRequests"
    			response-destination="subscriptionReplies" ref="chunkHandler" method="handleChunk" />
    	</jms:listener-container>
    	
    	<bean id="subscriptionClient" class="com.m4u.subscription.core.jaxrs.client.SubscriptionJAXRSClient">
    	   <constructor-arg type="java.lang.String" value="${subscription.server.url}" />
    	</bean>
    
    </beans>
    Thanks in advance.
Working...
X