Announcement Announcement Module
Collapse
No announcement yet.
Listen on multiple queues in Consumer Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Listen on multiple queues in Consumer

    Hi,

    I have a windows application (written in Java) that will run 24/7.
    This application will act as a Consumer for RabbitMQ server.

    Different producers (other applications) will be populating messages into the Topic Exchange queue using different routing keys. My application/consumer has to create different queues (using some particular binding key that matches the routing key of producers) and listen for messages from those queues indefinitely.

    My question is, can the MessageListener.onMessage() method be invoked separately for each of these different queues created in the consumer, so that they listen for messages indefinitely.

    If yes, what is the approach to do this?

  • #2
    Can you please clarify what you mean by this?: "so that they listen for messages indefinitely".

    Thanks,
    Mark

    Comment


    • #3
      Thanks Mark for looking into this.

      What I meant by "so that they listen for messages indefinitely" was, once we create a Queue, it has to listen (all the time) for incoming messages in that particular queue.
      The following code snippet from Non-Spring RabbitMQ consumer has "while(true)", which listens for messages all the time. Can we have the same functionality in Spring-AMQP as well?

      In Non-Spring RabbitMQ, the consumer code will have something like,

      QueueingConsumer consumer = new QueueingConsumer(chan);
      chan.basicConsume(queueName, false, consumer);
      while (true) {
      QueueingConsumer.Delivery delivery;
      try {
      delivery = consumer.nextDelivery();
      } catch (InterruptedException ie) {
      continue;
      }
      }

      Comment


      • #4
        That loop is basically the core of SimpleMessageListenerContainer. Did you try using that? It allows you to listen to multiple queues with one or more listeners. There are many integration tests and a couple of samples in Spring AMQP.

        Comment


        • #5
          Thanks Dave for your response.

          I did try using 'SimpleMessageListenerContainer' in the application code as shown below.

          But it seems the "new Binding()" is not working with the specified "RoutingKey" pattern in consumer code.
          When the producer sends message to the exchange 'Exchange1' with a queueName, irrespective of the RoutingKey, the message is always retrieved by onMessage() listener.

          Ideally, the message should only be retrieved, if the routingKey pattern matches with the 'routingKey' specified by the producer while publishing the message.

          Can you please help me to resolve this issue.

          Consumer Code:
          ===========

          public class Consumer {
          public static void main(String[] args) {
          AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfigu ration.class);
          RabbitAdmin amqpAdmin = (RabbitAdmin)context.getBean("amqpAdmin");

          Binding binding = new Binding(new Queue("TestQueue1"), new TopicExchange("Exchange1"), "TEST.ROUTING.#");
          amqpAdmin.declareBinding(binding);

          Binding binding1 = new Binding(new Queue("TestQueue2"), new TopicExchange("Exchange1"), "ROUTING.*");
          amqpAdmin.declareBinding(binding1);

          ConnectionFactory connectionFactory = (ConnectionFactory)context.getBean("connectionFact ory");
          List<String> queueList = new ArrayList<String>();
          queueList.add("TestQueue1");
          queueList.add("TestQueue2");

          Queue[] queues = new Queue[queueList.size()];
          for(int q = 0; q < queueList.size(); q++) {
          queues[q] = new Queue(queueList.get(q));
          }

          SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
          container.setConnectionFactory(connectionFactory);
          container.setQueues(queues);
          ConsumerLookup lookup = new ConsumerLookup();
          container.setMessageListener(lookup);
          container.start();
          }
          }


          ===========================

          public class ConsumerLookup implements MessageListener{

          @Override
          public void onMessage(Message msg) {

          System.out.println("RECEIVED TEXT:" + new String(msg.getBody()));
          }
          }


          =============================================

          @Configuration
          public class ConsumerConfiguration extends AbstractRabbitConfiguration {

          @Bean
          public ConnectionFactory connectionFactory() {
          CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
          connectionFactory.setUsername("guest");
          connectionFactory.setPassword("guest");
          return connectionFactory;
          }

          @Override
          public RabbitTemplate rabbitTemplate() {
          RabbitTemplate template = new RabbitTemplate(connectionFactory());
          return template;
          }
          }

          ===================================
          Last edited by ponsu; Mar 28th, 2011, 08:30 AM.

          Comment


          • #6
            The only problem I see there is that you don't declare the queues and exchanges. So I suspect that maybe you have some old durable bindings still declared. What is the result of running `rabbitmqctl list_bindings`?

            Comment


            • #7
              Thanks Dave for your response.

              As you have rightly pointed out, because of the presence of durable binding in rabbitmq server, the consumer always reads the message irrespective of the routing key pattern.

              I have provided below the list of Queues, Exchanges and Bindings retrieved from command "rabbitmqctl".

              For example, consider a scenario where the producer sends a message to exchange 'Exchange1' with routing key "TEST.ROUTING.one.two.three". And the consumer is listening for message from that excahnge using queue 'TestQueue1' and binding key "TEST.ROUTING.*".
              Ideally this routing key and binding key doesnt match, so the customer should not read the message. But, the consumer reads the message from this queue, irrespective of the binding key. This is because, we already have a durable binding with routing key "TEST.ROUTING.#".

              C:\Program Files\RabbitMQ\rabbitmq_server-2.0.0\sbin>rabbitmqctl list_queues
              Listing queues ...
              TestQueue2 0
              TestQueue1 0

              C:\Program Files\RabbitMQ\rabbitmq_server-2.0.0\sbin> list_exchanges
              Listing exchanges ...
              amq.rabbitmq.log topic
              amq.match headers
              amq.headers headers
              amq.topic topic
              amq.direct direct
              amq.fanout fanout
              direct
              Exchange1 topic

              C:\Program Files\RabbitMQ\rabbitmq_server-2.0.0\sbin>rabbitmqctl list_bindings
              Listing bindings ...
              TestQueue1 TestQueue1 []
              TestQueue2 TestQueue2 []
              Exchange1 TestQueue2 ROUTING.# []
              Exchange1 TestQueue2 ROUTING.* []
              Exchange1 TestQueue1 TEST.ROUTING.# []
              Exchange1 TestQueue1 TEST.ROUTING.* []


              ===================================

              The modified consumer code (using Spring AMQP) and the producer code (using RabbitMQ API) is given below.


              CONSUMER CODE:
              =============

              public class Consumer {
              public static void main(String[] args) {
              AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfigu ration.class);
              TopicExchange exchange = new TopicExchange("Exchange1");
              Queue queue1 = new Queue("TestQueue1");
              Queue queue2 = new Queue("TestQueue2");

              queue1.setDurable(true);
              queue2.setDurable(true);

              AmqpAdmin amqpAdmin = (AmqpAdmin)context.getBean("amqpAdmin");

              // Queue parameters: Durable - True ; AutoDelete - False; Exclusive - False
              amqpAdmin.declareQueue(queue1);
              amqpAdmin.declareQueue(queue2);


              // Exchange parameters: Durable - False ; AutoDelete - False;
              amqpAdmin.declareExchange(exchange);

              Binding binding1 = new Binding(queue1, exchange, "TEST.ROUTING.*");
              amqpAdmin.declareBinding(binding1);
              Binding binding2 = new Binding(queue2, exchange, "ROUTING.#");
              amqpAdmin.declareBinding(binding2);

              ConnectionFactory connectionFactory = (ConnectionFactory)context.getBean("connectionFact ory");
              List<String> subscriptions = new ArrayList<String>();
              subscriptions.add("TestQueue1");
              subscriptions.add("TestQueue2");

              Queue[] queues = new Queue[subscriptions.size()];
              for(int q = 0; q < subscriptions.size(); q++) {
              queues[q] = new Queue(subscriptions.get(q));
              }

              SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
              container.setConnectionFactory(connectionFactory);
              container.setQueues(queues);
              ConsumerLookup lookup = new ConsumerLookup();
              container.setMessageListener(lookup);
              container.afterPropertiesSet();
              container.start();
              }
              }


              =================================
              PRODUCER CODE:
              =============


              public class Producer {
              public static void main(String[] argv) throws java.io.IOException {
              Connection conn = null;
              ConnectionFactory factory = new ConnectionFactory();
              factory.setHost("localhost");
              factory.setUsername("guest");
              factory.setPassword("guest");
              conn = factory.newConnection();
              Channel chan = conn.createChannel();
              String queueName = "TestQueue1";
              String routingKey = "TEST.ROUTING.one.two.three";
              String exchangeName = "Exchange1";
              String msg = "MESSAGE FROM TEST QUEUE 1";
              chan.queueDeclare(queueName, true, false, false, null);

              chan.basicPublish(exchangeName, routingKey, null, msg.getBytes());
              chan.close();
              conn.close();
              }
              }


              =======================

              As per the requirement for Consumer application, it has to read the list of queue names and corresponding binding keys at runtime and listen for messages from Producer application. Multiple Producer applications will send messages to topic exchange with different routing keys. Messages will be read from particular queues, if the binding key and the routing key matches.


              Can you please correct if the values set for Durable, AutoDelete, Exclusive properties are correct for my requirement?

              Comment

              Working...
              X