Announcement Announcement Module
No announcement yet.
rabbitmq Retry/ Reconnection policy Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • rabbitmq Retry/ Reconnection policy


    I am very new to rabbitmq. I have a question regarding the reconnection/retry attempt to rabbitmq server.
    Is anyone tried to configure the retry/reconnection policy in spring amqp? Here is my scenario,

    Consumer should be able to reconnect -send and receive messages from rabbitmq whenever its comes up.

    <bean id="retryTemplate" class=" late">
    <property name="retryPolicy">
    <bean class="org.springframework.retry.policy.SimpleRetr yPolicy">
    <property name="maxAttempts" value="1000"></property>

    <bean id="retryInterceptor"
    class="org.springframework.retry.interceptor.State fulRetryOperationsInterceptor"

    class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer"
    p:connectionFactory-ref="rabbitConnectionFactory" p:queueNames="test.client.queue"
    p:messageListener-ref="messageListener" p:adviceChain-ref="retryInterceptor"/>

    Is this correct?

  • #2
    You don't need a retry interceptor to get automatic re-connection of consumers and producers - just make sure you use the CachingConnectionFactory.

    You have configured a retry interceptor for the listener itself, which is actually slightly orthogonal to the basic re-connection features. Declarative retry gives you full control over the number of attempts at processing an individual message, and if the container has to re-connect to the broker to achieve that it will happen automatically (independent of the retry configuration).

    Note you have used a stateful interceptor which isn't really necessary when there are no acks (the default) - if there are no acks then the server will never send the same message more than once, so the interceptor doesn't have to remember them.


    • #3
      Hi Dave,
      Thanks for your quick response. I tried with CachingConnectionFactory. Looks like its not working 100%. Here is my scenario,

      In my program, I trying to send 100 messages in a loop ( with thread sleep in every message) and I have a listener configured to this.I can see in the logs its trying to restart the consumer after I stop the rabbitMQ, however I didnt see anywhere for the producer.
      When I restart the server there is no logs to restarting the consumer.. that means its connecting to the server. but as the per program producer should send the rest of the message and consumer should listen to it. which is not happening... Could you please let me know if there is any configuration required to producer to reconnect.

      I am attaching my log message in the next reply


      • #4
        INFO: Starting beans in phase 2147483647
        Jun 24, 2011 9:15:12 AM org.springframework.amqp.rabbit.connection.SingleC onnectionFactory createConnection
        INFO: Established shared Rabbit Connection: Shared Rabbit Connection: org.springframework.amqp.rabbit.connection.SimpleC [email protected]
        Received message: (Body:'Catch the rabbit! Fri Jun 24 09:15:12 VET 2011'; ID:null; Content:text/plain; Headers:{}; Exchange:test.topicexchange1; RoutingKey:; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:1)
        Text: Catch the rabbit! Fri Jun 24 09:15:12 VET 2011
        Exception in thread "main" org.springframework.amqp.AmqpConnectException: com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed connection
        at org.springframework.amqp.rabbit.connection.RabbitU tils.convertRabbitAccessException( :111)
        at org.springframework.amqp.rabbit.connection.RabbitA ccessor.convertRabbitAccessException(RabbitAccesso
        at org.springframework.amqp.rabbit.core.RabbitTemplat e.execute(
        at org.springframework.amqp.rabbit.core.RabbitAdmin.i nitialize(
        at org.springframework.amqp.rabbit.core.RabbitAdmin$1 0.onCreate(
        at org.springframework.amqp.rabbit.connection.Composi teConnectionListener.onCreate(CompositeConnectionL
        at org.springframework.amqp.rabbit.connection.SingleC onnectionFactory.createConnection(SingleConnection
        at org.springframework.amqp.rabbit.connection.Connect ionFactoryUtils$1.createConnection(ConnectionFacto
        at org.springframework.amqp.rabbit.connection.Connect ionFactoryUtils.doGetTransactionalResourceHolder(C
        at org.springframework.amqp.rabbit.connection.Connect ionFactoryUtils.getTransactionalResourceHolder(Con
        at org.springframework.amqp.rabbit.connection.RabbitA ccessor.getTransactionalResourceHolder(RabbitAcces
        at org.springframework.amqp.rabbit.core.RabbitTemplat e.execute(
        at org.springframework.amqp.rabbit.core.RabbitAdmin.d eclareQueue(
        at com.cvty.ea.rabbitmq.AppTest.main(
        Caused by: com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed connection
        at com.rabbitmq.client.impl.AMQConnection.ensureIsOpe n(
        at com.rabbitmq.client.impl.AMQConnection.createChann el(
        at org.springframework.amqp.rabbit.connection.SimpleC onnection.createChannel(
        at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory$ eateBareChannel(
        at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory$ cess$000(
        at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory.createBareChannel(CachingConnect
        at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory.access$200(CachingConnectionFact
        at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory$CachedChannelInvocationHandler.i nvoke(
        at $Proxy5.exchangeDeclare(Unknown Source)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.d eclareExchanges(
        at org.springframework.amqp.rabbit.core.RabbitAdmin.a ccess$000(
        at org.springframework.amqp.rabbit.core.RabbitAdmin$1 1.doInRabbit(
        at org.springframework.amqp.rabbit.core.RabbitTemplat e.execute(
        ... 11 more
        Jun 24, 2011 9:15:14 AM org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er run
        WARNING: Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: com.rabbitmq.client.ShutdownSignalException: connection error; reason: Connection reset
        Jun 24, 2011 9:15:14 AM org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er run
        INFO: Restarting Consumer: tag=[amq.ctag-9XWG7IfLFLgKJagAuOIfAQ==], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), acknowledgeMode=AUTO local queue size=0