Announcement Announcement Module
Collapse
No announcement yet.
programmatic RabbitTemplate configuration Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • programmatic RabbitTemplate configuration

    hi gary:
    what would be the java programmatic way to do the following spring-context:
    <rabbit:template id="amqpTemplate"
    connection-factory="connectionFactory" reply-queue="replies">
    <rabbit:reply-listener />
    </rabbit:template>

    is it enough to do the following:
    //Queue queueX
    RabbitTemplate template = new RabbitTemplate(connFactory);
    //queueX is declared using rabbit admin?
    template.setReplyQueue(queueX);

    is there anything to be done for setting a reply-listener as in the spring-context config?

    regards,
    -cogitate

  • #2
    It's quite a lot more involved than that; you need to create and configure a SimpleMessageListenerContainer, and set the RabbitTemplate as its MessageListener.

    Probably the best place to start is to look at the TemplateParser.

    Comment


    • #3
      thanks gary! will do.
      in the meanwhile if i were to use the template as a prototype bean for configuring a template or maybe use @Configuration is there a way i can do the same thing?
      1. would like to configure template with a reply-queue
      2. would like to use connections configured with "N" channels (max)
      3. and of course, use a reply-listener...

      Comment


      • #4
        Hi Monish,

        Here's how to programmatically configure a RabbitTemplate to use a fixed reply queue with a reply listener container...

        Code:
        SimpleMessageListenerContainer replyContainer = new SimpleMessageListenerContainer(connectionFactory);
        replyContainer.setQueueNames("reply.queue");
        
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setReplyQueue(new Queue("reply.queue"));
        
        replyContainer.setMessageListener(template);
        replyContainer.afterPropertiesSet();
        replyContainer.start();
        
        
        
        Object reply = template.convertSendAndReceive("test.exchange", "test.binding", "Hello, world!");
        Hope that helps.

        There is currently no way to limit the number of channels; the number of cached channels can be controlled on the CachingConnectionFactory.

        Comment


        • #5
          thanks Gary!
          this goes a long way in improving what i have today.
          many many thanks,
          -monish

          Comment


          • #6
            Hi Gary:
            sorry to bother you, but i have one more question. if i'd want to have control over #of connections how would i do that?
            let's say configuration params from user is :
            [1] # of connections = 2
            [2] # of channels(threads/reply_queues) = 6 ( executorservice threads per connection )
            for a total concurrency of 12. is it possible to specify this?

            kind regards,
            -cogitate

            Comment


            • #7
              No; each connection factory uses a single connection for all channels.

              If you want to use a separate connection for different components, specify a connection factory for each.

              There is no way to limit channels per connection; that is entirely controlled by threading. Channels are cached so the actual number of channels is often (much) less than the number of threads.

              For the listener container, one channel is used by each consumer (controlled by the concurrentConsumers property). For the RabbitTemplate, channels are retrieved from the cache or created as necessary. If three threads are calling the template concurrently, three channels will be used. By default, only one channel is cached; this is controlled by the channelCacheSize property. When a thread "closes" a channel, and the cache isn't full, the channel is returned to the cache. You might have 50 threads using the template, but if only 3 of them are actually using the template at the same time, only 3 channels will be used.

              I hope that explains.

              Comment


              • #8
                Thank you much for this explanation.
                -- as a feedback to you i tested several scenarios:
                [1] multithreaded test with different timeouts to see if there are cross correlation mismatch ( passes great )
                [2] disconnect tests [client is up, restart broker]( yet to verify all tests, but from what i can see works!)
                -- one thing which might be useful is to copy "spring_reply_correlation" key back to the message from rabbitTemplate.sendAndReceive().it seems the defaultMessagePropertiesConverter currently doesn't copy this over.
                it might be argued that it's transparent to the client (for ex. even to start with), but when i see the SimpleAsyncTaskExecutor return with a spring_reply_correlation# it's difficult to know what message to correlate that with (for ex. timeout scenarios)

                much thanks,
                -monish

                Comment


                • #9
                  Hi Monish,

                  We do have an open JIRA to support using the standard Rabbit CorrelationId property instead of this custom Spring property (https://jira.springsource.org/browse/AMQP-271). That is mapped by the default converter.

                  This will likely become the standard in 1.2 (while still supporting the Spring property for backward compatibility).

                  Comment


                  • #10
                    thanks Gary! i do see rabbit correlationid gets mapped by the default converter.

                    VIP testing using two brokers:
                    i use rabbitTemplate and simpleMessageListenerContainer(listener) like this:
                    * create a pool of rabbitTemplate clients that aggregates the listener, pre-configured to use a particular reply-queue ( to save on creation/deletion of reply queues).
                    * when a particular broker behind the VIP is shutdown, the listener manages to connect to the other broker thru the VIP, however, since the amqp (rabbitmq) queue is created outside the listener and since the idea is to re-use the queue, there's no feedback to re-create the queue in the new connection.

                    is there any api in the listener container i can make use of to get notified that i need to re-declare the queue?

                    regards,
                    -monish

                    Comment


                    • #11
                      Have you looked at using HA/mirrored queues? Rabbit takes care of the mirroring and queue declaration for you. One of the drivers (aside from the reduction in the use of temporary queues) for implementing a fixed reply-queue was to enable it to be used with HA arguments.

                      Comment


                      • #12
                        Thanks Gary!
                        will look at HA/mirrored queues - however, at this point it might be too much(apologize in advance for non-quantitative adjective) to implement from an operations perspective.

                        i was hoping for maybe a listener in rabbitadmin that i can use to re-init the queues when the listener re-inits.
                        if that's at all possible.
                        regards,
                        -monish

                        Comment


                        • #13
                          How are you declaring your queues? The RabbitAdmin (in afterPropertiesSet) sets itself up as a ConnectionListener and, when a new connection is established (after failover), it will re-declare the queues.

                          Comment


                          • #14
                            // this is the client that's pre-configured and pooled
                            import..
                            class SpringClient
                            {
                            ...
                            ...
                            private Queue replyQueue = null;

                            public SpringClient(CachingConnectionFactory connectionFactory, RabbitAdmin admin)
                            {
                            this.connectionFactory = connectionFactory;
                            this.admin = admin;
                            amqpTemplate = new RabbitTemplate(connectionFactory);
                            replyContainer = new SimpleMessageListenerContainer(connectionFactory);
                            replyQueueName = "REPLY_QUEUE."+ UUID.randomUUID().toString();
                            }

                            public Message callService(String exchange,String serviceName,Message payload) throws Exception
                            {
                            // create reply queue just in time
                            if (replyQueue == null)
                            {
                            try
                            {
                            createReplyQueue();
                            }
                            catch(Exception e) {}
                            }
                            amqpTemplate.setExchange(exchange);
                            amqpTemplate.setRoutingKey(serviceName);
                            amqpTemplate.setReplyQueue(replyQueue);
                            Message replyMessage = null;
                            try
                            {
                            replyMessage = amqpTemplate.sendAndReceive(payload);
                            }
                            catch(Exception e) { .. }
                            ....
                            return replyMessage;
                            }

                            private void createReplyQueue() throws IOException
                            {
                            declareQueue();
                            replyContainer.setQueueNames(replyQueueName);
                            replyContainer.setMessageListener(amqpTemplate);
                            replyContainer.afterPropertiesSet();
                            replyContainer.start();
                            }

                            private void declareQueue() throws IOException
                            {
                            admin = new RabbitAdmin(connectionFactory);
                            boolean durable = false;
                            boolean autoDelete = true;
                            boolean exclusive = true;
                            replyQueue = new Queue(replyQueueName,durable,exclusive,autoDelete) ;
                            admin.declareQueue(replyQueue);
                            }
                            }

                            so if i could check
                            if (replyQueue == null && replyContainer.reset() )// or something equivalent then i can redeclare the queue

                            regards,
                            -monish

                            Comment


                            • #15
                              Please use [ code ] ... [ /code ] tags around your code/config (no spaces inside brackets); it makes it easier to read.

                              If you simply let Spring take care of everything, it will just work automatically - the RabbitAdmin is a ConnectionListener and will declare everything for you when the connection is established. For example, from the Spring Integration amqp sample....

                              Code:
                                  <rabbit:connection-factory id="connectionFactory" />
                              
                                  <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
                              
                                  <rabbit:admin connection-factory="connectionFactory" />
                              
                                  <rabbit:queue name="si.test.queue" />
                              
                                  <rabbit:direct-exchange name="si.test.exchange">
                                      <rabbit:bindings>
                                          <rabbit:binding queue="si.test.queue" key="si.test.binding" />
                                      </rabbit:bindings>
                                  </rabbit:direct-exchange>
                              Whenever a connection is established, all the declarations are performed.

                              Comment

                              Working...
                              X