Announcement Announcement Module
Collapse
No announcement yet.
Messages not ACK'd with AcknowledgeMode.AUTO without setting SimpleMessageListenerCon Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Messages not ACK'd with AcknowledgeMode.AUTO without setting SimpleMessageListenerCon

    I've noticed an issue when writing unit tests that run both Publisher and Consumer threads.

    In this scenario(but not if I run consumer and producer as separate processes), when I set the AcknowledgeMode to AUTO on a SimpleMessageListenerContainer, the broker does not appear to recieve an ACK for the recevied message, and after the test shuts down the message is requeued.

    But if I set the scope of the SimpleMessageListenerContainer to prototype, I get the correct behaviour.

    This is not done in the samples in the docs<http://static.springsource.org/spring-amqp/reference/html/> so I'm wondering if this is some kind of Channel thread-safety issue, or if I'm just doing something wrong?

    If it is just the case that SimpleMessageListenerContainer should be scoped as prototype, then can this be documented. And also, if using the <rabbit:listener-container> XML configuration, how would I set the scope?

    This was with environment:
    spring-amqp -1.0.0-RELEASE
    RabbitMQ-2.6.0
    Java-1.6.0_26

    See sample code below:


    Code:
    package example;
    
    import static org.junit.Assert.*;
    
    import java.util.concurrent.ArrayBlockingQueue;
    
    import org.apache.log4j.Logger;
    import org.junit.Before;
    import org.junit.Test;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Exchange;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Scope;
    
    public class ExampleTest
    {
    	private static final Logger log = Logger.getLogger(ExampleTest.class);
    	
    	@Autowired
    	private AmqpTemplate tmpl;
    	
    	@Autowired
    	private SimpleMessageListenerContainer container;
    	
    	@Autowired
    	private ExampleMessageHandler msgHandler;	
    	
    	@Before
    	public void setup()
    	{
    		ApplicationContext context = new AnnotationConfigApplicationContext(ExampleConfiguration.class);
    		context.getAutowireCapableBeanFactory().autowireBean(this);
    	}
    	
    	@Test(timeout=5000)
    	public void testPublishAndConsume() throws Exception
    	{		
    		// Create and start a Consumer thread.
    		container.start();		
    			
    		// Publish a job.
    		String sent = "msg1";
    		log.debug("Publishing message - " + sent);
    		tmpl.send(new Message(sent.getBytes(), new MessageProperties()));
    		log.debug("Message published - " + sent);
    		
    		// Wait for job to be received by consumer.
    		String received = msgHandler.waitForMessage();
    		log.debug("Recieved message = " + String.valueOf(received));
    		
    		assertNotNull(received);
    		assertEquals(sent, received);
    	}
    }
    
    class ExampleMessageHandler implements MessageListener
    {
    	private final static Logger log = Logger.getLogger(ExampleMessageHandler.class);
    	
    	private ArrayBlockingQueue<String> receivedStack = new ArrayBlockingQueue<String>(100);
    	
    	public void onMessage(Message msg)
    	{
    		// Add to stack in non-blocking way.
    		String body = new String(msg.getBody());
    		log.debug("Received job - " + String.valueOf(body));
    		try {
    			receivedStack.put(body);
    		} 
    		catch (InterruptedException e) {
    			System.err.println("Thread interrupted before passed job could be stored in stack.");
    		}
    	}
    	
    	public String waitForMessage() throws InterruptedException
    	{
    		// Call take() which blocks until a message is in the stack.
    		return receivedStack.take();
    	}
    }
    
    @Configuration
    class ExampleConfiguration
    {
    	private String host = "msrabbitmqqa.morningstar.com";
    	private int port = 5672;
    	private String username = "test";
    	private String password = "test";
    	private String vhost = "test";
    	
    	private String queueName = "test-queue";
    	private String exchangeName = "test-exchange";
    		
    	@Bean
    	public ConnectionFactory connectionFactory()
    	{
    		CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
    		connectionFactory.setPort(port);
    		connectionFactory.setUsername(username);
    		connectionFactory.setPassword(password);
    		connectionFactory.setVirtualHost(vhost);
    		return connectionFactory;
    	}
    
    	@Bean
    	public AmqpAdmin amqpAdmin()
    	{
    		return new RabbitAdmin(connectionFactory());
    	}
    
    	@Bean
    	public AmqpTemplate amqpTemplate()
    	{
    		RabbitTemplate template = new RabbitTemplate(connectionFactory());
    		template.setRoutingKey("");
    		template.setQueue(this.queueName);
    		template.setExchange(exchangeName);
    		return template;
    	}
    
    	@Bean
    	public Queue queue()
    	{
    		return new Queue(this.queueName, true, false, false);
    	}
    
    	@Bean
    	public Exchange exchange()
    	{
    		return new DirectExchange(exchangeName, true, false);
    	}
    
    	@Bean
    	public Binding binding()
    	{
    		return BindingBuilder.bind(queue()).to((DirectExchange) exchange()).with("");
    	}
    	
    	
    	@Bean ExampleMessageHandler messageListener()
    	{
    		return new ExampleMessageHandler();
    	}
    	
    	// NOTE: If uncomment below Scope line, then example works. As-is, the broker 
    	// received no ack from the Container even though have set mode to AUTO.
    	@Scope(value = "prototype")
    	@Bean
    	public SimpleMessageListenerContainer listenerContainer() 
    	{
    		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    		container.setConnectionFactory(connectionFactory());
    		container.setQueueNames(this.queueName);
    		container.setAcknowledgeMode(AcknowledgeMode.AUTO);		
    		container.setMessageListener(messageListener());
    		
    		return container;
    	}
    }
    thanks
    Mike

  • #2
    Your sample test works for me, so there must be something else going on. (Thanks for the code snippet, by the way.) My gugess is you have mouldy exhanges/queues etc. in your broker that are in some way conflicting with the declarations here. I can't really propose an exact scenario where that would make prototype scope have any difference, but since it works for me I can't really offer much else at this stage. Can you try again with a broker reset before you run the test ($ rabbitmqctl reset)?

    Comment


    • #3
      Hi Dave. I've tried after a broker reset but still get the same result. But I've just noticed that I had forgotten to comment out the @Scope(value = "prototype") line in the code snippet I posted. ie. the SimpleMessageListenerContainer Bean should have been:
      Code:
      // NOTE: If uncomment below Scope line, then example works. As-is, the broker 
      // received no ack from the Container even though have set mode to AUTO.
      //@Scope(value = "prototype")
      @Bean
      public SimpleMessageListenerContainer listenerContainer() 
      {
      	SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
      	container.setConnectionFactory(connectionFactory());
      	container.setQueueNames(this.queueName);
      	container.setAcknowledgeMode(AcknowledgeMode.AUTO);		
      	container.setMessageListener(messageListener());
      
      	return container;
      }
      And also note that the junit test does actually pass, but that the broker status reports a message remaining Ready in the queue after the test has completed.

      Comment


      • #4
        OK, sorry, I thought the test was supposed to fail. I actually cannot work out how to make this test fail, but I do see the extra message in the queue after the test exits using rabbitmqctl. I think the problem is that you have called container.start() twice (once implicitly when the context starts, and once explicitly in your test method). If you remove that call to container.start(), or change the container config to not start automatically, I think the normal singleton bean is fine. It *is* a bug, but not one that most users will encounter, so not very severe I think. If you can raise a JIRA ticket that would be great.

        Comment


        • #5
          Actually I think someone already logged this issue: https://jira.springsource.org/browse/AMQP-192.

          Comment


          • #6
            This issue might be relevant (preventing starting the container more than once):
            https://jira.springsource.org/browse/AMQP-192

            However, my first attempt to resolve that led to the question whether being able to invoke start() more than once is desirable based on this comment on SMLC's doStart() method:
            "Re-initializes this container's Rabbit message consumers, if not initialized already."

            That's a bit of a paradox. If it's "RE"-something that implies that something has already happened, but here it says "if not ... already".

            Dave, do you know off the top of your head if we can just return without doing anything if 'isRunning' returns TRUE already?

            Comment


            • #7
              Yes, removing the explicit call to container.start() has fixed it - I'd missed that it was being started automatically. Thanks for the help!

              Comment

              Working...
              X