Announcement Announcement Module
Collapse
No announcement yet.
Queue Workers pool Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Queue Workers pool

    Hi to everybody,
    I'd like to implement a pool of workers to read messages from a queue, like in the RabbitMQ tutorial page.

    Looking at the rabbit stocks example I tried to use the following:

    Code:
    <listener-container concurrency="5" connection-factory="connectionFactory" message-converter="jsonMessageConverter"	xmlns="http://www.springframework.org/schema/rabbit">
         <listener ref="myWorker" method="handleMessage" queue-names="#{myqueue.name}" />
    </listener-container>
    There should be 5 threads reading messages from the queue. However, I tried to count how many instances of my classes were created (using a static counter incremented in the constructor) but I saw that only one instance of my class were created, although looking at the log it seems that the SimpleAsyncTaskExecutor are created correctly (I see SimpleAsyncTaskExecutor-1, SimpleAsyncTaskExecutor-2, etc..).

    Could I encounter synchronization problem? Where can I found more details about how to design correctly pool of queue workers and manage concurrency problems? For example, how can I disable auto acknowledge via spring config file?

    Best,
    Flavio

  • #2
    I am not sure what you mean; the listener container creates a BlockingQueueConsumer instance for each Consumer and runs a thread within it; there are no synchronization issues within the container, but your MessageListener (and dependent classes) must be thread-safe because there is only one listener instance per container.

    It's best to make your code thread-safe. If it is not, you will need a container for each consumer (with concurrency=1) and make sure your listener bean (and its dependencies) are prototype beans (so each container gets a new instance).

    Comment


    • #3
      Thank you for the quick reply Gary!
      Yeah..probably it's better if first I explain what I need..I don't know if what I implemented is correct..

      I would like to buffer in a queue some json that I need to index in Lucene. Queue could potentially grow very fast (many producers) and indexing in lucene could take a while. Thus, from what I understood looking at the RabbitMQ tutorial, I need many workers (i.e. consumers) instances waiting for a message and do the indexing stuff. I need to be able to choose the right number of workers and maybe create a pool for them...am I right? Does the listener-container do what I need setting concurrency parameter? Am I doing something wrong..?

      Best,
      Flavio

      Comment


      • #4
        Yes; the container will take of it with the concurrency setting. It currently doesn't support dynamically growing and shrinking the pool, though.

        Comment


        • #5
          Great! So the last question is: is there a way to concretely monitor active consumers? I just noticed that looking at the management UI there are 5 open channels..is there some other way to monitor what the listener container is doing?

          Comment


          • #6
            Not really but I would recommend using acknowledge=auto; you can play around with the prefetch and txSize (to reduce the ack chatter) and monitor the unacked message count. If you use acknowledge=NONE and you don't have enough consumers, you will likely run out of memory.

            Comment


            • #7
              So Spring integration as usual solved almost all my issues raised during the RabbitMQ tutorial
              Dynamically growing and shrinking of the pool was also on my "wish-list" but probably is a not a so-requested feature if not implemented I imagine..so my last question about this topic (I promise!) is how do I design my queue to scale?
              I mean, what's the best strategy if I discover that my queue continue to grow (also is there a way to discover it automatically like e-mailing or similar)? I think that my first try will be to try to increase the number of workers in the listener-container up to my number of processors (or more??) and than to move to a clustered solution..the problem is that I have just one queue and from what I understood queues are not shared between brokers..maybe there's the possibility to make a round-robin policy also on the delivery of messages in the queues? What could it be the best evolution of my system?

              Thank you for the great support,
              Flavio

              Comment


              • #8
                was also on my "wish-list"
                It has been requested; we have an open JIRA issue but it needs some restructuring of the container so it won't be in the next release (1.2 due shortly - the release candidate went out yesterday). It might make it into the next release.

                It's hard to judge the concurrency needed; it depends on what your application is doing.

                RabbitMQ does have HA (mirrored) queues; you should really ask questions about RabbitMQ fundamentals over on the RabbitMQ mailing list where the Rabbit experts hang out. This forum is just about Spring-AMQP and how it interfaces with RabbitMQ

                Comment

                Working...
                X