Announcement Announcement Module
Collapse
No announcement yet.
Aggregating messages extracted from a queue channel Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Aggregating messages extracted from a queue channel

    Hi,

    In the evening I have to aggregate in a single ZIP file all the files produced and accumulated in a queue during the day.

    I'm currently working on a solution summarized as:

    Code:
    	<int-file:inbound-channel-adapter 
    		directory="${input.directory}"
    		filename-pattern="test*"
    		channel="inbound-channel">
    		
    		<poller fixed-rate="${input.channel}" />
    		
    	</int-file:inbound-channel-adapter>
    
    
    	<channel id="inbound-channel">
    		<queue />
    	</channel>
    	
    
    	<aggregator 
    		input-channel="inbound-channel"
    		output-channel="logging-channel"
                    correlation-strategy-expression="..">
    	
    		<beans:bean class="org. ... .FileToZipAggregator" />
    
    		<poller cron="${aggregator.cron}" max-messages-per-poll="${aggregator.max-messages-per-poll}"  />
    
    	</aggregator>
    The number of messages produced may vary.

    I cannot find a working release strategy for this aggregator: the queue channel does not provide the SEQUENCE_NUMBER and SEQUENCE_SIZE for the message extracted during the poll.

    Do you have any suggestion to meet the requirement?

  • #2
    I assume your cron expression fires once per day, in the evening, and you aggregate all the files at that time.

    The issue is "how do we know when we're done?".

    One way to do what you want is to configure the aggregator with a MessageGroupStore with timeoutOnIdle set to true, and the aggregator's sendPartialResultOnExpiry set to true; and use a MessageGroupStoreReaper.

    Schedule the reaper to run, say, every 10 minutes for an hour or two after your aggregator's cron, with a timeout of, say, 5 minutes.

    When the reaper fires, if a new message has arrived in the last 5 minutes, it won't reap. If, however, there has been no activity on the group, the "partial" group will be released. Subsequent firings of the reaper (today) won't do anything because there's no group.

    You just need to setup your reaper's timeout and schedule to be appropriate for your environment.

    Comment


    • #3
      Originally posted by Gary Russell View Post
      I assume your cron expression fires once per day, in the evening, and you aggregate all the files at that time.
      Yes, correct.

      Originally posted by Gary Russell View Post
      The issue is "how do we know when we're done?".

      One way to do what you want is to configure the aggregator with a MessageGroupStore with timeoutOnIdle set to true, and the aggregator's sendPartialResultOnExpiry set to true; and use a MessageGroupStoreReaper.

      Schedule the reaper to run, say, every 10 minutes for an hour or two after your aggregator's cron, with a timeout of, say, 5 minutes.

      When the reaper fires, if a new message has arrived in the last 5 minutes, it won't reap. If, however, there has been no activity on the group, the "partial" group will be released. Subsequent firings of the reaper (today) won't do anything because there's no group.
      I've seen this http://static.springsource.org/sprin...er.html#reaper section of the reference right before posting here, and it looked like a feasible solution. But... I was sooooo tired that I threw in the towel. What a shame!



      Originally posted by Gary Russell View Post
      You just need to setup your reaper's timeout and schedule to be appropriate for your environment.
      Thank you for your suggestion, Gary. It works like a charm.

      Here is the test flow:

      Code:
      
      	<int-file:inbound-channel-adapter 
      		directory="${input.directory}"
      		filename-pattern="test*"
      		channel="inbound-channel">
      		
      		<poller fixed-rate="${input.channel}" />
      		
      	</int-file:inbound-channel-adapter>
      
      
      	<channel id="inbound-channel"></channel>
      
      	<aggregator 
      		input-channel="inbound-channel"
      		output-channel="logging-channel"
      		message-store="simpleMessageStore"
      		send-partial-result-on-expiry="true">
      		
      		<beans:bean 
      			class="org....ZipFileAggregator">
      			<beans:property name="compressionLevel">
      				<util:constant static-field="java.util.zip.Deflater.BEST_COMPRESSION"/>
      			</beans:property>
      		</beans:bean>
      		
      	</aggregator>		
      
      
      
      	<beans:bean id="simpleMessageStore" class="org.springframework.integration.store.SimpleMessageStore"></beans:bean>
      
      	<beans:bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
        		<beans:property name="messageGroupStore" ref="simpleMessageStore"/>
        		<beans:property name="timeout" value="${reaper.timeout}"/>	
      	</beans:bean>
      
      	<task:scheduled-tasks>	
      	  <task:scheduled ref="reaper" method="run" cron="${tasks.reaper.cron}"/>
      	</task:scheduled-tasks>
      Here I've used the SimpleMessageStore implementation just for testing purposes. In the 'real' solution for production use it's used the Jdbc-based implementation .

      Thank you for your useful and super-fast response.

      Maurizio

      Comment


      • #4
        Great - however, as I mentioned, you might want to set the store's timeoutOnIdle to true.

        Otherwise, the reaper reaps based on the group creation time, rather than the group's last update time.

        Comment

        Working...
        X