Announcement Announcement Module
Collapse
No announcement yet.
Problem with applySequence on Router and two aggregators - its bug or feature ? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Problem with applySequence on Router and two aggregators - its bug or feature ?

    I have problem with "applySequence" attribute processing.

    My scenario is described in attachment Attachment
    There is one recipient list router that distributes the original request message to one or more processors. The router has the applySequence=true, to have possibility to aggregate reply-messages from more processors later. Between Router and corresponding Reply-Aggregator I have one additional Aggregator (in my scenario used as blocking-point for reply messages, they are forwarded through aggregator only when the release-signal is received).

    And here is the problem. The Sequence Header added by router is automatically processed in this aggregator. The CorrelatingMessageHandler.handleMessageInternal() calls the SimpleMessageGroup.isComplete(), where the sequence header is checked and the MessageGroup is removed from MessageStore independently on my configured ReleaseStrategy. So the Message is dropped here and is not forwarded to corresponding Reply-Aggregator - that is the place, where I need the sequence processing, not sooner.

    And in my Opinion the stack-based realization of applySequence header is not good for all scenarios, the better way could be "named" sequence - the sequence should be stored in header under the unique name of the router/splitter, etc. and the corresponding aggregator and its SequenceSizeReleaseStrategy should have configured the name of the sequence/router, that should be processed from header by this aggregator. Than this sollution will be independent on number of aggregating levels between sequence-create and sequence-processing.

    Is this bug or feature ? Should I post the Issue to Jira ?
    Thanks for any help or opinion.
    Attached Files

  • #2
    Are you saying that in your example not all the split messages (regardless of the stage) can come back?
    I am attaching a simple sample that shows basically what you have (two layers of aggregation) and it works as expected.

    As far as "stack-based realization of applySequence header is not good for all scenarios" - well of course its not good for all scenarios, that is why we expose custom strategies where such behavior could be customized.

    Comment


    • #3
      Also, looking at your diagram, I don't think using aggregator for signaling the release of the Message is the most appropriate way to handle your scenario. I see that you are simply sending message to an aggregator and wait till some signal will come from some queue. So I assume the component pulling from that queue knows about a message somehow. If so, why not just store a Message via ClaimCheck-in and send the ClaimCheck to the 'other process' (the one that deals with the signal queue). When such process is ready to signal the release of a Message it simply sends a Message with ClaimCheck so it could be retrieved with the ClaimCheck-out. Its much simpler IMHO.

      Comment


      • #4
        I change your sample to respect the my schema more precisely - it is attached here Attachment . The main change is use of RecipientListRouter - that generates the sequence and the ReleaseStrategy on two aggregators is changed to MessageCount instead of default SequenceStrategy.

        The problem occurred when the sequenceSize=1 (as recipientList size is 1 too), the message should be stored in aggregator and should wait for second message (as defined by configured MessageCountReleaseStrategy). But instead of this behaviour, the CorrelatingMessageHandler executes the lines 186-196 where is following code:
        Code:
        } else if (group.isComplete()) {
           try {
          // If not releasing any messages the group might still be complete
           for (Message<?> discard : group.getUnmarked()) {
             discardChannel.send(discard);
           }
          }finally {
             remove(group);
          }
        so the sequenceSize=1, there is 1 message in group => this IF is TRUE and the message is discarded and group removed from MessageStore. But this is bad for me, because the sequence on message should not be used by this aggregator, here I have different strategy for release (MessageCountStrategy), the SequenseStrategy is later on Reply-aggregator.
        It looks like bug, but Iīm not sure.


        "that is why we expose custom strategies where such behavior could be customized." - great, please could you provide me some info, how can I use my own SequenceStrategy on Router ? I can not find any plugable attribute for this, only applySequence=true. I need to put my own sequence into header on RecipientListRouter, but as I look at implementation, there is no support for any kind of SequenceStrategy in routers. I use the 2.0.3RELEASE, is there some better support in some other versions?

        Thank you for any idea.
        Attached Files

        Comment


        • #5
          Could you please explain the use case for blocking aggregators? Why nopt just flow messages to the final aggregator and have it do what it does. As far as SequenceStrategy, there is a release-strategy attribute where you can plug-in your own implementation of:
          Code:
          public interface ReleaseStrategy {
          
          	boolean canRelease(MessageGroup group);
          
          }

          Comment


          • #6
            I know, how to create ReleaseStrategy for aggregator, but I need to create and use my own SequenceStrategy on Router - I donīt want to use the Stack-based Sequence header, i want to implement my own Map-based sequence-header. And my question was about SequenceStrategy for router and your answer is about ReleaseStrategy in aggregator .

            In my opinion the problem is not in blocking aggregator, aggregator could wait for all messages, that are needed to provide result message. If I will have slow producer of messages to outA or outB channel in our sample, than there is the same problem.

            But OK, here are my scenarios, currently I have two:

            1) Acknowledge of remote polling - Attachment
            - here I have some client, that wants to POLL message from Queue over some WebService interface, than the message is processed and than must send the Acknowledge or Reply or Error response. Because I can not use TX-handling, I use explicit message processing acknowledge. It is similar to Amazon SQS, or JMS Client ACK. The solution is based on AckAggregator - the polled message copy is placed to aggregator (and its message store), than it is processed and the ACK message is send with correlation-msg-id to the same aggregator. When the aggregator pairs both messages, than the reply(ACK) could be forwarded for next processing. Otherwise, when timeout occurred on messageStore, the message is not acknowledged, and the request-message-copy is forwarded back to Request-channel - so it could be polled by other client. The ACKAggregator corresponds to "outA-aggregator" or "outB-aggregator" in my forum-config.xml . And the other handlers like RecipientListRouter and ReplyAggregator I have in my application too, so from this scenario is my problem.

            2) Poll of one specific message - Attachment
            - this is similar as ACK, but it is about Poll reply for specific message. The reply messages are forwarded to Aggregator, where are stored in MessageStore. When client is ready to pickup the Reply for specific RequestMessage, than the client creates the temporary-channel, this channel is set as ReplyChannel on pickup message and the correlation-message-id corresponding to request-message for which I need the reply is set too. Than the client send the pickup-message to aggregator and the aggregator finds the appropriate REPLY message in message-store and this message is forwarded to temporary-channel from pickup-message.replyChannel header. So the client gets the reply, for his specific request message. This scenario is only about blocking aggregator, not about sequence.

            These are not nice solutions, but there is no support for Client-ACK on channels and the MessageSelector for channel, to have possibility to poll message with specific ID is not supported too. I found some notes in this forum and Jira for this scenarios, but no solution.

            I need to have possibility to pair the Router(Splitter) sequence with corresponding AggregatorHandler. This is not possible with Stack-based sequence storage in header because there is Router(stage1) -> AckAggregator(Stage1) -> ReplyAggregator(Stage2) - so the sequence from Router is droped by AckAggregator, but I need to ignore it here and process it on ReplyAggregator. So I want to implement my solution on Map-base SequenceStorageStrategy - instead of Stages, the RouterName will be used as key to store SequenceStructure in header and than the SequenceReleaseStrategy on ReplyAggregator will be configured to read this header with corresponding RouterName. But for this I need help, to have possibility to implement my own sequence strategy on Router. Otherwise I must replace the SI router implementation with my own.
            Attached Files

            Comment

            Working...
            X