Announcement Announcement Module
Collapse
No announcement yet.
handling basic.cancel(consumer-tag consumer-tag, no-wait no-wait) sent from Server Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • handling basic.cancel(consumer-tag consumer-tag, no-wait no-wait) sent from Server

    I have created jira ticket AMQP-209. Currently there is inconsistent behavior amongst consumers when a cluster node is taken down.

    We are currently prevented from going to production with the existing SimpleMessageListenerContainer/BlockingQueueConsumer, which means that I'm going to be given bandwidth to fix this. Either I write my own one-off implementation of AbstractMessageListenerContainer and DefaultConsumer, or I try and work with you guys and commit to your project.

    From what I can tell, the ideal way to handle this is to provide a ConsumerCancelledListener similar to ConnectedListener. RabbitAdmin (or whatever other implementation of AmqpAdmin you happen to be using) would then redeclare queues and bindings appropriately. It would be nice to be able to redeclare only the queues and bindings which were lost. This would also be inconsistent with the automatic reconnect and would also require a substantial rethinking about how consumers relate to queues/exchanges and bindings in your client.

    I'm sorry for the duplicate thread, but I didn't like the thread title which I assume was causing it to be ignored.

  • #2
    The lack of basic.cancel handling can be reproduced without a cluster by deleting the queue through the RaqbbitMQ web management GUI.

    Comment


    • #3
      I believe I have successfully made the consumer reconsume. I would appreciate it if someone could review my code to make sure I'm not making things worse. The fix is basically in these two commits.

      https://github.com/drstevens/spring-...553d2ac95b2e92 - prove that handleCancel is being called
      https://github.com/drstevens/spring-...8cfa99fe6ec0e9 - do something with this

      I would probably consider leveraging the ShutdownSignalException in this way to be a bit of a hack, but I'm trying to change as little as possible in the SimpleMessageListenerContainer.

      Unfortunately I now need to add a hook to allow redeclaration of everything which needs to be redeclared on the channel for this consumer. I had hoped to be able to leverage a ChannelListener instead of ConnectionListener in my custom AmqpAdmin, but as I just found out, calls to channel.Close() are intercepted and basically dropped on the floor. Because new channels are not created, these ChannelListeners aren't called when i expected them to be.

      Right now, the only thing I can think of is to add a new type of listener like (ConsumerReconsumeListener for instance) which is called for this special case inside SimpleMessageListenerContainer.restart(consumer: Consumer). This smells bad to me though. There would have to be some way to have RabbitAdmin add it's own listener to this...

      Thoughts from the spring-amqp team on this would be greatly appreciated as you guys know this code much better than me.

      Here is debug level output from my change which is checked in to github. This is a result of taking down the cluster node which contains the queue while the consumers connection is to a different rabbit node which remains up.

      19:04:51.223 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-gV0Uy3Lr1OmpjOc_K9h9Kr], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
      The broker has notified the consumer that it is canceled
      19:04:51.653 [AMQP Connection xxx.xxx.xxx.xxx:5672] ERROR o.s.a.r.l.BlockingQueueConsumer - Received unexpected cancellation notice for Consumer: tag=[amq.ctag-gV0Uy3Lr1OmpjOc_K9h9Kr], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
      19:04:52.231 [SimpleAsyncTaskExecutor-1] WARN o.s.a.r.l.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it
      com.rabbitmq.client.ShutdownSignalException: channel error; reason: amq.ctag-gV0Uy3Lr1OmpjOc_K9h9Kr
      at org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer$InternalConsumer.handleCancel(Blockin gQueueConsumer.java:246) ~[spring-rabbit-1.0.1.BUILD-BT-1.1-SNAPSHOT.jar:na]
      at com.rabbitmq.client.impl.ChannelN.processAsync(Cha nnelN.java:351) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.client.impl.AMQChannel.handleComplete InboundCommand(AMQChannel.java:151) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.client.impl.AMQChannel.handleFrame(AM QChannel.java:96) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.client.impl.AMQConnection$MainLoop.ru n(AMQConnection.java:441) ~[amqp-client-2.5.0.jar:na]
      19:04:52.232 [SimpleAsyncTaskExecutor-1] INFO o.s.a.r.l.SimpleMessageListenerContainer - Restarting Consumer: tag=[amq.ctag-gV0Uy3Lr1OmpjOc_K9h9Kr], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
      19:04:52.232 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1)
      19:04:52.232 [SimpleAsyncTaskExecutor-2] DEBUG o.s.a.r.l.BlockingQueueConsumer - Starting consumer Consumer: tag=[null], channel=null, acknowledgeMode=AUTO local queue size=0
      19:04:52.238 [SimpleAsyncTaskExecutor-2] DEBUG o.s.a.r.c.CachingConnectionFactory - Detected closed channel on exception. Re-initializing: null
      Basically queueDeclarePassive fails here because the queue doesn't exist on the broker anymore. It was on the cluster node which went down.
      19:04:52.244 [SimpleAsyncTaskExecutor-2] ERROR o.s.a.r.l.SimpleMessageListenerContainer - Consumer received fatal exception on startup
      org.springframework.amqp.rabbit.listener.FatalList enerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
      at org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer.start(BlockingQueueConsumer.java:192) ~[spring-rabbit-1.0.1.BUILD-BT-1.1-SNAPSHOT.jar:na]
      at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er.run(SimpleMessageListenerContainer.java:489) ~[spring-rabbit-1.0.1.BUILD-BT-1.1-SNAPSHOT.jar:na]
      at java.lang.Thread.run(Thread.java:662) [na:1.6.0_22]
      Caused by: java.io.IOException: null
      at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChanne l.java:107) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (AMQChannel.java:131) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.client.impl.ChannelN.queueDeclarePass ive(ChannelN.java:643) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.client.impl.ChannelN.queueDeclarePass ive(ChannelN.java:59) ~[amqp-client-2.5.0.jar:na]
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method) ~[na:1.6.0_22]
      at sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:39) ~[na:1.6.0_22]
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:25) ~[na:1.6.0_22]
      at java.lang.reflect.Method.invoke(Method.java:597) ~[na:1.6.0_22]
      at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory$CachedChannelInvocationHandler.i nvoke(CachingConnectionFactory.java:298) ~[spring-rabbit-1.0.1.BUILD-BT-1.1-SNAPSHOT.jar:na]
      at $Proxy11.queueDeclarePassive(Unknown Source) ~[na:na]
      at org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer.start(BlockingQueueConsumer.java:188) ~[spring-rabbit-1.0.1.BUILD-BT-1.1-SNAPSHOT.jar:na]
      ... 2 common frames omitted
      Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'bt.cs.testClient.public' in vhost '/', class-id=50, method-id=10),null,""}
      at com.rabbitmq.utility.ValueOrException.getValue(Val ueOrException.java:67) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.utility.BlockingValueOrException.unin terruptibleGetValue(BlockingValueOrException.java: 33) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.client.impl.AMQChannel$BlockingRpcCon tinuation.getReply(AMQChannel.java:328) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel .java:201) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (AMQChannel.java:125) ~[amqp-client-2.5.0.jar:na]
      ... 11 common frames omitted
      com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'bt.cs.testClient.public' in vhost '/', class-id=50, method-id=10),null,""}
      at com.rabbitmq.client.impl.ChannelN.asyncShutdown(Ch annelN.java:384) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.client.impl.ChannelN.processAsync(Cha nnelN.java:235) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.client.impl.AMQChannel.handleComplete InboundCommand(AMQChannel.java:151) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.client.impl.AMQChannel.handleFrame(AM QChannel.java:96) ~[amqp-client-2.5.0.jar:na]
      at com.rabbitmq.client.impl.AMQConnection$MainLoop.ru n(AMQConnection.java:441) ~[amqp-client-2.5.0.jar:na]
      19:04:52.244 [SimpleAsyncTaskExecutor-2] DEBUG o.s.a.r.l.SimpleMessageListenerContainer - Cancelling Consumer: tag=[null], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
      19:04:52.245 [SimpleAsyncTaskExecutor-2] DEBUG o.s.a.r.l.BlockingQueueConsumer - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1)
      19:04:52.245 [SimpleAsyncTaskExecutor-2] INFO o.s.a.r.l.SimpleMessageListenerContainer - Stopping container from aborted consumer
      19:04:52.245 [SimpleAsyncTaskExecutor-2] DEBUG o.s.a.r.l.SimpleMessageListenerContainer - Shutting down Rabbit listener container
      19:04:52.245 [SimpleAsyncTaskExecutor-2] INFO o.s.a.r.l.SimpleMessageListenerContainer - Waiting for workers to finish.
      19:04:52.245 [SimpleAsyncTaskExecutor-2] INFO o.s.a.r.l.SimpleMessageListenerContainer - Successfully waited for workers to finish.

      Last edited by DRStevens; Jan 26th, 2012, 07:11 PM. Reason: Add some contex to log output

      Comment


      • #4
        @DRStevens; I have been doing some work with this today; I took a slightly different approach to using the shutdown signal, but I am still working on it.

        Before we can look at your code, we need you to sign the CLA https://support.springsource.com/spr...mmitter_signup

        Thanks

        Comment


        • #5
          I signed it earlier today. My confirmation number is 19620120126050603. I may have also fixed AMQP-197 as well, or perhaps small additions to my commit would fix it. I received the same NPE on receiving basic.CancelOk from broker during restart of the consumer. After implementing handleCancel (consumer canceled by broker) and causing the run method to call restart, I found that the restart method then sends a basicCancel back to the broker with the consumer tag. I prevented this basicCancel from going back to the broker when the broker originally canceled the consumer. It is likely that the same state could be caused by many different things though.
          Last edited by DRStevens; Jan 26th, 2012, 10:21 PM.

          Comment


          • #6
            Great thanks!

            Here is my version of your patch...

            https://github.com/garyrussell/sprin...05b9990613240a

            Looks like we were going down the same path concurrently. BTW, I only see the queue (re) declaration problem sporadically.

            I think the failure to declare the queue is because it is still in the process of being moved to the new master and we reconnected while it was not yet fully baked.

            So, I added some retry logic. Of course, I have run many tests since, and I haven't seen the retry hit yet

            Comment


            • #7
              It depends on how you are connected to a cluster and the types of queues you are consuming from. There is at least one situation in which you will always need to redeclare your queue and bindings. The simplest way to recreate this 100% of the time is to be connected to a single, non-clustered broker and delete the queue through the management console. Obviously you will never be able to re-consume from queue because it is gone. This sounds silly, but it mimics the situation below which is slightly more time consuming to execute.

              1. Create cluster with at least two nodes.
              2. Connect to node A and declare a non-persistent, non-exlusive, non-autodelete queue. Disconnect this connection. If it is connected when you execute step 4, there is a chance that this client will automatically reconnect and re-declare the queue before the consumer in step 3 re-consumes. This will result in non-deterministic failures when re-consuming.
              3. Connect to node B, re-declare this queue (declaration will be ignored) and consume from it.
              4. Take down node A with 'rabbitmqctl stop_app'

              This will result in the consumer for node B failing to re-consume 100% of the time. It does not matter how many times it retries. There is another situation which throws a little bit of a kink in this and it is when the queue is durable. The broker will not allow the queue to be re-declared until node A comes back up. In my .NET client, I allow the re-declaration to fail requiring manual intervention.

              Comment


              • #8
                I had another post describing how to cause your code to retry, but it requires moderator approval for some reason.

                I like your use of ConsumerCancelledException better than my hijacking of the ShutdownSignalException. I originally went this route as well, even naming it the same, but changed back to ShutdownSignalException because it required far fewer lines of code to be changed.

                I'm not positive, but I still think that closeMessageConsumer should not be called if we were cancelled by the broker.
                Last edited by DRStevens; Jan 27th, 2012, 10:33 AM. Reason: add comment about moderator approval

                Comment


                • #9
                  Yes, you must not call channel.basicCancel(string consumertag) after the consumer has already been canceled by the broker. channel.basicCancel is called by closeMessageConsumer. Doing so causes a NPE which then causes the connection to reconnect. You can see this in the RabbitMQ management console. Killing the connection causes an unnecessary interruption to other consumers using it. I have verified this with both 2.5.0 and 2.7.1 of the java amqp client. It is likely that RabbitAdmin is redeclaring everything as a result of the reconnect which is why you have been unable to hit your retry code. I took your code and prevented the call to closeMessageConsumer after being cancelled in the same way I did in my fork. I ended up with the following.

                  I had to partially remove sections of the log below due to post length


                  Force cancellation of consumer by deleting queue through management console
                  17:39:32.472 [AMQP Connection xxx.xxx.xxx.xxx:5672] WARN o.s.a.r.l.BlockingQueueConsumer - Cancel received
                  17:39:33.136 [SimpleAsyncTaskExecutor-1] WARN o.s.a.r.l.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it
                  org.springframework.amqp.rabbit.listener.ConsumerC ancelledException: null
                  at org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer.nextMessage(BlockingQueueConsumer.jav a:174) ~[spring-rabbit-1.0.1.BUILD-SNAPSHOT.jar:na]
                  at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.doReceiveAndExecute(SimpleMe ssageListenerContainer.java:431) ~[spring-rabbit-1.0.1.BUILD-SNAPSHOT.jar:na]
                  at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.receiveAndExecute(SimpleMess ageListenerContainer.java:420) ~[spring-rabbit-1.0.1.BUILD-SNAPSHOT.jar:na]
                  at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.access$200(SimpleMessageList enerContainer.java:56) ~[spring-rabbit-1.0.1.BUILD-SNAPSHOT.jar:na]
                  at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er.run(SimpleMessageListenerContainer.java:505) ~[spring-rabbit-1.0.1.BUILD-SNAPSHOT.jar:na]
                  at java.lang.Thread.run(Thread.java:662) [na:1.6.0_22]
                  17:39:33.137 [SimpleAsyncTaskExecutor-1] INFO o.s.a.r.l.SimpleMessageListenerContainer - Restarting Consumer: tag=[amq.ctag-QUnb2lce7LyTBXUNtQv9MS], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
                  17:39:33.137 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1)
                  17:39:33.138 [SimpleAsyncTaskExecutor-2] DEBUG o.s.a.r.l.BlockingQueueConsumer - Starting consumer Consumer: tag=[null], channel=null, acknowledgeMode=AUTO local queue size=0
                  17:39:33.144 [SimpleAsyncTaskExecutor-2] DEBUG o.s.a.r.c.CachingConnectionFactory - Detected closed channel on exception. Re-initializing: null
                  17:39:33.148 [SimpleAsyncTaskExecutor-2] WARN o.s.a.r.l.BlockingQueueConsumer - Reconnect failed; retries left=3
                  ...
                  17:39:35.167 [SimpleAsyncTaskExecutor-2] WARN o.s.a.r.l.BlockingQueueConsumer - Reconnect failed; retries left=1

                  java.io.IOException: null
                  at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChanne l.java:106) ~[amqp-client-2.7.1.jar:na]
                  at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChanne l.java:102) ~[amqp-client-2.7.1.jar:na]
                  at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (AMQChannel.java:124) ~[amqp-client-2.7.1.jar:na]
                  at com.rabbitmq.client.impl.ChannelN.queueDeclarePass ive(ChannelN.java:716) ~[amqp-client-2.7.1.jar:na]
                  at com.rabbitmq.client.impl.ChannelN.queueDeclarePass ive(ChannelN.java:61) ~[amqp-client-2.7.1.jar:na]
                  at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method) ~[na:1.6.0_22]
                  at sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:39) ~[na:1.6.0_22]
                  at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:25) ~[na:1.6.0_22]
                  at java.lang.reflect.Method.invoke(Method.java:597) ~[na:1.6.0_22]
                  at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory$CachedChannelInvocationHandler.i nvoke(CachingConnectionFactory.java:298) ~[spring-rabbit-1.0.1.BUILD-SNAPSHOT.jar:na]
                  at $Proxy11.queueDeclarePassive(Unknown Source) ~[na:na]
                  at org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer.start(BlockingQueueConsumer.java:197) ~[spring-rabbit-1.0.1.BUILD-SNAPSHOT.jar:na]
                  at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er.run(SimpleMessageListenerContainer.java:489) [spring-rabbit-1.0.1.BUILD-SNAPSHOT.jar:na]
                  at java.lang.Thread.run(Thread.java:662) [na:1.6.0_22]
                  Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'bt.cs.testClient.public' in vhost '/', class-id=50, method-id=10), null, "[B@4df8b14"}
                  ...
                  com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'bt.cs.testClient.public' in vhost '/', class-id=50, method-id=10), null, "[B@4df8b14"}
                  ...
                  17:39:36.172 [SimpleAsyncTaskExecutor-2] DEBUG o.s.a.r.c.CachingConnectionFactory - Detected closed channel on exception. Re-initializing: null
                  17:39:36.177 [SimpleAsyncTaskExecutor-2] ERROR o.s.a.r.l.SimpleMessageListenerContainer - Consumer received fatal exception on startup
                  org.springframework.amqp.rabbit.listener.FatalList enerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
                  at org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer.start(BlockingQueueConsumer.java:212) ~[spring-rabbit-1.0.1.BUILD-SNAPSHOT.jar:na]
                  at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er.run(SimpleMessageListenerContainer.java:489) ~[spring-rabbit-1.0.1.BUILD-SNAPSHOT.jar:na]
                  at java.lang.Thread.run(Thread.java:662) [na:1.6.0_22]
                  ...
                  Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'bt.cs.testClient.public' in vhost '/', class-id=50, method-id=10), null, "[B@4df8b14"}
                  ...
                  17:39:36.178 [SimpleAsyncTaskExecutor-2] DEBUG o.s.a.r.l.BlockingQueueConsumer - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1)
                  17:39:36.178 [SimpleAsyncTaskExecutor-2] INFO o.s.a.r.l.SimpleMessageListenerContainer - Stopping container from aborted consumer
                  17:39:36.178 [SimpleAsyncTaskExecutor-2] DEBUG o.s.a.r.l.SimpleMessageListenerContainer - Shutting down Rabbit listener container
                  17:39:36.178 [SimpleAsyncTaskExecutor-2] INFO o.s.a.r.l.SimpleMessageListenerContainer - Waiting for workers to finish.
                  17:39:36.178 [SimpleAsyncTaskExecutor-2] INFO o.s.a.r.l.SimpleMessageListenerContainer - Successfully waited for workers to finish.
                  Last edited by DRStevens; Jan 27th, 2012, 04:50 PM. Reason: add comment about shortened log

                  Comment

                  Working...
                  X