Announcement Announcement Module
No announcement yet.
DelayQueue contribution Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • #31

    I've thought about this a bit more, and I am actually leaning toward a channel interceptor that simply delays and resubmits. It would register a task with a provided (or default) ScheduledExecutionService instance using a header from the Message to determine the delay (or allowing a default delay, and if set to 0 only Messages with the delay header would be delayed). This would also remove the delay flag to indicate that the Message has already been delayed (and it passes thru undelayed upon resubmission). The main thing that feels hacky to me is that it would need to be the outermost interceptor to avoid re-invoking others.

    This does seem to be the most generally applicable since having this logic in the preSend() method would allow it to be used for both queue-based and dispatcher-based channels. By delegating to the scheduler backed by a thread pool, this would still fulfill the long wait requirement (without sleep).

    Of course, if this channel were a direct channel intended to propagate a transaction context, that transaction context boundary would be broken by the use of a thread pool. It should not be applied to such channels really, because that would always require blocking the sender thread for the entire delay anyways.

    Let me know what you think.


    • #32
      Mark, what is bad in the channel task executor solution?

      I don't like the resubmitting interceptor because the order of the interceptors is so significant in this case.

      Did you think about a dedicated endpoint similar to bridge which uses concurrency scheduler execution to delay between 2 channels?


      • #33
        I think I agree that the interceptor resubmitting is too error prone and hence doesn't feel like the right place. If we were to go with the endpoint approach, I guess we could simply add support for a scheduler-driven delay on the bridge itself. However, I am back to thinking about a new Queue subclass that would simply wrap another Queue instance and add a Scheduler in front. Basically it would be an equivalent of DelayQueue that can be used as a work queue. Overall, I think my main goal is to have this in the channel... just seems like the right place.



        • #34
          The main reason I did it in channel is because I saw how easily it can be done with QueueChannel subclass. Also it utilizes the queue storage for messages waiting for the delay. If Camel would implement their delay endpoint without blocking a thread, I surely would use it. Logically, delaying is a kind of processing, so in an ideal world I prefer endpoint. Also a channel with waiting messages behaves like an empty one. This is weird a little bit.

          Anyway, if you decide to do it on channel, the interceptors should be processed carefully. Those on sending side must be called before the delay and those on receiving after the delay. Otherwise logs will look funny.


          • #35
            To be honest, this use case makes me think that an "around" interceptor for a channel would be great. I've implemented a DelayChannel that takes a target channel (any instance), and that works fine. I originally chose not to include the around interception, because it seems like a channel is inherently linear and that most "around" type use-cases should be handled by an endpoint instead.

            By the way, I meant to answer your question about the TaskExecutor earlier. You'll notice in the example I posted above, it checks for a MessageAwareRunnable. It needed that in order to get the delay header from the Message. It's not difficult at all to include the Message within the Runnable when using the TaskExecutor on the channel. However, I'm reluctant to add that, because it seems like it could be abused very easily.

            That's an interesting point about the channel appearing empty when it contains delayed Messages.


            • #36
              Logically, delaying is a kind of processing, so in an ideal world I prefer endpoint. Also a channel with waiting messages behaves like an empty one. This is weird a little bit.
              After prototyping this as a) channel b) custom queue c) interceptor and d) endpoint, I completely agree with the statement above. Especially since I am implementing this by delegating to a TaskScheduler, it does feel more like a processor than a channel.

              How about this for the configuration?:
               <delayer input-channel="in" delay-header-name="delay" default-delay="5000" output-channel="out" />
              The default delay can also be 0 in which case only Messages that have a delay header would be delayed.


              • #37
                I've created a new issue to track the implementation of a DelayHandler:

                I've marked this as superseding the queue channel/interceptor approach based on the discussion above:


                • #38
                  I committed the DelayHandler as well as namespace support. Check out the JIRA issues above and the Fisheye tab in particular. The namespace support is covered by this issue:

                  (the commit logs for that one include an XML example for the tests)

                  For 2.0, we will add support for a MessageStore so that the Messages can be persisted while waiting to be sent. I created this issue to track that:

                  Please let me know what you think about the implementation. I would like to hear more detail about the particular use case still.



                  • #39
                    Hello Mark,

                    Sorry I did not notice your message 2 days ago.

                    I'm very satisfied how this feature was implemented at the end. Good job!

                    Thank you.