Announcement Announcement Module
Collapse
No announcement yet.
Aggregating groups of varying length Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Aggregating groups of varying length

    I have a series of objects that I want to send into SI. They are ordered by the ID of each object, so I can create a correlation id for each related set of them. They arrive in order one after the other without delay. I don't know how many are in each group however; it varies. I was hoping to have the aggregator sort that out for me.

    Some of the sets of objects have 2 in a group and some have 3 or another number so sequence size doesn't really help me.

    I tried setting up an aggregator and messages that have a high sequence size and using a reaper to grab groups that have been sitting there for a few seconds; those ones would be complete.

    However, try as I might, I don't get the right number of items in each group.

    Code:
    <aggregator input-channel="inboundDependentObjects"
                expire-groups-upon-completion="true"
                send-partial-result-on-expiry="true"
                message-store="inboundDependentObjectsMessageStore"
                output-channel="aggregatedDependentObjects"
                />
    
    
        <beans:bean id="inboundDependentObjectsMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
            <beans:property name="messageGroupStore" ref="inboundDependentObjectsMessageStore"/>
            <beans:property name="timeout" value="4000"/>
        </beans:bean>
    
        <task:scheduled-tasks scheduler="scheduler">
            <task:scheduled ref="inboundDependentObjectsMessageStoreReaper" method="run" fixed-rate="100"/>
        </task:scheduled-tasks>
    Most often I expect groups of 3 but it could vary. With the setup I have now I get one or two per group. I haven't been able to sort it out.

    Perhaps there is a better way that I haven't thought of?

    Thanks,
    Matt

  • #2
    It's not clear what your question is; you can use a custom ReleaseStrategy to release on whatever algorithm you want. e.g group size >=2 and time since last arrival > 1000ms.

    Of course, this might release at 2 when #3 might arrive 1ms later.

    It is important to understand that the release strategy is triggered by message arrival (aggregators are passive). That's where the reaper comes in.

    If you can explain exactly what you want, we can try to help. But, reading the above, I hope it's clear you can't have an algorithm "group size >= 2 and last arrival <= 1000ms ago and next arrival won't be in < 10ms".

    However, if the message producer knows the sizes of the groups then simply have it set the sequenceSize/sequenceNumber headers appropriately and you can then use the default strategies.

    Comment


    • #3
      Hi Gary,

      You've given me a few good options to think about so thank you. I'll take another kick at the can however and see what you think.

      To clarify, the data arrives from the message producer in correlation id order and the messages should arrive in quick succession; there should be little or no latency. So, it looks like the following:

      Code:
      Message    CorrelationId
       A                 1
       B                 1
       C                 1
       D                 2
       E                 2
       F                 3
       G                 4
      So, I want to end up with groups [A,B,C], [D,E], [F] and [G] - but, the message producer doesn't know the group size unless I write code for it to figure it out; but I was trying to avoid that.

      I realize I could have the message producer group these items and send them into SI, however, I was hoping to have the aggregator take care of it.

      I had thought that if I set a high sequence size (such as 500) that the reaper would release the existing groups that are sitting in the message store after the timeout.

      If it turns out that it would simpler just to get the producer to do the aggregation or provide the sequence size then I'm open to that too, but I think it would be better from my end if I can get the aggregator to do it. It would be a more general solution that I want to apply in a number of different cases.

      Open to other suggestions also. I hope I've mode things more clear.

      Thanks,
      Matt

      Comment


      • #4
        In that case, using a large sequenceSize, the reaper will work just fine, as long as you can live with a minimum release time of 4 seconds.

        When A,B,C show up (but no more with the same correlationId), you will see the group of 3 released 4 seconds after C arrives.

        Similarly, D, E will be released 4 seconds after E arrives.

        Running with DEBUG logging should make everything clear.

        Comment

        Working...
        X