Announcement Announcement Module
No announcement yet.
default behavior of Aggregator Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • default behavior of Aggregator

    Hello All,

    I'm trying to configure a simple aggregator which will correlate based on the value in the header key "CORRELATION_ID" and release if no new messages have arrived w/in 3 seconds. Going through the 2.1.4.RELEASE reference, section 5.4.4. suggests to me that the following should work:

    <bean id="timeout"
    		<constructor-arg name="threshold" value="1000" />
    		<constructor-arg name="timeout" value="3000" />
    <int:aggregator input-channel="channel3" output-channel="channel4" discard-channel="channel4"
    		send-partial-result-on-expiry="true" >
    but it doesn't, throwing a IllegalStateException: "Null correlation not allowed. Maybe the CorrelationStrategy is failing?" I thought the correlation-strategy was optional and used the CORRELATION_ID header by default.

    If I provide an explicit correlation-strategy like this:

    <bean id="correlationStrategy" class="org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy">
    		<constructor-arg name="attributeName" value="CORRELATION_ID" />
    then the AbstractMessageHandler throws a IllegalStateException "Invalid method parameter for payload: was expecting collection".

    Is there a simpler way to go about this? Am I misunderstanding the docs? Any help or clarification would be much appreciated.



  • #2
    CORRELATION_ID is the constant; the value is "correlationId" - see MessageHeaders.

    Correlation is NOT optional - the aggregator needs something to aggregate on. If you don't have a correlationId header, you need to supply one, or use a correlation strategy that uses some other technique to group messages. If all messages are in the same 'group' and you always want to release based on time, use correlation-strategy-expression="'foo'" - note single quotes around foo within double quotes - foo is a constant (literatl).

    Finally, the aggregator (and release strategy) are passive components; their algorithms are triggered by a message arrival. You can't have a release strategy that

    release[s] if no new messages have arrived w/in 3 seconds
    See callout #8 in the section 5.4.4 you cited:

    Indicates that expired messages should be aggregated and sent to the 'output-channel' or 'replyChannel' once their containing MessageGroup is expired (see MessageGroupStore.expireMessageGroups(long)). One way of expiring MessageGroups is by configuring a MessageGroupStoreReaper. However MessageGroups can alternatively be expired by simply calling MessageGroupStore.expireMessageGroup(groupId). That could be accomplished via a Control Bus operation or by simply invoking that method if you have a reference to the MessageGroupStore instance. Otherwise by itself this attribute has no behavior. It only serves as an indicator of what to do (discard or send to the output/reply channel) with Messages that are still in the MessageGroup that is about to be expired. Optional.
    Configuration of a MessageGroupStoreReaper (how often it runs etc) is described near the bottom of section 5.4.5.


    • #3
      Thanks Gary, I think I have it now and I really appreciate your help.

      One further question - could you please elaborate on the "send-timeout" for an Aggregator described as:

      "The timeout interval for sending the aggregated messages to the output or reply channel. Optional."



      • #4
        elaborate on the "send-timeout" for an Aggregator
        This attribute doesn't come into play in most scenarios.

        The send-timeout really only applies if the output-channel might block. This is somewhat rare; one example is if the output-channel is a QueueChannel with a capacity limit, and the queue is full. Then, if space is not available within the send-timeout, an exception is thrown.