Announcement Announcement Module
Collapse
No announcement yet.
SimpleMessageListenerContainer in Tasklet start- handler not completing before -stop Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • SimpleMessageListenerContainer in Tasklet start- handler not completing before -stop

    Hello,

    AMQP version 1.2.0

    Relevant thread: http://forum.springsource.org/showth...tenerContainer

    I have a tasklet that is scheduled to run every 5 minutes. In it is a SimpleMessageListenerContainer that is started and its handler is suppose to consume the messages - requeueing messages that the system isn't ready to process or move to another queue for processing.

    In the RepeatStatus execute function:

    Code:
    container.afterPropertiesSet();
    container.start(); // autoStartup = false in xml
    container.stop();
    container.destroy();
    System.out.println("DbPauseConsumerTasklet triggered!");
    return RepeatStatus.FINISHED;
    My handler code (implements ChannelAwareMessageListener) is:

    Code:
            @Override
    	public void onMessage(Message message, Channel channel) throws Exception {
    		byte[] body = message.getBody();
    		String strbody = new String(body);
    		Gson gson = new Gson();
    		dbStatusMessage = gson.fromJson(strbody, DbStatusMessage.class);
    		if (dbStatusMessage != null) {
    			K12Architecture arch = getArchDao().selectMDYByMfrDaYear(dbStatusMessage.getMfrOrgId(),
    					dbStatusMessage.getDaOrgId(), dbStatusMessage.getCalYear());
    			// db validating - put back on queue
    			if (arch != null && "validating".equals(arch.getEnv())) {
    				System.out.println("arch id: " + arch.getId() + " env: " + arch.getEnv());
    				System.out.println(dbStatusMessage.getQueue() + " being put back on queue...");
    				channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    			} else {
    				System.out.println("acknowledging and moving message to validation.ready");
    				channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    				getDbReadyProducer().insertMessageToQueue(dbStatusMessage);
    			}
    			
    		}
    		System.out.println("getting to here....");
    	}
    The output is:

    Code:
    arch id: 2884 env: validating
    validation.inserts.TEST_10_2012 being put back on queue...
    getting to here....
    arch id: 2884 env: validating
    validation.inserts.TEST_10_2012 being put back on queue...
    getting to here....
    DbPauseConsumerTasklet triggered!
    I understand what is currently going on to be the handler is on a different thread, so my handler triggers but is unable to completely process the messages before container.stop() is called (I am testing with 8 messages on this queue).

    So, I think I need a synchronous consumer - amqptemplate and use http://forum.springsource.org/showth...tenerContainer and http://forum.springsource.org/showth...t=amqpTemplate as my guides

    but it is the '...' between the stop and start (139002) that has me stumped in the above url link. What am I putting there to ensure my handler processes all messages? My attempt:

    Code:
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(getConnectionFactory());
    container.setQueueNames("validation.pause");
    container.setMessageListener(getTemplate());
    container.afterPropertiesSet();
    container.start();
    
    message = getTemplate().receive();
    byte[] body = message.getBody();
    String strbody = new String(body);
    System.out.println("message: " + strbody);
    Gson gson = new Gson();
    dbStatusMessage = gson.fromJson(strbody, DbStatusMessage.class);
    if (dbStatusMessage != null) {
    	K12Architecture arch = getArchDao().selectMDYByMfrDaYear(dbStatusMessage.getMfrOrgId(),
    	dbStatusMessage.getDaOrgId(), dbStatusMessage.getCalYear());
    	if (arch != null && "validating".equals(arch.getEnv())) {
    		System.out.println("arch id: " + arch.getId() + " env: " + arch.getEnv());
    		System.out.println(dbStatusMessage.getQueue() + " being put back on queue...");
    		getTemplate().convertAndSend(dbStatusMessage);
    	} else {
    		System.out.println("acknowledging and moving message to validation.ready");
    		getDbReadyProducer().insertMessageToQueue(dbStatusMessage);
    	}
    
    }
    
    container.stop();
    container.destroy();
    
    System.out.println("DbPauseConsumerTasklet triggered!");
    return RepeatStatus.FINISHED;
    and get:

    Code:
    message: {"daOrgId":10,"queue":"validation.inserts.TEST_10_2012","calYear":"2012","mfrOrgId":1}
    WARN : org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Execution of Rabbit message listener failed, and no ErrorHandler has been set.
    org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:647)
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:586)
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:482)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:61)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:110)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:611)
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:454)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:474)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:458)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:61)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:551)
    	at java.lang.Thread.run(Thread.java:722)
    Caused by: java.lang.NullPointerException
    	at java.lang.String.<init>(String.java:481)
    	at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:790)
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:584)
    	... 10 more
    I need the container to be temporary (appear to have that), the amqptemplate to consume ALL messages (not happening) and then the container to be destroyed (appear to have that).

    Please advise,

    Thank you.

  • #2
    When are you "done" ? Is it simply when the queue is empty? If so, you can use

    Code:
    while ((int) rabbitAdmin.getQueueProperties("validation.pause").get("QUEUE_MESSSAGE_COUNT") > 0) {
        Thread,sleep(1000);
    }
    container.stop();
    But, bear in mind a new message might arrive in the small time between the while loop ending and the stop().

    Comment


    • #3
      Thanks Gary for the quick reply and input.

      I think my last sentence is misleading (sorry). Say I have 8 messages in 'validation.pause' I want those consumed, checked to see if processing is ready for that message and if not, put back on 'validation.pause' queue, otherwise moved to 'validation.ready' queue and stop. So one pass (every 5 minutes) might mean all consumed and queue count 0 or 8 consumed and 2 put back on 'validation.pause' to be checked at the next 5 minute interval.

      Thank you.

      Comment


      • #4
        Putting it back on the queue will cause it to be immediately redelivered. You probably want to move those to another queue. Maybe use validation.pause1 and validation.pause2 - and then toggle between them.

        First run reads from 1 and puts not ready messages on 2; when 1 is empty, stop. Next run reads from 2 and puts not ready on 1; when empty, stop. Repeat.

        Comment


        • #5
          Okay - will do the pause1, pause2 back and forth.

          Obviously still doing something wrong and being a bit slow....

          Code:
                  @Override
          	public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
          		container = new SimpleMessageListenerContainer(getConnectionFactory());
          		container.setQueueNames("validation.pause1");
          		container.setMessageListener(getPause1Template());
          		container.afterPropertiesSet();
          		container.start();
          
          		message = getPause1Template().receive();
          		byte[] body = message.getBody();
          		String strbody = new String(body);
          		System.out.println("message: " + strbody);
          		Gson gson = new Gson();
          		dbStatusMessage = gson.fromJson(strbody, DbStatusMessage.class);
          		if (dbStatusMessage != null) {
          			K12Architecture arch = getArchDao().selectMDYByMfrDaYear(dbStatusMessage.getMfrOrgId(),
          					dbStatusMessage.getDaOrgId(), dbStatusMessage.getCalYear());
          			if (arch != null && "validating".equals(arch.getEnv())) {
          				System.out.println("arch id: " + arch.getId() + " env: " + arch.getEnv());
          				System.out.println(dbStatusMessage.getQueue() + " being put on pause2...");
          				getPause2Template().convertAndSend(dbStatusMessage);
          			} else {
          				System.out.println("acknowledging and moving message to validation.ready");
          				getDbReadyProducer().insertMessageToQueue(dbStatusMessage);
          			}
          
          		}
          
          		while ((int) getAdmin().getQueueProperties("validation.pause1").get("QUEUE_MESSAGE_COUNT") > 0) {
          			Thread.sleep(1000);
          		}
          		container.stop();
          		container.destroy();
          
          		System.out.println("DbPauseConsumerTasklet triggered!");
          		return RepeatStatus.FINISHED;
          	}
          gives me:

          Code:
          message: {"daOrgId":10,"queue":"validation.inserts.TEST_10_2012","calYear":"2012","mfrOrgId":1}
          WARN : org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Execution of Rabbit message listener failed, and no ErrorHandler has been set.
          org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
          	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:647)
          	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:586)
          	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:482)
          	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:61)
          	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:110)
          	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:611)
          	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:454)
          	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:474)
          	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:458)
          	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:61)
          	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:551)
          	at java.lang.Thread.run(Thread.java:722)
          Caused by: java.lang.NullPointerException
          	at java.lang.String.<init>(String.java:481)
          	at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:790)
          	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:584)
          	... 10 more
          which appears to be because RabbitTemplate does not have an 'onMessage()' method....?

          Please advise,

          Comment


          • #6
            Stand corrected - RabbitTemplate does have an onMessage() method.

            Comment


            • #7
              Oh, wow - I didn't notice that.

              The Rabbit template's onMessage method is used for async replies to ...sendAndReceive... calls when using an explicit reply queue - it is a special use case.

              You shouldn't be using the RabbitTemplate for arbitrary message reception like that. If you just want to use the receive() method, you don't need a listener container at all. Either that, or you need your own MessageListener; you can't use the RabbitTemplate this way.

              Looks like we should add some code to ensure you don't wire the template as a listener if you are not using it in that mode.
              Last edited by Gary Russell; Aug 30th, 2013, 12:58 PM.

              Comment


              • #8
                You might have been led astray by that second thread you cited - that was specifically all about how to programmatically configure the RabbitTemplate as the listener for async replies.

                Comment


                • #9
                  Easily led astray ;-)

                  I think I have got it now, hopefully not famous last words... Greatly appreciate the assistance and AMQP work.

                  Best wishes and a good long weekend.

                  Comment

                  Working...
                  X