Announcement Announcement Module
Collapse
No announcement yet.
Rabbit MQ Management Not showing Acks to messages being consumed Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Rabbit MQ Management Not showing Acks to messages being consumed

    I have configured my spring SimpleMessageListener to automatically ack a message that is picked up from a queue.

    <bean id="messageListenerContainer" class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueName" value="cbs.push.queue" />
    <property name="concurrentConsumers" value="1" />
    <property name="messageListener" ref="messageListenerAdapter" />
    <property name="autoAck" value="true" />
    </bean>

    The listener does pick up messages from the queue but I see zero acks being reported in the RabbitMQ Management Utility.

    Do i have the bean configured incorrectly to ack a message?
    Last edited by catDeveloper; Dec 21st, 2010, 03:36 PM.

  • #2
    If you upgrade to the 1.0.0.M2 release and use channelTransacted=true (no autoAck flag in M2) it should work as you expect, I think.

    Comment


    • #3
      Thank you so much for responding. I have not been able to get past this issue for a couple of weeks now.

      I will try upgrading and let you know how it turns out.

      Comment


      • #4
        I upgraded to version 1.0.0.M2 and discovered the SimpleMessageListenerContainer now does not have a method call setAutoAck(boolean).

        How do you set the ack flag in version 1.0.0.M2?

        Comment


        • #5
          From my previous answer:
          Originally posted by Dave Syer View Post
          use channelTransacted=true

          Comment


          • #6
            Hello David,

            I have set the value of channelTransacted to true but I am still not seeing acked messages being committed and displayed in the rabbit management utility.

            Code:
            <bean id="messageListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer" scope="session">
            		<property name="connectionFactory" ref="connectionFactory" />
            		 <property name="queueName" value="cbs.push.queue"/>  		
            	    <property name="concurrentConsumers" value="1" />
            	    <property name="messageListener" ref="messageListenerAdapter" />
            	    <property name="channelTransacted" value="true" />
            	    <property name="autoStartup" value="false" />
            	    
            	</bean>
            I am expecting to see a count of ack'd messages just like the screen shot shows on the rabbitmq.com website describing the functionality of the management tool. http://www.rabbitmq.com/img/management/queue.png

            I have also implemented a consumer class which uses the Channel that calls the ack method.

            Code:
            while (runInfinite) {
            				QueueingConsumer.Delivery delivery;
            				try {
            					delivery = consumer.nextDelivery();
            					channel.txCommit();
            				} catch (InterruptedException ie) {
            					continue;
            				}
            				System.out.println("Message received"+ new String(delivery.getBody()));
            				channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
            I wasn't sure if the transaction manager was required logic. I was glad to find in the forum your posting stating it wasn't required.

            http://forum.springsource.org/showthread.php?p=336703
            "transaction manager is optional, and auto-ack should still be the default"

            I really don't know what else I need to do to get the ACKS to show up.

            Comment


            • #7
              Ack'd Messages

              The reason you aren't seeing any ack'd messages is becuase they are auto ack'd in your config.

              In order to see the messages you are wanting, you set autoAck to false. Setting channelTransacted=true is only half of that. The other is that you have to register your transactionManager with spring-amqp for that connection.

              Code:
              <bean id="listenerBean" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
              
              ...
              
              <property name="transactionManager" ref="txManager"/>
              <property name="channelTransacted" value="true"/>
              ...
              </bean>
              That will set the listener to not auto-ack your messages, and the mgmt console will start giving you a runnig count of the number of total, ready, and unack'd messages you have. In that scenario, unack'd messages represent those put into a listener's internal BlockingCache to be worked through, but have not yet been processed and ack'd. You can control the size of that list by setting the prefetch size of your listener.

              Comment


              • #8
                Thank you so much for responding with your advice on setting the transactionManager and the channelTransacted attribute on the container.

                I was finally able to get the implementation for the transaction manager implemented,
                Code:
                <bean id="messageListenerContainer"		class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"
                		scope="session">
                		<property name="connectionFactory" ref="connectionFactory" />
                		<property name="queueName" value="cbs.push.queue" />
                		<property name="concurrentConsumers" value="1" />
                		<property name="messageListener" ref="messageListenerAdapter" />
                		<property name="channelTransacted" value="true" />
                		<property name="autoStartup" value="false" />
                		<property name="transactionManager" ref="txnManager" />
                	</bean>
                
                <bean name="txnManager"
                		class="org.springframework.transaction.jta.JtaTransactionManager">
                		<property name="userTransaction">
                			<ref local="jotm" />
                		</property>
                	</bean>
                
                	<bean id="jotm" class="org.springframework.transaction.jta.JotmFactoryBean" />
                Tomcat does not support the standard spring JTA implementation and for this reason i had to use the jotm library.

                In the process I also learned that Rabbit only shows the ack'd messages per second. it does not capture the total number of ack'd messages that are picked up by a listener :-)

                Comment


                • #9
                  That's good. Just be aware that JOTM is buying you nothing here over any other transaction manager (Rabbit is not aware of it). And once again: the transaction manager is optional, it is only needed if your listener needs it (e.g. does something in a database).

                  Comment

                  Working...
                  X