Announcement Announcement Module
No announcement yet.
need clarification about inbound channel adapter and pollers Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • need clarification about inbound channel adapter and pollers

    I've bumped into a strange poller issue with JMS inbound adapter, and i guess i have some miss-understanding (or is it a bug?).

    in my SI context XML:

    <int:inbound-channel-adapter id="masterQ" ref="QMgr" method="getU" channel="jobInput" >
        	<int:poller fixed-rate="20000" />
    --some processing and then put a message on a JMS - "queue-1" (using int-jms:outbound-channel-adapter)
    <int-jms:inbound-channel-adapter id="jmsIn-1" destination="queue-1" channel="fromJMS-1">
    	<int:poller fixed-rate="4000"  />
    --some more processing and then put a message on a JMS - "queue-2" (using int-jms:outbound-channel-adapter)
    <int-jms:inbound-channel-adapter id="jmsIn-2" destination="queue-2" channel="fromJMS-2">
    	<int:poller fixed-rate="4000"  />
    the flow works sometimes as expected. The masterQ is putting a message on queue-1, then jmsIn-1 takes it and put it on queue-2. so far all is good.
    the problem is that jmsIn-2 (jms:inbound) almost never reads the message from it's queue.

    The log message is:
    [org.springframework.integration.endpoint.SourcePol lingChannelAdapter] Received no Message during the poll, returning 'false'

    But - if i turn-off (stop via JMX) the inbound of jmsIn-1, then the 2nd adapter is able to receive JMS.

    I suspect that it's related to the poller and the exact fixed-rate (4000), because when i play with the values, things get to work "more occasionally"

    Can someone please explain this? why does the pollers 'shade'/block each-other?


  • #2
    It's hard to say without seeing your complete configuration, and logs, but I suspect you are experiencing thread starvation.

    The default TaskScheduler bean ("taskScheduler") has only 10 threads; polled JMS adapters (with default configuration) are not "good citizens" because they consume (block) the poller thread until a message arrives.

    You can increase the threads by overriding the default "taskScheduler" bean (but it needs some other configuration too - see DefaultConfiguringBeanFactoryPostProcessor.registe rTaskScheduler). Or, probably easier - give your pollers a reference to a task executor so the scheduler hands off the work to another thread and is freed up. You could also configure the adapters to use a receiveTimeout instead of blocking the thread indefinitely - simply inject a JmsTemplate with the appropriate timeout.

    Finally, consider using <int-jms:message-driven-channel-adapter/>s instead - they don't use threads from the default scheduler.


    • #3
      Thank you Gary.
      since i couldn't upload my project configuration, i've create a new demo project to play with.
      I've noticed that by replacing the 'fix-rate' with 'fix-delay' on the jms-inbound, the starvation is less of a problem.

      Regarding your comment with JMS default configuration - I'd like to ask if there are any recommendation for ActiveMQ as JMS broker?

      thanks again!

      	<context:annotation-config />
      	<int:annotation-config />
      	<!-- Config JMX server -->
      	<context:mbean-server />
      	<int-jmx:mbean-export id="jmxMbeanExport" default-domain="" />
      	<context:mbean-export default-domain="" />
      	<context:component-scan base-package="com.yariv.issue.service" />
      	<aop:aspectj-autoproxy />
      	<context:property-placeholder location="classpath:/" />
      	<!-- add message history to every message. -->
      	<int:message-history />
      	<!-- Load JMS configuration -->
      	<import resource="classpath:/META-INF/spring/integration/jms-config.xml" />
      	<bean id="masterMgr" class="com.yariv.issue.service.MasterMgr" />
      	<int:inbound-channel-adapter id="masterQ"
      		ref="masterMgr" method="getRecords" channel="jobInput" auto-startup="true">
      		<int:poller fixed-rate="${master.poller}">
      	<int:channel id="jobInput">
      			<int:wire-tap channel="infoLogM" />
      	<int-jms:outbound-channel-adapter id="jmsOut-1"
      		destination="queue-1" extract-payload="false" channel="jobInput" />
      	<int:logging-channel-adapter id="infoLogM"
      		level="WARN" logger-name="-> MASTER" expression="payload" />
      	<!-- from here the client-1 starts -->
      	<!-- pull JMS messages from Q -->
      	<int-jms:inbound-channel-adapter id="jmsIn-1"
      		destination="queue-1" channel="fromJMS-1" auto-startup="true">
      		<int:poller fixed-rate="${client.poller}"
      			max-messages-per-poll="1" />
      	<int:channel id="fromJMS-1">
      			<int:wire-tap channel="infoLogC1" />
      	<int:channel id="asApiRequest" />
      	<!-- Bean for ApiTransformer -->
      	<bean id="clientTrans" class="com.yariv.issue.service.ClientTrans" />
      	<!-- ApiTransformer call method toApiRequest -->
      	<int:transformer input-channel="fromJMS-1"
      		output-channel="asApiRequest" id="doSomething" ref="clientTrans"
      		method="getTicket" />
      	<int:logging-channel-adapter id="infoLogC1"
      		level="WARN" logger-name="-> CLIENT-1" expression="payload" />
      	<int-jms:outbound-channel-adapter id="jmsOut-2"
      		destination="queue-2" extract-payload="false" channel="asApiRequest" />
      	<!-- start client-2 -->
      		<int:channel id="fromJMS-2">
      			<int:wire-tap channel="infoLogC2" />
      	<int:channel id="single-request"></int:channel>
      	<int:delayer id="delayRequest" input-channel="fromJMS-2"
      		output-channel="single-request" default-delay="${delay.period}" 
      	<task:scheduler id="myScheduler" pool-size="10" />
      	<int-jms:inbound-channel-adapter id="jmsIn-2"
      		destination="queue-2" channel="fromJMS-2" auto-startup="true">
      		<int:poller fixed-rate="${client.poller}"
      			max-messages-per-poll="1" />
      	<int:logging-channel-adapter id="infoLogC2"
      		level="WARN" logger-name="-> CLIENT-2" expression="payload" />
      	<int:outbound-channel-adapter id="updateRequest" ref="clientTrans" channel="single-request" method="endFlow"/>
      and the jms-config:
      	<bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
      		<property name="brokerURL">
      	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
      		<property name="targetConnectionFactory" ref="activemqConnectionFactory" />
      		<property name="sessionCacheSize" value="10"/>
      		<property name="cacheProducers" value="false"/>
      	<!-- Define JMS message sender -->
      	<bean id="jmsProducer" class="org.springframework.jms.core.JmsTemplate">
      		<constructor-arg ref="connectionFactory" />
      		<property name="pubSubDomain" value="false" />
       	<bean id="queue-1" class="org.apache.activemq.command.ActiveMQQueue">
      		<constructor-arg value="queue-1"/>
      	<bean id="queue-2" class="org.apache.activemq.command.ActiveMQQueue">
      		<constructor-arg value="queue-2"/>