Announcement Announcement Module
No announcement yet.
Trigger some callback function after queue empty/destroy event Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Trigger some callback function after queue empty/destroy event

    Hi to everybody,
    in my use case I should be able to create a queue in order to buffer some tasks to execute by some worker threads and then, once this queue is empty, start another operation.
    Basically, I need a way to tell my AMQP broker (RabbitMQ) to set-up a queue and then to destroy it and, once destroyed, trigger some callback function.

    Is it possible to achieve this goal? Any advice about?


  • #2
    You can use a RabbitAdmin to declare/delete queues, exchanges, bindings.


    • #3
      Ok, with that I can tear things up or down but I can't discover when the queue becomes empty..or if I can, can you give a snippet please?


      • #4
        The 1.2 Rabbit Admin has a new method (getQueueProperties()), which returns the message and consumer count for the queue.

        1.2 is at 1.2.0.RC1 (release candidate) right now, available from the milestones repo; it should be released in a few days.


        • #5
          Thank you very much Gary! That's exactly what I need!


          • #6
            Sorry Gary but I discovered that I have also another problem..when the queue gets empty, there are workers/consumers that still need to finish their work. How can I check when the consumers (whose cardinality depends on the concurrency parameter) of a specific queue (i.e. myQueue2 in the example below) within listener-container have finished their work? In my config file I have:

            <rabbit:listener-container id="myContainer" concurrency="5" connection-factory="cf">
                 <rabbit:listener ref="myWorker1" method="handleMessage" queue-names="#{}" />
                 <rabbit:listener ref="myWorker2" method="handleMessage" queue-names="#{}" />


            • #7
              This might work...

              If the consumer somehow "knows" it has completed its work (e.g. because there's something in the "last" message), it can call stop() on its container (but it would have to be on another thread). Any work-in-process on other consumers will complete.

              You can then wait for the consumer count to go to 0 before deleting it.


              • #8
                When i declare a rabbit:listener-container I'm actually instantiating a new SimpleMessageListenerContainer, right?
                Now, I just want to be sure of what's happening in the "underground":
                1) How can I declare more than one listener per listener-container if in the SimpleMessageListenerContainer only one listener is handled..?
                2) Looking at the code, the listener should be instanceof MessageListener or my case they are simple Objects but they do is it possible (and I don't use any MessageListenerAdapter..)?
                3) I should get a reference to the listener-container but I don't know it possible to get it from the messageListener? Is it aware of its container?
                4) From what I understood I thus need to to intercept an "end" message (not complicated) and then call stop()?
                Actually, I don't want to end listening, I just want to know if they finished their it possible?
                Maybe I just have to call stop() and once finished do my stuff and then start() or restart() on the listener container to re-enable processing of messages?



                • #9
                  First of all, I am confused about your use case - first you said you want to create and delete queues dynamically, but now you say you don't want to stop listening. You can't change the queues a container is listening on (unless you stop it, then change the queues, then start it again).

                  I thought you were somehow dynamically creating a SMLC (programmatically rather than with XML) for the new queue, in which case you'd already have a reference to the SMLC to call stop().

                  To explain how the namespace works...

                  It is really just a convenience.

                  In fact, for each <listener/> child element, you get a new SMLC; the namespace is just a convenient way of setting shared attributes (concurrency etc) in the <listener-container/> parent element.

                  Also, the namespace automatically wires-in a MessageListenerAdapter - hence your listener can be a POJO.

                  If you define the SMLC using normal <bean/> syntax (or programmatically), you'd be responsible for wiring in the adapter.

                  No, the listener doesn't know which container its running in.

                  If you defined them with <bean/> syntax, you could give them an ID and then @Autowire them (or use context.getBean()) to get a reference to stop/start etc.

                  I hope that explains everything.


                  • #10
                    Sorry Gary, you're absolutely right..I changed my mind during development...
                    at first I thought to tear up and down queues dynamically but then I saw that the best solution for my use case was to keep them and intercept instead an "end" message to trigger the code I need to execute at the end of the queue processing.

                    Now I have a better understanding of all the hidden staff thanks.
                    I think that what I need is to:
                    1) set up a listener-container for each queue-listener, giving a different id at each container (so I can get a reference to it through ApplicationContext)
                    2) once i receive an "end" message in the listener I tell the listener-container to stop()
                    3) once the stop method returns I trigger my external processing task (external thread)
                    4) restart the listener container via start()

                    Do you think that this solution could be a valuable one?


                    • #11
                      Yes, but be careful with the stop(); the listener thread can't stop the container itself - you have to do in on a new thread - maybe use executor.execute().


                      • #12
                        Thank you very much Gary for this tip (and for all the great support as well obviously)..
                        I didn't thought to this problem!