Announcement Announcement Module
Collapse
No announcement yet.
Aggregator Issue Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Aggregator Issue

    Hello,

    I'm getting a strange behavior with the aggregator with SI 2.1.4 as compared to SI 2.0.3. Reduced to a test case which just accumulates up to two messages before releasing:

    Code:
    public class AgTest {
    
    	public static void main(String[] args) {
    		ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("beans.xml");
    		MessageChannel ch = (MessageChannel) ctx.getBean("channel1");
    		MessagingTemplate template = new MessagingTemplate();
    		for(int z = 0 ; z < 5 ; z ++) {
    			template.send(ch, MessageBuilder.withPayload(1).build());
    			template.send(ch, MessageBuilder.withPayload(2).build());
    		}
    	}
    	
    	public Integer aggregate(List<Integer> numbers) {
    		return 666;
    	}
    	
    	public Integer correlate(Integer number) {
    		return 1;
    	}
    	
    	public boolean canRelease(List<Integer> numbers) {
    		return numbers.size() == 2;
    	}
    With the configuration:

    Code:
      	<int:channel id="channel1"/>
      	
     	<int:aggregator id="ag" 
    		input-channel="channel1" ref="agm" method="aggregate" 
    		output-channel="channel2"
    		send-partial-result-on-expiry="true"
    		correlation-strategy="agm" correlation-strategy-method="correlate"
    		release-strategy="agm" release-strategy-method="canRelease" />
    	
    	<bean id="agm" class="com.borrar.AgTest"/>
     	
     	<int:channel id="channel2"/>
      
      	<stream:stdout-channel-adapter id="stout" channel="channel2" append-newline="true"/>
    With SI 2.1.4 I just get one "666" on stdout. With SI 2.0.3 I get (as I expected) five times a "666". Is this a change in the behavior?

    regards,

    Diego

  • #2
    Hello, some idea? What could be wrong?

    BTW the documentation regarding the "release" is not clear enough (at least for me.) It starts with the "group released for aggregation", but the rest of the paragraph states that the group could also be "complete" and implies that in the first case the group will not be deleted from the message store... why two cases? for the "partial results on expiration"? so the partial results sent that way will remain in the message store?

    It also introduces a "marked/unmarked messages" concept that is superposed to the "group completed (and ready to be released)" vs "group uncomplete (and not released)":

    From section 5.4.3 for 2.1.4.RELEASE
    "When the group is released for aggregation, all its unmarked messages are processed and then marked so they will not be processed again. If the group is also complete (i.e. if all messages from a sequence have arrived or if there is no sequence defined), then the group is removed from the message store. Partial sequences can be released, in which case the next time the ReleaseStrategy is called it will be presented with a group containing marked messages (already processed) and unmarked messages (potentially a new partial sequence)."

    Comment


    • #3
      This is a change in behavior between 2.0 and 2.1. It is discussed in the migration guide.

      https://github.com/SpringSource/spri...igration-Guide

      By default, once a group (messages with the same correlation) has been marked complete, all subsequent messages for that group are sent to the discard channel (if defined).

      To revert to the previous behavior, you have to set expire-groups-upon-completion="true" on the aggregator...

      Code:
      <int:aggregator id="ag" 
      	input-channel="channel1" ref="agm" method="aggregate" 
      	output-channel="channel2"
      	send-partial-result-on-expiry="true"
      	correlation-strategy="agm" correlation-strategy-method="correlate"
      	release-strategy="agm" release-strategy-method="canRelease" 
      	expire-groups-upon-completion="true" />
      This removes the group and allows a new group to be created the next time a message with the expired correlation arrives.

      The notion of marked/unmarked messages no longer exists but it appears the documentation was not updated. Please open a 'Documentation' JIRA issue for that; thanks. https://jira.springsource.org/browse/INT

      Comment


      • #4
        Thank you Gary.

        https://jira.springsource.org/browse/INT-2832

        One more question: with the new behavior, in order to discard such subsequent messages, the aggregator will have to store every past correlation key? this could exhaust the JVM memory or I'm not understanding at all?

        Comment


        • #5
          No, you are correct - I was expecting that configuring a MessageGroupStoreReaper would remove these "empty" groups but it does not.

          In other words, you should be able to configure a MGSR to run every, say, 30 mins and clean up these released groups - picking an expiry time long enough so that you can be confident no new messages would arrive in that time and the reaper should remove the group.

          Code:
           	<int:aggregator id="ag" 
          		input-channel="channel1" ref="agm" method="aggregate" 
          		output-channel="channel2" 
          		send-partial-result-on-expiry="true" discard-channel="discard"
          		correlation-strategy="agm" correlation-strategy-method="correlate"
          		release-strategy="agm" release-strategy-method="canRelease" 
          		expire-groups-upon-completion="false" 
          		message-store="ms" />
          
            	<bean id="ms" class="org.springframework.integration.store.SimpleMessageStore" />
            	
            	<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
            		<property name="messageGroupStore" ref="ms" />
            		<property name="timeout" value="10000" />
            	</bean>
            	
            	<task:scheduled-tasks>
            		<task:scheduled ref="reaper" method="run" fixed-rate="15000" />
            	</task:scheduled-tasks>
          However, I notice that the reaper only expires groups that actually contain messages.

          This is a bug. I will open another JIRA (unless you beat me to it

          Comment


          • #6
            https://jira.springsource.org/browse/INT-2833

            Comment


            • #7
              Wow , that explains another of my issues. IMHO the expire-groups-upon-completion="true" should be the default. BTW I don't like the aggregator's new role of remembering such imprecise number of "recent past messages" (maybe it could be more useful with a new "remember()" strategy method). Anyway, with the new behavior the MGSR should be highly recommended (or mandatory) to prevent memory exhaustion.

              Thanks a lot for the support. I'm using the aggregator for some time and really helped in my work.

              Comment


              • #8
                Yes; we were talking about this today and, on reflection, it would have been better to default to TRUE. But, unfortunately, 2.1 has been out for nearly a year and 2.2 is about to be released so we couldn't make such a change right now. We might consider it in 3.0.

                The main driver for the change was to resolve a number of issues, including the proper handling of late-arriving messages and a (previously) dysfunctional resequencer which used the same code base.

                Comment

                Working...
                X