Announcement Announcement Module
No announcement yet.
Multi channel endpoint and extending AbstractPollingEndpoint Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Multi channel endpoint and extending AbstractPollingEndpoint


    I need some kind of mechanism for polling multiple channels (according to a certain strategy determined by the user). I was thinking of extending the AbstractPollingEndpoint in a way quite similar to the PollingConsumer class but instead of polling from a single channel, have it poll from multiple channels.

    I have 2 questions in this regard:
    1. Does it make sense to extend AbstractPollingEndpoint? I'm guessing this is not the kind of API that you guarantee to keep stable.
    2. If it doesn't make sense, is there an alternative?


  • #2
    Depending on the details of your use-case, I think there are a couple options that would preclude the need to extend AbstractPollingEndpoint.

    Can you describe the nature of why you need multiple pollers?

    If it's a matter of combining different poll-time settings, then perhaps it can be accomplished with a custom Trigger implementation. If it's a matter of actually polling different sources, then I would probably recommend using multiple 'inbound-channel-adapters' that share a common reference to a 'channel' for passing the resulting Messages.

    Does your use-case fall into either of those categories? If not, can you describe the difference?



    • #3
      Actually, reading it again, I see that you are referring to an internal component - presumably polling other channels that are producing output from some upstream component.

      So, I will rephrase my question: why can you not simply use the same output-channel on those upstream components? Once I understand your answer to that question, hopefully I'll be able to provide some alternatives.


      • #4
        Hi Mark,

        The idea is to be able to prioritize between channels without causing starvation on one\some of the channels. So I want to be able to poll from channels according to some predefined ratio, thus giving advantage to some channels over others.

        Actually you gave me and idea. I could use the same output-channel on all upstream components, defining different polling rate to each component. So components with higher priority will poll more frequently than components with lower priority. Will that do the trick?



        • #5
          And I could probably also use the max-messages-per-poll attribute in order to poll more messages from high-priority channels, right?

          I'm thinking of using the following configuration:

          <channel id="InternalDistributionChannel">
          		<queue capacity="500"/>
          	<bridge input-channel="InternalDistributionChannel" output-channel="DistributionChannel">
          		<poller send-timeout="-1" max-messages-per-poll="{distribution.rate}>
          			<interval-trigger interval="${distribution.interval}"/>
          	<channel id="DistributionChannel" >
          		<queue ref="DistributionQueue"/>
          The DistributionQueue is a HazelCast queue so it is distributed across multiple JVMs. The ${distribution.interval} and ${distribution.rate} will be given different values depending on the priority of the component upstreaming to the DistributionChannel. This way the DistributionChannel will hold a mix of messages which represents a prioritized distribution.

          Will appreciate any comments. Thanks,


          • #6
            Have you considered using a "priority-queue" within that channel rather than a simple "queue" sub-element?


            • #7
              Yes. A priority-queue is not good for 2 reasons:
              1. The queue has to be a distributed queue. I'm not using a simple queue but rather a HazelCast queue. HazelCast does not have a priority queue implementation quite yet.
              2. The throughput of our system is quite high making it quite possible that low priority items will be starved in a priority queue.


              • #8
                Okay. Have you considered using a JMS-backed channel with JMSPriority properties?


                • #9
                  We preferred grid-based distribution and redundancy over JMS. JMS is not easily scalable in opposed to grid-based technologies.
                  Are you proposing other options because the solution above is not good enough?


                  • #10
                    If you are using HazelCast, you can possibly implement HazelCast based MessageStore and identify it via 'message-store' attribute in the queue configuration, thus making your queue distributed.

                    <channel id="myDistributedQueue">
                         <queue capacity=100000 message-store="hazelcastStore"/>
                    <bean id="hazelcastStore" class=""/>


                    • #11
                      I was just trying to explore other options that might be simpler in terms of the configuration. At some point (once we have a GA version available, planned for Spring Integration 2.1), you might want to consider our AMQP integration.

                      In the meantime, the way you are describing it might handle it fine. Another option I would consider is simply referencing a "task-executor" within the <poller> element. Then, you could have a number of different, reusable thread pool configurations. For example, you might have "high", "medium", and "low" priority (where priority is handled by the number of threads). Then you would be able to use "highPriorityExecutor" everywhere you need that one.



                      • #12
                        Hi Oleg,

                        I'm not sure I understand your post. I don't have any trouble making the queue distributed with HazelCast. The ref to "DistributedQueue" in the configuration excerpt above is already pointing to a HazelCast distributed queue.



                        • #13
                          Good, i wasn't sure you were aware of that, my bad

                          So, now since your queue is distributed I am still not sure as two why it could not be prioritized. I understand that you have a high throughput system and that you may have so many high-priority messages that low-priority messages will remain in the queue indefinitely. But IMHO that is exactly what has to happen in the world where we have high/low priority.
                          However I am NOT advocating that your low-priority messages should remain there forever. There are two ways I see how you can handle this today.

                          1) - by tuning up your poller and task executor configuration according to the load in your system - basically making your poller poll for messages just a bit faster then the *average* frequency of messages coming in.

                          2) - PriprityChannel can be configured with custom Comparator where you can implement your own compare logic that might take timestamp of the message into consideration and UP the priority of the message if it has been sitting for a certain amount of time. So the low-priority message after sitting in the queue for a certain period of time can essentially be upgraded to a higher priority by simply re-ordering such queue.

                          what do you think?


                          • #14

                            Thanks for the task-executor idea (actually it reminded me that I forgot to assign my task-executor ).



                            • #15
                              Hi Oleg,

                              The thing is that in our system nothing is really low-priority but rather "slightly-lower" .
                              I really need a more fine tuned adjustment of the way items are prioritized. Using variable rates, variable intervals and different task-executors might do the trick.

                              I'm a bit concerned about using a sophisticated comparator. I agree that taking into account the message's timestamp might do the trick but I'm not sure if this is good enough. For now I'll stick to the current solution and see if it works for us.

                              Thanks for all the ideas. It really helped me.