Announcement Announcement Module
Collapse
No announcement yet.
Listeners do not receive messages after a bit of inactivity. Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Listeners do not receive messages after a bit of inactivity.

    Hi, all!

    I setup my listener as I think of it in no particularly special way. But what I am noticing is that initially listeners are receiving messages, then there is a period where I stop sending anything for a while (10-20 minutes). After resuming message sending the listener hears nothing of it (my logs do not show that blockingqueueconsumer picked up anything). I am wondering what is the reason for this?

    Here is my setup for the listener:

    <bean id="cc.listener.container"class="org.springframewo rk.amqp.rabbit.listener.SimpleMessageListenerConta iner">
    <property name="channelTransacted" value="false" />
    <property name="concurrentConsumers" value="2" />
    <property name="prefetchCount" value="1" />
    <property name="connectionFactory" ref="rabbitConnectionFactory" />
    <property name="acknowledgeMode" value="MANUAL" />
    <property name="queues" value="cc.${instance.name}.queue" />
    <property name="autoStartup" value="true" />
    <property name="messageConverter" ref="rabbitmq.messageConverter" />
    <property name="delegateListener" ref="rabbitmq.ccMessageListener" />
    </bean>

    does anyone have any ideas of what might be wrong with this? How to debug this problem? On rabbit's web UI I see that messages are un-acknowledged whenever this happens. Rabbit's logs do not indicate any problems. Logs on the client side of things are just regular polling:

    Logs on the client side when this happens indicate nothing of interest just the usual polling message:
    2012-10-12 15:15:35,251 DEBUG [org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer] - Retrieving delivery for Consumer: tag=[amq.ctag-A0B0-MDfTy5QiLUpgqXWNT], channel=Cached Rabbit Channel: AMQChannel(amqp://alex@localhost:5672/alex,3), acknowledgeMode=MANUAL local queue size=0

    I am using spring amqp/rabbit release 1.1.2.RELEASE, amqp client 2.8.4 and my server is running 2.8.6 of rabbit.

    Any help is greatly appreciated.

    Cheers,
    Alex.
    Last edited by ashneyderman; Oct 12th, 2012, 09:17 AM.

  • #2
    Just a bit of additional information. BTW, I can consistently reproduce this problem (after about 3/4 minutes of inactivity listener stops pumping the messages). I ran rabbitmqctl report on the server and I see this on the queue where message is not being consumed:

    <'rabbit@ip-10-32-14-234'.1.23181.13> cc.alex.queue false false [{"x-dead-letter-exchange","Error Messages"},{"x-dead-letter-routing-key","cc.error"}] 0 35024 [{q1,0}, {q2,0}, {delta,{delta,undefined,0,undefined}}, {q3,0}, {q4,0}, {len,0}, {pending_acks,1}, {target_ram_count,infinity}, {ram_msg_count,0}, {ram_ack_count,1}, {next_seq_id,27}, {persistent_count,0}, {avg_ingress_rate,0.003245172815983037}, {avg_egress_rate,0.003245172815983037}, {avg_ack_ingress_rate,0.003245172815983037}, {avg_ack_egress_rate,0.0}]

    So, it seems that the message is not being acknowledged hence -> "{pending_acks,1}". Is there a reason anyone can think of that this might be the case? Anyone got any ideas?

    Cheers,
    Alex.

    Comment


    • #3
      Best guess is the listener is holding up the container threads.

      Can you take a stack dump (using jstack or VisualVM) when it is "stuck" ??

      A debug level log would likely be helpful too.

      Comment


      • #4
        here is the thread dump. http://pastebin.com/Dy8h70HZ

        I see nothing suspicious in there. But then again I am not sure what to look for.

        BTW, amqp client is 2.8.6 version now. I updated jsut to make sure there is no problem with version incompatibilities.

        Comment


        • #5
          Yes, looks ok.

          I just noticed you have
          <property name="acknowledgeMode" value="MANUAL" />
          This means you are responsible for the ack. Any reason you are not using the default - AUTO (which is different to rabbit autoAck). AUTO means the container with ack or nack the message after processing - ACK for normal, reject (with requeue by default) when an exception is thrown. NONE means Rabbit autoack.

          Comment


          • #6
            Originally posted by Gary Russell View Post
            Yes, looks ok.

            I just noticed you have

            This means you are responsible for the ack. Any reason you are not using the default - AUTO (which is different to rabbit autoAck). AUTO means the container with ack or nack the message after processing - ACK for normal, reject (with requeue by default) when an exception is thrown. NONE means Rabbit autoack.

            Nah, that was the first version. When I noticed the problem, I was thinking well I fucked something up let me change it to let spring manage it for me, just to see if it really was me. Changing that value to AUTO made no difference to the end result. So, at the time thread dump was produced the setting is set to AUTO (and all the rest of the adjustments were made to make that work).

            Cheers,
            Alex.

            Comment


            • #7
              Next step would be a DEBUG (or preferably TRACE) level log, covering the period from when it was working to when it stopped.

              Comment


              • #8
                ok. I did a little experiment and stripped down everything to the bare minimum. Code and config. The log of the execution can be found here:

                http://pastebin.com/Nx0gD7RA

                The Java sources and configs are here:

                http://pastebin.com/whNfemxY

                I publish the message from Web UI. the first one happened right after the start of the program run (see line 234 of the log). Then the next one happened a little before 4 minutes afterwards (see line 633 of the log) and that one worked. Then I published another message a little bit over 5 minutes later and it never came through (5 minutes sounds suspiciously like a setting some place). The console showed that one message is unacknowledged.

                Cheers,
                Alex.
                Last edited by ashneyderman; Oct 12th, 2012, 04:28 PM.

                Comment


                • #9
                  Very strange - it does appear to be behaving like the ackMode is MANUAL.

                  Does it always fail on the 3rd message? (This would make sense because you have 2 consumers).

                  Can you run it in a debugger and break in BlockingQueueConsumer.commitIfNecessary() - you should see it step into this code...

                  Code:
                  if (!deliveryTags.isEmpty()) {
                  	long deliveryTag = new ArrayList<Long>(deliveryTags).get(deliveryTags.size() - 1);
                  	channel.basicAck(deliveryTag, true);
                  }
                  BTW, this is not related, but the MissingMessageIdAdvice needs a reference to the RetryTemplate's cache; your config has it using it's own instance. It needs a reference to the template's cache because messages without an ID are never retried and the advice needs a reference to the template's cache to clean it up.

                  See the test case, where we share the cache...

                  Code:
                  		// use an external template so we can share his cache
                  		RetryTemplate retryTemplate = new RetryTemplate();
                  		RetryContextCache cache = new MapRetryContextCache();
                  		retryTemplate.setRetryContextCache(cache);
                  		fb.setRetryOperations(retryTemplate);
                  
                  		// give him a reference to the retry cache so he can clean it up
                  		MissingMessageIdAdvice missingIdAdvice = new MissingMessageIdAdvice(cache);
                  This advice is a work-around for cases when an occasional message is missing an ID because the interceptor can't handle a message that doesn't have an ID. If no messages ever have an ID, the retry interceptor is worthless and can be removed.

                  Comment


                  • #10
                    FYI, I just pasted your config, pretty much verbatim, into the spring-integration AMQP sample app (replacing the inbound adapter with your container/test listener), and everything works fine. I even waited 5 minutes...

                    Code:
                    diff --git a/basic/amqp/pom.xml b/basic/amqp/pom.xml
                    index 3edaf6c..e619cd3 100644
                    --- a/basic/amqp/pom.xml
                    +++ b/basic/amqp/pom.xml
                    @@ -12,7 +12,7 @@
                     
                     	<properties>
                     		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                    -		<spring.integration.version>2.1.0.RELEASE</spring.integration.version>
                    +		<spring.integration.version>2.2.0.RC1</spring.integration.version>
                     		<log4j.version>1.2.16</log4j.version>
                     		<junit.version>4.10</junit.version>
                     	</properties>
                    diff --git a/basic/amqp/src/main/resources/META-INF/spring/integration/spring-integration-context.xml b/basic/amqp/src/main/resources/META-INF/spring/integration/spring-integration-context.xml
                    index 125457f..50e057e 100644
                    --- a/basic/amqp/src/main/resources/META-INF/spring/integration/spring-integration-context.xml
                    +++ b/basic/amqp/src/main/resources/META-INF/spring/integration/spring-integration-context.xml
                    @@ -5,11 +5,13 @@
                     	xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
                     	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
                     	xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
                    +	xmlns:util="http://www.springframework.org/schema/util"
                     	xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
                     		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                     		http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
                     		http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
                    -		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
                    +		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                    +		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd">
                             
                         <!-- From STDIN To RabbitMQ -->
                     
                    @@ -25,9 +27,49 @@
                     
                         <!-- From RabbitMQ To STDOUT -->
                     
                    -    <int-amqp:inbound-channel-adapter channel="fromRabbit" 
                    -                                      queue-names="si.test.queue"
                    -                                      connection-factory="connectionFactory" />
                    +<!--     <int-amqp:inbound-channel-adapter channel="fromRabbit"  -->
                    +<!--                                       queue-names="si.test.queue" -->
                    +<!--                                       connection-factory="connectionFactory" /> -->
                    +
                    +    <bean id="listener.container" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
                    +        <property name="channelTransacted" value="false" />
                    +        <property name="concurrentConsumers" value="2" />
                    +        <property name="prefetchCount" value="1" />
                    +        <property name="connectionFactory" ref="connectionFactory" />
                    +        <property name="acknowledgeMode" value="AUTO" />
                    +        <property name="queues" value="si.test.queue" />
                    +        <property name="autoStartup" value="true" />
                    +        <property name="messageListener">
                    +            <bean class="org.springframework.integration.samples.amqp.SimpleMsgListener" />
                    +        </property>
                    +        <property name="adviceChain" ref="retryChain" />
                    +    </bean>
                    +
                    +    <bean id="retryOperationsInterceptor" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
                    +        <property name="retryOperations">
                    +            <bean class="org.springframework.retry.support.RetryTemplate">
                    +                <property name="retryPolicy">
                    +                    <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                    +                        <property name="maxAttempts" value="2" />
                    +                    </bean>
                    +                </property>
                    +                <property name="backOffPolicy">
                    +                    <bean class="org.springframework.retry.backoff.FixedBackOffPolicy">
                    +                        <property name="backOffPeriod" value="1000" />
                    +                    </bean>
                    +                </property>
                    +            </bean>
                    +        </property>
                    +    </bean>
                    +
                    +    <util:list id="retryChain">
                    +        <bean class="org.springframework.amqp.rabbit.retry.MissingMessageIdAdvice">
                    +            <constructor-arg>
                    +                <bean class="org.springframework.retry.policy.MapRetryContextCache" />
                    +            </constructor-arg>
                    +        </bean>
                    +        <ref bean="retryOperationsInterceptor" />
                    +    </util:list>
                     
                         <int:channel id="fromRabbit">
                             <int:interceptors>
                    Code:
                    18:24:02.478 INFO  [main][org.springframework.integration.samples.amqp.Main] 
                    =========================================================
                                                                             
                              Welcome to Spring Integration!                 
                                                                             
                        For more information please visit:                   
                        http://www.springsource.org/spring-integration       
                                                                             
                    =========================================================
                    18:24:03.828 WARN  [main][org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] CachingConnectionFactory's channelCacheSize can not be less than the number of concurrentConsumers so it was reset to match: 2
                    18:24:04.156 INFO  [main][org.springframework.integration.samples.amqp.Main] 
                    =========================================================
                                                                              
                        This is the AMQP Sample -                             
                                                                              
                        Please enter some text and press return. The entered  
                        Message will be sent to the configured RabbitMQ Queue,
                        then again immediately retrieved from the Message     
                        Broker and ultimately printed to the command line.    
                                                                              
                    =========================================================
                    ddd
                    18:24:09.117 INFO  [SimpleAsyncTaskExecutor-1][org.springframework.integration.samples.amqp.SimpleMsgListener] Message received: ddd
                    fff
                    18:24:11.996 INFO  [SimpleAsyncTaskExecutor-2][org.springframework.integration.samples.amqp.SimpleMsgListener] Message received: fff
                    ggg
                    18:24:14.997 INFO  [SimpleAsyncTaskExecutor-1][org.springframework.integration.samples.amqp.SimpleMsgListener] Message received: ggg
                    hhh
                    18:24:17.998 INFO  [SimpleAsyncTaskExecutor-2][org.springframework.integration.samples.amqp.SimpleMsgListener] Message received: hhh
                    hhh
                    18:25:00.009 INFO  [SimpleAsyncTaskExecutor-1][org.springframework.integration.samples.amqp.SimpleMsgListener] Message received: hhh
                    123
                    18:25:08.011 INFO  [SimpleAsyncTaskExecutor-2][org.springframework.integration.samples.amqp.SimpleMsgListener] Message received: 123
                    wait for 5 mins
                    18:30:55.089 INFO  [SimpleAsyncTaskExecutor-1][org.springframework.integration.samples.amqp.SimpleMsgListener] Message received: wait for 5 mins

                    Comment


                    • #11
                      I am starting to think that there is probably something going on with the server. It looks as if it does something with the channel, while I am not looking.

                      just tried local rabbit and this does NOT happen with the local setup.

                      We do host on amazon so localhost scenario is not really what will be happening in production :-(

                      If anything jumps at you given this info ... I am going to go and play with the debugger as per your suggestion.

                      Cheers,
                      Alex.
                      Last edited by ashneyderman; Oct 12th, 2012, 11:55 PM.

                      Comment


                      • #12
                        Just did a little experiment with heartbeats (set it to 60 seconds) for the connection and that worked wonders for me.

                        The only problem now is that I can not use auto-delete queues since server can kill the connection while instance is still running. That would be handy since a lot of our queues make sense on per-instance (consuming instance that is) basis, so queue needs to be removed on each restart of the instance. Anyhow, I should probably find a way to purge the queue before I start listening but that is a totally different question.

                        Gary, thanks for all your help!

                        Cheers,
                        Alex.

                        Comment


                        • #13
                          Also, it is not clear what version of the amqp client and spring's rabbit jars did you use in your run? they are probably coming as transitive dependencies from spring integration which I do not use.
                          Yeah it's pulling in 1.1.2 and the 2.8.4 client. I used the SI app because I had it handy and just dropped in your config.

                          just tried local rabbit and this does NOT happen with the local setup.
                          Cool. Strange that it stops with Amazon - if the connection is actually broken, the consumer will reconnect, so it sounds like something else is going on.

                          Comment

                          Working...
                          X