Announcement Announcement Module
Collapse
No announcement yet.
Reply queue with onMessage().... Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Reply queue with onMessage()....

    Hi,
    again it's me. I have a question regarding the implementation of this scenario: http://rdoc.info/github/ruby-amqp/am..._Reply_pattern

    My producer looks like this:

    Code:
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, job, new MessagePostProcessor() {
    					
    	@Override
    	public Message postProcessMessage(Message message) throws AmqpException {
    		message.getMessageProperties().setReplyTo(RESPONSE_QUEUE);
    	   try {
    		  message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString().getBytes("UTF-8"));
    	   } catch (UnsupportedEncodingException ex) {
    		throw new AmqpException(ex);
    	   }
    		return message;
    	}
    });
    Now the question is, how to implement the opposite on the consumer side:

    Code:
    /* (non-Javadoc)
    	 * @see org.springframework.amqp.rabbit.core.ChannelAwareMessageListener#onMessage(org.springframework.amqp.core.Message, com.rabbitmq.client.Channel)
    	 */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
    	Job job = listenerHelper.convertMessage(message);
    		
    	Thread.sleep(2000);		
    	System.out.println("received: " + job.getCommand());
    		
    	channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    		
    }
    In the documentation I couldn't find anything regarding that... Do I need to that via rabbitTemplate?

  • #2
    Hi,
    I digged further and after rereading the documentation I extended my producer to:

    Code:
    resultJob = (Job) rabbitTemplate.convertSendAndReceive(EXCHANGE_NAME, ROUTING_KEY_NAME, job, new MessagePostProcessor() {
    					
    					@Override
    					public Message postProcessMessage(Message message) throws AmqpException {
    						//message.getMessageProperties().setReplyTo(RESPONSE_QUEUE);
    						try {
    							message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString().getBytes("UTF-8"));
    						} catch (UnsupportedEncodingException ex) {
    							throw new AmqpException(ex);
    						}
    						return message;
    					}
    				});
    But the question is still, how I answer to that in my onMessage(...) method. Could anyone help me out with that?

    Side note: Perhaps it would be quite helpful to extend the tutorial section of Spring AMQP with Rabbit along the examples provided on the website of Rabbit self.
    Last edited by johanneshiemer; Oct 28th, 2011, 07:41 AM.

    Comment


    • #3
      Originally posted by johanneshiemer View Post
      Do I need to that via rabbitTemplate?
      That seems sensible to me. You want to send a message in your listener, right, responding to the replyTo header in the incoming message? Note you can use acknowledgeMode=AUTO to have Spring AMQP do the ack for you.

      Comment


      • #4
        Hi Dave,
        thank you for your reply. Yes I am aware of Acknowledge.AUTO, but as I am using prefetchCount(5); and need to regulate to concurrent onMessage()-Threads, I send the ACK manually after execution and freeing of the specific slot.

        Coming back to the problem of the request/reply. As you can see, Spring creates with rabbitTemplate.convertSendAndRecieve() an non-durable queue for. In my onMessage-Method on the consumer side I am not sure right now, how to send "the answer" to this anonymous queue.
        It would be great if you could give me a hint...

        Thanks a lot for your answers,

        Johannes

        Comment


        • #5
          I think all you need to do is look in the replyTo header and send the response back to that.

          I'm not sure I understand your acking strategy, and I would hope that the container can do it for you if it's not too weird - if it can't maybe we need a new feature request. But that's another question (we can come back to it later if you want).

          Comment

          Working...
          X