Announcement Announcement Module
Collapse
No announcement yet.
Consumer does not reconnect when there is no connection failure Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Consumer does not reconnect when there is no connection failure

    Using spring-amqp-1.0.0.RELEASE...

    I've found an issue, which may be operator error in the case there is a setting I am missing, in which a consumer is not recreated when it is disconnected in absence of a connection failure. This happens infrequently when a cluster node goes down, about 1/6 times. My guess is that it is related to a cluster node going down and the client reconnecting before the cluster realizes that the client is gone. This then prevents a consumer from consuming from an exclusive queue.

    When using HA queues though, this issue is very easy to recreate deterministically.

    Assume you have a cluster with at least two nodes. A non-exclusive HA queue is declared and is master on Node A. Assume client X is connected to Node B and consuming from this queue. This happens frequently when using a TCP load balancer to connect to the cluster. When Node A containing the master queue is taken down with "rabbitmqctl stop_app", the queue on Node B is promoted to master and then forces all consumers to reconnect. This cluster behavior is documented here, see Mirrored Queue Behaviour. At this point, unless there is some setting I am missing, the consumer is kicked off and is supposed to reconnect but Spring AMQP does not.

    Is there something I need to handle to catch this and reconnect the consumer? I am relatively familiar with the basic Java and .NET AMQP apis as I implemented this reconnecting and reconsuming in my custom .NET abstraction. We are using Spring AMQP for some java/scala components though and I can't seem to figure out how to handle this.

    To be fair, Akka 1.3 (possibly later) exhibits the same failure.
    Last edited by DRStevens; Jan 17th, 2012, 12:14 PM. Reason: "When Node A containing master" instead of "When Node B containing master"

  • #2
    The following is from my experience with the .NET amqp client, however the java client is very similar. When the slave queue is promoted to master, either handleCancelOk or handleCancel (there is only OnCancel in the .NET client) is called on all consumers (implementations of com.rabbitmq.client.Consumer) consuming from this HA queue. The channel and connection are NOT shutdown though. The abstraction layer must then create a new consumer and call basicConsume again with it. It may be possible to use the same consumer again, I haven't tried it.

    From looking at the spring-amqp-1.0.0.RELEASE source, you need recreate the consumer and call basicConsume again as a result of BlockingQueueConsumer.InternalConsumer.handleCance l/handleCancelOk being called. This is not happening and is why the consumer dies on slave promotion to master.
    Last edited by DRStevens; Jan 17th, 2012, 02:37 PM. Reason: "consuming from this HA queue" instead of "consuming from this channel"

    Comment


    • #3
      Here are logs and rabbitmqctl output when recreating the issue I'm raising. We have a three node cluster. Hostnames and IPs have been changed/masked to protect the innocent. In regards to my previous message, handleCancelOk is not what must be handled, otherwise "Received cancellation notice for ..." would have shown up in the test app log. I believe it is handleCancel.

      At this point there is an HA queue named "reconnecting.consumer.testqueue" declared and mirrored across all nodes in the cluster. You can see from the rabbitmqctl output below that the master is on rabbitnode2. It is mirrored on rabbitnode1 and rabbitnode3. Make note of the consumer count and the consumer tag.
      [[email protected] ~]# rabbitmqctl list_queues name arguments pid consumers slave_pids
      Listing queues ...
      reconnecting.consumer.testqueue [{"x-ha-policy","all"}] <[email protected]> 1 [<[email protected]>, <[email protected]>]
      ...done.

      [[email protected] ~]# rabbitmqctl list_consumers
      Listing consumers ...
      reconnecting.consumer.testqueue <[email protected]> amq.ctag-Asm0iXLsDi1--nlpRaKU0b true
      ...done.


      Now I stop the rabbitnode2 node containing the master
      [[email protected] ~]# rabbitmqctl stop_app
      Stopping node rabbit@rabbitnode2 ...
      ...done.


      Tail of /var/log/rabbitmq/[email protected] showing the node going offline
      =INFO REPORT==== 17-Jan-2012::16:01:31 ===
      closing TCP connection <0.16379.289> from xxx.xxx.xxx.xxx:xxxxx

      =INFO REPORT==== 17-Jan-2012::16:01:31 ===
      application: rabbit
      exited: stopped
      type: temporary

      =INFO REPORT==== 17-Jan-2012::16:01:31 ===
      application: os_mon
      exited: stopped
      type: temporary

      =INFO REPORT==== 17-Jan-2012::16:01:31 ===
      application: mnesia
      exited: stopped
      type: temporary

      Now the rabbitmqctl output after taking down rabbitnode2. You can see that rabbitnode1 was promoted to master. Note the consumer count of 0.
      [[email protected] ~]# rabbitmqctl list_queues name arguments pid consumers slave_pids
      Listing queues ...
      reconnecting.consumer.testqueue [{"x-ha-policy","all"}] <rabbit@rabbitnode1.3.27093.347> 0 [<[email protected]>]
      ...done.

      [[email protected] ~]# rabbitmqctl list_consumers
      Listing consumers ...
      no consumers listed for "reconnecting.consumer.testqueue"
      ...done.

      Excerpt from log of test app. Note the consumer tag and time. As you can see, spring-amqp thinks the consumer is still consuming. Neither the connection nor the channel were shutdown. These are just logs of the consumer polling. It keeps going.

      16:01:29.922 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
      16:01:29.922 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:yyyy/,1), acknowledgeMode=AUTO local queue size=0
      16:01:30.923 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
      16:01:30.923 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:yyyy/,1), acknowledgeMode=AUTO local queue size=0
      16:01:31.923 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
      16:01:31.923 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:yyyy/,1), acknowledgeMode=AUTO local queue size=0
      16:01:32.924 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
      16:01:32.924 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:yyyy/,1), acknowledgeMode=AUTO local queue size=0
      16:01:33.924 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
      16:01:33.924 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:yyyy/,1), acknowledgeMode=AUTO local queue size=0
      16:01:34.925 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
      16:01:34.925 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:yyyy/,1), acknowledgeMode=AUTO local queue size=0
      16:01:35.925 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
      16:01:35.925 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:yyyy/,1), acknowledgeMode=AUTO local queue size=0
      16:01:36.926 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
      16:01:36.926 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:yyyy/,1), acknowledgeMode=AUTO local queue size=0
      Last edited by DRStevens; Jan 17th, 2012, 03:43 PM.

      Comment


      • #4
        The same thing happens if you delete any queue with a spring-amqp consumer on the broker through the Rabbit Management web interface. This obviously should not happen, but the consumer still thinks that it is consuming.

        Comment


        • #5
          Did you ever find a solution to this problem? I'm using spring-rabbit. Pulling away the queue generates three (re)connect requests and then fails indefinitely. Silently failing consumers is something that would keep me up at nite.

          Comment


          • #6
            https://jira.springsource.org/browse/AMQP-209

            It was supposedly fixed in 1.1.0. There are all sorts of other complexities that arise with HA queues and when to redeclare queues/bindings on the broker. Unfortunately if you attempt to declare/remove some things which already exist/do not exist programatically with the java client, you will get disconnected. This makes redeclaring things very difficult because of all the different configurations of queues/bindings/exchanges. HA adds a lot of additional complexity to this. I wasted way to much of my life trying to account for all these situations. 6-8 months ago I could have told you exactly what was is causing the behavior you are seeing, but it's been awhile. I have a feeling that the inability to either consume from a queue or declare something is where the failure lies.
            I wrote my own fix for this specific issue before 1.1.0 was released. Pretty much ever commit here is related to this - https://github.com/drstevens/spring-amqp/commits/master. This code got merged into https://github.com/BostonTechnologies/spring-amqp which has some extra hooks for redeclaring things which you will most likely need to use. We found that unless you embrace Spring completely, spring-amqp does not do everything you would like it to. If I remember correctly, all of your queues and bindings must be beans for them to be redeclared unless you use these hooks yourself.

            See this implementation of AmqpAdmin which handlea the redeclaration of non-bean queues/bindings/exchanges https://gist.github.com/1665118. I can't remember if that includes everything necessary to redeclare on when a consumer is cancelled and reconsumes.

            Basically our conclusion was that the extra complexity surrounding HA queues, added performance hit, and the lack of a good abstraction library to allow us to redeclare things properly caused us to ditch HA queues.

            This cancel issue is still a problem even when not using HA queues though. Eventually we ditched spring-amqp completely and I wrote my own scala abstraction on top of the java client. It requires Akka 2.0 (and use of scala obviously) - https://github.com/BostonTechnologies/scala-amqp. As far as I know this is still used. I no longer work for this company though, so I'm not sure this is going to be maintained. I do know that they are working to replace RabbitMQ with a non-brokered queueing system which scales much better. I have no plans to maintain this anymore as I now firmly believe that brokered queueing systems do not scale.

            The Akka 1.3x amqp plugin also does not properly handle cancel causing this to silently fail as well. Between spring-amqp, the old scala akka 1.3x amqp plugin and my code, the only one that consistently came back and redeclared all of the appropriate queues and bindings after extended network partitions was my project.
            Last edited by DRStevens; Nov 14th, 2012, 10:19 AM.

            Comment


            • #7
              Silently failing consumers is something that would keep me up at nite.
              This should only be a problem if the client doesn't receive a TCP FIN/RST when the broker goes down.

              The only way (I know of) for this to happen is if there is a network problem such as a switch going bad, yanking an ethernet cable downstream of the locally connected switch, or a hard power-off of the broker server (so the tcp stack can't issue the RST).

              To avoid these cases, you can enable heartbeats on the underlying Rabbit Connection Factory.

              Comment

              Working...
              X