Announcement Announcement Module
Collapse
No announcement yet.
Poller in a loop has no delay, while fixed-rate is set Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Poller in a loop has no delay, while fixed-rate is set

    I have a following splitter & aggregator config:

    HTML Code:
    <int:poller id="fixedThreadPoolPoller" default="true" fixed-rate="3000"
        task-executor="fixedThreadPoolTaskExecutor" max-messages-per-poll="-1" />
    
    <int:channel id="tasksIncoming"><int:queue /></int:channel>
    
    <int:splitter id="taskToTaskAssetsSplitter" expression="payload.assets" 
        input-channel="tasksIncoming" output-channel="tasksAssetsWaitingForContentFiles" />	
    
    <int:channel id="tasksAssetsWaitingForContentFiles">
        <int:queue message-store="tasksWaitingForAssetsMessageStore" capacity="100" />
    </int:channel>
    
    <int:chain id="moveContentFilesChain" input-channel="tasksAssetsWaitingForContentFiles">
        <int:service-activator ref="ingestTaskHandler" method="moveContentFileIfExists" />
        <int:router expression="payload.gathered" >
            <int:mapping value="false" channel="tasksAssetsWaitingForContentFiles" />
            <int:mapping value="true" channel="tasksAssetsWithContentFilesFound" />
        </int:router>
    </int:chain>
    
    <int:channel id="tasksAssetsWithContentFilesFound">
        <int:queue message-store="tasksAssetsWithContentFilesFoundMessageStore" capacity="100" />
    </int:channel>
    	
    <int:aggregator id="taskAssetsToTaskAggregator" ref="ingestTaskHandler"
        method="aggregateTaskAssets" release-strategy="aggregateTaskAssetsReleaseStrategy"
        input-channel="tasksAssetsWithContentFilesFound" output-channel="tasksReadyToProcess" />
    
    <int:channel id="tasksReadyToProcess"><int:queue /></int:channel>
    The workflow is:
    1) task are incoming to the 'tasksIncoming'
    2) tasks are split into task assets
    3) the split assets are queued in 'tasksAssetsWaitingForContentFiles'
    4) the service avtivator in 'moveContentFilesChain' checks if assets content file exists in some location, and if so than it copies it to some other location and sets the 'payload.gathered' to true; if content file doesn't exist the message is passed without change;
    5) the router in 'moveContentFilesChain' chcaks of the file was dound and routes either back to 'tasksAssetsWaitingForContentFiles' or to 'tasksAssetsWithContentFilesFound'
    6) if files fo all assets are found the task is aggregated by the 'taskAssetsToTaskAggregator' and passed to 'tasksReadyToProcess' for further processing

    The problem is, that I want to poll 'moveContentFilesChain' in a fixed rate of 3000 ms, as set in default poller, but instead after first poll it falls into a continous loop and there is no delay betwen each nest 'moveContentFilesChain'.

    Please help me correct my config to make the 'moveContentFilesChain' poll every 3000 ms and not continously.

    Thanks in advance,
    Darek

  • #2
    The continuous polling is due to the -1 value for max-messages-per-poll (that means "no limit" to the number of times it will actually call receive() for a given poll interval). If you change that to 1, it will only receive at most 1 message per-poll.

    If you are using that same poller elsewhere, and do want to keep that setting, then simply provide an explicit (non-default) poller sub-element for the chain where you want the "at most one" behavior.

    Hope that helps.
    -Mark

    Comment


    • #3
      Well, your 'max-messages-per-poll' is set to -1 which means it will attempt to receive as many messages per single poll as it can (until receive() returns null). Set max-messages-per-poll to how many messages you want to process per single poll. For example: If you set it to 1, than at most one message will be processed each 3 sec.

      Comment


      • #4
        Thanks a lot for Your answers. When I set the 'max-messages-per-poll' to 1 I indeed have one poll per 3 seconds, but this means I check only one file each 3 seconds.

        I'd really like to check all files avalable in the 'tasksAssetsWaitingForContentFiles' queue, and then sleep for 3 seconds, and than again check all files available, and so on. Is sth like this possible?

        Basically the use case is, that some other process is providing a package description from a remote resource, that I pick up and read the conents, which is a list of accompanying files that will also soon arrive from that remote location, so I need to wait for all this files before I proceed. The loop is the most problematic for me here.

        Thanks a lot,
        Darek

        Comment


        • #5
          Have you tried 'fixed-delay' instead of 'fixed-rate'? I have a hazy memory of someone describing the difference, and it was to do with this; but I can find no proper description (so I might be wrong).

          Could we get a line added to the manual guys, expanding on the elusive "As an alternative to 'fixed-rate' you cna also use 'fixed-delay' attribute." [and correct the typo ]

          Comment


          • #6
            Originally posted by dnlegge View Post
            Have you tried 'fixed-delay' instead of 'fixed-rate'?
            Tried as You suggested, buy it didn't help - the only difference using 'fixed-delay' is that the timers starts counting from task completition, and not from task start.

            I added expiration check and ended up using a delayer in 'expirationCheckChain':

            HTML Code:
            <task:scheduler id="tasksScheduler" pool-size="10" />
            	
            <task:scheduled-tasks scheduler="tasksScheduler">
                <task:scheduled ref="taskAggregatorMessageStoreReaper" method="run" fixed-rate="1000" />
            </task:scheduled-tasks>
            
            <int:poller id="fixedThreadPoolPoller" default="true" fixed-rate="3000"
                    task-executor="fixedThreadPoolTaskExecutor" max-messages-per-poll="-1" />
            
            <int:channel id="tasksIncoming"><int:queue /></int:channel>
            <int:channel id="tasksAssetsWaitingForContentFiles"><int:queue /></int:channel>
            <int:channel id="tasksForExpirationCheck"><int:queue /></int:channel>
            <int:channel id="tasksExpired"><int:queue /></int:channel>
            <int:channel id="tasksAssetsWithContentFilesFound">	<int:queue /></int:channel>
            <int:channel id="tasksReadyToProcess"><int:queue /></int:channel>
            
            <int:splitter id="taskToTaskAssetsSplitter" expression="payload.assets" 
                    input-channel="tasksIncoming" output-channel="tasksAssetsWaitingForContentFiles" />    
            
            <int:chain id="moveContentFilesChain" input-channel="tasksAssetsWaitingForContentFiles">
                <int:service-activator ref="filesHandler" method="moveContentFileIfExists" />
                <int:router expression="payload.gathered" >
                    <int:mapping value="false" channel="tasksForExpirationCheck" />
                    <int:mapping value="true" channel="tasksAssetsWithContentFilesFound" />
                </int:router>
            </int:chain>
            
            <int:chain id="expirationCheckChain" input-channel="tasksForExpirationCheck">
                <int:delayer default-delay="1000" scheduler="tasksScheduler" />
                <int:router expression="payload.expired">
                    <int:mapping value="false" channel="tasksAssetsWaitingForContentFiles" />
                    <int:mapping value="true" channel="tasksExpired" />
                </int:router>
            </int:chain>
            
            <int:service-activator id="taskExpirationHandler" ref="ingestTaskHandler" method="handleTaskAssetExpiration"
                    input-channel="tasksExpired" />
            
            <int:aggregator id="taskAssetsToTaskAggregator" expression="#this[0].payload.task" 
                    message-store="taskAggregatorMessageStore" discard-channel="nullChannel"    
                    input-channel="tasksAssetsWithContentFilesFound" output-channel="tasksReadyToProcess" />
            Last edited by roadrunn; Aug 26th, 2011, 07:41 AM.

            Comment

            Working...
            X