Announcement Announcement Module
Collapse
No announcement yet.
Timeouts waiting for responses Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Timeouts waiting for responses

    Hello, I have a problem getting responses back from an async gateway.

    The gateway looks like this:

    Code:
    public interface JobServiceGateway {
        @Gateway(requestChannel="xmlTestProcessingRequestChannel")
        Future<String> sendMessageAndGetReply(String message) throws Exception;
    }
    The xml configuration is pretty basic, the essential parts being:

    (The server)

    Code:
    <int:channel id="xmlProcessingRequestChannel" />
    
    <int-amqp:inbound-gateway id="amqpInboundGateway"
        request-channel="xmlProcessingRequestChannel"
        connection-factory="connectionFactory"
        queue-names="myproduct.xmlProcessingRequestQueue"
        concurrent-consumers="3"
    />
    (The client)

    Code:
    <int:gateway id="jobServiceGateway" service-interface="com.myproduct.JobServiceGateway"/>
    
    <int:channel id="xmlTestProcessingRequestChannel"/>
    <int:channel id="pdfTestDeliveryChannel"/>
    
    <int-amqp:outbound-gateway
        request-channel="xmlTestProcessingRequestChannel"
        exchange-name="xml-request-delivery"
        routing-key-expression="'xml-request.number'"
        amqp-template="amqpTemplate"
    />
    
    ...Code for the xml-request-delivery exchange omitted...
    The problem is that if I begin pumping in tasks like this:

    Code:
    List<FutureTask> futures = new ArrayList<FutureTask>();
    for (int i=0; i<50; i++) {
        System.out.println(new Date()+ ": Sending message " + i);
        Future<String> response = jobServiceGateway.sendMessageAndGetReply("Hello Message #" + i);
        futures.add(new FutureTask(String.valueOf(i),response));
    }
    And then wait for the tasks to complete:

    Code:
    for (FutureTask future: futures) {
        try {
            System.out.println(new Date()+ "Getting response for " + future);
            System.out.println(new Date()+ "    Response = " + future.getFuture().get(3000, TimeUnit.MILLISECONDS));
        } catch (TimeoutException e) {
            System.err.println(new Date()+ "    TIMEOUT while waiting for " + future);
        }
    }
    What happens is the following:

    - The server receives all the tasks, 3 at a time (concurrent-consumers=3). I used a synthetic delay of 2 seconds processing each task, so each bunch of 3 tasks takes 2 seconds to complete. Then it gets 3 new tasks, processes them - etc.

    - The client receives responses to *some* tasks, while others time out. This happens even if I insert a Thread.sleep(total time to process 50 tasks, 2 seconds per task, 3 at a time) between sending the messages and starting my process-responses loop.

    What is happening to the responses that are generated, but somehow never received by the client?

    I use RabbitMQ as the message middleware, and in the web interface the queue becomes empty as soon as the last task has been processed, and before the "receive-loop" starts processing task responses. Where are the responses going when they disappear out of RabbitMQ?
    Last edited by mberg; May 21st, 2012, 06:28 AM.

  • #2
    Can you please try increasing the timeout on the "client" side adapter? The default is only 5 seconds.

    Thanks,
    Mark

    Comment


    • #3
      Also, this recent thread might be helpful: http://forum.springsource.org/showth...tion-over-amqp

      It's quite possible that you are experiencing the same thing. Please read through the posts there to see if you agree.

      Hope that helps.
      -Mark

      Comment


      • #4
        When you say "client side adapter", are you referring to my "int-amqp:outbound-gateway"?

        How do I set the timeout on the outbound gateway?

        The strange thing is that when i do a future.get(3000, TimeUnit.MILLISECONDS) it actually waits for 3 seconds before it tells me the request timed out. According to the server, the message was already processed. So where did the response go?

        PS... On a related note, I read somewhere else that there is a known issue with the inbound-gateway not exposing the replyTimeout property, so I'm doing this on the server:

        Code:
        AmqpInboundGateway amqpInboundGateway = spring.getBean("amqpInboundGateway",AmqpInboundGateway.class);
        amqpInboundGateway.setReplyTimeout(4000);
        For a single, non-async (i.e. non-"futurized" gateway) I had to do this if my message took longer than 1000 ms to process.
        Last edited by mberg; May 21st, 2012, 07:05 AM.

        Comment


        • #5
          How do I set the timeout on the outbound gateway?
          Code:
          <int-amqp:outbound-gateway
              request-channel="xmlTestProcessingRequestChannel"
              exchange-name="xml-request-delivery"
              routing-key-expression="'xml-request.number'"
              amqp-template="amqpTemplate"
          />
          See post #7 in the thread Mark referred you too - it's an attribute on the amqpTemplate that's referenced by the outbound gateway.

          Comment


          • #6
            Hello Mark, thanks for the reference. You were right, by adding a "reply-timeout" to my rabbit:template tag, and setting it to the worst case scenario, my requests were all processed without problems. I didn't try that because for some reason eclipse is not showing reply-timeout to be a valid attribute on the rabbit:template tag. Oh well.

            So what is happening is that my service is generating a reply, but my client times out before it can read it.

            I'm wondering about the temporary queues that is created in RabbitMQ. How long do they live for? If the default time-to-live for the temporary amqp reply queues is less than my rabbit reply-timeout value, what will happen then?

            Comment


            • #7
              Each temporary queue should live until a reply Message is received or as long as the timeout, whichever is longer. Then, it is deleted.

              Are you actually witnessing something different?

              Comment


              • #8
                No, but I was worried that perhaps I needed to set another timeout value elsewhere :-)

                Would it be better to create a permanent reply channel and specifying that on the gateway method? It sounds costly to create a new queue for each message?

                Comment


                • #9
                  That's a new feature in spring-amqp 1.1.0 - I mentioned that in post#4 in the same thread.

                  Comment


                  • #10
                    Thanks for your reply Gary. I didn't notice the reply-queue attribute.

                    I'm a bit confused now. How does this reply-queue work together with the replyChannel annotation value on my gateway method?

                    Code:
                        ....
                        @Gateway(requestChannel="xmlTestProcessingRequestChannel", replyChannel="pdfTestDeliveryChannel")
                        Future<String> sendMessageAndGetReply(String message) throws Exception;
                        ....

                    Comment


                    • #11
                      You don't need a replyChannel on your gateway; because you don't have a reply-channel on your outbound gateway, the framework knows it needs to send the reply to the gateway.

                      The reply queue on the rabbit template is unrelated to the framework reply channel. The reply-queue is an AMQP (rabbit) Queue on which all replies are sent. This is to avoid the overhead of creating a new queue for each reply.

                      It is a new feature in the 1.1.0 release that came out last week.

                      Comment


                      • #12
                        Thanks Gary.

                        There is a replyTo message header, a reply-channel on the outbound gateway and a reply queue on the template.

                        I know there's probably not a much better way to do this, but it can be confusing to a newbie like myself.. :-)

                        What header parts are relevant with respect to message routing? I know replyTo stores the queue to which to send the reply to. But if that queue has a fixed name, how does the system know which client should pull the reply off the queue? I mean, if multiple clients are talking to my server, and the reply queue has a fixed name, how are messages routed out to each client?
                        Last edited by mberg; May 21st, 2012, 08:07 AM.

                        Comment


                        • #13
                          After adding the reply-queue header to my rabbit:template tag, I'm getting this error:

                          ERROR org.springframework.amqp.rabbit.core.RabbitTemplat e [SimpleAsyncTaskExecutor-1] RabbitTemplate - No correlation header in reply

                          No hits on google for this error ... perhaps you can help?

                          Comment


                          • #14
                            Oops; sorry; for this to work out of the box, you need Spring Integration 2.2.0.M1.

                            However, you can make it work with earlier versions by mapping the headers on the server side. If you look at debug logs on the server, you will see that some headers are being dropped; you need to add mappings for

                            spring_reply_correlation
                            spring_reply_to

                            Comment


                            • #15
                              Adding the mapping solved the problem. I guess everything is working as it should now, thanks for your help Gary and Mark :-)

                              Comment

                              Working...
                              X