Announcement Announcement Module
Collapse
No announcement yet.
DelayQueue contribution Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • DelayQueue contribution

    Hello and thanks for the amazing framework.

    In the past, I worked with ServiceMix and made a few contributions to ServiceMix, most of them for a streaming file reader. It allows reading large files which caused "out of memory exceptions" without streaming.

    In the current project we use Spring Integration, and we needed to delay failing messages before the retry. When the delay is a few minutes it's impractical to hold a thread with simple sleeping, like it's done in Apache Camel.

    Fortunately, org.springframework.integration.channel.QueueChann el has a constructor which accepts any BlockingQueue instance. I subclassed this queue and passed a java.util.concurrent.DelayQueue to the super constructor. Also I overrode doSend to wrap Message with a java.util.concurrent.Delayed.

    After this everything else works very good, no threads are hold while messages wait for the delay.

    Does this idea make sense? Do you prefer a basic patch with only the DelayChannel implementation, or should I extend the XSD and the configuration parser for the namespace support?

    Thank you again.
    Andrew.

  • #2
    Originally posted by andrewsk View Post
    When the delay is a few minutes it's impractical to hold a thread with simple sleeping, like it's done in Apache Camel.
    That is an interesting point. I guess it's pretty clear from the Spring Integration implementation that one of the main points taken from Enterprise Integration Patterns is that "Message Channel" is a central component. I have always been perplexed that other EIP-inspired frameworks treat it as a "logical" construct only. The fact that a channel should be a first-class component was one of the main reasons that I felt the need to create an API/framework with a different approach. I think your example of extending channel to handle the delay directly there is a nice affirmation of that model.

    I'm not familiar with the Camel delay feature that you are describing... do they literally just call Thread.sleep(delay)?
    Last edited by Mark Fisher; Apr 10th, 2009, 09:55 AM.

    Comment


    • #3
      Originally posted by Mark Fisher View Post
      do they literally just call Thread.sleep(delay)?
      Yes, almost like that:
      http://svn.apache.org/viewvc/camel/t...24&view=markup
      Code:
      if (isFastStop()) {
                  stoppedLatch.await(delay, TimeUnit.MILLISECONDS);
              } else {
                  Thread.sleep(delay);
              }
      But their Delayer is a processor, not a channel. I think queue better suits delaying messages, because it has its natural storage for the messages that wait.

      Anyway, does it make sense to prepare a patch? Would you like to add another type of channel to your inventory?

      Comment


      • #4
        I think the best thing for us to do in the namespace is to open up the <queue/> element to accept a queue-class attribute. This would not require any additional java code and allow any queue customization that the user wants.

        See:
        http://groups.google.com/group/hazel...448b37035d357c
        http://forum.springsource.org/showthread.php?t=66915

        I've created http://jira.springframework.org/browse/INT-634 to track this.

        Comment


        • #5
          As I have no better place to post the patch, I published the code in my blog: http://andskiba.blogspot.com/2009/04...or-spring.html

          As I mentioned there, the delay calculating code should be separated into a strategy. This will allow delay calculation based on the message payload, etc. The current implementation is just the most simple I could come with.

          Comment


          • #6
            Originally posted by iwein View Post
            I think the best thing for us to do in the namespace is to open up the <queue/> element to accept a queue-class attribute. This would not require any additional java code and allow any queue customization that the user wants.
            Sorry, did not notice your reply. It's not enough to create DelayQueue and supply it to the QueueChannel constructor. Messages have to be wrapped to implement Delayed interface in this case.

            P.S. I tried to create DelayQueue<Delayed> to BlockingQueue<Message> adapter to convert Message to Delayed as they are put to queue, but abandoned. It grew very big and ugly because of huge number of methods in BlockingQueue interface.
            Last edited by andrewsk; Apr 11th, 2009, 07:17 AM.

            Comment


            • #7
              Nice blog post! I added a link to it from the Spring Integration home page.

              I'm really glad to see that you've found Spring Integration easy to use, and I'm brainstorming with this delay channel idea. We might be able to apply it more generally whenever a Message has been wrapped in Delayed for redelivery.

              Comment


              • #8
                Thank you, Mark.

                I liked Iweins idea about opening <queue> tag. It solves the problem with configuration parser, which is not easy. So here is another solution of the same problem using channel interceptors: http://andskiba.blogspot.com/2009/04...terceptor.html

                What I don't like in the new solution, it is fragile. If someone passes DelayQueue to QueueChannel constructor and does not configure interceptor, it fails with a cryptic message. Or if the interceptor is not the last one in the chain, the DelayMessageAdapter might be wrapped with another Message proxy and DelayQueue will not be happy again. Still thinking on a better solution.

                Comment


                • #9
                  There is some resemblance here to what was necessary with PriorityChannel. In fact, I seem to recall thinking about the simple "replace the queue" option earlier and realizing that it's often not sufficient.

                  For the case of "RendezvousChannel", it really is that simple:
                  Code:
                  public class RendezvousChannel extends QueueChannel {
                  
                      public RendezvousChannel() {
                          super(new SynchronousQueue<Message<?>>());
                      }
                  
                  }
                  However, if you look at the code for PriorityChannel, you will notice that it really had to be a sub-class (like your first solution), since we needed to support capacity in addition to the priority-ordering. We used a Semaphore there, and while it could have probably been accomplished with a queue + interceptor, I agree that such a solution would be too fragile.

                  On the other hand, I might be missing some great idea that solves this problem in a more general way.

                  In any case, this is really interesting, and as you probably noticed I've updated the issue so that it will be included in 1.0.3:
                  http://jira.springframework.org/browse/INT-634

                  Comment


                  • #10
                    I don't know how to create new subclasses of QueueChannel without changing http://www.springframework.org/schema/integration schema. But on the other hand is it that necessary? As long as schema is backward-compatible, it can be updated every time a new class is added.

                    Some pain may come from the way XML is parsed in Spring Integration. So PointToPointChannelParser must be updated every time as well. In ServiceMix they did a smart trick with doclets to update schema when source code changes, and AFAIR there was no need to program custom parsers for new classes.

                    So I can prepare a bigger patch, including code, tests, XSD and parsers if we decide to go on with subclass variant. The only thing I have to complete is to separate the delay strategy. Can I create a JIRA issue for delay queue so I can post patches there and discuss them?

                    Comment


                    • #11
                      If we deal with subtypes of QueueChannel or Queue both in the queue-class from the parser, that might work.

                      If you want to create a JIRA and attach something there that's always welcome. The blog is also great, just linking to that from the JIRA is fine too.

                      Comment


                      • #12
                        Originally posted by iwein View Post
                        If we deal with subtypes of QueueChannel or Queue both in the queue-class from the parser, that might work.
                        Iwein, can you explain what you mean? What is "both"? You support the idea of using queue-class with the interceptor for the delay queue?

                        If you want to create a JIRA and attach something there that's always welcome. The blog is also great, just linking to that from the JIRA is fine too.
                        I prefer to attach diff patches to JIRA, as the code grows too fast to publish on the blog. So this is the new feature request: http://jira.springframework.org/browse/INT-636

                        Comment


                        • #13
                          Originally posted by andrewsk View Post
                          Iwein, can you explain what you mean? What is "both"? You support the idea of using queue-class with the interceptor for the delay queue?
                          With both I meant that we could say that you can either replace the internal BlockingQueue, or the whole channel class with the namespace. But since I couldn't explain it in one sentence that was a bad idea. I think for now adding the first option to the namespace per INT-634 would go far enough, but we need to do something about the fragility concerns you mentioned. I shouldn't confuse these two things.

                          Comment


                          • #14
                            Originally posted by iwein View Post
                            With both I meant that we could say that you can either replace the internal BlockingQueue, or the whole channel class with the namespace.
                            Hmm, if you let specify the whole channel class, how is that different from creating a usual Spring bean? Because there must be a way to access all properties existing in the class.
                            I think for now adding the first option to the namespace per INT-634 would go far enough, but we need to do something about the fragility concerns you mentioned. I shouldn't confuse these two things.
                            May be a custom queue factory or channel factory? It could pass necessary things to constructor, then somehow go on with the configuration with existing elements, then finish by adding the necessary interceptor. So configuration would look like

                            <channel ...usual-attrs>
                            <queue factory-bean="delayQueueFactory" ...usual-attrs/>
                            <usual-elements/>
                            </channel>

                            I'm just not sure it's possible for the factory to cooperate with parser like this.

                            Comment


                            • #15
                              To make interceptor less fragile, I see only one way: coupling the BlockingQueue class with last interceptor. Simply speaking if user chooses a specific queue (priority, delay, etc.), he must in the same time be in control when element is added or taken from the queue. Therefore it's important to set the last interceptor.

                              I looked at PointToPointChannelParser and AbstractChannelParser and it's doable to allow the following:

                              interface QueueDecorator {
                              BlockingQueue getQueue();
                              ChannelInterceptor getLastInterceptor();
                              }

                              and the channel would be defined like this:

                              <channel ...>
                              <queue decorator="delayDecorator">
                              <!-- transaction, polling, application specific interceptors -->
                              </channel>

                              <bean id="delayDecorator" class="DelayQueueDecorator">
                              <-- delay timeout, whatever -->
                              </bean>

                              I know, names are awful, but it gives the idea.

                              It will be not trivial for end user to introduce new "decorator" classes but very easy to use 3rd party ones.

                              What I don't like in this solution is that it precisely targets currently known issues, but to make it more general, more complicated play with BeanDefinitionBuilder is needed.

                              What do you think?

                              Comment

                              Working...
                              X