Announcement Announcement Module
Collapse
No announcement yet.
Start/stop consuming messages from queues using a single listener-container Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Start/stop consuming messages from queues using a single listener-container

    My web-application is consuming messages sent by thousands of other publishing applications. Until now, I have had a single queue design; all publishing applications send messages to a single queue from where my application consumes them. But, I am now realizing the need for a multi-queue design; if a misbehaving application sends data extremely fast, messages from other applications may get stuck behind messages from the misbehaving application.

    I am trying to implement one-queue-per-publishing-application. I want to have a fixed number of inbound-adapters consuming messages from all these queues. Challenge is that these publishing applications get created/destroyed while my web-application is consuming messages. I want to be able to divide the queues available at start-up to the inbound-adapters and then add/remove queues from these adapters while they are consuming messages. What's the best way to accomplish this?

  • #2
    add/remove queues from these adapters while they are consuming messages.
    That is not supported.

    The only way to change the queues and/or the concurrency of each adapter is to stop it, change the properties and start it again.

    Comment


    • #3
      Is there a plan to add this support and what's the timeline? If consuming using basic java, I think it's easier to add a new queue to a consumer by just calling channel.basicConsume with the new queue name and the consumer as parameter. Right? Any reason spring doesn't provide similar functionality?

      Comment


      • #4
        It simply hasn't been asked for before.

        Feel free to open a New Feature JIRA issue; there is already an open issue to dynamically change the number of consumers https://jira.springsource.org/browse/AMQP-306

        I don't have a timeline, though; sorry.

        Comment


        • #5
          If I were to employ the workaround you suggested, I'd do something like this:

          Code:
          inboundAdapter.stop();
          listenerContainer.setQueueNames(updatedQueues);
          inboundAdapter.start();
          Is this sufficient? Any side-effects apart from the fact that there will be no consuming of messages between stop() and start(), e.g. message loss?

          Comment


          • #6
            It depends on your acknowledge mode (NONE = autoAck in rabbit speak will lose messages); the default (AUTO) will requeue messages by default.

            That said, it will be fine as long as you don't have 'defaultRequeueRejected' set to false on the container (it defaults to true).

            I have an open pull request for an issue where in-flight messages are rejected instead of being requeued during a container stop in that case...

            https://github.com/SpringSource/spri...ted&state=open
            Last edited by Gary Russell; May 9th, 2013, 09:20 PM.

            Comment


            • #7
              Thanks Gary. I have filed a new-feature request at https://jira.springsource.org/browse/AMQP-312.

              Comment


              • #8
                Hi goyalk! Which workaround did you employ to get this functionality? I have the exact same problem and was wondering which workarounds you tried, which one you chose in the end, and why...

                Comment


                • #9
                  Originally posted by DennisLaumen View Post
                  Hi goyalk! Which workaround did you employ to get this functionality? I have the exact same problem and was wondering which workarounds you tried, which one you chose in the end, and why...
                  1. Stop listener
                  2. Update queues in listener
                  3. Start listener

                  This seems to be the only option. I haven't tested for message loss yet.

                  Comment


                  • #10
                    Thanks goyalk! I had implemented that workaround and it seemed to work fine. I haven't thoroughly tested it for message loss etc. though.

                    In the meanwhile I found a workaround in the framework I'm using (Axon) which circumvents this issue altogether (for my specific use case).

                    Comment

                    Working...
                    X