Announcement Announcement Module
Collapse
No announcement yet.
Aggregator allows messages sent to complete groups to form new groups Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Aggregator allows messages sent to complete groups to form new groups

    The documentation states that "The default behaviour is to send messages with the same correlation as a completed group to the discard-channel.". I have configured an aggregator using a timeout strategy.

    Code:
         <aggregator input-channel="aggregationChannel"
              output-channel="outputChannel" 
              message-store="volatileMessageStore"
              send-partial-result-on-expiry="true" 
              expire-groups-upon-completion="false" 
              discard-channel="nullChannel" 
              group-timeout="2000"
              release-strategy-expression="false" 
              correlation-strategy-expression="headers['BATCH_NAME']" />
    I then created a unit test that sends messages to an aggregator, allows the group to complete and then send another message with the same correlation as the first. I would have expected to not receive a message the second time but I do.

    Code:
             // Send a message to a splitter that will make copies of the message and forward them to the aggregator
              inputChannel.send(MessageBuilder.withPayload("test").setHeader("BATCH_NAME", "001").build());
              
              // We should get one message on the output channel within 2 sec of the last message being added to the group
              Message<?> outMessage = outputChannel.receive(5000);
              assertNotNull(outMessage);
              
              // The MessageGroup for this batch has now been closed and aggregation triggered
              inputChannel.send(MessageBuilder.withPayload(pr).setHeader("BATCH_NAME", "001").build());
              
              // I shouldn't receive anything this time as the additional invoices for the batch would have been sent to the discard channel
              outMessage = outputChannel.receive(5000);
              
              assertNull(outMessage);
    Is there any way I can raise an exception when messages arrive for a messagegroup that has already been aggregated? When using a timeout to complete a messagegroup this is possible if the timeouts are not configured correctly.

  • #2
    Your use case is one that is not currently supported out-of-the-box.

    If a group completes normally and expire-groups-on-completion is false, the group metadata (group is complete state) is retained so that late messages can be detected and discarded.

    If you are using a MessageGroupStoreReaper, you can set minimumTimeoutForEmptyGroups which means these empty groups can be cleaned up on a longer schedule than partial groups, so we keep the group around for a longer time for discarding late messages, but remove them eventually, rather than causing a memory leak by keeping them forever.

    Expired groups are completely different to completed groups. Expired groups (either by timeout, or expire-groups-on-completion="true") are released (or discarded, depending on send-partial-groups-on-expiry) after the timeout, and the group is completely removed from the database.

    It sounds like you want a partially released group (due to expiry/group timeout) to be marked as 'complete' instead of expired.

    That way, late arriving messages for an "expired" group would be sent to the discard channel (where you can throw an exception or whatever you want to do).

    This is a reasonable request and I would encourage you to submit an Improvement JIRA issue (https://jira.spring.io/browse/INT).

    The good news is you can easily implement your own subclass to do what you want.

    In 4.0, we made most of the aggregator's methods protected to allow them to be overridden.

    You will notice that in forceComplete() we set a boolean 'remove' to 'true' by default.

    https://github.com/spring-projects/s...dler.java#L460

    It is set to false later if we detect that the group has changed since it was selected, or if it's an empty group that should not be expired yet.

    So, subclass AggregatingMessageHandler; override it's superclass (AbstractCorrelatingMessagehandler) forceComplete() method.

    Copy the code and change the default value of 'remove' to false. It will only be set to true later if a group is empty and minimumTimeoutForEmptyGroups has been exceeded - set it to a large enough value to keep the empty group around (not needed if you are not running a message group store reaper, but remember these empty groups will stick around forever otherwise).

    So, that will stop the group from being removed, but we still need to mark the group complete and remove the messages that were released (turn it into an empty group).

    To do that, call afterRelease() after the group is released...

    https://github.com/spring-projects/s...dler.java#L497
    Code:
                            if (releaseStrategy.canRelease(groupNow)) {
                                this.completeGroup(correlationKey, groupNow);
                            }
                            else {
                                this.expireGroup(correlationKey, groupNow);
                            }
                            afterRelease(groupNow, null);
    The second argument can be null; it's not used in the aggregator's afterRelease() method.


    To invoke your custom aggregator, define it as a <bean/> and then reference it from a
    Code:
    <service-activator ref="myAgg" input-channel="foo"/>
    Note: no output channel - that must be supplied to your bean in its outputChannel property.


    Hope that helps.

    Comment


    • #3
      Thanks Gary. I hadn't understood the difference between group expiry and group completion.

      My use case is that I have a splitter that splits a huge file as a stream. Because of this it does not know the number of messages that will be split ahead of time so the 'size' header is not set. That is why I want the aggregator to use the timeout to complete the group and why it is important that a group only completes once.

      I will raise an improvement issue. I might also include one for a streaming splitter as I implemented my own to avoid loading a big message into memory.

      Comment


      • #4
        I discovered that there is already an improvement issue for a StreamingMessageSplitter which would be more elegant than my solution.

        https://jira.spring.io/browse/INT-651

        The corresponding Aggregator improvment issue is:

        https://jira.spring.io/browse/INT-3420

        Comment


        • #5
          Unfortunately the forceComplete() method in AbstractCorrelatingMessagehandler is private and so cannot be overridden in a subclass. I can't see any way to introduce the behaviour you described without copying and modifying both AggregatingMessageHandler and AbstractCorrelatingMessagehandler in their entirity.

          Comment

          Working...
          X