Announcement Announcement Module
Collapse
No announcement yet.
Weird behavior of messageListener(); Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Weird behavior of messageListener();

    Hi,
    I have question regarding a weird behavior of the messageListener();. As the reference manual refers, the messageListener is asynchronous. I defined to different queues and routing keys, which properties look as follows:

    Code:
    amqp.schedule.routingKeyName=service.schedule
    amqp.schedule.routingKey=service.schedule
    amqp.schedule.queue=service.schedule
    amqp.schedule.prefetchCount=10
    
    amqp.compose.routingKeyName=service.compose
    amqp.compose.routingKey=service.compose
    amqp.compose.queue=service.compose
    amqp.compose.prefetchCount=2
    For each of them I defined a separate configuration looking like this:

    Code:
    @Value("${amqp.compose.routingKeyName}")
    private String ROUTING_KEY_NAME;
    	
    @Value("${amqp.compose.routingKey}")
    private String ROUTING_KEY;
    	
    @Value("${amqp.compose.queue}")
    private String QUEUE;
    	
    @Value("${amqp.compose.prefetchCount}")
    private int prefetchCount;
    	
    @Override
    public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
    	rabbitTemplate.setRoutingKey(ROUTING_KEY_NAME);		
    }
    		
    @Bean
    public SimpleMessageListenerContainer compositionListenerContainer() {
    	SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
    	container.setQueues(compositionQueue());
    	container.setMessageListener(compositionMessageListenerAdapter());
    	container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    	container.setPrefetchCount(prefetchCount);
    	container.setErrorHandler(errorHandler());
    	return container;
    }
    
    @Bean 
    public MessageListenerAdapter compositionMessageListenerAdapter() {
    	return new MessageListenerAdapter(amqpListener, jsonMessageConverter());		
    }
    The problem is, although the compositionMessageListenerAdapter refers to amqpListener, which implements ChannelAwareMessageListener and should be asynchronous, it does not care about the prefetchCounts in the properties file.

    Perhaps I have a lack of understanding, but what I was expecting from the prefetchCount was that for example, if the prefetchCount was set to 10, then onMessage(..) would be executed ten times..is this not the case?

    Any help appreciated!

  • #2
    In the excerpt you posted, the 'amqp.compose.prefetchCount' property has a value of 2, not 10.

    Comment


    • #3
      Hi Mark,
      yes. And I think I found the solution to my problem. The thing was I mixed up concurrentConsumers with prefetchCount. What I initially tried to achieve was having 2 composer consumers and 10 schedule consumers.

      Thanks for your help. :-)

      Comment


      • #4
        Hi Mark,
        coming back, I have some issues with my two consumers. Essentially both consumers are receiving the same message. Taking a look here: http://www.rabbitmq.com/tutorials/tu...-two-java.html this should not happen by default. In my version the output looks like this:

        Sample (with 2 consumers)
        Code:
        5972ccd9-2cbe-42d0-9545-ee1b2fb0be34
        executing COMPOSE: TIMEOUT 10 seconds
        5972ccd9-2cbe-42d0-9545-ee1b2fb0be34
        executing COMPOSE: TIMEOUT 10 seconds
        
        73e0ae06-23ad-456b-ab75-8acc05eff378
        executing COMPOSE: TIMEOUT 10 seconds
        73e0ae06-23ad-456b-ab75-8acc05eff378
        executing COMPOSE: TIMEOUT 10 seconds
        
        48efb80e-c55b-4bde-b8f7-4c052667c111
        executing COMPOSE: TIMEOUT 10 seconds
        45cceb20-2286-43b4-ad1a-d37723fe1354
        executing COMPOSE: TIMEOUT 10 seconds
        In the third iteration you can see the messages are not the same for the first time.

        Any idea why this happens?

        Configuration of SimpleMessageListenerContainer looks like this:


        Code:
        prefetchCount = 2;
        concurrentConsumers = 2;
        
        @Bean
        	public SimpleMessageListenerContainer compositionListenerContainer() {
        		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        		container.setQueues(compositionQueue());
        		container.setMessageListener(compositionMessageListenerAdapter());
        		container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        		container.setPrefetchCount(prefetchCount);
        		container.setErrorHandler(errorHandler());
        		container.setConcurrentConsumers(concurrentConsumers);
        		return container;
        	}
        Last edited by johanneshiemer; Jan 9th, 2012, 09:07 AM.

        Comment


        • #5
          Hi,
          I kept on working on the stuff. I figured out some quite interesting things. First of all the behavior is completely right if I remove the following piece of code:

          Code:
          restClient.prepare(host, userName, password, useSSL);
          restClient.connect();
          			
          configurationType = configuration.get(job.getParentId());
          from my onMessage() method there are no duplicate messages across the consumer (separation through the UUID):

          Code:
          e83147e2-e641-4346-9165-efee015f73b3 executing COMPOSE Mon Jan 09 19:19:29 CET 2012
          124e8c95-71b9-4adb-ae20-ba27a9dece8c executing COMPOSE Mon Jan 09 19:19:29 CET 2012
          46810668-57ee-4d3e-babf-b68ca5c1d2c0 executing COMPOSE Mon Jan 09 19:19:29 CET 2012
          
          f8b6d0e1-d0b7-4bf6-99df-d90454fc6c06 executing COMPOSE Mon Jan 09 19:19:39 CET 2012
          0dc14720-e9cf-447f-b9f3-836a0b6d3adb executing COMPOSE Mon Jan 09 19:19:39 CET 2012
          4ce08229-e408-4f6f-8dac-e3a5078e8631 executing COMPOSE Mon Jan 09 19:19:39 CET 2012
          Adding the code again, create the duplicates:

          Code:
          834f06dc-7028-475d-a482-e9ac1631123a executing COMPOSE Mon Jan 09 19:17:15 CET 2012
          834f06dc-7028-475d-a482-e9ac1631123a executing COMPOSE Mon Jan 09 19:17:15 CET 2012
          834f06dc-7028-475d-a482-e9ac1631123a executing COMPOSE Mon Jan 09 19:17:15 CET 2012
          
          34e0cd91-d652-4db6-b1f2-e6279888658d executing COMPOSE Mon Jan 09 19:17:27 CET 2012
          34e0cd91-d652-4db6-b1f2-e6279888658d executing COMPOSE Mon Jan 09 19:17:27 CET 2012
          34e0cd91-d652-4db6-b1f2-e6279888658d executing COMPOSE Mon Jan 09 19:17:27 CET 2012
          Being honest, I do not know why. The restClient is attached to the implementation of ChannelAwareListener like this:

          Code:
          @Autowired
          private IRestClient restClient;
          Please help me, this is driving me nuts..!

          Comment


          • #6
            Hi,
            me again. I am beginning to think that this is a bug. It is quite easy to reproduce the issue again and again.

            Could anyone of the team please take a look at it?

            Comment


            • #7
              It sounds like maybe your message listener is probably failing and causing the messages to rollback. That wouldn't be a bug. But there isn't enough information about your client to really see what is happening. Can you post the implementation of your amqpListener? Look at the logs from your consumer to see if it is failing?

              Comment


              • #8
                Hi Dave,
                thanks a lot for your response. First of all here is my implementation of the amqpListener:

                Code:
                @Component
                public class ComposeListener implements IComposeListener {
                
                	private Exception serviceException;
                
                	private Job job;
                
                	@Autowired(required = true)
                	private IAMQPListenerHelper listenerHelper;
                	
                	@Autowired(required = true)
                	private volatile RabbitTemplate rabbitTemplate;
                	
                	@Value("${amqp.status.routingKeyName}")
                	protected String STATUS_RK;
                	
                	@Value("${amqp.baseExchange}")
                	protected String EXCHANGE_NAME;
                	
                	@Autowired
                	private IRestClient restClient;
                	
                	@Value("${host.address}")
                	private String host;
                	
                	@Value("${host.username}")
                	private String userName;
                
                	@Value("${host.password}")
                	private String password;
                	
                	@Value("${host.useSSL}")
                	private boolean useSSL;
                	
                	/**
                	 * @return the serviceException
                	 */
                	public Exception getServiceException() {
                		return serviceException;
                	}
                
                	/**
                	 * @param serviceException the serviceException to set
                	 */
                	public void setServiceException(Exception serviceException) {
                		this.serviceException = serviceException;
                	}
                
                	@Override
                	public void onMessage(Message message, Channel channel) throws Exception {
                		
                		/**
                		 * TODO Check JOB for its validity
                		 */
                		try {
                			job = listenerHelper.convertMessage(message);
                			
                			if (!job.getCommand().equals(Type.ACKNOWLEDGE.toString())) {
                				restClient.prepare(host, userName, password, useSSL);
                				restClient.connect();
                			}
                		} catch (Exception ex) {
                			setServiceException(ex);
                		}
                                if (job.getCommand().equals(Type.COMPOSE.toString())) {
                			System.out.println(job.getId() + " executing COMPOSE " + new Date().toString());
                //			try {				
                //				compose.execute(platformType, configurationType);
                //			} catch (Exception ex) {
                //				setServiceException(ex);
                //			}
                			Random rand = new Random();
                			//Thread.sleep(rand.nextInt(10000));
                			Thread.sleep(10000);
                		}
                
                	}
                I attached the output to log.txt as it otherwise would disrupt the whole thread.

                Thanks a lot for your help!

                UPDATE:
                Another bit of information. It seems to be, that the behavior is not related to the restClient. If I replace the restClient with Thread.Sleep(10000); then the same output with duplicate message occurs as well:
                Code:
                af646cfc-c879-4757-a6dc-ddf6f18230a8 executing COMPOSE Tue Jan 10 10:20:02 CET 2012
                af646cfc-c879-4757-a6dc-ddf6f18230a8 executing COMPOSE Tue Jan 10 10:20:02 CET 2012
                af646cfc-c879-4757-a6dc-ddf6f18230a8 executing COMPOSE Tue Jan 10 10:20:02 CET 2012
                Last edited by johanneshiemer; Jan 10th, 2012, 04:21 AM.

                Comment


                • #9
                  Hi,
                  as I got not reply yet from Dave, I kept on digging. An interesting thing I found is, that each Consumer gets a separate message, but when converting the message from JSON to object the error occurs:

                  Code:
                  {"id":"6e9b12ab-a405-41cd-85eb-22dfb507f7da","command":"COMPOSE","parentId":"3c9ace8c-3fdf-4e11-bf1c-00dc02dcc364","parentEntity":"CONFIGURATION","startDate":null,"endDate":null,"status":0,"statusMessage":null,"user":null,"parent":null}
                  {"id":"13092f44-8947-4e2a-8725-6780ae360f0f","command":"COMPOSE","parentId":"3c9ace8c-3fdf-4e11-bf1c-00dc02dcc364","parentEntity":"CONFIGURATION","startDate":null,"endDate":null,"status":0,"statusMessage":null,"user":null,"parent":null}
                  {"id":"f269d809-0dda-4219-9827-b80538bfff39","command":"COMPOSE","parentId":"3c9ace8c-3fdf-4e11-bf1c-00dc02dcc364","parentEntity":"CONFIGURATION","startDate":null,"endDate":null,"status":0,"statusMessage":null,"user":null,"parent":null}
                  6e9b12ab-a405-41cd-85eb-22dfb507f7da executing COMPOSE Tue Jan 10 13:10:53 CET 2012
                  6e9b12ab-a405-41cd-85eb-22dfb507f7da executing COMPOSE Tue Jan 10 13:10:53 CET 2012
                  6e9b12ab-a405-41cd-85eb-22dfb507f7da executing COMPOSE Tue Jan 10 13:10:53 CET 2012
                  As you can see, the first message was taken...

                  The stub to test this looks like this:

                  Code:
                  @Override
                  	public void onMessage(Message message, Channel channel) throws Exception {
                  		
                  		/**
                  		 * TODO Check JOB for its validity
                  		 */
                  		try {
                  			String value = new String(message.getBody());
                  			System.out.println(value);
                  			JsonMessageConverter jasonMsgConverter = new JsonMessageConverter();
                  			job = (Job)jasonMsgConverter.fromMessage(message);
                  Dave/Mark, am I doing something wrong here?

                  Comment


                  • #10
                    Have you tried just these two lines of code in isolation passing in a Message instance with the same content as in the app?:

                    Code:
                    JsonMessageConverter jasonMsgConverter = new JsonMessageConverter();
                    job = (Job)jasonMsgConverter.fromMessage(message);

                    Comment


                    • #11
                      Hi Mark,
                      yes and an interesting fact is, that if you take a look at my ComposeListener above you can see the second var: private Job job;. If I put the variable into the onMessage(Message message...) method, then the messages are correct. So it seems that the issue is related to the Job var. But I don't have any idea why...do you have a clue?

                      Below the output mit local var in the method:

                      Code:
                      {"id":"af646cfc-c879-4757-a6dc-ddf6f18230a8","command":"COMPOSE","parentId":"3c9ace8c-3fdf-4e11-bf1c-00dc02dcc364","parentEntity":"CONFIGURATION","startDate":null,"endDate":null,"status":0,"statusMessage":null,"user":null,"parent":null}
                      {"id":"bd8cc583-acbf-4075-8bdc-1fd94d2a50a3","command":"COMPOSE","parentId":"3c9ace8c-3fdf-4e11-bf1c-00dc02dcc364","parentEntity":"CONFIGURATION","startDate":null,"endDate":null,"status":0,"statusMessage":null,"user":null,"parent":null}
                      {"id":"13092f44-8947-4e2a-8725-6780ae360f0f","command":"COMPOSE","parentId":"3c9ace8c-3fdf-4e11-bf1c-00dc02dcc364","parentEntity":"CONFIGURATION","startDate":null,"endDate":null,"status":0,"statusMessage":null,"user":null,"parent":null}
                      
                      af646cfc-c879-4757-a6dc-ddf6f18230a8 executing COMPOSE Tue Jan 10 13:53:38 CET 2012
                      13092f44-8947-4e2a-8725-6780ae360f0f executing COMPOSE Tue Jan 10 13:53:38 CET 2012
                      bd8cc583-acbf-4075-8bdc-1fd94d2a50a3 executing COMPOSE Tue Jan 10 13:53:38 CET 2012

                      Comment


                      • #12
                        Yes, that's exactly what the problem is; since you are using an instance var there to store state passed in with each message, the code is not thread safe.

                        Comment


                        • #13
                          ClassCastException? The logs from the client should tell you (I think it might be DEBUG level, not sure).

                          Comment

                          Working...
                          X