Announcement Announcement Module
Collapse
No announcement yet.
Messages not Being Sent? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Messages not Being Sent?

    Hi All,

    I'm fairly certain that I'm doing something simple - wrongly! I am also violating one of my cardinal rules about coding late into a Friday night.

    In any event, I have this demo app that establishes a producer thread and a consumer thread.

    The main thread init code creates a topic exchange and a queue like this:

    Code:
    		ConnectionFactory connectionFactory = getConnectionFactory();
    		rabbitTemplate = new RabbitTemplate(connectionFactory);                  // create Spring's implementation of the AmqpTemplate interface
    
    		rabbitAdmin = new RabbitAdmin(rabbitTemplate.getConnectionFactory());    // get a RabbitAdmin providing required connectionFactory ctor arg
    		
    		Exchange topicExchange = new TopicExchange("pbTopicExchange", false, false);    // non-durable, and no auto-delete
    		rabbitAdmin.declareExchange(topicExchange);
    		
    		Queue queue = new Queue("pbJobMgmtRqst", false, false, false);           // non-durable, non-exclusive, no auto-delete
    		rabbitAdmin.declareQueue(queue);                                         // NB: the RabbitAdmin object links the exchange and the queue
    The producer's run method looks like this:

    Code:
    public void run()
    	{
    		System.out.println("ENTERED PRODUCER THREAD");
    
    		ConnectionFactory connectionFactory = getConnectionFactory();
    		rabbitTemplate = new RabbitTemplate(connectionFactory);              
    
    		rabbitAdmin = new RabbitAdmin(rabbitTemplate.getConnectionFactory());    // get a RabbitAdmin providing required connectionFactory ctor arg
    		
    		Queue queue = new Queue("pbJobMgmtRqst", false, false, false);           // non-durable, non-exclusive, no auto-delete
    		rabbitAdmin.declareQueue(queue);                                         // NB: the RabbitAdmin object links the exchange and the queue
    		
    		String strMsg = "This is a test...";
    		
    		byte[] msgBody = null;
    		
    		try
    		{
    			msgBody = strMsg.getBytes("UTF-8");
    		}
    		catch(Exception e)
    		{
    			System.out.println("CAUGHT EXCEPTION");
    		}
    		
    		MessageProperties msgProperties = new MessageProperties();
    		Message msg = new Message(msgBody, msgProperties);
    		
    		System.out.println("SENDING MSG to exchange " + exchangeName);
    		
    		for ( int n=0; n < 100; n++)
    		{
    			rabbitTemplate.send(exchangeName, "mgmt.clientID.1234", msg);
    		}
    	}
    After running this (let's forget about consumer thread for the moment), I see no evidence of any messages queued against queue "pbJobMgmtRqst." That is, the RabbitMQ admin plugin shows 0 Ready, 0 Unacknowledged, 0 Total.

    The consumer has not run.

    The admin console also shows the following under Queues -> Bindings:

    Incoming to pbJobMgmtRqst
    From: (AMQP default)
    Routing key: pbJobMgmtRqst

    I found this odd because I thought that with statement

    Code:
    rabbitTemplate.send(exchangeName, "mgmt.clientID.1234", msg);
    I had specified an Exchange name of "pbTopicExchange." This is set in the producer thread's constructor.

    So before I get to the consumer issues (another post to this thread), some questions:

    a. shouldn't I be seeing in the admin console some evidence of the 100 messages I sent ?
    b. why does the admin console show "AMQP default" as the exchange?

    On a side note: I am finding discrepancies between the online documentation at

    http://static.springsource.org/sprin...1.0.x/apidocs/

    and the Spring AMQP - Reference Documentation 1.0.0.RELEASE PDF. For example, the PDF shows

    Code:
    Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
    But the online docs give no instance of BindBuilder.bind returning a Binding. Instead, .bind returns something called a BindingBuilder.DestinationConfigurer. In fact, my Eclipse IDE changed my use of Binding to GenericArgumentsConfigurer.

    The consumer thread is the next issue but, again, I will save that for later.

    I would be grateful for any help on this.

    Thanks.

    -Paul

  • #2
    Paul,

    The following would send a message to the Exchange named by the value in 'exchangeName' with a routing key of 'mgmt.clientID.1234':

    Code:
    rabbitTemplate.send(exchangeName, "mgmt.clientID.1234", msg);
    But from what you mentioned above, it sounds like you want to send to your 'pbJobMgmtRqst' queue, and that is only bound to the default no-name Exchange.

    So to send to it would require something like this:

    Code:
    rabbitTemplate.send("pbJobMgmtRqst", msg);
    That 2-arg version of the send method expects a 'routing-key' and the Message instance. When a Queue is bound only to the default exchange, you must send to that Exchange with the Queue's name as routing-key if you want the message to be sent to that Queue.

    Hope that helps.
    -Mark

    Comment


    • #3
      Hi Mark,

      Thank you for your reply.

      Yes, I do want to send the message to queue "pbJobMgmtRqst." So, I think that my question boils down to why isn't this queue bound to exchange "pbTopicExchange?"

      I'm a bit confused here. My producer does this:

      Code:
      rabbitTemplate.send(exchangeName, "mgmt.clientID.1234", msg);
      with variable exchangeName set to "pbTopicExchange." So why isn't the message sent to that exchange?

      From a different perspective: my understanding is that the act of binding devolves on the consumer only. That is, my "main" thread creates the exchange and the queue. My producer idempotently declares the same queue and, in sending to exchange "pbTopicExchange," specifies the aforementioned routing key. But only the consumer needs to actually bind this queue to the exchange - right?

      I feel like I am missing something very obvious....

      Thanks for your help.

      -Paul

      Comment


      • #4
        You are free to bind queues to exchanges wherever you like, but until they are bound you can't expect messages sent to the exchange to show up on the queue. That seems to be the gap here. Your producer is undoubtedly sending the messages successfully but the broker has no route for them so they will be discarded.

        Comment


        • #5
          Dave, thanks.

          I think that your analysis is dispositive.

          Does this mean that the binding entity, the consumer in my case, must be started first?

          I have been under the impression that order didn't matter in this regard, i.e., producer could start first and its messages would be held at the exchange.

          If this is not so, how does one handle a scenario in which the consumer bounces? While it's down messages sent by the producer will simply be lost.

          Perhaps tomorrow afternoon I will post here the difficulties I am having with my consumer. As noted, I am having some difficulty squaring the online DOCS against the spring AMQP 1.0.0 reference PDF. At least in part because of these difficulties, I haven't gotten the consumers bind code to work.

          This is my first experience with a topic exchange. Fanout was easy!

          Thanks for your help.

          -paul*

          Comment


          • #6
            Paul, I'd recommend a quick look at these tutorials, especially #5: http://www.rabbitmq.com/getstarted.html

            That should clarify how Queues bind to a Topic Exchange and the role of the routing key. Then, on the Spring AMQP side - OR directly in the broker config or admin UI, you would just bind your Queue to the TopicExchange with a pattern that determines which messages should be routed to that Queue. At that point, sending with RabbitTemplate is a matter of passing the exchangeName, the routingKey, and the Message.

            Hope that helps get you on the right track.
            -Mark

            Comment


            • #7
              Thanks Mark.

              I have indeed looked at that tutorial and I think I have an
              OK understanding of topic exchange, binding key and routing
              key.

              My difficulty or, rather, a difficulty concerns the binding code
              itself. I think I'm doin' it wrong. I will post it tmrw when. I
              get back from Boston.

              While I have your attention (and Dave's), which i appreciate,
              can you look at my earlier post about sharing an AmqpTemplate?

              Thanks.

              -paul

              Comment


              • #8
                Hi again,

                As promised (or threatened!) here is the consumer code. This is the constructor for my class AmqpDemoConsumer. I know that there is something wrong here.

                Code:
                public AmqpDemoConsumer(String qName, String bindKey, int concurrConsumers) throws Exception 
                {
                  queueName = qName; 
                  bindingKey = bindKey; 
                  Queue queue = new Queue(queueName, false, false, false); 
                  Exchange topicExchange = new TopicExchange("pbTopicExchange", false, false); // non-durable, and no auto-delete 
                
                //Binding b = new Binding(someQueue, someTopicExchange, "foo.*"); 
                  
                  GenericArgumentsConfigurer b = BindingBuilder.bind(queue).to(topicExchange).with(bindingKey); 
                  numberConcurrentConsumers = concurrConsumers; 
                  ConsumerSimpleMessageListenerContainer container =  new ConsumerSimpleMessageListenerContainer(); 
                  container.setConnectionFactory(getConnectionFactory());     // connect to broker node 
                  container.setQueueNames(queueName);                             // set name of Q whence receive messages 
                  container.setConcurrentConsumers(numberConcurrentConsumers); 
                  container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), new SimpleMessageConverter())); 
                  Queue[] qArray = new Queue[1]; 
                  qArray[0] = queue;
                  container.setQueues(qArray);
                  container.startConsumers();
                }
                When I tried to use the BindingBuilder.bind call, I thought (based on the Spring AMQP 1.0.0 reference PDF) that I would get back a Binding object. When, per that documentation I first coded:

                Code:
                Binding b = BindingBuilder.bind(queue).to(topicExchange).with(bindingKey);
                my Eclipse IDE complained "type mismatch: cannot convert from BindingBuilder.GenericArgumentsConfigurer to Binding"

                This hearkens back to my question about the seeming inconsistencies between the reference PDF and the online Spring AMQP docs. Page 7 of that document shows this example:

                Code:
                Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
                What am I doing wrong? Put otherwise, can someone show me how, using Spring AMQP, I should establish the binding to the topic exchange?

                The next issue is that, because I couldn't establish a Binding object, I never bound the pbJobMgmtRqst to the topic exchange. Based on what Mark and Dave have said, I assume that this failure is why no messages ever show up: the broker sees no outbound mapping for the messages my producer is sending - so he discards them. Is this analysis correct?

                Finally, a propos of this broker behavior, there remains the question that I asked Dave a few days ago; to wit, if the broker behavior is to discard messages for which he has no outbound mapping (i.e., no one having bound the queue to the exchange with a binding key that matches routing key), then doesn't this mean the binding entity (consumer in my case) must always be up before the producer starts?

                If so, this seems problematic to me, especially in so robust a system as Rabbit. If producing and consuming entities are significantly decoupled, to the point where they need know nothing about each other, then don't we in fact have a re-coupling here; i.e., why should producer have to think about whether or not the consumer is up??

                Thanks for your help.

                -Paul

                Comment


                • #9
                  The problem is the compiler doesn't know what the Exchange type is...

                  Code:
                  Exchange topicExchange = new TopicExchange(...
                  You need to declare topicExchange as a TopicExchange for the fluent API to work properly - take a look at the various to() methods on the DestinationConfigurer (which is returned by the bind() method) - the to() return type depends on the parameter type.

                  Comment


                  • #10
                    Originally posted by PMBell;404960if the broker behavior is to discard messages for which he has no outbound mapping (i.e., no one having bound the queue to the exchange with a binding key that matches routing key), then doesn't this mean the binding entity (consumer in my case) must always be up [B
                    before[/B] the producer starts?
                    That's the AMQP protocol - it's that way by design whether we like it or not. Many users of AMQP actually don't care much about consumers missing messages. For those that do, what it means in practice is that if you can't rely on your consumers starting before your producers, then you have to create the bindings somewhere other than your consumer. Remember bindings can be durable, so once you have them set up you don't have to have this problem ever again with the same broker instance.

                    Comment


                    • #11
                      Dave and Gary,

                      Thanks again. The news about durable bindings is very good. And Gary's suggestion re defining a TopicExchange rather than a mere Exchange did the trick.

                      So I am very close to getting this demo to work.

                      However, when I run the consumer (now a distinct Java application rather than a thread created by the AmqpDemo class), I am getting this error (actually, I am getting thousands of them):

                      Failed to invoke target method 'handleMessage' with argument type = [class org.springframework.amqp.core.Message], value = [{(Body:'This is a test, ...'; ID:null; Content:text/plain; Headers:{}; ExchangebTopicExchange; RoutingKey:mgmt.clientID.1234; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:701)}

                      I think that ultimately (or originally) this error has something to do with the type of message I am sending. Rather, what each end thinks about the type of message. Here is the producing code:

                      Code:
                      String strMsg = "This is a test, ...";
                      		
                      		byte[] msgBody = null;
                      		
                      		try
                      		{
                      			msgBody = strMsg.getBytes("UTF-8");
                      		}
                      		catch(Exception e)
                      		{
                      			System.out.println("CAUGHT EXCEPTION");
                      		}
                      		
                      		MessageProperties msgProperties = new MessageProperties();
                      		msgProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
                      		Message msg = new Message(msgBody, msgProperties);
                      		rabbitTemplate.send(exchangeName, "mgmt.clientID.1234", msg);
                      And here is how I set up for receiving messages in the consumer Java application:

                      Code:
                      		Queue queue = new Queue(queueName, false, false, false); 
                      		TopicExchange topicExchange = new TopicExchange("pbTopicExchange", false, false);    // non-durable, and no auto-delete
                      
                      		Binding b = BindingBuilder.bind(queue).to(topicExchange).with(bindingKey);
                      		rabbitAdmin.declareBinding(b);
                      		
                      		numberConcurrentConsumers = 1;
                      		ConsumerSimpleMessageListenerContainer container = new ConsumerSimpleMessageListenerContainer();
                      		container.setConnectionFactory(getConnectionFactory());                  // connect to broker node
                      		container.setQueueNames(queueName);                                      // set name of Q whence receive messages
                      		container.setConcurrentConsumers(numberConcurrentConsumers);
                      		//container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), new SimpleMessageConverter()));
                      		container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), null));
                      		container.startConsumers();
                      Please note the commented out line that calls setMessageListener. I was experimenting with a SimpleMessageConverter and, in the next line, tried setting that converter to null. In each case I continue to get the error, but the phenomena varies some; specifically, the text of the error message is a bit different (if I recall correctly).

                      The ConsumerSimpleMessageListenerContainer is simply this:

                      Code:
                      public class ConsumerSimpleMessageListenerContainer extends SimpleMessageListenerContainer
                      {
                      	public void startConsumers() throws Exception
                      	{
                      		super.doStart();
                      	}
                      }
                      And the handler itself is simply this:

                      Code:
                      public class ConsumerHandler  
                      {
                      	public void handleMessage(String text) throws Exception
                      	{
                      			System.out.println("Received--------------------------: " + text);
                      	}
                      }
                      Again, I suspect this is a simple mistake.

                      Can someone point out where my misunderstanding is?

                      Thanks.

                      -Paul

                      Comment


                      • #12
                        Well, explicitly setting the adapter's message converter to null won't work - that will try to find a method handleMessage(Message m) (i.e. perform no conversion).

                        The SimpleMessageConverter should work - can you share the exact text of the message you are getting in that configuration?

                        Have you looked at using Spring Integration; it provides a much higher-level abstraction on top of all this? There's a sample here https://github.com/SpringSource/spri...ter/basic/amqp.

                        Comment


                        • #13
                          Gary (and others),

                          Your post arrived just before I was going to post news of some progress.

                          I decided to try using one of the (slightly) higher-level APIs and, rather than call .send() of a Message, I tried .convertAndSend as follows:

                          Code:
                          byte[] theMsg = { 0x61, 0x62, 0x63 };
                          rabbitTemplate.convertAndSend("pbTopicExchange", "mgmt.clientID.1234", theMsg);
                          I also restored in the consumer's "set up" code the instruction:

                          Code:
                          container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), new SimpleMessageConverter()));
                          These changes worked and moved me to a happier place. I can now produce and consume both String and byte[] objects.

                          But I remain interested in understanding what I did wrong when trying to send a "raw" Message. I note that in getting String and byte[] to work, I needed to change the handleMessage signature accordingly. In the case of a raw Message, is it a matter of changing this signature to

                          Code:
                          public void handleMessage(Message msg)
                          ?

                          Also, I suspect that in handling such messages I must somehow inform Spring AMQP that I don't need the services of a MessageConverter. Is this correct?

                          Oh, in re Spring Integration: I am keenly interested in its capabilities. Also, I have noticed with interest that a few heavy hitters from Spring Integration weigh in frequently about AMQP. Learning something about Spring Integration is on my "to do" list. I forsee its use in a project I am working on.

                          Thanks for your help.

                          -Paul

                          Comment


                          • #14
                            Well, again, I'd need to see the actual error you got when using a SimpleMessageConverter and handleMessage(String s).

                            With no message converter you would need the parameter type to be Message like you found. Your test case is very similar to one of the message listener adapter unit tests MessageListenerAdapterTests.testDefaultListenerMet hod(). Which works just fine with a plain text message, a SimpleMessageConverter and a handleMessage(String data) method. So I am intrigued as to why you saw an error with that combination.

                            ...I must somehow inform Spring AMQP that I don't need the services of a MessageConverter. Is this correct?
                            You always need a MessageConverter if your listener takes anything other than a Message as its parameter. If you want to get the whole message, you can simply implement MessageListener and do away with the adapter altogether. Even if you use an adapter, if the listener implements MessageListener (or ChannelAwareMessageListener), it is invoked without conversion.

                            HTH

                            Comment


                            • #15
                              Dave and Gary,

                              By way of follow-up....

                              It was relatively easy to get the "raw" message approach working. Per Gary's instruction, I set the container's MessageListener to an instance of a class (ConsumerListener) that implements MessageListener, and did away with the adapter:

                              Code:
                              		
                              ConsumerSimpleMessageListenerContainer container = new ConsumerSimpleMessageListenerContainer();
                              container.setConnectionFactory(getConnectionFactory());   
                              container.setQueueNames(queueName);                                      		
                              container.setMessageListener(new ConsumerListener());
                              The ConsumerListener does simply this:
                              Code:
                              public void onMessage(Message msg)
                              	{
                              		byte[] b = msg.getBody();
                              		System.out.println("RECEIVED MSG=" + new String(b));
                              	}
                              So, via this approach I am able to handle (should the need arise) "raw" messages. And using an adapter and message converter, I can handle messages in a not-so-raw (cooked?) manner.

                              I have one question about the order of component (consumer, producer) startup and the notion of a "durable binding" that Dave raised. Specifically, am I right that a "durable binding" comes to be when one binds a durable queue to a durable exchange; put otherwise, that there is no Binding constructor or setter that marks a binding "durable?"

                              Thank you both for your help. This has been an enjoyable and instructive forum experience.

                              Cordially,

                              Paul

                              Comment

                              Working...
                              X