Announcement Announcement Module
No announcement yet.
Aggregator Cleanup Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Aggregator Cleanup

    I am currently using an aggregator to aggregate my messages in batches of 10. The number of messages in a batch is never a factor of 10 so they all don't get processed. For instance, if I have 35 messages only 30 messages are sent along and 5 remain unprocessed.

    How do I force the aggregator to send the remaining 5 messages when I know it is complete? Can I send a message to the aggregator to initiate this?


  • #2
    I can think of two options off the top of my head.

    1) you can set the flag to send incomplete aggregates on timeout

    2) you can send a Message that somehow signifies completion of a group to the Aggregator

    Option 1 is obviously the simplest, but you might not be able to configure a timeout value that is both reasonable to accommodate normal complete groups and not adding excessive delay to incomplete groups.

    Option 2 would require that the correlationStrategy knows to correlate that "End of Sequence" message to the proper group, and then your aggregation strategy would need to exclude that special message when combining the result.

    Does that sound like it would address the problem? I'm interested in your feedback, since something along the lines of Option 2 will almost definitely be included in 2.0.

    I guess a third option would be to somehow set the sequence size of that incomplete group. I would need to know a bit more about how you are creating and sending these messages to the Aggregator.



    • #3
      Thanks for your quick response. The messages I am processing is a batch of xml messages. The first message in the batch is a START message and the last message is a STOP message. Both the START and STOP messages contain an indicator of the source of the message, so I know when a source STARTS and STOPS. I am grouping the messages in between the START and the STOP by creating a correlation id using the source id, a secondary id, and batch counter, so I can send a batch of (source id, secondary id) objects repeatedly.

      As you say, Option 1 would be simplest, but I like Option 2 better because I already have the STOP message in place and there's no need to wait.

      However, it's not clear to me what the 'proper group' is. My assumption is that it could be building multiple groups of messages to send and when the STOP is sent it would need to send all the groups?

      How do I tell the aggregator to send the groups it has accumulated?

      Will it send them all at once or one group at a time?

      Thanks for you help,


      • #4

        If I understand things right, you have the following setting:

        - multiple groups of messages of variable sizes, grouped by their source
        - you want to send them in batches of 10, but since the total number is not a multiple of 10, the last group might be smaller.

        Here's what you need to do:

        1) Either set the correlation id of the message to the source id, or use a CorrelationStrategy that returns the id of the source (so that all messages from the same source are grouped together).

        2) Create a custom completion strategy that verifies that the group is either of size 10, or contains the STOP message

        3) disable the tracking of correlation ids.

        What is going to happen: messages will start accumulating in the aggregator until the condition mentioned at point 2 is fulfilled - aggregation takes place, and the first message is sent. Then the whole process starts again. Disabling the tracking of correlation ids is necessary, because otherwise, after aggregation, the subsequent messages in the group will be rejected.

        Hope this helps,


        • #5
          Thank you Marius for your reponse. It is helpful.

          I am grouping the messages by s compound key (source, track).

          When each (source, track) key reaches the batch number, say 10, the (source, track) batch of messages is sent.

          When the STOP messages arrives, it is possible that I will have multiple groups of messages (MessageBarriers) to send.

          At this point I will send all the MessageBarriers that aren't empty.

          Thanks again for your time,


          • #6

            So there's a single STOP message that applies to all the groups?

            I think that in this case you may need to customize the current Aggregator implementation (e.g. by creating a subclass) to execute a forced "flush" of all the currently held groups, once the STOP message arrives.



            • #7
              Thanks. That's what I was thinking.


              • #8
                It's not clear to me how to flush the groups. For some reason the barriers in AbstractMessageBarrierHandler is always empty, although there are messages that haven't been sent.

                Also, If I customize the current Aggregator implementation (presumably extends AbstractMessageAggregator), how do I configure it in XML? Do I configure it as a <bean> and configure the <aggregator> using the <bean> definition or should I be configuring it as a <aggregator> directly (somehow)?
                Last edited by cmdunn01886; Jul 21st, 2009, 12:19 PM.