Announcement Announcement Module
Collapse
No announcement yet.
Messages not being auto-acked Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Messages not being auto-acked

    I have yet another simple problem where for various reasons I had to change an existing, working direct queue to a fanout going to multiple servers. With trial and error I got the fanout working; I receive the messages just fine, but apparently the sender (or rabbitmq) thinks I have not ack'ed and it resends, to where I get a blizzard of messages that did not occur with the direct exchange.

    The code is slightly more complex than the original direct queue because I need the data going to multiple durable queues for when the machine goes down the data is left intact on the queue until it is up again (hence the hostname being tacked on to the queue name).

    One other tidbit is that I thought I could send to a fanout exchange without specifying the queue name, when I try to leave the queue name null on the producer, even though it is going to a fanout exchange it complains. I am wondering if this name is causing the problem, because none of the queues are named exactly that. The data _does_ get there, it just keeps getting resent.

    I tried adding the acknowledge-mode to AUTO, but still no joy.

    The following is my code and firehose trace output:


    Code:
    /* producer side */
            connection = connectionFactory.newConnection();
            channel    = connection.createChannel();
            channel.exchangeDeclare(exchangeName, "fanout", true, false, null);
    ...
            channel.basicPublish(exchangeName, queueName, null, payload.getBytes("UTF-8"));
    
    
    /* consumer side */
        <int:channel id="report.status.channel"/>
    
        <rabbit:queue id="statusQueue" name="cw.status.report.queue.#{T(java.net.InetAddress).getLocalHost().getHostName()}" durable="true"/>
    
        <rabbit:fanout-exchange name="${activity.status.exchange.name}" durable="true">
            <rabbit:bindings>
                <rabbit:binding queue="statusQueue"/>
            </rabbit:bindings>
        </rabbit:fanout-exchange>
     
        <amqp:inbound-channel-adapter channel="report.status.channel" acknowledge-mode="AUTO"
                                      queue-names="#{statusQueue.getName()}"
                                      connection-factory="rabbitConnectionFactory"/>
    
       <int:service-activator id="reportStatus" input-channel="report.status.channel"
                               ref="reportStatusHandler" method="handle">
       </int:service-activator>
    
    
    /* rabbitmq firehose output */   
    ================================================================================
    2012-1-30 12:58:59: Message received
    
    Node:         rabbit@gnuphie
    Exchange:     activity.status.exchange
    Queue:        cw.status.report.queue.gnuphie
    Routing keys: [<<"cw.status.report.queue">>]
    Properties:   []
    Payload: 
    {"id":"replace_with_jsonrpc_id","result":{"report_status":"REPSTATUS_CREATED","report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","userUUID":"2f75358c-6489-4038-9737-9a1572dfc898","response":{"report_progress":{"step":3,"totalSteps":9}}},"generate_report":{"report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","jsonrpcID":"cf6764d8-3608-494f-be37-59bbb250f93c","reportTarget":"web"}}
    
    ================================================================================
    2012-1-30 12:58:59: Message received
    
    Node:         rabbit@gnuphie
    Exchange:     activity.status.exchange
    Queue:        cw.status.report.queue.gnuphie
    Routing keys: [<<"cw.status.report.queue">>]
    Properties:   []
    Payload: 
    {"id":"replace_with_jsonrpc_id","result":{"report_status":"REPSTATUS_MISSINGORG","report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","userUUID":"2f75358c-6489-4038-9737-9a1572dfc898","response":{"report_progress":{"step":5,"totalSteps":9}}},"generate_report":{"report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","jsonrpcID":"cf6764d8-3608-494f-be37-59bbb250f93c","reportTarget":"web"}}
    
    ....
    zillions of these...

  • #2
    A clue for me (and hopefully for someone else to give me a hint) is I find loads of these in the log:

    Code:
    Execution of Rabbit message listener failed, and no ErrorHandler has been set: class org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
    I am not sure what the underlying error actually is; there are no other exceptions in the log. I am assuming this is a configuration error on my part that never actually get to my code before being thrown.

    I have not really touched my listener configuration (it worked with the direct exchange):


    Code:
        <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="auto"
                                   advice-chain="#{rabbitConfig.adviceChain}" concurrency="#{rabbitConfig.concurrent}"/>
    I am looking at the error-handler attribute for the listener right now. Will get back if I find something. My understanding was there was a default ErrorHandler created by spring-integration, but maybe I can put something else in that will shed a little more light.

    Comment


    • #3
      Well I created some highly complex code:


      Code:
      /**
       * Handle amqp errors in the amqp listener.
       *
       * @author         richardd    
       */
      @Component
      public class CwAmqpErrorHandler implements ErrorHandler {
      
          /** Field description */
          private final static Logger logger = LoggerFactory.getLogger(CwAmqpErrorHandler.class);
      
          //~--- methods ------------------------------------------------------------
      
          /**
           * Handle the given error, possibly rethrowing it as a fatal exception
           *
           * @param t
           */
          @Override
          public void handleError(Throwable t) {
              logger.error("unexpected messaging error", t);
          }
      }
      and added this in my config of the inbound adapter:

      Code:
          <amqp:inbound-channel-adapter channel="report.status.channel" acknowledge-mode="AUTO"
                                        queue-names="#{statusQueue.getName()}" error-handler="cwAmqpErrorHandler"
                                        connection-factory="rabbitConnectionFactory"/>
      Which dumped out some extremely verbose stack traces that told me I had a constraint violation of some hibernate stuff. Thank you for a very public forum to showcase my stupidity.

      Comment


      • #4
        One other note on this, in case someone else runs into a similar problem, is the reason it worked with the direct and not the fanout was because in the direct case only one system was updating a particular records in the shared database, whereas the fanout was sending to several which were attempting to update simultaneously. The actual record of interest was not the same on the different systems, but a related record in the database schema was the same across the systems and the update attempt was conflicting.

        Comment


        • #5
          could you please provide your reportStatusHandler code

          Originally posted by gnuphie View Post
          I have yet another simple problem where for various reasons I had to change an existing, working direct queue to a fanout going to multiple servers. With trial and error I got the fanout working; I receive the messages just fine, but apparently the sender (or rabbitmq) thinks I have not ack'ed and it resends, to where I get a blizzard of messages that did not occur with the direct exchange.

          The code is slightly more complex than the original direct queue because I need the data going to multiple durable queues for when the machine goes down the data is left intact on the queue until it is up again (hence the hostname being tacked on to the queue name).

          One other tidbit is that I thought I could send to a fanout exchange without specifying the queue name, when I try to leave the queue name null on the producer, even though it is going to a fanout exchange it complains. I am wondering if this name is causing the problem, because none of the queues are named exactly that. The data _does_ get there, it just keeps getting resent.

          I tried adding the acknowledge-mode to AUTO, but still no joy.

          The following is my code and firehose trace output:


          Code:
          /* producer side */
                  connection = connectionFactory.newConnection();
                  channel    = connection.createChannel();
                  channel.exchangeDeclare(exchangeName, "fanout", true, false, null);
          ...
                  channel.basicPublish(exchangeName, queueName, null, payload.getBytes("UTF-8"));
          
          
          /* consumer side */
              <int:channel id="report.status.channel"/>
          
              <rabbit:queue id="statusQueue" name="cw.status.report.queue.#{T(java.net.InetAddress).getLocalHost().getHostName()}" durable="true"/>
          
              <rabbit:fanout-exchange name="${activity.status.exchange.name}" durable="true">
                  <rabbit:bindings>
                      <rabbit:binding queue="statusQueue"/>
                  </rabbit:bindings>
              </rabbit:fanout-exchange>
           
              <amqp:inbound-channel-adapter channel="report.status.channel" acknowledge-mode="AUTO"
                                            queue-names="#{statusQueue.getName()}"
                                            connection-factory="rabbitConnectionFactory"/>
          
             <int:service-activator id="reportStatus" input-channel="report.status.channel"
                                     ref="reportStatusHandler" method="handle">
             </int:service-activator>
          
          
          /* rabbitmq firehose output */   
          ================================================================================
          2012-1-30 12:58:59: Message received
          
          Node:         rabbit@gnuphie
          Exchange:     activity.status.exchange
          Queue:        cw.status.report.queue.gnuphie
          Routing keys: [<<"cw.status.report.queue">>]
          Properties:   []
          Payload: 
          {"id":"replace_with_jsonrpc_id","result":{"report_status":"REPSTATUS_CREATED","report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","userUUID":"2f75358c-6489-4038-9737-9a1572dfc898","response":{"report_progress":{"step":3,"totalSteps":9}}},"generate_report":{"report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","jsonrpcID":"cf6764d8-3608-494f-be37-59bbb250f93c","reportTarget":"web"}}
          
          ================================================================================
          2012-1-30 12:58:59: Message received
          
          Node:         rabbit@gnuphie
          Exchange:     activity.status.exchange
          Queue:        cw.status.report.queue.gnuphie
          Routing keys: [<<"cw.status.report.queue">>]
          Properties:   []
          Payload: 
          {"id":"replace_with_jsonrpc_id","result":{"report_status":"REPSTATUS_MISSINGORG","report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","userUUID":"2f75358c-6489-4038-9737-9a1572dfc898","response":{"report_progress":{"step":5,"totalSteps":9}}},"generate_report":{"report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","jsonrpcID":"cf6764d8-3608-494f-be37-59bbb250f93c","reportTarget":"web"}}
          
          ....
          zillions of these...



          could you please provide your reportStatusHandler code

          Comment

          Working...
          X