Announcement Announcement Module
Collapse
No announcement yet.
Spring Integration Poller configuration Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Spring Integration Poller configuration

    Hi,

    I have been trying to work out how to correctly configure a poller in Spring Integration but I have run into some problems (I raised a similar post a few weeks ago, but I didn't get any reply; see http://forum.springsource.org/showthread.php?t=93555).

    Basically, I would like to emulate event-driven behaviour using a poller. This is described in the Spring Integration Reference Manual as follows:

    The 'receiveTimeout' property specifies the amount of time the poller should wait if no messages are available when it invokes the receive operation. For example, consider two options that seem similar on the surface but are actually quite different: the first has an interval trigger of 5 seconds and a receive timeout of 50 milliseconds while the second has an interval trigger of 50 milliseconds and a receive timeout of 5 seconds. The first one may receive a message up to 4950 milliseconds later than it arrived on the channel (if that message arrived immediately after one of its poll calls returned). On the other hand, the second configuration will never miss a message by more than 50 milliseconds. The difference is that the second option requires a thread to wait, but as a result it is able to respond much more quickly to arriving messages. This technique, known as "long polling", can be used to emulate event-driven behavior on a polled source.
    In an attempt to use the "long polling" technique described, I have created a poller and task executor as shown below. The poller has a timeout of 5 seconds and a trigger interval of 50 ms, and I have 20 threads in my pool:

    Code:
    	<si:service-activator input-channel="publishChannel" ref="publishErrorHandler" method="passThrough">
    		<si:poller receive-timeout="5000"
    				task-executor="publicationTaskExecutor">
    			<si:interval-trigger interval="50"/>
    			<si:transactional transaction-manager="databaseTransactionManager"/>
    		</si:poller>
    	</si:service-activator>
    
    ...
    
    	<bean id="publicationTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    		<property name="corePoolSize" value="20"/>
    		<property name="maxPoolSize" value="20"/>
    		<property name="waitForTasksToCompleteOnShutdown" value="true"/>
    		<property name="daemon" value="false"/>
    		<property name="threadNamePrefix" value="publicationService-"/>
    	</bean>
    Now this initially appeared to be working fine, but during testing we started to see problems with memory depletion, and this was traced to the above configuration.

    The problem is that the poller keeps scheduling new tasks even though all the threads are blocked waiting for either (i) a new message to arrive, or (ii) the timeout to expire. Given that there are 20 threads executing tasks with a 5 second timeout, they will be executed at a rate of 4 per second (5000/20 = 250ms). But, new tasks are being scheduled at a rate of 20 per second, so the internal queue in the task executor will grow at a rate of 16 per second (while the process is idle), so we essentially have a memory leak.

    Obviously, this is not right, but how do I go about correctly configuring a poller for "long polling". Should I be using a different type of task executor? Should I be specifying a queue capacity of zero on the task executor, and then setting a 'Discard' policy for the rejection policy?

    I can tweak the interval and timeout values until I get something that's workable, but I'd like to understand how this is supposed to work.

    Regards
    -Matt

  • #2
    That's good feedback on the reference guide (if you raise a JIRA you can be sure it won't be forgotten in the race to 2.0).

    I think that queue size of 0 makes a lot of sense in this case; in fact always for a poller, which is why it is the default, I think (the default rejection policy is CALLER_RUNS, which might also be useful).

    Note that Spring 3 has support for configuring the task executor in an XML namespace (<task:executor/>).

    Comment


    • #3
      Hi Dave,

      Thanks for your reply. However, the default queue size for a task executor is actually unbounded (Integer.MAX_VALUE) rather than zero, unless I'm missing your point.

      Code:
      public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor {
      
      	private final Object poolSizeMonitor = new Object();
      
      	private int corePoolSize = 1;
      
      	private int maxPoolSize = Integer.MAX_VALUE;
      
      	private int keepAliveSeconds = 60;
      
      	private boolean allowCoreThreadTimeOut = false;
      
      	private int queueCapacity = Integer.MAX_VALUE;
      
      	private ThreadPoolExecutor threadPoolExecutor;
      
      	...
      Regarding the namespace support, it looks like there's various different ways to create a task executor. I had been using the SI namespace support (i.e. <thread-pool-task-executor>), but that didn't allow me to override the queue size. As you suggest, the Spring 3.0 <task:executor/> element looks like a better option (and would appear to make SI's <thread-pool-task-executor/> element redundant).

      Regards
      -Matt

      Comment


      • #4
        Matt,

        The <thread-pool-task-executor> should be exposing its "queue-capacity" attribute. Do you not see it? What version are you using?

        And, yes, in Spring Integration 2.0, the <thread-pool-task-executor> has been removed since the core Spring 3.0 'task' namespace is available.

        Regards,
        Mark

        Comment


        • #5
          Sorry for the late reply. Yes, you're right - there is a 'queue-capacity' attribute on the <thread-pool-task-executor> (not sure why I missed that before). I've also raised a task in Jira requesting an update to the reference manual to explain this more clearly. Please see: https://jira.springsource.org/browse/INT-1450

          Thanks
          -Matt

          Comment

          Working...
          X