Announcement Announcement Module
Collapse
No announcement yet.
rabbitmq Retry/ Reconnection policy Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • rabbitmq Retry/ Reconnection policy

    Hi,

    I am very new to rabbitmq. I have a question regarding the reconnection/retry attempt to rabbitmq server.
    Is anyone tried to configure the retry/reconnection policy in spring amqp? Here is my scenario,

    Consumer should be able to reconnect -send and receive messages from rabbitmq whenever its comes up.

    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemp late">
    <property name="retryPolicy">
    <bean class="org.springframework.retry.policy.SimpleRetr yPolicy">
    <property name="maxAttempts" value="1000"></property>
    </bean>
    </property>
    </bean>

    <bean id="retryInterceptor"
    class="org.springframework.retry.interceptor.State fulRetryOperationsInterceptor"
    p:retryOperations-ref="retryTemplate">
    </bean>

    <bean
    class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer"
    p:connectionFactory-ref="rabbitConnectionFactory" p:queueNames="test.client.queue"
    p:messageListener-ref="messageListener" p:adviceChain-ref="retryInterceptor"/>

    Is this correct?

  • #2
    You don't need a retry interceptor to get automatic re-connection of consumers and producers - just make sure you use the CachingConnectionFactory.

    You have configured a retry interceptor for the listener itself, which is actually slightly orthogonal to the basic re-connection features. Declarative retry gives you full control over the number of attempts at processing an individual message, and if the container has to re-connect to the broker to achieve that it will happen automatically (independent of the retry configuration).

    Note you have used a stateful interceptor which isn't really necessary when there are no acks (the default) - if there are no acks then the server will never send the same message more than once, so the interceptor doesn't have to remember them.

    Comment


    • #3
      Hi Dave,
      Thanks for your quick response. I tried with CachingConnectionFactory. Looks like its not working 100%. Here is my scenario,

      In my program, I trying to send 100 messages in a loop ( with thread sleep in every message) and I have a listener configured to this.I can see in the logs its trying to restart the consumer after I stop the rabbitMQ, however I didnt see anywhere for the producer.
      When I restart the server there is no logs to restarting the consumer.. that means its connecting to the server. but as the per program producer should send the rest of the message and consumer should listen to it. which is not happening... Could you please let me know if there is any configuration required to producer to reconnect.

      I am attaching my log message in the next reply

      Comment


      • #4
        INFO: Starting beans in phase 2147483647
        Jun 24, 2011 9:15:12 AM org.springframework.amqp.rabbit.connection.SingleC onnectionFactory createConnection
        INFO: Established shared Rabbit Connection: Shared Rabbit Connection: org.springframework.amqp.rabbit.connection.SimpleC onnection@1236cd5
        Received message: (Body:'Catch the rabbit! Fri Jun 24 09:15:12 VET 2011'; ID:null; Content:text/plain; Headers:{}; Exchange:test.topicexchange1; RoutingKey:; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:1)
        Text: Catch the rabbit! Fri Jun 24 09:15:12 VET 2011
        Exception in thread "main" org.springframework.amqp.AmqpConnectException: com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed connection
        at org.springframework.amqp.rabbit.connection.RabbitU tils.convertRabbitAccessException(RabbitUtils.java :111)
        at org.springframework.amqp.rabbit.connection.RabbitA ccessor.convertRabbitAccessException(RabbitAccesso r.java:106)
        at org.springframework.amqp.rabbit.core.RabbitTemplat e.execute(RabbitTemplate.java:314)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.i nitialize(RabbitAdmin.java:293)
        at org.springframework.amqp.rabbit.core.RabbitAdmin$1 0.onCreate(RabbitAdmin.java:227)
        at org.springframework.amqp.rabbit.connection.Composi teConnectionListener.onCreate(CompositeConnectionL istener.java:31)
        at org.springframework.amqp.rabbit.connection.SingleC onnectionFactory.createConnection(SingleConnection Factory.java:136)
        at org.springframework.amqp.rabbit.connection.Connect ionFactoryUtils$1.createConnection(ConnectionFacto ryUtils.java:93)
        at org.springframework.amqp.rabbit.connection.Connect ionFactoryUtils.doGetTransactionalResourceHolder(C onnectionFactoryUtils.java:140)
        at org.springframework.amqp.rabbit.connection.Connect ionFactoryUtils.getTransactionalResourceHolder(Con nectionFactoryUtils.java:83)
        at org.springframework.amqp.rabbit.connection.RabbitA ccessor.getTransactionalResourceHolder(RabbitAcces sor.java:100)
        at org.springframework.amqp.rabbit.core.RabbitTemplat e.execute(RabbitTemplate.java:303)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.d eclareQueue(RabbitAdmin.java:109)
        at com.cvty.ea.rabbitmq.AppTest.main(AppTest.java:26)
        Caused by: com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed connection
        at com.rabbitmq.client.impl.AMQConnection.ensureIsOpe n(AMQConnection.java:133)
        at com.rabbitmq.client.impl.AMQConnection.createChann el(AMQConnection.java:392)
        at org.springframework.amqp.rabbit.connection.SimpleC onnection.createChannel(SimpleConnection.java:29)
        at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory$ChannelCachingConnectionProxy.cr eateBareChannel(CachingConnectionFactory.java:286)
        at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory$ChannelCachingConnectionProxy.ac cess$000(CachingConnectionFactory.java:277)
        at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory.createBareChannel(CachingConnect ionFactory.java:131)
        at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory.access$200(CachingConnectionFact ory.java:52)
        at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory$CachedChannelInvocationHandler.i nvoke(CachingConnectionFactory.java:227)
        at $Proxy5.exchangeDeclare(Unknown Source)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.d eclareExchanges(RabbitAdmin.java:312)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.a ccess$000(RabbitAdmin.java:45)
        at org.springframework.amqp.rabbit.core.RabbitAdmin$1 1.doInRabbit(RabbitAdmin.java:295)
        at org.springframework.amqp.rabbit.core.RabbitTemplat e.execute(RabbitTemplate.java:309)
        ... 11 more
        Jun 24, 2011 9:15:14 AM org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er run
        WARNING: Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset
        Jun 24, 2011 9:15:14 AM org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er run
        INFO: Restarting Consumer: tag=[amq.ctag-9XWG7IfLFLgKJagAuOIfAQ==], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), acknowledgeMode=AUTO local queue size=0

        Comment

        Working...
        X