Announcement Announcement Module
Collapse
No announcement yet.
Reordering messages in a time window rather than by known group size Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Reordering messages in a time window rather than by known group size

    The Problem
    I've looked through a lot of SI stuff including the refguide and the SI Samples on GitHub, and I can't seem to find anyone trying to do what I need to do. I have messages coming in from various sources, and I need to resequence them to go out "in order" based on some definition of "in order". The problem is that the messages aren't necessarily related in any way other than this ordering, so I can't define "message groups" to be aggregated and released, like the standard Resequencer does.

    Suggested Algorithm for Solution
    The current thought on how to solve it is to:
    1. delay all messages for some period of time, and
    2. release, in order, any messages that:
      1. have served their full delay and
      2. wouldn't be out of order with any other messages that have come in, *including* any that haven't served the full delay yet
    The Question
    Is there a clean way to do this in SI that I'm just not seeing? Is there a different or better way to solve this that I'm not thinking of?

    Example of the Problem and Suggested Solution
    Let me try to give an example to help clarify the problem. Assume messages "A", "B", "C", and "D" should be sent out in their lexicographical order. Further, assume that the messages are all received 1 second apart in the order "B", "C", "A", "D".

    Using the suggested solution with a delay period of 1.5 seconds should cause this sequence:
    • 0s: Receive B
    • 1s: Receive C
    • 1.5s: B has served its delay, so it can be sent: Send B
    • 2s: Receive A
    • 2.5s: C has served its delay, but can't be sent because A has arrived and needs to be sent before C, but A hasn't served its delay
    • 3s: Receive D
    • 3.5s: A has served its delay, so A and C can now be sent: Send A and C, in order
    • 4.5s: D has served its delay: Send D
    So a 1.5s delay would yield the messages in order B, A, C, D. Increasing the delay to 2.5s would cause everything to be correctly ordered because:
    • 0s: Receive B
    • 1s: Receive C
    • 2s: Receive A
    • 2.5s: B has served its delay, but can't be sent because A has arrived
    • 3s: Receive D
    • 3.5s: C has served its delay, but can't be sent because A still isn't out of its delay perioud
    • 4.5s: A has served its delay, so A, B, and C can be sent in order
    • 5.5s: D has served its delay: Send D
    Thus correctness of order is a function of how much delay you're willing to accept in message flow.

  • #2
    Hi!

    Maybe I don't understand fully your task, but what I see:
    1. You need here: <dalayer>, <<aggregator>, MessageGroupStore, custom ReleaseStrategy, custom MessageGroupProcessor
    2.
    delay all messages for some period of time
    Just to use <dalayer>: http://static.springsource.org/sprin...r.html#delayer
    But with external message-store:
    HTML Code:
     <int:delayer id="delayer" 
            default-delay="1500"
            message-store="delayerMessageStore">
    3. Aggregator after dalayer: http://static.springsource.org/sprin...ml#aggregator:
    HTML Code:
    <aggregator correlation-strategy-expression="1"
    				release-strategy=""
    				ref=""/>
    where:
    - correlation-strategy-expression="1" it is some workaround, if you don't have any ability to group you incoming Messages. You should think here more: maybe you can find some external corellation. From other side there is an expire-groups-upon-completion="true"
    4. Your custom ReleaseStrategy#canRelease should look in the delayerMessageStore on group delayer and compare messages via desired logic, e.g.:
    C has served its delay, but can't be sent because A still isn't out of its delay perioud
    For this sample it will be enough to check that there is no 'A' message in the delayer.
    5. custom MessageGroupProcessor may just sort released messages in the appropriate order. And return a LinkedList<Message>.
    6. In the end will work
    Code:
    for (Object next : (Iterable<?>) processorResult) {
    				this.sendReplyMessage(next, replyChannel);
    			}
    Where your messages will be send to the outbound-channel one by one.

    Hope I made some right point to start...

    Take care,
    Artem

    P.S. Delayer MessageGroup became available from Spring Integration 2.2

    Comment


    • #3
      Yes, this sounds like it makes sense. I'll try to find time to test it out some time this week. The MessageStore on the Delayer is key. I didn't know that existed. Fortunately, I'm already running 2.2.0.RC2 because I need some other things introduced there. Thanks so much for the detailed answer!

      Comment

      Working...
      X