Announcement Announcement Module
Collapse
No announcement yet.
Unacked messages left in rabbitmq channel when using amqp outbound gateway Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Unacked messages left in rabbitmq channel when using amqp outbound gateway

    I have an application which is using several amqp outbound gateways.

    It is all working perfectly except for the fact that after every invocation of the gateway methods an unacked message is left in the rabbitmq channel that the app uses.

    I'm not sure whether this is a bug with the spring gateway implementation, or with rabbit or whether I am doing something wrong.

    As I said, there is nothing wrong with the operation of the gateway other than these unacked messages however I'm worried that after the app has been running for a while the rabbit node will eventually fail because of the number of unacked messages.

    I am using Spring 3.1.2 with Spring integration 2.1.3 and Spring-amqp 1.1.1, the rabbit client and server are both 2.8.4

    I think these are all the latest versions.

    Any suggestions would be appreciated.

    Here is a example of one of the gateways (package names changed and interface simplified).

    Code:
        <rabbit:connection-factory id="connectionFactory" 
                                   username="${rabbitmq.username}" 
                                   password="${rabbitmq.passwd}" 
                                   addresses="${rabbitmq.addresses}"/>
        <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
        <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" reply-timeout="120000"/>
        <int:gateway id="customersService" service-interface="mypackage.services.ICustomers"
                     default-request-channel="customersChannel" default-reply-channel="customersReplyChannel"
                     default-request-timeout="120000" default-reply-timeout="120000"/>
        <int:channel id="customersChannel"/>
        <int-amqp:outbound-gateway id="customersRabbitGateway" 
                                   request-channel="customersChannel"
                                   reply-channel="customersEncodedReplyChannel"
                                   amqp-template="rabbitTemplate"
                                   exchange-name="my.exchange"
                                   routing-key="customers.request"
        />
        <int:channel id="customersEncodedReplyChannel"/>
        <int:json-to-object-transformer input-channel="customersEncodedReplyChannel" 
                                        output-channel="customersReplyChannel" 
                                        type="mypackage.entities.Customers"
        />
        <int:channel id="customersReplyChannel"/>
    My service interface:

    Code:
    public interface ICustomers {
    	
    	public Customers getCustomers(String pattern);
    	
    }

  • #2
    I am confused; the acks are the responsibility of the consumer, not the sender.

    Is the service (the one that receives the pattern and returns 'Customers') a Spring Integration application? It is that application that is responsible for acking the messages.

    Comment


    • #3
      I'm not sure at the moment whether the unacked message relates to the outbound message or the incoming reply, both of which I assume travel in and out on the same channel. I had assumed it must be the reply that wasn't getting acked, but it could well be the other end.

      So, in answer to your question, yes the other end is a spring integration application and uses a reciprocal inbound-gateway the config for which is configured as follows.

      Code:
       <rabbit:connection-factory id="connectionFactory" 
                                     username="${rabbitmq.username}" 
                                     password="${rabbitmq.passwd}" 
                                     addresses="${rabbitmq.addresses}" />
          <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
          <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" reply-timeout="60000" />
      
          <rabbit:topic-exchange id="myexchange" name="my.exchange">
              <rabbit:exchange-arguments>
                  <entry key="x-ha-policy" value="all"/>
              </rabbit:exchange-arguments>
              <rabbit:bindings>
                  <rabbit:binding queue="rabbitInboundQueue" pattern="#" />
              </rabbit:bindings>
          </rabbit:topic-exchange>
          <rabbit:queue id="rabbitInboundQueue"/>
          
          <int-amqp:inbound-gateway id="rabbitInboundGateway"
                                    queue-names="#{rabbitInboundQueue.getName()}"
                                    request-channel="rabbitInboundChannel"
                                    connection-factory="connectionFactory"
                                    error-channel="errorChannel"
                                    />
                                    
          <int:chain input-channel="rabbitInboundChannel">
              <int:transformer expression="new String(payload)" />
              <int:header-value-router header-name="amqp_receivedRoutingKey" default-output-channel="nullChannel">
                  <int:mapping value="customers.request" channel="customersInboundChannel" />
                  <int:mapping value="engineers.request" channel="engineersInboundChannel" />                       
                  <int:mapping value="tasks.request" channel="tasksInboundChannel" />                       
              </int:header-value-router>
          </int:chain>
                                                                                          
          <int:chain input-channel="tasksInboundChannel">
              <int:service-activator ref="engineerTasksServiceActivator"/>
              <int:object-to-json-transformer/>
          </int:chain>
          
          <int:chain input-channel="engineersInboundChannel">
              <int:service-activator ref="engineersServiceActivator"/>
              <int:object-to-json-transformer/>
          </int:chain>
          
          <int:chain input-channel="customersInboundChannel">
              <int:service-activator ref="customersServiceActivator"/>
              <int:object-to-json-transformer />
          </int:chain>

      The customersServiceActivator is an implementation of the ICustomers interface, so returns a Customers object to the reply channel.

      There is no unacked message left in the rabbitmq channel that the server uses, it's just the clients channel that has it.

      The server uses the same versions of Spring, spring-amqp and amqp-client as the client app.

      I hope that clarifies things.

      Comment


      • #4
        Well, with your configuration, a new (temporary, autodelete) reply queue is created for each request; I have run some tests and noticed we don't ack the reply before canceling the consumer (which removes the queue).

        It does seem to be "just" an accounting issue, though, because the temporary queue itself is (auto)deleted when the consumer is canceled. The actual message no longer exists.

        Clearly, we should be acking (or auto-acking) these replies. I have opened a JIRA https://jira.springsource.org/browse/AMQP-249.


        However, since you are using spring-amqp 1.1.1, you might want to consider using a 'reply-queue'...

        http://static.springsource.org/sprin...#request-reply

        ...this is more efficient, and does not suffer from the unacked problem because it doesn't use the temporary queues, and the listener container for the replies acks the replies.

        Comment


        • #5
          OK, thanks for clearing that up.

          I have tried the "reply-queue" method but have problems getting it to work.

          I changed my rabbit template definition to the following:

          Code:
              <rabbit:queue id="replyQueue"/>
              <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" reply-timeout="120000" reply-queue="replyQueue">
                  <rabbit:reply-listener />
              </rabbit:template>
          but it fails on startup with:

          Code:
          Jul 13, 2012 9:28:36 AM org.springframework.amqp.rabbit.core.RabbitAdmin initialize
          WARNING: Auto-declaring a non-durable Queue (ee1dae78-ec5b-4357-a45d-ffe58265d704). It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
          Jul 13, 2012 9:28:36 AM org.springframework.amqp.rabbit.core.RabbitAdmin initialize
          WARNING: Auto-declaring an auto-delete Queue (ee1dae78-ec5b-4357-a45d-ffe58265d704). It will be deleted by the broker if not in use, and all messages will be lost.  Redeclared when the connection is closed and reopened.
          Jul 13, 2012 9:28:36 AM org.springframework.amqp.rabbit.core.RabbitAdmin initialize
          WARNING: Auto-declaring an exclusive Queue (ee1dae78-ec5b-4357-a45d-ffe58265d704). It cannot be accessed by consumers on another connection, and will be redeclared if the connection is reopened.
          Jul 13, 2012 9:28:37 AM org.springframework.amqp.rabbit.listener.BlockingQueueConsumer start
          Jul 13, 2012 9:28:36 AM org.springframework.amqp.rabbit.core.RabbitAdmin initialize
          WARNING: Auto-declaring an exclusive Queue (ee1dae78-ec5b-4357-a45d-ffe58265d704). It cannot be accessed by consumers on another connection, and will be redeclared if the connection is reopened.
          Jul 13, 2012 9:28:37 AM org.springframework.amqp.rabbit.listener.BlockingQueueConsumer start
          WARNING: Reconnect failed; retries left=2
          java.io.IOException
                  at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
                  at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
                  at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
                  at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:755)
                  at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
                  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
                  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                  at java.lang.reflect.Method.invoke(Method.java:616)
                  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:348)
                  at $Proxy690.queueDeclarePassive(Unknown Source)
                  at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:213)
                  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:516)
                  at java.lang.Thread.run(Thread.java:636)
          Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'replyQueue' in vhost '/', class-id=50, method-id=10), null, ""}
          As you can see a Queue called "ee1dae78-ec5b-4357-a45d-ffe58265d704" is declared but the listener uses the bean name as the queue name when it tries to consume.

          Have I got the wrong end of the stick here or is it a bug? The documentation is unclear as to how the reply queue itself should be configured

          Regards,
          AJ.

          Comment


          • #6
            Hmmm - I am not sure why it doesn't work with an anonymous queue; I'll look into it.

            Giving the queue an explicit name works for me...

            Code:
            <rabbit:queue name="replyQueue" />
            (Note name= Vs. id=)

            Comment


            • #7
              I deliberately didn't try using an explicit name because I am using the same configuration for multiple clients, and I am assuming that multiple clients consuming from the same reply queue would cause problems.

              I could use my own code to generate a unique named queue for each client, but I'd much rather use an anonymous queue and keep it simple.

              For now, I will live with the original problem, you have reassured me that it isn't a major issue, so I will wait until 1.1.2 is out and give it another try.

              Thanks for your help,

              Regards,
              AJ.

              Comment

              Working...
              X