Announcement Announcement Module
Collapse
No announcement yet.
SI Aggregator - Timeout release strategy Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • SI Aggregator - Timeout release strategy

    Hi,

    I have configured a producer and a consumer. Producer is sending messages every 3rd second. There is a Aggregator configured that has release-strategy="timeout" and timeout set as 7 seconds. Now I see the messages are aggregated and sent to output channel at 9th seconds (as soon as 3rd message is received), though it should timeout at 7th second and should aggregate only 2 messages.

    I am using org.springframework.integration.aggregator.Timeout CountSequenceSizeReleaseStrategy and I have seen the code, canRelease (..) will be called on every message received in the input-channel and will return true only after receiving 3rd message

    Is there a way to achieve timeout as defined i.e. 7 second and only aggregate the messages in the queue till that time?

    Context.xml

    Code:
    <task:scheduled-tasks scheduler="taskScheduler">
    		<task:scheduled ref="producer" method="run" fixed-rate="3000" />
    	</task:scheduled-tasks>
    
    	<bean id="timeout"
    		class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
    		<constructor-arg name="threshold" value="1000" />
    		<constructor-arg name="timeout" value="7000" />
    	</bean>
    
    	<bean id="correlationStrategy" class="com.test.MyCorrelationStrategy" />
    	<int:aggregator input-channel="channel_one"
    		output-channel="channel_two" release-strategy="timeout"
    		correlation-strategy="correlationStrategy" method="aggregate">
    		<bean class="com.test.MyAggreegator" />
    	</int:aggregator>
    Regards,
    Pranav

  • #2
    Originally posted by Pranav Kumar Varshney View Post
    Hi,

    I have configured a producer and a consumer. Producer is sending messages every 3rd second. There is a Aggregator configured that has release-strategy="timeout" and timeout set as 7 seconds. Now I see the messages are aggregated and sent to output channel at 9th seconds (as soon as 3rd message is received), though it should timeout at 7th second and should aggregate only 2 messages.

    I am using org.springframework.integration.aggregator.Timeout CountSequenceSizeReleaseStrategy and I have seen the code, canRelease (..) will be called on every message received in the input-channel and will return true only after receiving 3rd message

    Is there a way to achieve timeout as defined i.e. 7 second and only aggregate the messages in the queue till that time?

    Context.xml

    Code:
    <task:scheduled-tasks scheduler="taskScheduler">
    		<task:scheduled ref="producer" method="run" fixed-rate="3000" />
    	</task:scheduled-tasks>
    
    	<bean id="timeout"
    		class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
    		<constructor-arg name="threshold" value="1000" />
    		<constructor-arg name="timeout" value="7000" />
    	</bean>
    
    	<bean id="correlationStrategy" class="com.test.MyCorrelationStrategy" />
    	<int:aggregator input-channel="channel_one"
    		output-channel="channel_two" release-strategy="timeout"
    		correlation-strategy="correlationStrategy" method="aggregate">
    		<bean class="com.test.MyAggreegator" />
    	</int:aggregator>
    Regards,
    Pranav
    Hi Pranav,

    Can you please send the whole set up , as it will help me to see more details and will provide you with valuable feedback of your query..!!

    Comment


    • #3
      I have configured a producer and a consumer. Producer is sending messages every 3rd second. There is a Aggregator configured that has release-strategy="timeout" and timeout set as 7 seconds. Now I see the messages are aggregated and sent to output channel at 9th seconds (as soon as 3rd message is received), though it should timeout at 7th second and should aggregate only 2 messages.

      I am using org.springframework.integration.aggregator.Timeout CountSequenceSizeReleaseStrategy and I have seen the code, canRelease (..) will be called on every message received in the input-channel and will return true only after receiving 3rd message

      Is there a way to achieve timeout as defined i.e. 7 second and only aggregate the messages in the queue till that time?
      What does your correlation strategy do? Can you show us how your are building the message that is being sent out to the input channel? Can answer how the release is happening after three messages only after looking at it?

      Release strategy in invoked only when a message arrives. It will happen only after a message arrives and 7000 ms (as set by you) have elapsed since the first message in the group has arrived or the group is complete or the threshold for max number of messages in the group is reached (1000 in your case).

      In case we are using HeaderAttributeCorrelationStrategy, the release will happen after the 4th message in the group has arrived

      Comment


      • #4
        Originally posted by Amol Nayak View Post
        What does your correlation strategy do? Can you show us how your are building the message that is being sent out to the input channel? Can answer how the release is happening after three messages only after looking at it?

        Release strategy in invoked only when a message arrives. It will happen only after a message arrives and 7000 ms (as set by you) have elapsed since the first message in the group has arrived or the group is complete or the threshold for max number of messages in the group is reached (1000 in your case).

        In case we are using HeaderAttributeCorrelationStrategy, the release will happen after the 4th message in the group has arrived
        Release strategy in invoked only when a message arrives. It will happen only after a message arrives and 7000 ms (as set by you) have elapsed since the first message in the group has arrived or the group is complete or the threshold for max number of messages in the group is reached (1000 in your case).
        Hi Anmol,

        could you please explain about the threshold for max number of messages in the group...does it mean that when the value of messages count reaches to 1000 messages then aggregator would release the messages..??

        Comment


        • #5
          Yes, if the message group size >= threshold


          p.s. My name is Amol and not Anmol

          Comment


          • #6
            Originally posted by Amol Nayak View Post
            Yes, if the message group size >= threshold


            p.s. My name is Amol and not Anmol
            if the message group size >= threshold
            Hi Amol,

            Please explain this in detail,so that my understanding on thia will be 100% clear...!!please guide in scenearios where we want that message group inside message store should expire after a fixed interval of time period , can we go also for org.springframework.integration.aggregator.Timeout CountSequenceSizeReleaseStrategy as we can avoid message reaper , as rg.springframework.integration.store.MessageGroupS toreReaper we use to expire the message group inside the message store ..!!

            As I have the scenerio in which inside the release starergy i aggregate all the messages into the group of 10 and when the count reaches to 10 I release all those messages and side by side I also need to track the timeout period of those messages for that I have configured the message reaper to expire the message group and if the messages below 10 let assume only seven arrives with in that time period then also those messages to be released ....we cant wait for more time period ,, but for this AI have to add separate reaper,,,, does this org.springframework.integration.aggregator.Timeout CountSequenceSizeReleaseStrategy will also work same ..!!
            Last edited by SARAL SAXENA; Nov 22nd, 2011, 12:06 PM.

            Comment


            • #7
              No, TimeoutCountSequenceSizeReleaseStrategy cannot be used instead of MessageGroupStoreReaper . As i stated earlier, release strategy for a message group will kick in only when a message for the groups arrives. Where as a reaper periodically runs as scheduled and expire the message groups if required as configured.

              For example, you want a group to be timed out after 30 secs after the first message in it arrives.

              In case you have a TimeoutCountSequenceSizeReleaseStrategy configured for a threshold message count to 10 and timeout to 30 secs and you receive 7 of 10 messages within 30 secs, then the group will never expire in absence of reaper if no more messages for the group arrives.

              Comment


              • #8
                I have configured org.springframework.integration.store.MessageGroup StoreReaper and its all fixed.

                Context.xml

                Code:
                <?xml version="1.0" encoding="UTF-8"?>
                <beans xmlns="http://www.springframework.org/schema/beans"
                	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
                	xmlns:int-feed="http://www.springframework.org/schema/integration/feed"
                	xmlns:task="http://www.springframework.org/schema/task"
                	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                			http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
                			http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
                			http://www.springframework.org/schema/integration/feed http://www.springframework.org/schema/integration/feed/spring-integration-feed-2.0.xsd">
                
                	<int:poller id="pooler" default="true" fixed-delay="1000"></int:poller>
                	<int:channel id="channel_one">
                		<int:queue capacity="10" />
                	</int:channel>
                
                	<int:channel id="channel_two">
                		<int:queue capacity="10" />
                	</int:channel>
                
                	<bean id="producer" class="com.test.MessageProducer">
                		<property name="channel" ref="channel_one" />
                	</bean>
                	<task:scheduled-tasks scheduler="taskScheduler">
                		<task:scheduled ref="producer" method="run" fixed-rate="3000" />
                	</task:scheduled-tasks>
                
                	<bean id="timeout"
                		class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
                		<constructor-arg name="threshold" value="1000" />
                		<constructor-arg name="timeout" value="7000" />
                	</bean>
                
                	<bean id="correlationStrategy" class="com.test.MyCorrelationStrategy" />
                	<int:aggregator input-channel="channel_one"
                		output-channel="channel_two" release-strategy="timeout"
                		send-partial-result-on-expiry="true" correlation-strategy="correlationStrategy"
                		message-store="myMessageStore" method="aggregate">
                		<bean class="com.test.MyAggreegator" />
                	</int:aggregator>
                
                	<bean id="myMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
                	
                	<bean id="myReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
                		<property name="messageGroupStore" ref="myMessageStore" />
                		<property name="timeout" value="5000" />
                	</bean>
                	
                	<task:scheduler id="myScheduler" />
                	
                	<task:scheduled-tasks scheduler="myScheduler">
                		<task:scheduled ref="myReaper" method="run" fixed-rate="1000"/>
                	</task:scheduled-tasks>
                
                	<bean id="consumer" class="com.test.MessageConsumer" />
                	<int:service-activator input-channel="channel_two"
                		ref="consumer" />
                </beans>
                Thanks Amol and Saral

                Comment


                • #9
                  What was your use case? As release strategy and message group reaper are totally different and are intended for different situations.

                  Comment


                  • #10
                    Hi Pranav ,
                    Thanks for the explnation it is working now,I done the corelational-startergy expression attribute via SPEL itself and it's working..!!Thanks a lot..!!
                    Last edited by SARAL SAXENA; Nov 29th, 2011, 01:57 AM.

                    Comment


                    • #11
                      Originally posted by SARAL SAXENA View Post
                      Hi Amol,

                      As you have said....



                      PLEASE explain the concept of release strategy for a message group...as I am 100% clear on message reaper but not on message group..please if it is best you can tell with a example..!!
                      Hi ,
                      Is there is any way that I would configure only the release out startergy and I don't want to go for message reaper,,, I want that message group of 10 should be formed within 30 secnds and if somehow only 7 messages arrive in 30 seconds then those 7 messages group should be released ...can't release startergy alone handles this case itself..!!

                      Comment


                      • #12
                        NO.

                        As we have said to you before, the aggregator is a passive component and only acts when messages arrive. To activate it outside of message arrival, you need some other component (the reaper).

                        Comment


                        • #13
                          Originally posted by Gary Russell View Post
                          NO.

                          As we have said to you before, the aggregator is a passive component and only acts when messages arrive. To activate it outside of message arrival, you need some other component (the reaper).
                          Hi Gary,

                          As we can set the release startergy in the xml configuration file itself...like the below...

                          Code:
                          bean id="timeout"
                          		class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
                          		<constructor-arg name="threshold" value="1000" />
                          		<constructor-arg name="timeout" value="7000" />
                          	</bean>
                          Can we also define the corelation statergy bean also in xml file for grouping the messages according to same corelatioanal id...rite now in my application I am writing a separate pojo bean for corelational startergy and then passing it reference in aggregatoer in xml., I want thqt co relational startergy should also be there in xmll file itself just as the release startergy..!! is it possible could you please advise me and show some sample tags of that in the xml format..!!!

                          Comment


                          • #14
                            You can use a org.springframework.integration.aggregator.HeaderA ttributeCorrelationStrategy.
                            Define a bean like


                            Code:
                                    <bean id="correlationStrategy" class="org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy">
                            		<constructor-arg value="CORRELATING_ID"/>
                            	</bean>
                            Give the reference to this bean using the correlation-strategy attribute of the aggregator definition in the config

                            This will aggregate the messages based on the value of the header CORRELATING_ID in the message header

                            Comment


                            • #15
                              Hi Amol,

                              In my application I am using the corelation bean as follows....

                              Code:
                              import org.springframework.integration.Message;
                              
                              public class CorrelationBean {
                              	//Not grouping based on correlation ids 
                              	public String correlationStrategy(Message<String> request){
                              		return "NO GROUPING";
                              	}
                              }
                              As seen above technically I doing no grouping on the basis of id's and I am passing the refence of this pojo in my configuration as ..
                              Code:
                              <bean id="correlationBean" class="com.walgreens.ods.producer.CorrelationBean" scope="prototype" />
                              <si:aggregator id="aggregator_2" input-channel="aggregator-input-channel_2" output-channel="aggregator-output-channel_2"
                              		
                              		ref="sampleAggregator" method="aggregateMessagaes"  		
                              		correlation-strategy="correlationBean" correlation-strategy-method="correlationStrategy" 
                              		release-strategy="releaseStrategyBean" release-strategy-method="releaseStrategy" 
                              		message-store="messageStore2"
                              		order="1" 
                              		send-partial-result-on-expiry="true"		
                              		send-timeout = "5000"
                              		>
                              	</si:aggregator>

                              I want to remove this pojo and I want all the configuration in xml file itself as I have done of release statergy , please guide me how this above bean would be look like in the xml file itself..!! thanks in advance..!!

                              Comment

                              Working...
                              X