Announcement Announcement Module
Collapse
No announcement yet.
RabbitTemplate.send() not throwing Exception on non-existing exchange and routing key Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • RabbitTemplate.send() not throwing Exception on non-existing exchange and routing key

    Hi.

    I am trying to publish some messages to a given exchange and channel using RabbitTemplate. The class and beans are listed below. The problem is, even though I never created the exchange or routing key that I am using in rabbitTemplate.send(), the code does not complain! I was hoping to see AmqpException or some other exception.

    The following logging clearly states that it is trying to publish to the desired exchange and routing key:

    Code:
    Fri Aug 23 20:42:25.627 PDT 2013 | "http-bio-8080"-exec-1 | MessagePublisher | INFO | Publishing message: <my.message>
    Fri Aug 23 20:42:25.639 PDT 2013 | "http-bio-8080"-exec-1 | MessagePublisher | DEBUG | AMQP message body: [B@4e326f04
    Fri Aug 23 20:42:25.639 PDT 2013 | "http-bio-8080"-exec-1 | MessagePublisher | DEBUG | AMQP message: (Body:'<my.message>'; ID:null; Content:application/json; Headers:{__TypeId__=java.lang.String}; Exchange:null; RoutingKey:null; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:0)
    Fri Aug 23 20:42:25.639 PDT 2013 | "http-bio-8080"-exec-1 | MessagePublisher | DEBUG | RabbitTemplate: org.springframework.amqp.rabbit.core.RabbitTemplate@511697b7
    Fri Aug 23 20:42:25.672 PDT 2013 | "http-bio-8080"-exec-1 | org.springframework.amqp.rabbit.connection.CachingConnectionFactory | DEBUG | Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,1)
    Fri Aug 23 20:42:25.688 PDT 2013 | "http-bio-8080"-exec-1 | org.springframework.amqp.rabbit.core.RabbitTemplate | DEBUG | Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1)
    Fri Aug 23 20:42:25.688 PDT 2013 | "http-bio-8080"-exec-1 | org.springframework.amqp.rabbit.core.RabbitTemplate | DEBUG | Publishing message on exchange [my.exchange], routingKey = [my.routingKey]
    Fri Aug 23 20:42:25.691 PDT 2013 | "http-bio-8080"-exec-1 | org.springframework.amqp.rabbit.connection.CachingConnectionFactory | DEBUG | Returning cached Channel: AMQChannel(amqp://[email protected]:5672/,1)
    Fri Aug 23 20:42:25.695 PDT 2013 | "http-bio-8080"-exec-1 | SmsListenerController | INFO | Sms message: SmsMessage@655b395e, status: SUCCESS

    Code:
       public void publish(Object messageObj) 
          throws PublisherException {
          
          final MessageProperties messageProperties = getMessageProperties();
          
          try {
             logger.infoP("Publishing message: {}", messageObj);
             
             final Message amqpMessage = 
                   rabbitTemplate.getMessageConverter().toMessage(messageObj, messageProperties);
             
             logger.debugP("AMQP message body: {}", amqpMessage.getBody());
             logger.debugP("AMQP message: {}", amqpMessage.toString());
             logger.debugP("RabbitTemplate: {}", rabbitTemplate.toString());
             
             rabbitTemplate.send(amqpMessage);
             
             statusReporter.report(messageObj, PublisherStatus.SUCCESS);
             
          } catch (MessageConversionException mce) {
             
             statusReporter.report(messageObj, PublisherStatus.FAILURE);
             
             logger.warnP("Exception converting message {} to JSON! {}", messageObj, mce);
             logger.notice(mce, mce);
             
             throw new PublisherException(mce);
             
          } catch (AmqpException ae) {
             
             statusReporter.report(messageObj, PublisherStatus.FAILURE);
             
             logger.errorP("Exception sending message {} to Rabbit Template {}! {}", 
                   messageObj, rabbitTemplate, ae);
             logger.warn(ae, ae);
             
             throw new PublisherException(ae);
             
          } catch (Exception e) {
             
             statusReporter.report(messageObj, PublisherStatus.FAILURE);
             
             logger.errorP("Exception while publishing message: {}! {}", messageObj, e);
             logger.warn(e, e);
             
             throw new PublisherException(e);
          }
       }
       
       protected MessageProperties getMessageProperties() {
          return new MessageProperties();
       }
    Below is what my bean config looks like:

    Code:
    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"
             p:connectionFactory-ref="springAmqpConnectionFactory"
             p:exchange="${spring.amqp.exchange.name}"
             p:routingKey="${spring.amqp.routingKey.name}"
             p:messageConverter-ref="messageConverter" />
             
    <bean id="messageConverter" 
             class="org.springframework.amqp.support.converter.JsonMessageConverter" />
    Code:
    spring.amqp.exchange.name=my.exchange
    spring.amqp.routingKey.name=my.routingKey

  • #2
    That's just the way AMQP works; messages are silently dropped, by default, if they can't be routed. One of the ways RabbitMQ gets its amazing performance is the sending thread is not blocked waiting for an acknowledgment.

    RabbitMQ does have an extension whereby you can get confirmation (ack or nack, on a different thread) as to whether a particular message was processed by the broker or not - see http://www.rabbitmq.com/confirms.html.

    You can also enable publisher returns which provides a mechanism to get the message returned (on another thread) if it couldn't be routed; it requires you to set the mandatory flag - see http://www.rabbitmq.com/amqp-0-9-1-reference.html and search for mandatory.

    bit mandatory
    This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message.
    The Spring AMQP RabbitTemplate supports both these features; see http://static.springsource.org/sprin...html/amqp.html

    Section 3.2.2 about configuring the connection factory and 3.3.1 for the template.
    Last edited by Gary Russell; Aug 27th, 2013, 10:10 PM.

    Comment


    • #3
      You can also use a RabbitAdmin to automatically declare the exchange, queue, and binding.

      Comment


      • #4
        Thanks, Gary. I tried using publisherReturns. But I don't see it doing anything!

        Code:
        <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"
                 p:connectionFactory-ref="springAmqpConnectionFactory"
                 p:exchange="${spring.amqp.exchange.name}"
                 p:routingKey="${spring.amqp.routingKey.name}"
                 p:mandatory="${spring.amqp.connection.publisherReturns}"
                 p:messageConverter-ref="messageConverter" 
                 p:returnCallback-ref="smsMessageReturnCallbackHandler" />
                 
        <bean id="messageConverter" 
                 class="org.springframework.amqp.support.converter.JsonMessageConverter" />
                 
        <bean id="smsMessageReturnCallbackHandler"
                 class="com...SmsMessageReturnCallbackHandler" />
        Code:
        spring.amqp.connection.publisherReturns=true
        Code:
        import org.springframework.amqp.core.Message;
        import org.springframework.amqp.rabbit.core.RabbitTemplate;
        import org.springframework.amqp.support.converter.MessageConverter;
        import org.springframework.beans.factory.annotation.Autowired;
        
        public class SmsMessageReturnCallbackHandler 
           implements RabbitTemplate.ReturnCallback {
           
           private static final Logger logger = Logger.getLogger(SmsMessageReturnCallbackHandler.class);
           
           @Autowired
           private MessageConverter messageConverter;
           
           @Override
           public void returnedMessage(Message message, 
                                       int replyCode, 
                                       String replyText,
                                       String exchange, 
                                       String routingKey) {
              
              String convertedMessage = messageConverter.fromMessage(message).toString();
              logger.debugP("Could not publish message: {} with routing key: {} to exchange: {}", 
                    convertedMessage, routingKey, exchange);
              
           }
        }
        Of course, the exchange doesn't exist yet. That's the whole point of this exercise. I still don't see anything being logged! The Spring config does pick up the bean though, as seen from the debug log below.

        Code:
        Tue Aug 27 17:33:00.782 PDT 2013 | Thread-4 | org.springframework.beans.CachedIntrospectionResults | DEBUG | Getting BeanInfo for class [com.locationlabs.smslistener.springamqp.rabbit.SmsMessageReturnCallbackHandler]
        Tue Aug 27 17:33:00.784 PDT 2013 | Thread-4 | org.springframework.beans.CachedIntrospectionResults | DEBUG | Caching PropertyDescriptors for class [com.locationlabs.smslistener.springamqp.rabbit.SmsMessageReturnCallbackHandler]

        Further, I am not sure how to use Publisher Confirms in my particular scenario - I have no easy way of correlating back to the messages I attempted to publish. In my case, CorrelationData has to be the message attempted itself; that boils down to Publisher Returns.

        Comment


        • #5
          It seems I was mistaken; I just ran a test and it seems you only get a return if the exchange exists, but the routing key doesn't result in the message being routed to a queue.

          You will have to take that up with the RabbitMQ guys on their mailing list - https://lists.rabbitmq.com/cgi-bin/m...bbitmq-discuss Spring-AMQP provides the familiar Spring programming style on top of the core Rabbit API, it can't do more than the API itself.

          Similarly, publisher confirms really only confirm that the broker processed the message, not that it was routed anywhere.

          Like I said, you can automatically declare the exchange with a RabbitAdmin; then you'll get a return if there are no queue bindings.

          Comment


          • #6
            Thank you, Gary. Will follow up with the RabbitMQ guys.

            RabbitAdmin is still a different tool. We normally run scripts to set up the RabbitMQ entities we need. The reason for what I'm trying to do is if someone forgets to run the script, we should be able to detect that and raise alerts. That's not happening right now. Is there a method to check if a specified exchange and routing key/queue binding exist?

            Comment


            • #7
              I am not aware of an API to check a binding (although the management plugin has a rest API you can use to get that info).

              For the exchange, you could put something like this in your startup code...

              Code:
              @Test
              public void testExchangePresent() {
              	final String name = "junk";
              	try {
              		template.execute(new ChannelCallback<Object>() {
              
              			@Override
              			public Object doInRabbit(Channel channel) throws Exception {
              				channel.exchangeDeclarePassive(name);
              				return null;
              			}
              		});
              	}
              	catch (Exception e) {
              		fail("Exchange " + name + " is not present");
              	}
              }
              ... and use returns for the missing binding(s).

              exchangeDeclarePassive throws an exception if the exchange doesn't exist.

              Comment


              • #8
                This seems more complicated than needed! Going back to the RabbitAdmin approach, how to create one using Spring config? I don't see methods to set binding/queue/exchange that can be used by the declare*() methods. Do I have to write a class that makes a RabbitAdmin? That seems counterintuitive.

                Comment


                • #9
                  Just add the queues, exchanges, bindings to the context; add a RabbitAdmin, and you're done...

                  Code:
                  	<rabbit:connection-factory id="connectionFactory" />
                  
                  	<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
                  
                  	<rabbit:admin connection-factory="connectionFactory" />
                  
                  	<rabbit:queue name="si.test.queue" />
                  
                  	<rabbit:direct-exchange name="si.test.exchange">
                  		<rabbit:bindings>
                  			<rabbit:binding queue="si.test.queue" key="si.test.binding" />
                  		</rabbit:bindings>
                  	</rabbit:direct-exchange>
                  http://static.springsource.org/sprin...-configuration

                  Comment


                  • #10
                    Okay, the RabbitAdmin sets up the default broker definitions now.

                    But I have another problem:

                    Code:
                    Wed Aug 28 15:31:59.759 PDT 2013 | AMQP Connection 127.0.0.1:5672 | org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl | ERROR | No listener for seq:1
                    Wed Aug 28 15:31:59.759 PDT 2013 | AMQP Connection 127.0.0.1:5672 | org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl | ERROR | No listener for seq:2
                    "When these options are set, Channels created by the factory are wrapped in an PublisherCallbackChannel which is used to facilitate the callbacks. When such a channel is obtained, the client can register a PublisherCallbackChannel.Listener with the Channel. The PublisherCallbackChannel implementation contains logic to route a confirm/return to the appropriate listener."

                    Does this mean that I HAVE TO register a PublisherCallbackChannel.Listener? Right now, my re-handle logic is in SmsMessageReturnCallbackHandler, as mentioned in post #4. What's wrong with that?

                    Also, I hope it's okay to opt-in for only one of Publisher Confirms or Returns. I have only opted-in for Returns due to reasons mentioned in post #4.

                    Comment


                    • #11
                      Yes, you can opt in confirms separately, but if they are enabled then you need to register a listener it looks like you have set up the connection factory for confirms but don't have a listener.

                      Returns are controlled by the mandatory flag on the template, and need them enabled on the factory.

                      Comment


                      • #12
                        I just ran a test and I only get those messages if I have publisher-confirms="true" on the connection factory and no confirm-callback.

                        Comment


                        • #13
                          I have publisher-returns=true (and apparently no Listener callback).

                          Code:
                          <bean id="springAmqpConnectionFactory"
                                   class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"
                                   p:channelCacheSize="${spring.amqp.connection.channelCacheSize}"
                                   p:publisherReturns="${spring.amqp.connection.publisherReturns}">
                                   <property name="connectionListeners">
                                      <list>
                                         <ref bean="springAmqpLoggingConnectionListener" />
                                      </list>
                                   </property>
                                   <constructor-arg value="${rabbitmq.host}" />
                          </bean>
                          
                          
                          <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"
                                   p:connectionFactory-ref="springAmqpConnectionFactory"
                                   p:exchange="${spring.amqp.exchange.name}"
                                   p:routingKey="${spring.amqp.routingKey.name}"
                                   p:mandatory="${spring.amqp.connection.publisherReturns}"
                                   p:messageConverter-ref="messageConverter" 
                                   p:returnCallback-ref="smsMessageReturnCallbackHandler" />
                          
                          
                          <bean id="smsMessageReturnCallbackHandler"
                                   class="com...SmsMessageReturnCallbackHandler" />
                          What I don't understand is who takes PublisherCallbackChannel.Listener. RabbitTemplate doesn't seem to have a setter for it!

                          Comment


                          • #14
                            What I don't understand is who takes PublisherCallbackChannel.Listener. RabbitTemplate doesn't seem to have a setter for it!
                            It's dealt with internally; the RabbitTemplate is, itself, the Listener and propagates any data to is callbacks.

                            Code:
                            public class RabbitTemplate extends RabbitAccessor implements RabbitOperations, MessageListener,
                            	PublisherCallbackChannel.Listener {
                            ...
                            	public void handleConfirm(PendingConfirm pendingConfirm, boolean ack) {
                            ...
                            You should only be getting acks from RabbitMQ if the channel has confirms selected; and that is invoked in the connection factory only if publisherConfirms is set ...

                            Code:
                            		if (this.publisherConfirms) {
                            			try {
                            				channel.confirmSelect();
                            			} catch (IOException e) {
                            				logger.error("Could not configure the channel to receive publisher confirms", e);
                            			}
                            		}

                            Comment


                            • #15
                              Thanks, Gary. I had a misconfiguration in one of my CachingConnectionFactory beans such that it had publisherConfirms instead of publisherReturns! I think that's because I was going back and forth between the two. That caused the problem mentioned in post #10. Now I am using only publisherConfirms and things work.

                              Also, I decided to use Rabbit Admin to declare the broker definitions to avoid the possibility of misconfiguration while declaring things externally.

                              I have one question though - under what circumstances would a message be undelivered and the ReturnCallback strategy kicked in? I know absence of a routing key is one. What are some other scenarios? Didn't find much documentation on that.

                              Comment

                              Working...
                              X