Announcement Announcement Module
Collapse
No announcement yet.
Automatic reconnects Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Automatic reconnects

    I'm new to AMQP and RabbitMQ, and I have a simple project set up using Spring AMQP to consume messages from a queue. As far as I can tell, there's nothing built in to attempt reconnects in the event of a lost connection. Is that right? I saw this thread:
    http://forum.springsource.org/showthread.php?t=93955

    Is that still a recommended way to go? Given the high level of similarity between Spring AMQP and Spring JMS, I'd expect that there would eventually be an AMQP DefaultMessageListenerContainer that will handle things like this. If I want to get automatic reconnects, which direction should I look?

    On a related note, might it be possible to generify the JMS DefaultMessageListenerContainer (and other JMS classes) to work for JMS, AMQP, and message protocol X by doing some refactoring and creating a new module like "spring-messaging"? The spring-messaging module would be the API-ish thing with providers (JMS, AMQP, X) plugged into the back end, very much like spring-amqp is intended to support multiple AMQP providers. It seems this would add a lot of value by making important functionality--like auto-reconnect--immediately available to spring-amqp and any other message protocols that might come along, assuming they conform to the same basic idiom.

  • #2
    I believe you are asking for this feature: https://jira.springsource.org/browse/AMQP-93

    It has been resolved and will be available in the upcoming M3 release.

    Comment


    • #3
      I don't think that's what I'm looking for. That would take care of the case where a connection can't be made at startup, but what about when the RabbitMQ server restarts or shuts down or the connection is temporarily lost for some other reason? If I'm reading this right, the SimpleMessageListenerContainer only gets a connection from the factory at startup. If the server shuts down or the connection goes bad, then it would generally result in an exception's being thrown, and it seems that those exceptions are caught by the AsyncMessageProcessingConsumer in the SimpleMessageListenerContainer. When that gets an exception, the receive loop exits and no further invocations are scheduled, so it is no longer trying to receive messages.

      More of what I'm looking for is what happens in the org.springframework.jms.listener.DefaultMessageLis tenerContainer. In its receive loop, in the inner AsyncMessageListenerInvoker class, if any exception is thrown while attempting to receive a message, it calls recoverAfterListenerSetupFailure(), which calls refreshConnectionUntilSuccessful(), which, as the name implies, attempts forever to reconnect until the container is shut down. After establishing a new connection, it begins scheduling receive() invocations again so that message processing resumes.

      It seems to me that the connection factory is at too low of a level to be able to intelligently handle this kind of thing, so any changes there would only get you part of the way to being able to fully recover from a disconnect. This is the kind of mechanism I was talking about in my first post that would be nice to share between JMS and AMQP.

      Comment


      • #4
        Check out the last comments here: https://jira.springsource.org/browse/AMQP-44

        Also, if you checkout the latest code (HEAD), you can see the changes that have been made.

        Comment


        • #5
          Ah, never mind. I didn't see everything there. It does go beyond the connection factory. I think that will be exactly what I'm looking for. Thanks a lot.

          Comment


          • #6
            Great! If there's any way you can take it for a spin in the current state, it would be great to get some feedback. Otherwise, we'll have M3 out soon (hopefully next week).

            Thanks,
            Mark

            Comment


            • #7
              Originally posted by Mark Fisher View Post
              Great! If there's any way you can take it for a spin in the current state, it would be great to get some feedback. Otherwise, we'll have M3 out soon (hopefully next week).

              Thanks,
              Mark
              Hey, I got pulled into something else for a while, but I came back to this today. Here's what I've found so far.

              1) I see that M3 was released, but I didn't get a tag from the repo. It should be on 83f0f48db2471cfcf9035dca4849029924d4f27c.

              2) The M3 release doesn't appear to handle simple reconnects, where the RabbitMQ server just restarts, but as of f1d783cf5c09b16c06263b2a1475ef12658aa3ef (latest as of this morning), it does reconnect when I restart RabbitMQ.

              3) Even with reconnects working, if the server comes up and the queue I'm trying to receive from doesn't exist--e.g. it wasn't durable and hasn't yet been recreated--the reconnect attempts stop. The trace-level logs and stacktrace for when this happens are below.

              I'm going to keep looking and poking at this for a while.

              Code:
              12:58:29,694 INFO  Restarting Consumer: tag=[null], channel=null, acknowledgeMode=AUTO local queue size=0                                                                 [SimpleAsyncTaskExecutor-3] SimpleMessageListenerContainer
              12:58:29,695 DEBUG Closing Rabbit Channel: null                                                                                                                           [SimpleAsyncTaskExecutor-3] BlockingQueueConsumer
              12:58:29,698 DEBUG Detected closed connection. Opening a new one before creating Channel.                                                                                 [SimpleAsyncTaskExecutor-4] CachingConnectionFactory
              12:58:29,721 ERROR Consumer received fatal exception on startup                                                                                                           [SimpleAsyncTaskExecutor-4] SimpleMessageListenerContainer
              org.springframework.amqp.rabbit.listener.ListenerStartupFatalException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
              	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:156)
              	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:471)
              	at java.lang.Thread.run(Thread.java:619)
              Caused by: java.io.IOException
              	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:107)
              	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:131)
              	at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:624)
              	at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:59)
              	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:153)
              	... 2 more
              Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404,reply-text=NOT_FOUND - no queue 'hello' in vhost '/',class-id=50,method-id=10),null,""}
              	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
              	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
              	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:328)
              	at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:201)
              	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:125)
              	... 5 more
              Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404,reply-text=NOT_FOUND - no queue 'hello' in vhost '/',class-id=50,method-id=10),null,""}
              	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:365)
              	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:235)
              	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:151)
              	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:96)
              	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:446)
              12:58:29,729 DEBUG Cancelling Consumer: tag=[null], channel=AMQChannel(amqp://guest@dev-tomcat:5672/,1), acknowledgeMode=AUTO local queue size=0                          [SimpleAsyncTaskExecutor-4] SimpleMessageListenerContainer
              12:58:29,729 DEBUG Closing Rabbit Channel: AMQChannel(amqp://guest@dev-tomcat:5672/,1)                                                                                    [SimpleAsyncTaskExecutor-4] BlockingQueueConsumer
              12:58:29,730 DEBUG Shutting down Rabbit listener container                                                                                                                [SimpleAsyncTaskExecutor-4] SimpleMessageListenerContainer
              12:58:29,731 DEBUG Waiting for workers to finish.                                                                                                                         [SimpleAsyncTaskExecutor-4] SimpleMessageListenerContainer
              12:58:29,731 INFO  Successfully waited for workers to finish.                                                                                                             [SimpleAsyncTaskExecutor-4] SimpleMessageListenerContainer

              Comment


              • #8
                Oh, and I also have to note that in glancing through the repo, I see several commits that mix large code reformats with real changes, and picking the changes out of the reformats is really difficult. If your IDE is reformatting on the fly, then "git add -p" is your friend for picking out the relevant changes to stage for a commit, and then you can commit the reformatting changes separately.

                Comment


                • #9
                  Originally posted by zzantozz View Post
                  if the server comes up and the queue I'm trying to receive from doesn't exist--e.g. it wasn't durable and hasn't yet been recreated--the reconnect attempts stop.
                  That is the expected behaviour. Would you like to change it?

                  The reasoning is that if the queue doesn't exist on startup then there is a configuration problem and you don't want the context to start at all. If you intend to configure the queue yourself programmatically at startup, then you can set auto-startup="false" on the container and only start it when the queue exists.

                  If you use RabbitAdmin autostartup to declare the queue then it should happen strictly before the listener container starts up so it should be fine. If this is not what you are seeing please describe in more detail what you do see and what you would expect.

                  Thanks for the git tips. There is a tag on M3 as far as I can see (f45dea87bb0389aa714e08c03e1be3cd2dd4f463).

                  Comment


                  • #10
                    Originally posted by Dave Syer View Post
                    That is the expected behaviour. Would you like to change it?

                    The reasoning is that if the queue doesn't exist on startup then there is a configuration problem and you don't want the context to start at all. If you intend to configure the queue yourself programmatically at startup, then you can set auto-startup="false" on the container and only start it when the queue exists.
                    That sounds fine. I'm new to RabbitMQ, and I'm not really familiar with conventional ways of using it. I have a feeling it should be okay the way it is. I just wanted to be sure you guys were aware that's how it was working.

                    Originally posted by Dave Syer View Post
                    If you use RabbitAdmin autostartup to declare the queue then it should happen strictly before the listener container starts up so it should be fine. If this is not what you are seeing please describe in more detail what you do see and what you would expect.
                    I'm not using RabbitAdmin.

                    Originally posted by Dave Syer View Post
                    Thanks for the git tips. There is a tag on M3 as far as I can see (f45dea87bb0389aa714e08c03e1be3cd2dd4f463).
                    I just double checked and fetched the latest commits, and I have no M3 tag. I don't have commit f45dea, either. Commit 83f0f48db2471cfcf9035dca4849029924d4f27c with message "[maven-release-plugin] prepare release 1.0.0.M3" certainly looks like the one created by the maven release plugin. If you check the poms, that's the revision where they were changed to 1.0.0.M3. It has your name on it, in fact. I'm fetching changes from https://github.com/SpringSource/spring-amqp.git. Perhaps some commits and/or the tag haven't been pushed to that repo? Did you use the release plugin or did you do the release manually? If you manually create a tag, you have to use "git push tag <tag name>" or "git push --tags". Only branches are pushed if you don't specify tag or --tags.

                    Comment


                    • #11
                      If you're not using RabbitAdmin how do you know the queues and bindings are declared? It's fine if you want to go that way (and want to fail if the queue is missing), but I'm just pointing out that it's there and has very low overhead and few side effects (beyond declaring stuff that you know you are going to use anyway). One of the main features of AMQP is supposed to be that the clients control the broker.

                      Originally posted by zzantozz View Post
                      If you manually create a tag, you have to use "git push tag <tag name>" or "git push --tags". Only branches are pushed if you don't specify tag or --tags.
                      I know. Maybe you have to fetch the tags (git fetch --tags) as well, or if you haven't even got that commit then you probably haven't fetched it either. There's no branch, so maybe git doesn't fetch it by default? I see it on github, so I know it's there. If we need to create a branch we can do that.

                      Comment


                      • #12
                        Originally posted by Dave Syer View Post
                        If you're not using RabbitAdmin how do you know the queues and bindings are declared? It's fine if you want to go that way (and want to fail if the queue is missing), but I'm just pointing out that it's there and has very low overhead and few side effects (beyond declaring stuff that you know you are going to use anyway). One of the main features of AMQP is supposed to be that the clients control the broker.
                        Okay, I'll look into that. Thanks for the tip. I've only been doing some testing against a local RabbitMQ server that I'm running, and I've been declaring and removing queues myself with the RabbitMQ Java client to figure out how things work. I don't know what the real, production instance is going to look like yet or what I'm supposed to or have permission to do on it.

                        Originally posted by Dave Syer View Post
                        I know. Maybe you have to fetch the tags (git fetch --tags) as well, or if you haven't even got that commit then you probably haven't fetched it either. There's no branch, so maybe git doesn't fetch it by default? I see it on github, so I know it's there. If we need to create a branch we can do that.
                        You're right. I was only fetching branches. Is there a particular reason that 1.0.0.M3 isn't merged into master? I wasn't expecting a tag that wasn't merged into mainline development. Before we go there, though, there's a bigger problem: your tag doesn't match your artifacts. I assume that you did the maven release and moved the tag manually afterward, since the maven release happened at 83f0f48db2471cfcf9035dca4849029924d4f27c, and the artifacts look like they match that commit. Why isn't the tag there? If it was moved to include some critical fixes, which seems likely from the apparent rebase of several commits, then those fixes didn't make it into the artifacts, and the tag is just misleading. If you need to get more fixes out, make a 1.0.0.M4. Whatever the case, I strongly recommend moving the tag back to its correct location--where the artifacts were actually released--to avoid more confusion in the future.

                        Comment


                        • #13
                          The tag was moved after the artifacts were first deployed but before the release was announced. If you don't have artifacts that match that commit then you must have downloaded them before the announcement. I know it's not perfect, and we wouldn't have done it if it wasn't a milestone.

                          Comment


                          • #14
                            Originally posted by Dave Syer View Post
                            The tag was moved after the artifacts were first deployed but before the release was announced. If you don't have artifacts that match that commit then you must have downloaded them before the announcement. I know it's not perfect, and we wouldn't have done it if it wasn't a milestone.
                            I see. That means it's cached in our Nexus repo as well... Is M4 or RC1 coming soon? I'd as soon just move on as have to go clean up. Besides, the auto-reconnect isn't working in M3, and it'd be nice to have a release to point at for my own upcoming release instead of working from a snapshot.

                            Comment


                            • #15
                              RC1 will be as soon as we get the issues cleared away (all contributions gratefully accepted). We aim to do it by end of March at the latest.

                              Comment

                              Working...
                              X