Announcement Announcement Module
No announcement yet.
losing amqp messages when db connection is lost Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • losing amqp messages when db connection is lost

    I'm using Spring Integration 2.1.4, Hibernate 3.6, and RabbitMQ. If my consumer starts with a valid connection to the Rabbit server, but fails to build a db connection pool for Hibernate, the service activator (implementing org.springframework.integration.core.MessageHandle r) will continue to pull messages off the queue and then discard them all. The Hibernate exception never reaches the handleMessage method so I cannot catch it. The Hibernate exception is thrown up via a proxy. Ideally, I'd like to catch a specific Hibernate exception and then call System.exit(1). How can I know I've lost my connection or failed to build the pool, and then kill the jvm? Imagine a scenario where I started my mq consumer with valid credentials to the Rabbit server but bad credentials to the database. Currently my mq consumer discards all the mq messages faster than I can blink. I cannot allow that on my production servers. What am I doing wrong? My handleMessage method is annotated with @Transactional and I'm using a HibernateTransactionManager. My amqp inbound channel adapter is transacted, acknowledge-mode is auto, and tx-size is 1. -Ben

  • #2
    You need to show your configuration but, in essence, you need to synchronize your AMQP transaction with the DB transaction by configuring the listener container with the JDBC transaction manager. That way, if the DB transaction fails to commit, so will the AMQP transaction.


    • #3

      Is your config like this:
      HTML Code:
      <amqp:inbound-channel-adapter queue-names="some.queue"
      Where 'transactionManager' is your 'HibernateTransactionManager'

      Take care,
      Last edited by Artem Bilan; Oct 25th, 2012, 02:41 AM. Reason: reformat sample


      • #4
        That was it! Thank you Gary and Cleric. As soon as I set the transaction manager on the amqp inbound channel adapter, it worked. I was missing a namespace too. But now the messages go back to the primary queue and get reprocessed over and over again. Which is no big deal. I'll deal with that later. At least I don't lose the messages. Cheers! -Benjamin


        • #5
          You can use a retry interceptor that takes some action (via a Recoverer) when retries are exhausted.

              <int-amqp:inbound-channel-adapter channel="fromRabbit" 
                                                connection-factory="connectionFactory" />
              <bean id="retryOperationsInterceptor" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
                  <property name="retryOperations">
                      <bean class="">
                          <property name="retryPolicy">
                              <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                                  <property name="maxAttempts" value="2" />
                  <property name="messageRecoverer">
                          <bean class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer" />
          If you leave out the recoverer, the message will simply be logged under WARN. The RejectAndDontRequeueRecoverer allows the container to reject the message and not requeue it; this allows, for example, the bad message to go to a DeadLetterExchange/Queue.

          You can omit the RetryTemplate if you want to go with the default retry policy (SimpleRetryPolicy with maxAttempts=3).

          The stateful retry interceptor needs messages to have a messageId property (so it can maintain state). There is also a Stateless interceptor that does the retries in memory, but stateful is usually best when using transactions.


          • #6
            Mr. Russell, thank you for your time and assistance. I added the advice-chain to my int-amqp:inbound-channel-adapter, after the transaction-manager attribute. My source queue has a DLX defined. I also added the retryOperationsInterceptor bean to my spring context file. My code compiles and runs but it still just continues to reprocess the same message over and over again at 7.8 times per second. Based on your previous, post in this thread, I expected the message to go to the DLX after 3 failed attempts. Was that expectation wrong? I have inspected my mq message and it has a UUID value for the "ID" header. Is that what you mean by messageId property? I also tried to set a "messageId" property with a value of the same UUID. That had no affect. I read this ( but I'm still not sure what you mean by "messageId property". Can you please elaborate or point me to documentation or an article? -Benjamin


            • #7
              If your inbound AMQP messages didn't have a messageid, you'd get an error...

              throw new FatalListenerExecutionException(
              		"Illegal null id in message. Failed to manage retry for message: " + message);
              If you are not getting that exception, you are fine.

              As you can see from the cited example, it works.

              Take a look at the reference documentation...


              If you don't see similar DEBUG logs, please attach your configuration and a DEBUG level log showing the repeated failures.