Announcement Announcement Module
Collapse
No announcement yet.
2.0.0M4 Aggregator timeout attribute, where did it go? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • 2.0.0M4 Aggregator timeout attribute, where did it go?

    Hello and thanks for the work on a product which appears to have great potential and save me lots of work!

    I've noticed a number of changes in my migration from M3 to M4 related the release strategies and the "timeout" attribute for the aggregator tag. I've search a bit and have been unsuccessful in determining how I'm supposed to set the timeout value.

    Do I need to create my own "Message Store", then reference the store inside the aggregator? Does the new MessageGroupStoreReaper play some part in the newly missing "timeout" attribute?


    <code>
    <!-- M3 version -->
    <aggregator input-channel="appChannel"
    output-channel="collectedChannel"
    correlation-strategy="partyCorrelationStrategy"
    completion-strategy="falseCompletionStrategy"
    send-partial-result-on-timeout="true"
    timeout="60000">
    </aggregator>
    </code>

    <code>
    <!-- M4 version -->
    <aggregator input-channel="appChannel" output-channel="collectedChannel" discard-channel="logger"
    correlation-strategy="partyCorrelationStrategy"
    release-strategy="falseReleaseStrategy"
    send-partial-result-on-expiry="true">
    </aggregator>

    </code>

  • #2
    The MessageGroupStoreReaper is indeed the location of the expiry responsibility (which used to be on a timer inside the aggregator endpoint itself). So if you really need that feature then you just set up an explicit message store instance (instead of using the default in the aggregator), then share that store between the aggregator and a reaper and put the latter in a <task:scheduled-tasks/> (there is an example in the store test package in SVN).

    Note, however that there are some use old M3 cases that may not require explicit expiry. In particular if all you need is protection against a memory leak as long as the whole sequence eventually arrives, then the ReleaseStrategy handles that. And in cases where there is no sequence and you are just grouping a stream of correlated messages, the MessageGroup is always complete as soon as it is released, so there is also often no need for a timeout - you might be happy just to have unprocessed messages expire on ApplicationContext shutdown.

    Comment


    • #3
      MessageGroupStoreReaper, aggregator and messageStore via xml not working either

      Dave,
      Thanks for the reply and it is quite possible that I'm making it harder than it has to be. My current process is to collect an unknown number of messages (i.e. no sequence numbers) generated by the database which is where the timeout came in handy.

      I did attempt to use the MessageGroupStoreReaper with the following XML configuration and specify the messageStore in the aggregator tag. This did not appear to work and I'm thinking because 2 message store were created and the one I specified did not have listeners registered.

      I have not had a chance reproduce the scenario with a test case and have reverted back to 1.0.4 for the time being.

      <code>
      <beans:bean id="messageStore" class="org.springframework.integration.store.Simpl eMessageStore" />

      <beans:bean id="reaper" class="org.springframework.integration.store.Messa geGroupStoreReaper">
      <beansroperty name="messageGroupStore" ref="messageStore"/>
      <beansroperty name="timeout" value="30000"/>
      </beans:bean>

      <task:scheduled-tasks scheduler="scheduler">
      <task:scheduled ref="reaper" method="run" fixed-rate="5000"/>
      </task:scheduled-tasks>

      <task:scheduler id="scheduler"/>
      </code>

      - Scott

      Comment


      • #4
        That config looks good, but you didn't show your aggregator (it needs a reference to the message store as well).

        Comment


        • #5
          Dave,
          From memory the aggregator looked something like the one below. If I'm remembering correctly the messageStore I'm referencing here did get used, however the only registered callbacks were from the MessageStore created in the Aggregator constructor.

          <code>
          <aggregator input-channel="appChannel" output-channel="collectedChannel" discard-channel="logger"
          correlation-strategy="partyCorrelationStrategy"
          release-strategy="falseReleaseStrategy"
          messageStore="messageStore"
          send-partial-result-on-timeout="true">
          </aggregator>
          </code>

          From the CorrelatingMessageHandler constructor
          <code>
          store.registerMessageGroupExpiryCallback(new MessageGroupCallback() {
          public void execute(MessageGroup group) {
          forceComplete(group);
          }
          });
          </code>

          should the setMessageStore() do similar callback registration or is that left up to the user?

          - Scott

          Comment


          • #6
            Good catch (https://jira.springsource.org/browse/INT-1140). Fixed it.

            Comment

            Working...
            X