Announcement Announcement Module
Collapse
No announcement yet.
Why Synchronous reception is not blocking Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Why Synchronous reception is not blocking

    Hi All,

    As per the java doc "The simpler option is to poll for a single Message at a time with a synchronous, blocking method call"


    However the template.receive() is not synchronous blocking call and returns null instead.

    On using Rabbit MQ API it was observed as blocking call...

    Am i doing something wrong here

    Spring based code implementation
    Code:
    
                   while (true) {
    			try {
    				status = txManager.getTransaction(def);
    				Message msg = template.receive(); //This is not blocking and returns null if Q is empty
    				System.out.println(msg);
    				if (msg != null) {
    					//business logic goes here..
    				}
    				txManager.commit(status);
    			} catch (Exception e) {
    				
    				txManager.rollback(status);
    			}
    		}

    Rabbit mq code

    Code:
    	while( true) {
    			
    			QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //This is blocking call until msg arrives.
    			
    			Object eventObj = ByteStreamUtil.convertToObject( delivery.getBody());
    			try {
    				//business logic goes here
    				channel.basicAck( delivery.getEnvelope().getDeliveryTag(), false);
    			}
    			catch( Exception e) {
    				e.printStackTrace();
    				channel.basicNack( delivery.getEnvelope().getDeliveryTag(), false, false);
    			}
    		}
    Any thoughts and more information explaining above behavior is appreciated.

    Thanks
    Steve

  • #2
    Any response from core spring amqp team will be appreciate. Currently we cannot use spring amqp due to difference in behavior observed as explained above.

    Thanks
    Steve

    Comment


    • #3
      Sorry, but I believe that JavaDoc was relevant for the JmsTemplate upon which the AmqpTemplate was modeled, but the notion of a blocking receive is not quite as direct as in JMS (where there is a blocking receive method in the API itself).

      We at least need to update the documentation (please open a JIRA for that), and we may consider supporting both blocking and non-blocking receive calls (returning a Message OR null after 0 timeout as done currently).

      That said, it seems that the MessageListenerContainer approach could work for any case where you want to block until a Message arrives (the container can be started/stopped as needed). Can you explain your use-case a bit and why you want the blocking receive call specifically?

      Thanks,
      Mark

      Comment


      • #4
        Thanks Mark.
        I've created JIRA to update java doc. Using receive was simplest approach to fit in current application framework which is using rabbit mq. I will try to figure out how we can leverage MessageListener approach in our application.

        Our use case is pretty standard , on receiving messages perform some business logic. I can definitely use Listener to solve our case.

        Comment


        • #5
          Transaction in message listener container

          Hi Mark & All,

          I am facing some issues with transactions in message listener container.

          Logs do indicate rollback but message is not put back into Q.

          01:08:57,140 DEBUG RabbitTransactionManager:843 - Initiating transaction rollback
          01:08:57,140 DEBUG RabbitResourceHolder:164 - Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,3)
          Here is code

          Code:
          SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
          		container.setConnectionFactory((ConnectionFactory) context.getBean("rabbitConnFactory"));
          		container.setQueueNames(getQueueName());
          		container.setMessageListener(new MsgListener(txManager));
          		container.setChannelTransacted(true);
          		container.setPrefetchCount(1);
          		container.setTxSize(1);
          		container.start();
          MsgListener

          Code:
          class MsgListener implements MessageListener {
          		PlatformTransactionManager txManager;
          		TransactionStatus status = null;
          		DefaultTransactionDefinition def;
          
          		public MsgListener(PlatformTransactionManager txManager) {
          			this.txManager = txManager;
          			def = new DefaultTransactionDefinition();
          			def.setName(getQueueName()+"TX");
          			def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
          		}
          		 
          		@Override
          		public void onMessage(Message msg) {
          			try {
          				status = txManager.getTransaction(def);
          				System.out.println(msg);
          				if (msg != null) {
          					
          					Object object = ByteStreamUtil.convertToObject(msg
          							.getBody());
          					if (object != null) {
          						processEvent((E) object);
          					}
          				}
          				txManager.commit(status);
          			} catch (Exception e) {
          				log.error(
          						"Error while consuming message - message will be put back on queue",
          						e);
          				txManager.rollback(status);
          			}
          		}
          		
          	}
          I would really appreciate if you or someone could throw some light on this.

          Thanks much
          Steve

          Comment

          Working...
          X