Announcement Announcement Module
Collapse
No announcement yet.
Unable to send chunk to ActiveMQ Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Unable to send chunk to ActiveMQ

    Hi this is my configuration for master

    mater_integration.xml
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
    	xmlns:int="http://www.springframework.org/schema/integration"
    	xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd
    		http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.2.xsd
    		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
    	<import resource="jmscontext_common.xml" />
    	
    	<int:channel id="requestChannel" />
    	<int:channel id="incoming" />
    
    	<bean id="messagingGateway" class="org.springframework.integration.core.MessagingTemplate">
    		<property name="defaultChannel" ref="requestChannel"></property>
    		<property name="receiveTimeout" value="1000"></property>
    	</bean>
    	
    	<int-jms:outbound-channel-adapter
    		connection-factory="connectionFactory" channel="requestChannel"
    		destination-name="requests" />
    
    	<bean id="headerExtractor"
    		class="org.springframework.batch.integration.chunk.JmsRedeliveredExtractor" />
    
    	<int:transformer input-channel="incoming"
    		output-channel="replies" ref="headerExtractor" method="extract" />
    
    	<int:channel id="replies" scope="thread">
    		<int:queue />
    		<int:interceptors>
    			<bean id="pollerInterceptor"
    				class="org.springframework.batch.integration.chunk.MessageSourcePollerInterceptor">
    				<property name="messageSource">
    					<bean
    						class="org.springframework.integration.jms.JmsDestinationPollingSource">
    						<constructor-arg>
    							<bean class="org.springframework.jms.core.JmsTemplate">
    								<property name="connectionFactory" ref="connectionFactory" />
    								<property name="defaultDestinationName" value="replies" />
    								<property name="receiveTimeout" value="100" />
    							</bean>
    						</constructor-arg>
    					</bean>
    				</property>
    				<property name="channel" ref="incoming" />
    			</bean>
    		</int:interceptors>
    	</int:channel>
    
    	<bean id="chunkWriter"
    		class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    		scope="step">
    		<property name="messagingOperations" ref="messagingGateway" />
    		<property name="replyChannel" ref="replies" />
    		<property name="maxWaitTimeouts" value="10" />
    	</bean>
    </beans>

    This is jmscontext_common.xml
    Code:
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    		<property name="targetConnectionFactory">
    			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
    				<property name="brokerURL" value="tcp://localhost:61616"></property>
    			</bean>
    		</property>
    	</bean>
    
    	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    		<property name="connectionFactory" ref="connectionFactory"></property>
    		<property name="defaultDestinationName" value="replies"></property>
    		<property name="receiveTimeout" value="1000"></property>
    		<property name="sessionTransacted" value="true"></property>
    	</bean>
    This is my job that i am trying to submit
    Code:
    <batch:job id="mysimplejob" job-repository="jobRepository">
    	<batch:step id="mysimplestep" >
    		<batch:tasklet>
    			<batch:chunk commit-interval="10" reader="dummyReader" writer="dummyWriter"></batch:chunk>
    		</batch:tasklet>
    	</batch:step>
    </batch:job>
    <bean id="dummyReader" class="org.mkcl.batchadmin.jobs.DummyReader"></bean>
    <bean id="dummyWriter" class="org.mkcl.batchadmin.jobs.DummyWriter"> </bean>
    	
    	<bean id="chunkHandler"
    		class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
    		<property name="chunkWriter" ref="chunkWriter" />
    		<property name="step" ref="mysimplestep" />
    	</bean>
    </beans>
    When i see ActiveMQ admin, even the queue has not been created.
    The control goes from reader to writer that's it.

    Here is the code for DummyReader and DummyWriter
    Code:
    public class DummyReader implements ItemReader<String>, ItemStream {
    
    	private ItemReader<String> delegate;
    @Override
    	public String read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception{
    		System.out.println("Reading...");
    		return "Data";
    	}
    Code:
    public class DummyWriter implements ItemWriter<String>{
    
    
    	@Override
    	public void write(List<? extends String> items) throws Exception {
    		System.out.println("From jobs xml..."+items);
    		
    	}
    
    }

    So I am getting the output
    Reading......
    Reading......
    Reading......
    Reading......
    Reading......
    Reading......
    Reading......
    Reading......
    From jobs xml...Data
    Reading...
    Reading...
    Reading...
    Reading...
    Reading...
    Reading...
    Reading...
    From jobs xml...Data

    and so on..


    What is wrong here..?
    Please help.
    At least queue should be created for slaves to consume..
Working...
X