Announcement Announcement Module
Collapse
No announcement yet.
MaximumConcurrentProducers for a jms:outbound-gateway Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • MaximumConcurrentProducers for a jms:outbound-gateway

    I have created the following set up :

    The sender service interface accepts Strings.
    These Strings are converted to a EBCDIC byte array which the outbound gateway posts on a queue.
    It waits for a response and then the reverse transformation is executed and the output is written to stdout.

    In the current confiuguration this is executed one by one.
    Changing the poller should probably introduce paralellism.
    Whoever say I change it to a taskexecutor with 50 threads, it is possible that this 50 outbound connections are in progress.
    I would like to be able to limit this to a certain number.
    With the inbound gateway there is a parameter called (Max)concurrentConsumers, is there a (Max)cocurrentProducer parameter or another way to solve this?

    I thought that during the sendAndReive call in the JmsOutboundGateway the thread would block and during this the thread would be used to start treating the next Message, but this doesn't seem to be the case.
    This way it is possible that all the threads in the pool are blocked doing I/O, while there actually are messages in the queues that should be threated during this time.
    Are these blocked threads during I/O returned to the threadpool or not ?



    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans:beans xmlns="http://www.springframework.org/schema/integration"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans"
    	xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:integration="http://www.springframework.org/schema/integration"
    	xmlns:jms="http://www.springframework.org/schema/integration/jms"
    	xmlns:stream="http://www.springframework.org/schema/integration/stream"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans
    			http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
    			http://www.springframework.org/schema/context
    			http://www.springframework.org/schema/context/spring-context-2.5.xsd
    			http://www.springframework.org/schema/integration
    			http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
    			http://www.springframework.org/schema/integration/jms
    			http://www.springframework.org/schema/integration/jms/spring-integration-jms-1.0.xsd
    			http://www.springframework.org/schema/integration/stream
    			http://www.springframework.org/schema/integration/stream/spring-integration-stream-1.0.xsd">
    
    	<jms:outbound-gateway id="jmsout"
    		reply-destination="replyQueue" request-destination="requestQueue"
    		request-channel="requestChannel" reply-channel="replyChannel">
    		<integration:poller>
    			<integration:interval-trigger interval="1000" />
    			 
    		</integration:poller>
    	</jms:outbound-gateway>
    
    
    	<beans:bean id="connectionFactory"
    		class="org.springframework.jms.connection.CachingConnectionFactory">
    		<beans:property name="targetConnectionFactory" ref="mqQueueConnectionFactory" />
    		<beans:property name="sessionCacheSize" value="10" />
    		<beans:property name="cacheProducers" value="false"></beans:property>
    	</beans:bean>
    
    	<beans:bean id="mqQueueConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
    		<beans:property name="queueManager" value="CM11"></beans:property>
    		<beans:property name="hostName" value="geMQCM11"></beans:property>
    		<beans:property name="port" value="1413"></beans:property>
    		<beans:property name="channel" value="GERE01.CM11.01"></beans:property>
    		<beans:property name="transportType" value="1"></beans:property>
    	</beans:bean>
    
    	<beans:bean id="requestQueue" class="com.ibm.mq.jms.MQQueue">
    		<beans:property name="baseQueueName" value="GERE.QL.JRULES_BATCH_REQ.0001"></beans:property>
    	</beans:bean>
    
    	<beans:bean id="replyQueue" class="com.ibm.mq.jms.MQQueue">
    		<beans:property name="baseQueueName" value="GERE.QL.JRULES_BATCH_RSP.0001"></beans:property>
    	</beans:bean>
    
    
    	<integration:poller id="poller" default="true">
    		<integration:interval-trigger interval="1000" />
    	</integration:poller>
    
    	<stream:stdout-channel-adapter id="stdout"
    		channel="EBCDICToASCIITransformerChannel" append-newline="true" />
    
    	<gateway id="sender" service-interface="Sender" />
    
    	<channel id="requestChannel" >
    		<queue capacity="10"/>
    	</channel>
    
    	<channel id="replyChannel" >
    			<queue capacity="10"/>
    	</channel>
    
    
    	<channel id="EBCDICToASCIITransformerChannel" >
    			<queue capacity="10"/>
    	</channel>
    	
    
    	<channel id="ASCIIToEBCDICTransformerChannel" >
    			<queue capacity="10"/>
    	</channel>
    	
    
    	<transformer input-channel="replyChannel"
    		output-channel="EBCDICToASCIITransformerChannel" ref="EBCDICToASCIITransformer"
    		method="convertToASCII" />
    
    	<transformer input-channel="ASCIIToEBCDICTransformerChannel"
    		output-channel="requestChannel" ref="ASCIIToEBCDICTransformer" method="convertToEBCDIC" />
    
    	<beans:bean id="EBCDICToASCIITransformer"
    		class="dexia.gemk.mainframe.ctg.EBCDICASCIITransformer" />
    
    	<beans:bean id="ASCIIToEBCDICTransformer"
    		class="dexia.gemk.mainframe.ctg.EBCDICASCIITransformer" />
    
    
    </beans:beans>
Working...
X