Announcement Announcement Module
Collapse
No announcement yet.
Peeking at messages in a QueueChannel without removing them from the queue Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Peeking at messages in a QueueChannel without removing them from the queue

    Hi folks,

    My apologies in advance if this is already in the docs but I have been reading them and I can't yet find the answer.

    Question: are Filters able to peek at the messages present in an end-point without actually removing them from the channel?

    E.g. say I have a QueueChannel and I want to use a filter to see if there is a message waiting; I want to apply some business logic to have the filter either remove the message from the queue and pass it along, or leave it in the queue for some other process to consume. Is this possible? From what I can tell from the docs, if a filter looks at a message, it has already removed it from the channel and then can either drop the message, throw an error, or redirect the message to some other channel. If I do the latter and place the message back in the FIFO queue, then the message has lost its' spot.

    Thank you in advance for your time.

  • #2
    Your description of the Filter behavior is correct: you could use the Filter's 'discard-channel' to put it back onto the queue but would lose its spot.

    Another option is to allow the Filter to throw the Exception. If you have more than one handler subscribed to the channel (and the failover flag is set to true, which it is by default), the rest of the handlers will be tried - one at a time.

    If you get a chance to try that out, please let us know how it works for you.

    We have considered a couple other options (I'm not a big fan of relying on Exception-throwing for control flow), such as either extending the SubscribableChannel interface to add a subscribe(handler, selector) method (in addition to the current subscribe(handler) method) or to define an explicit "Selective Consumer" component that would simply combine a MessageHandler with a MessageSelector and allow the MessageDispatcher to ask first if the Message would be accept()-ed. The tradeoff is that option #1 would require extending the interface while option #2 would most likely lead to instanceof checks. Any other ideas are welcome!

    Thanks,
    Mark

    Comment


    • #3
      Thanks for the reply Mark.

      The Filter in question would be the only one in the endpoint. There would be nothing after it. So throwing an exception does not seem to apply to this particular situation unfortunately unless I'm missing something.

      But, how about turning the problem on its' head? What I'm really aiming for here is the ability to periodically execute some business logic and if that logic deems it necessary, then poll a channel to see if a message is available in a queue. I'm essentially striving for message pulling instead of message pushing.

      So, instead of having the endpoint be called when the message is there and having the filter decide if it will accept and remove the message from the queue, why not have a PollingConsumer that triggers a channel.receive() call when it's ready? Looking at the PollingConsumer and the types of triggers it accepts, it seems that there could be a trigger that augments the CronTrigger or IntervalTrigger by running some additional business logic before it decides whether to make the channel.receive() call.

      Do you see any merit in the above approach or is there some other feature that I'm missing that can achieve message pulling?

      Thank you again for your time.

      Comment


      • #4
        I think these two issues might address those same ideas:
        https://jira.springsource.org/browse/INT-1002
        https://jira.springsource.org/browse/INT-650

        Please let me know if you agree, and if so, feel free to add some comments via JIRA (also, you can vote and add a watch to the issues).

        Thanks,
        Mark

        Comment


        • #5
          https://jira.springsource.org/browse/INT-1002 is exactly it. I'll start following it. So, for the time being, it sounds like I need to create a custom TaskExecutor that periodically runs some checks and then invokes a QueueChannel.receive() when ready.

          Comment


          • #6
            Hi Mark,

            Taking another look at this, it feels line a custom Trigger might still not do the trick. The reason is that Triggers work on the assumption that they know what the next execution time should be. A custom trigger may indeed allow for business logic to be executed and possibly know when the next trigger time should be. But this leaves out the cases where not even the business logic can foresee when it should poll next.

            After doing some local tests, I was able to get the following to work:

            - Created a PeekingQueueChannel subclass of AbstractPollableChannel that adds a few more features:
            -- Exposes the BlockingQueue peek method
            -- Wraps the BlockingQueue peek method in a peek(long timeout) method that causes the current thread to wait for a given timeout unless there is something in the queue
            -- Overrides the doSend method to notify any blocking threads when the internal BlockingQueue is updated

            - Created a PeekingPollingStrategy interface with a method such as: public abstract boolean peek(Message<?> message);

            - Created a PeekingPollingConsumer subclass of PollingConsumer that introduces the following:
            -- Expects a PeekingQueueChannel in the constructor (throws an exception otherwise)
            -- Expects a PeekingPollingStrategy implementation in the constructor (throws an exception otherwise)
            -- Overrides the doPoll() method to:
            --- first call inputChannel.peek(this.receiveTimeout) and receive the head message if there is one. It can also block if it wants until one arrives
            --- take the message returned from the above call and pass it to the PeekingPollingStrategy implementation to run custom business logic that will decide whether the message should be consumed
            --- use the result of the PeekingPollingStrategy to either call super.doPoll() or return false.

            The result was a Queue that allows a specialized end-point to look at a message before de-queuing it and run custom logic to decide whether the message should be consumed from the queue. In addition, the channel is able to notify blocking threads that the queue has been updated. The current QueueChannel implementation does not seem to do that in its' doSend method. I was trying to configure the PeekingPollingConsumer above to emulate event-driven behavior via configuration when I found that out (see http://static.springsource.org/sprin...oint-namespace, last two paragraphs of section 4.4 Namespace Support).

            I will be working on testing this configuration further on the system that we are currently building; I will update this thread if I find any issues. In the mean time, I hope this gives folks some ideas on how to address the message pulling vs. pushing challenge.

            Best Regards.

            Comment


            • #7
              It sounds like you might have a concurrency issue between the peek() and super.poll() within your doPoll() method... or are you synchronizing access at that level?

              Comment


              • #8
                peek() synchronizes internally around the queue in PeekingQueueChannel:

                public class PeekingQueueChannel extends AbstractPollableChannel {
                [...]
                protected Message<?> peek(long timeout){
                if (timeout > 0) {
                synchronized (queue) {
                if (queue.isEmpty()) {
                try {
                queue.wait(timeout);
                }
                catch (InterruptedException e){}
                }
                }
                }
                return queue.peek();
                }
                [...]
                }

                then in PeekingPollingConsumer:

                public class PeekingPollingConsumer extends PollingConsumer {
                [...]

                @Override
                protected boolean doPoll() {
                Message<?> msg = this.queueChannel.peek(this.receiveTimeout);
                boolean doPoll = strategy.peek(msg);
                if(doPoll) {
                return super.doPoll();
                }
                else {
                return false;
                }
                }
                [...]
                }

                Do you see any red flags?

                Comment


                • #9
                  What I was worried about is this part:
                  Code:
                      boolean doPoll = strategy.peek(msg);
                      if(doPoll) {
                          return super.doPoll();
                      }
                  When you call doPoll there, how do you know you're going to get the same Message that you peek()-ed?

                  Comment


                  • #10
                    I see. Is it wise to synchronize the entire method then or will it just kill performance?

                    Code:
                    @Override
                    protected synchronized boolean doPoll() {
                        Message<?> msg = this.queueChannel.peek(this.receiveTimeout);
                        boolean doPoll = strategy.peek(msg);
                        if(doPoll)
                        {
                            return super.doPoll();
                        }
                        else
                        {
                            return false;
                        }
                    }

                    Comment

                    Working...
                    X