Announcement Announcement Module
Collapse
No announcement yet.
Aggregator aggregates same group many times Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Aggregator aggregates same group many times

    Hi,

    I'm trying to use the SI aggregator. Here is my configuration:

    <aggregator id="aggId" input-channel="aggregatorChannel" output-channel="aggregatedChannel" ref="myAggregator" method="aggregate"
    completion-strategy="myStrategy" completion-strategy-method="checkCompleteness" timeout="20000" send-partial-result-on-timeout="true" send-timeout="1000000" />

    In the checkCompleteness method, I return true if the size of the list is greater than 20 (ie: the aggregation is done when 20 received are received or after a timeout of 20 sec).

    The aggregate method simply puts all the messages to be aggregated in one message and returns it.

    I did a test by sending to the input channel 100 messages and I have noticed that the aggregator aggregates many times the same group of messages: logs showed that this aggregation is done each time by a different thread.

    After a group of message is aggregated first time, logs also showed that there is no message (with same correlation id as this group) is received.

    I believe I did some thing wrong in my config

    Thanks in advance for your help!

  • #2
    I don't see anything wrong with that particular configuration, but let's start by saying that something is wrong ... somewhere .

    Now, could you please post:

    - the code that sends the messages
    - the completion strategy
    - the aggregator

    Thanks,
    Marius

    Comment


    • #3
      Thank you for your reply.

      I have attached java classes and xml config.

      Just to clarify what I'm trying to do with the Aggregator:

      Each message I receive has a type. I want to do a first processing for each message individually and then aggregate all message with the same type and do a second processing.

      So I pass the message in a transformer to add the correlation id in it once the first processing is done. First I used the type of the message as correlation id, but I noticed that once a first aggregation is done for a certain type, the aggregator ignores all messages of this type which arrived after this aggregation (while what I want to do is aggregating again these new arriving messages and forget about already aggregared messages). So I understood that I have to change the correlation id of messages when this correlation id was used for an aggregation. I then added an AggregationIdProvider that provides me with ids which were never used for a certain type.

      To avoid missing a message for which the transformer assigned an already used id, I used the field discard-channel to route discarded message to the input channel of the transformer which assigns correlation ids and hence get new correlation id assigned to it (I might add some check to avoid getting a message looping infinitely between transformer and aggregator)!


      If you execute the test (which sends 100 messages and aggregates by 30 or each 20 sec), you will see logs like this:

      Aggregating [30] messages...

      and you will notice that the same group of messages is aggregated many times!

      Thanks in advance for your help!

      Comment


      • #4
        OK, thanks, I'm going to look into it some time in the next couple of days.

        Comment

        Working...
        X