Announcement Announcement Module
Collapse
No announcement yet.
Spring integration aggregator Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Spring integration aggregator

    I'm new to spring integration and have been trying to set up an aggregator.
    What I'm trying to achieve is an aggregator that will combine messages into configable block (say 10 messages) but if their is less than 10 will send them on in a configurable time.
    Code:
    <integration:aggregator id="completelyDefinedAggregator"
    		input-channel="aggregatorChannel" 
    		output-channel="JMSChannel" 
    		ref="aggregatorBean"
    		method="add" 
    		release-strategy="aggregatorBean"
    		release-strategy-method="canRelease" 
    		send-partial-result-on-expiry="true"
    		message-store="messageStore" />
    
    	<beans:bean id="aggregatorBean" class="Aggregator">
    		<beans:property name="numberOfRecords" value="${aggregator.numberOfRecords}" />
    	</beans:bean>
    
    	<beans:bean id="messageStore"
    		class="org.springframework.integration.store.SimpleMessageStore" />
    
    	<beans:bean id="reaper"
    		class="org.springframework.integration.store.MessageGroupStoreReaper">
    		<beans:property name="messageGroupStore" ref="messageStore" />
    		<beans:property name="timeout" value="30000" />
    	</beans:bean>
    
    	<task:scheduled-tasks scheduler="scheduler" >
    		<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
    	</task:scheduled-tasks>
    
    	<task:scheduler id="scheduler" pool-size="1"/>
    I have tried to create an aggregator which will allow partial results to be sent using a MessageGroupStoreReaper on a simple message store.

    Now the i use to test if a message can be release which works fine but the remaining messages are never flushed.
    Code:
    	public boolean canRelease(List<String> messages) {
    		if (messages.size() >= this.getNumberOfRecords()) {
    			return true;
    		} else {
    			return false;
    		}
    	}
    Even stranger is that I'm using Eclipse and running in debug this seems to work ie the messagestore is expired as long as i take my time in the add method

    Code:
    	public String add(List<String> records) {
    		if (records.isEmpty()) {
    			return null;
    		} else {
    			StringBuffer sb = new StringBuffer();
    
    			for (String record : records) {
    				sb.append(record);
    			}
    			System.out.println(sb.toString());
    			return sb.toString();
    		}
    	}
    Is this a bug? or am i mad

  • #2
    Any one got any ideas

    I did see a thread that suggested that no call backs are registered with the message store at the moment and this had been fixed.

    I have used MessageGroupStoreReaper to see try and force an expire but still no job is this functionality broken?

    Comment


    • #3
      Hmm, this actually might be a bug.
      Could you please explain the steps to reproduce it or may be attach a test which would be very helpful?

      Comment


      • #4
        Right my general flow is.

        file inbound-channel-adapter --> file-to-bytes-transformer --> splitter --> transformer --> aggregator --> jms outbound-channel-adapter

        The idea is to take a CSV file and split every line into a message before applying a transformer to pad to a fixed format. The aggregator is used to combine the messages in such that the MQ message length is not exceeded. As the message is fixed length its possible to say the release strategy is n number of messages.

        This all works fine for multiples of N but messages < N are stuck within the message store (IE the remaining lines of the file) as above I have tried to expire the messages by creating an SimpleMessageStore which can be assigned to the aggregator and the MessageGroupStoreReaper which is then scheduled to run at a set interval and call the method run which should expire all messages, (it doesn't)

        What I'm trying to achieve is a release strategy of TimeoutCountSequenceSizeReleaseStrategy as when using this bean directly it is unable to take the MessageGroup as a collection

        Hope this helps let me know if i can say anymore

        Comment


        • #5
          Actually i was able to reproduce it and it looks like we have a bug. Give me another day and I'll send you more details.

          Comment


          • #6
            Could you please raise an issue in JIRA - https://jira.springsource.org/browse/INT? The fact that you are not getting partial release is a bug. Good news is I can address it rather quickly (small code change), but you gonna have to rely on the snapshots for a while (about a month)

            Comment


            • #7
              Has this been raised as a bug? I couldn't see any entry for it in the JIRA timeline

              Comment


              • #8
                Yes it has https://jira.springsource.org/browse/INT-2013

                Comment


                • #9
                  Originally posted by oleg.zhurakousky View Post
                  Oleg, on the jira is "Reaper will run every 5 sec" a cut and paste error? Or am I missing something

                  Comment


                  • #10
                    Good catch. Just changed it

                    Comment


                    • #11
                      Sorry Oleg, confused again:

                      Originally posted by oleg.zhurakousky View Post
                      For example
                      <task:scheduled ref="reaper" method="run" fixed-rate="5000"/> - Reaper will run every 5 sec
                      <property name="timeout" value="3000" /> - timeout property of the Reaper is set to 3 sec

                      The 3 seconds becomes meaningless since partial release can happen no earlier than 1 min.
                      I thought the 'fixed-rate' was supposed to be the 1 min (60000ms) referred to in the text.
                      But I might have misunderstood.

                      Comment


                      • #12
                        Well, i have to change it again. Actually all i needed to change the first time is "Reaper will run every 5 sec" to "Reaper will run every 1 min"

                        Comment


                        • #13
                          Originally posted by oleg.zhurakousky View Post
                          Well, i have to change it again. Actually all i needed to change the first time is "Reaper will run every 5 sec" to "Reaper will run every 1 min"
                          That was my reading of it too

                          Comment

                          Working...
                          X