Announcement Announcement Module
Collapse
No announcement yet.
MessageRejectedWhileStoppingException while stopping Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • MessageRejectedWhileStoppingException while stopping

    Hi,
    I'm using spring-amqp along with RabbitMQ message broker which receives the messages from the queue and process them. also, I need to manage the incoming traffic from the MessageConsumer which reads from the queue and pass it to the core of application, so I created a InputTrafficMonitor bean which decides whether to stop/start the incoming traffic (from the message consumers) with stop/starting the injected SimpleMessageListenerContainer bean ( with calling the corresponding stop/start methods - I don't know if it's the right way to do that or not);
    so I started a stress-test and found that some messages is being lost in the case of stopping the messageListenerContainer ! I checked the logs and found bunch of MessageRejectedWhileStoppingException s as follows :

    Code:
    2012-10-29 14:55:19,004 WARN  [amqp.rabbit.listener.SimpleMessageListenerContainer:557] - Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.rabbit.listener.MessageRejectedWhileStoppingException: Message listener container was stopping when a message was received

    I. any clue how to handle/solve this problem ?
    II. is there any better way to pause the incoming traffic from the consumers rather than stop/starting the MessageListenerContainer ?

    thanks in advance
    Last edited by nima; Oct 29th, 2012, 03:27 AM.

  • #2
    Did you change the container's acknowledgeMode from its default (AUTO)?

    When the exception is thrown the container, by default, tells the broker to requeue the message.

    Comment


    • #3
      Hi Gary.
      nope ! the acknowledgeMode is being set to AUTO
      thanks again

      Comment


      • #4
        Hmmm... interesting. I just ran some tests and the message isn't "lost", it's sitting in Rabbit in an un-ackd state and is not redelivered after restarting the container with container.start(). When I kill the JVM, the message goes back to 'ready' and is delivered when I restart the JVM.

        I'll take a look to see what's happening; please open a JIRA ticket...

        https://jira.springsource.org/browse/AMQP

        Comment


        • #5
          I have figured out what's going on; we have a race condition where the reject is done before the consumer is actually canceled, so messages can be delivered to the "old" (stopped) consumer and remain in an un-ackd state.

          The solution is to immediately cancel the consumer when the stop() is received so when we reject any in-flight messages because the container is stopped, we don't get any unintended redeliveries.

          I should be able to get the fix into the 1.1.3 release which is currently due this week.

          Comment


          • #6
            Hi Gary, and thanks for your reply
            I'm so happy to hear that you found the problem. but, ...

            so messages can be delivered to the "old" (stopped) consumer and remain in an un-ackd state
            well, I ran a simple test with 300,000 incoming messages and tried to randomly stop/start the MessageListenerContainer (40 times happened), and I found that 299,519 messages has been received correcty, just 18 of them were in unacked state (based on info. in web-based management console) , and so, 463 of them were lost. I found exactly 463 MessageRejectedWhileStoppingExceptions in my log. and the (18) unacked messages were added to the final received messages after restaring my application. so, the lost messages were not in unacked state.

            thanks again for your help, buddy

            Comment


            • #7
              Hmmm... I don't see any code that would cause that behavior - I'll run some more tests. Can you share all your container properties? (concurrentConsumers, prefetchCount etc).

              Also, what does your test listener do with the delivery - any time delays etc to simulate processing, or just increment a counter?

              Comment


              • #8
                Code:
                <bean id="incomingMessageListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
                        <property name="connectionFactory" ref="rabbitmqConnectionFactory" />
                        <property name="queueNames" value="${incoming.queue.name}" />
                        <property name="messageListener" ref="incomingMessageListener" />
                        <property name="errorHandler" ref="errorHandler" />
                        <property name="acknowledgeMode" value="AUTO" />
                        <property name="defaultRequeueRejected" value="false" />
                        <property name="concurrentConsumers" value="${incoming.consumer.count}" />
                    </bean>
                oops ! I'm so so sorry, I think I found it ! I've set the defaultRequeueRejected property to false (days before) inorder to prevent endless loop in the case of any exception ( I cannot remember why, but I think maybe in the case of invalid object) ; I haven't test it yet, but the name implies !

                again, I deeply apologize

                P.S may I ask your opinion about my second question which is the approach for "traffic management" ?; I mean is there any better way to handle the incoming traffic from the MessageConsumer (which reads from the queue) rather than having a ManagerBean which stop/starts the MessageListenerContainer based on condition ?

                thank you for your patience and great help

                Comment


                • #9
                  That's good to know; thanks.

                  Actually, I am glad you pressed me because I did find another condition that could cause messages to remain un-ack'd.

                  Even after I corrected that, I still, very rarely, saw unack'd (but never lost) messages; I suspect a race condition in the rabbit client itself but I need to do some research there.

                  In the meantime, I have found that if we physically close the channel used by the consumer, rather than returning it to the cache, we never end up with unack'd messages and the stop/start scenario works fine.

                  I would really appreciate it if you could run your test against my branch. I'd also be interested to hear how your test differs from mine. My test case is in StopStartIntegrationTests.java.

                  If you are able to do that....

                  Code:
                  git clone https://github.com/garyrussell/spring-amqp.git
                  git checkout AMQP-275a
                  ./gradlew install
                  Then, update your POM to use version 1.2.0.BUILD-SNAPSHOT of spring-rabbit.

                  You should see zero unack'd messages via the web admin; there should be no need to kill the JVM.


                  Regarding avoiding the "infinite loop" problem, consider using a StatelessRetryInterceptorFactoryBean (or stateful if your messages have messageIds) to insert a retry interceptor into the listener's advice chain. By configuring the interceptor to use a RejectAndDontRequeueRecoverer, the message will be permanently rejected after N (default = 3) attempts. You can configure the queue to send such rejections to a Dead Letter Exchange for later investigation.


                  Stopping/starting the container is a reasonable approach to achieve what you are doing. Another alternative might be to add an Advice to the listener's advice chain that simply pauses the container's thread. Personally, I'd prefer to stop the container (once my fix is applied).
                  Last edited by Gary Russell; Oct 31st, 2012, 02:53 PM.

                  Comment


                  • #10
                    sure I'm on a trip right now, I'll do that as soon as I get back to home (or work)
                    best wishes

                    Comment


                    • #11
                      Hi Gary
                      I've just ran some tests and it appears to be ok.

                      no. spring-rabbit version #messages #stop/starts #final result #unacked #failed
                      A1 1.1.0.RELEASE 300000 73 300000 99 0
                      B1 1.2.0.BUILD-SNAPSHOT 300000 71 300000 0 0
                      B2 1.2.0.BUILD-SNAPSHOT 300000 63 300000 0 0
                      can't wait for the final release of 1.2.0
                      thanks buddy
                      Last edited by nima; Nov 4th, 2012, 03:26 AM.

                      Comment


                      • #12
                        Thanks for letting us know; we will be back-porting this fix to the 1.1.x stream; 1.1.3 should be available later this week.

                        Comment


                        • #13
                          spring-amqp 1.1.3.RELEASE, including this fix, is now available

                          http://www.springsource.org/spring-amqp

                          Comment


                          • #14
                            thanks buddy I really appreciate

                            Comment


                            • #15
                              Hello,
                              I have read through this post and something is still not clear to me. We have the same situation as the original poster. We send 300K test message in and we always get 300K out unless we stop a consumer. We then often get messages in the dead letter queue.

                              Is it normal behavior to reject unacknowledged messages when a consumer shuts down via SimpleMessageListenerContainer.stop()? I would expect unacknowledged messages to go back to the queue. We do not want rejected messages to be requeued, we want them to go to the dead letter queue. Why are messages rejected on a clean stop request?

                              TIA,
                              Tracy

                              Comment

                              Working...
                              X