Announcement Announcement Module
Collapse
No announcement yet.
DelayQueue contribution Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • #16
    Maybe we should have a separate delayer component. We do something similar inside resequencer and aggregator through a MessageBarrier. Maybe a channel is not the right place to put this logic...

    Comment


    • #17
      This is where the debate gets interesting....

      I agree that the notion of tight-coupling between interceptor+queue kind of leads to that.

      However, regardless of the ultimate decision we make here, I still stand by my comment earlier about the need for channels as a first-class, concrete component.

      Maybe the right solution is to simply add a wrapper/decorator like RedeliveredMessage that itself implements Delayed. Then provide a channel (or interceptor even) that knows how to handle that - passing to a DelayQueue.

      Comment


      • #18
        I like the idea to focus on the retry. After I had a delaying channel, I had to set error channel for a single endpoint to send failures to the retry loop. And to return the default error handling after. And to add a message header with retry count. Not very reliable. So having standard support for retries would be very helpful.

        As a workaround we can use INT-634 and an interceptor. The real solution will be designed with focus on retry, and delay will be one of the features.

        Comment


        • #19
          Channels should be first class concrete components.

          I was thinking of a <delayer/> as an endpoint, instead of a specialization of the channel. The question is perhaps if there should be any other stateful components than channel (which we do have now).

          Comment


          • #20
            Good point. I think we should consider this along with other ways in which we want to generalize the "barrier" and make it more flexible... essentially it becomes the common base class for any stateful handler.

            The one thing that is different in this case is that the best way to implement Delayer is obviously a DelayQueue, and that does seem to align with channels since they can delegate to queues for their own storage. The one thing I do really like about adding this behavior to a channel is that it would include the capability of handling messages that have delayed *at least as long as* the delay interval (but potentially longer depending on the consumer).

            Comment


            • #21
              I like the idea to make delayer an endpoint. It's very disturbing to cheat on channel and wrap messages with Delayed. Actually, it breaks the contract, as QueueChannel accepts in constructor BlockedQueue<Message> and not BlockedQueue<? extends Message> for a reason. I had to erase type in my super(new DelayQueue()) to get around this.

              Comment


              • #22
                As we did not decide on a better solution, I created a patch for the interceptor option. The patch includes simple delay queue with delay configured at channel level and the message header variant, where every message has a header with a time stamp. Please review: http://jira.springframework.org/secu...layQueue.patch

                P.S. The patch includes unit tests.

                Comment


                • #23
                  Thanks for adding the attachment. For now I've assigned the issue to 2.0-M1 since a) it's essentially a new feature and b) we need to be sure about the channel vs. endpoint implementation. I'm still not sure which is better at the moment, but I think experimenting with the options a bit will clarify that. If all goes well, we may be able to bump this to 1.0.3.

                  Comment


                  • #24
                    Andrew I have an alternate implementation that is now based on our new option for passing a TaskExecutor for a channel.

                    Basically, I implemented a DelayedExecutor that implements Spring's TaskExecutor and accepts a delay value. Internally, it delegates to a ScheduledExecutorService in its execute(Runnable) method.

                    Therefore, when you send(Message) it will delay based on the interval and then invoked the handler.

                    So, I'm wondering... does this meet your needs?

                    Let me know if you have any questions.

                    -Mark

                    Comment


                    • #25
                      Mark, I cannot find any file with "Delay" in name in trunk. Where can see your implementation?

                      Comment


                      • #26
                        Andrew,

                        Sorry for the confusion; I did not commit this code yet.

                        It's basically like this:
                        Code:
                        public class DelayedExecutor implements TaskExecutor {
                        
                            private volatile long defaultDelay;
                        
                            private volatile String delayHeaderName;
                        
                            private final ScheduledExecutorService scheduler;
                        
                        
                            public DelayedExecutor(long defaultDelay) {
                                this(defaultDelay, null);
                            }
                        
                            public DelayedExecutor(long defaultDelay, ScheduledExecutorService scheduler) {
                                this.defaultDelay = defaultDelay;
                                this.scheduler = (scheduler != null ? scheduler : Executors.newScheduledThreadPool(1));
                            }
                        
                            public void setDefaultDelay(long defaultDelay) {
                                this.defaultDelay = defaultDelay;
                            }
                        
                            public void execute(Runnable task) {
                                Long delay = null;
                                if (task instanceof MessageAwareRunnable && this.delayHeaderName != null) {
                                    Message<?> message = ((MessageAwareRunnable) task).getMessage();
                                    delay = message.getHeaders().get(this.delayHeaderName, Long.class);
                                }
                                if (delay == null) {
                                    delay = this.defaultDelay;
                                }
                                this.scheduler.schedule(task, delay, TimeUnit.MILLISECONDS);  
                            }
                        
                        }

                        Comment


                        • #27
                          I like this solution, but have one minor note about delayHeader property. IMO it does not cover all cases. What do you think about a strategy accepting a Message and returning the delay?

                          Comment


                          • #28
                            Andrew,

                            I think the role of populating the header from Message content should be handled by a Transformer that would be invoked prior to sending the Message to the queue. That is consistent with several other components in the framework where the receiver only checks for a header value without any awareness of how it may have been populated upstream (e.g. the outbound file adapters check for a filename in a header).

                            Does that make sense?
                            -Mark

                            Comment


                            • #29
                              Yes, transformer can do the job also. I thought about a strategy because I liked how the aggregator allows correlation strategy and completeness strategy to be specified on the endpoint itself without extra endpoints.

                              Are you going to extend the configuration schema to support delay executor? I suppose the schema goes to be changed at least for executor per channel configuration.

                              Comment


                              • #30
                                Andrew,

                                In 1.0.3, we'll have a "task-executor" reference option for channels. We might not introduce the delayed executor until 2.0, but I could at least commit it in the sandbox.

                                Comment

                                Working...
                                X