Announcement Announcement Module
Collapse
No announcement yet.
Grouping with Aggregator only releases first group Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Grouping with Aggregator only releases first group

    I've been trying to reproduce the behaviour shown at this github page:
    https://github.com/olegz/s12gx.2011/...ion/aggregator

    However, I see only one group released when debugging and in the log.

    Code:
    <channel id="inboundBatchingChannel" />
    
        <aggregator input-channel="inboundBatchingChannel"
                    output-channel="batchChannel"
                    expire-groups-upon-completion="false"
                    send-partial-result-on-expiry="true"
                    message-store="batchMessageStore"
                    release-strategy-expression="size() == 10"
                />
    
        <channel id="batchChannel">
            <interceptors>
                <wire-tap channel="aggregatedBatchLogger"/>
            </interceptors>
        </channel>
    
        <logging-channel-adapter id="aggregatedBatchLogger"
                                 log-full-message="true"
                                 level="DEBUG" logger-name="AGGREGATED-BATCHES-LOGGER"  />
        
        <beans:bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
            <beans:property name="messageGroupStore" ref="messageStore"/>
            <beans:property name="timeout" value="5000"/>
        </beans:bean>
    
        <task:scheduled-tasks scheduler="scheduler">
            <task:scheduled ref="reaper" method="run" fixed-rate="100"/>
        </task:scheduled-tasks>
    
        <task:scheduler id="scheduler"/>
    I have 25 items being written to the inbound channel. I'm hoping for 3 groups to be created by the aggregator, but I only see one.

  • #2
    Hello!
    I'm hoping for 3 groups to be created
    Messages are grouped by CorrelationStrategy. In your case it is a default one:
    Code:
    new HeaderAttributeCorrelationStrategy(MessageHeaders.CORRELATION_ID)
    From other side you tell to the aggregator:
    expire-groups-upon-completion="false"
    send-partial-result-on-expiry="true"
    release-strategy-expression="size() == 10"
    So, it creates a MessageGroup based on CORRELATION_ID. And if all your 25 inbound Messages have the same CORRELATION_ID, the aggregator creates the group on first Message and it will by only one group in the MessageStore for these 25 Messages. Further it releases Messages on expiry by reaper as a list of 10 and only once. And, of course, never removes this group, because expire-groups-upon-completion="false". In this case the MessageGroup will be marked as "completed" and won't allow any new Message. It is a scenario "Late arrival" .
    There is a discardChannel on the AbstractCorrelatingMessageHandler. It allows to you to handle "Late arrival" Messages.
    It's about theory.

    And now a question: what do you want to achieve?

    Take care,
    Artem

    Comment


    • #3
      I want to process the objects 10 at a time, and then process whatever is left over at the end. In this case there would be 2 groups of 10 and one group of 5. (25 messages overall)

      The example I cited claims that this can be done and uses the same correlation id throughout. However, I found it doesn't work.

      So, I created a separate correlationId per group and now it works well. (ie. For each set of 10 increment the correlationId ) - This way the aggregator releases each set of 10 and the reaper releases the group of 5 at the end.

      If there is another way I'd be interested to hear about it.

      Thanks,
      Matt

      Comment


      • #4
        expire-groups-upon-completion="true" should fix your wishes with the same correlationId

        Comment


        • #5
          Yes, it works now. This is very good. Thanks so much for the help.

          Comment

          Working...
          X