Announcement Announcement Module
Collapse
No announcement yet.
Seeing request instead of the response on outbound-gateway / inbound-gateway call Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Seeing request instead of the response on outbound-gateway / inbound-gateway call

    I have what I thought would be a mundane task where I make a synchronous call from a client app via gateway->amqp outbound-gateway, receive on a server app via amqp inbound-gateway, return the response via a durable reply queue defined on the client outbound-gateway.

    Everything works except the reply on the client side is the original request, not the response from the server.

    The client request makes it to the server service-activator:

    2013-06-03 13:02:36,633 INFO com.mycompany.ghoti.cwrep.handlers.ReportGenerator - api report request [{errorChannel=org.springframework.integration.core .MessagingTemplate$TemporaryReplyChannel@f420ca, content-type=text/plain, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@f420ca, amqp_replyTo=cw.cwrep.apireport.reply.queue, amqp_springReplyCorrelation=6f6b80ea-b4ea-43c1-bec3-fdfcc4c37f7c, amqp_receivedExchange=cwrepapi.exchange, amqp_contentEncoding=UTF-8, amqp_redelivered=false, history=apireport.request.channel, id=ad123a03-973a-45f5-9913-f080d54a0380, timestamp=1370286156633, amqp_deliveryMode=PERSISTENT, amqp_deliveryTag=42}]: {"id":"-6944708661944251444","method":"report:getReport"," params":["2633e96f-285f-476d-8703-b60f4b3f9e1b","f6a09844-0ba1-4868-a444-2720e2598d93"]}
    I can see the server response in the wire-tap on the server side:

    2013-06-03 13:02:36,633 DEBUG com.mycompany.foobar.cwrep.util.MessageDumper - MessageDumper: [{errorChannel=org.springframework.integration.core .MessagingTemplate$TemporaryReplyChannel@f420ca, content-type=text/plain, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@f420ca, amqp_replyTo=cw.cwrep.apireport.reply.queue, amqp_springReplyCorrelation=6f6b80ea-b4ea-43c1-bec3-fdfcc4c37f7c, amqp_receivedExchange=cwrepapi.exchange, amqp_contentEncoding=UTF-8, amqp_redelivered=false, history=apireport.request.channel,apireport.reply. channel,dumper.channel, id=3cdcc7c1-74fd-48aa-8589-51bf5be49ef5, timestamp=1370286156633, amqp_deliveryMode=PERSISTENT, amqp_deliveryTag=42}] {"id":"-6944708661944251444","result":{"data here"}}
    This response never makes it back to the client calling function, instead, I see returned the original request:
    2013-06-03 15:10:05,770 DEBUG com.mycompany.ghoti.cwapi.handlers.MessageDumper - MessageDumper.dumpNoReply: [{timestamp=1370293805770, id=67b2e735-54c7-49ab-aea1-b19238c8b2bc, history=redisReportApiRequestorGateway,cwrep.apire port.request.channel,amqpCwrepApiRequestGateway,cw rep.apireport.reply.channel,dumper.channel, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@1f4261fe, content-type=text/plain, amqp_deliveryMode=PERSISTENT, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@1f4261fe, amqp_replyTo=cw.cwrep.apireport.reply.queue, amqp_receivedExchange=cwrepapi.exchange, amqp_contentEncoding=UTF-8, amqp_deliveryTag=22, amqp_redelivered=false}] {"id":"-2772915484581351814","method":"report:getReport"," params":["2e383eb1-0fdb-4d8c-a187-6536c01d2c0b","f6a09844-0ba1-4868-a444-2720e2598d93"]}
    I am obviously missing some little piece that returns the server response

    client-side Java call:

    Code:
        Message<String> response = redisApiRequestorIntfc.requestApiReport(payload);
    client-side configuration:

    Code:
        <int:channel id="cwrep.apireport.reply.channel">
            <int:interceptors>
                <int:wire-tap channel="dumper.channel"/>
            </int:interceptors>
        </int:channel>
        <int:channel id="cwrep.apireport.request.channel"/>
    
    	
    	<rabbit:queue name="cw.cwrep.apireport.request.queue" durable="true"/>
        <rabbit:queue name="cw.cwrep.apireport.reply.queue" durable="true"/>
    
    	<rabbit:direct-exchange name="cwrepapi.exchange" durable="true">
            <rabbit:bindings>
                <rabbit:binding queue="cw.cwrep.apireport.request.queue"/>
                <rabbit:binding queue="cw.cwrep.apireport.reply.queue"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <rabbit:template id="amqpTemplateCwrepApiRequest" connection-factory="rabbitConnectionFactory"
                         queue="cw.cwrep.apireport.request.queue" reply-queue="cw.cwrep.apireport.reply.queue"
                         exchange="cwrepapi.exchange">
            <rabbit:reply-listener acknowledge="auto" concurrency="5" error-handler="cwAmqpErrorHandler"
                                   requeue-rejected="false"/>
        </rabbit:template>
    
        <int:gateway id="redisReportApiRequestorGateway" default-request-channel="cwrep.apireport.request.channel"
                     error-channel="logger.channel" default-reply-channel="cwrep.apireport.reply.channel"
                     default-reply-timeout="3000" default-request-timeout="3000"
                     service-interface="com.mycompany.foobar.cwapi.handlers.RedisApiRequestorIntfc"/>
    
    
        <amqp:outbound-gateway id="amqpCwrepApiRequestGateway"
                               exchange-name="cwrepapi.exchange" request-channel="cwrep.apireport.request.channel"
                               reply-channel="cwrep.apireport.reply.channel" mapped-reply-headers="cwapiApiReply"
                               amqp-template="amqpTemplateCwrepApiRequest"/>
    server-side configuration:

    Code:
        <int:channel id="apireport.request.channel"/>
        <int:channel id="apireport.reply.channel">
            <int:interceptors>
                <int:wire-tap channel="dumper.channel"/>
            </int:interceptors>
        </int:channel>
    
        <rabbit:queue name="cw.cwrep.apireport.request.queue" durable="true"/>
        <rabbit:queue name="cw.cwrep.apireport.reply.queue" durable="true"/>
    
        <rabbit:direct-exchange name="cwrepapi.exchange" durable="true">
            <rabbit:bindings>
                <rabbit:binding queue="cw.cwrep.apireport.request.queue"/>
                <rabbit:binding queue="cw.cwrep.apireport.reply.queue"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
       
        <amqp:inbound-gateway request-channel="apireport.request.channel" error-channel="dumper.channel"
                              reply-channel="apireport.reply.channel"
                              connection-factory="rabbitConnectionFactory" queue-names="cw.cwrep.apireport.request.queue"/>
    
        <int:service-activator input-channel="apireport.request.channel" requires-reply="true"
                               output-channel="apireport.reply.channel"
                               ref="reportGenerator" method="handleApi"/>
    server-side Java service-activator ReportGenerator.handleApi():

    Code:
        return MessageBuilder.withPayload(String.format("{\"id\":\"%s\",\"result\":{\"data here\"}}",
                        id)).copyHeaders(headers).build();
    What am I missing in this setup that prevents the returning response from getting back to the client?

    In particular, what I am trying to do is to have a synchronous call via amqp gateways which returns the reply on a durable queue so the reply queue does not have to be constantly created and destroyed for every call.
    Last edited by Gary Russell; Jun 3rd, 2013, 05:54 PM.

  • #2
    I just saw a thread that might address this a few before this one. I will try the suggestions there first. Sorry for the noise (unless I still can't get it working).

    Comment


    • #3
      That doesn't make sense. Are you testing both client and server in the same application context? If so, is dumper.channel a pub-sub channel? (both dumpers would get the same message if it was).

      That said, your third log entry is a couple of hours later so it looks like these logs aren't from the same test.

      Rather than using WireTaps, can you simply run with DEBUG logging for org.springframework.integration? (You'll see PreSend and PostSend logs on all channels and it will be much easier to figure out what's going on).

      Comment


      • #4
        I removed the WireTaps and turned on the DEBUG logging in SI.

        The client and server are separate web apps, but are running in development on the same Tomcat instance, which I assume would put them into separate application contexts. I did run the tests multiple times and just grabbed example log output, hence the time discrepancies.

        To make things more consistent, I made the above changes and ran the test pulling from corresponding log output for the client and server webapps.

        Client log output:

        2013-06-03 17:25:45,067 DEBUG org.springframework.integration.amqp.outbound.Amqp OutboundEndpoint - org.springframework.integration.amqp.outbound.Amqp Out
        boundEndpoint#9 received message: [Payload={"id":"8692946984224037181","method":"repo rt:getReport","params":["859edc34-f7d7-499d-ad26-96ae7a73b2ff","f6a
        09844-0ba1-4868-a444-2720e2598d93"]}][Headers={timestamp=1370301945067, id=005aed54-03d6-46a2-8c6b-55db40593bb4, history=redisReportApiRequestorGateway,
        cwrep.apireport.request.channel, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@43696df3, replyChannel=org.sp
        ringframework.integration.core.MessagingTemplate$T emporaryReplyChannel@43696df3}]
        .... truncated for submission limit ....
        'org.springframework.integration.amqp.outboun
        d.AmqpOutboundEndpoint#9' sending reply Message: [Payload={"id":"8692946984224037181","method":"repo rt:getReport","params":["859edc34-f7d7-499d-ad26-96a
        e7a73b2ff","f6a09844-0ba1-4868-a444-2720e2598d93"]}][Headers={timestamp=1370301945068, id=d2222273-fcf6-44b4-9703-0450cdbd47a2, history=redisReportApiRe
        questorGateway,cwrep.apireport.request.channel,amq pCwrepApiRequestGateway, errorChannel=org.springframework.integration.core. MessagingTemplate$Temporary
        ReplyChannel@43696df3, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@43696df3}]
        2013-06-03 17:25:45,068 DEBUG org.springframework.integration.channel.DirectChan nel - preSend on channel 'cwrep.apireport.reply.channel', message: [Payl
        oad={"id":"8692946984224037181","method":"report:g etReport","params":["859edc34-f7d7-499d-ad26-96ae7a73b2ff","f6a09844-0ba1-4868-a444-2720e2598d93"]}][H
        eaders={timestamp=1370301945068, id=de6c9dbe-e6c8-46ff-9b26-d6c8c925527f, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryR
        eplyChannel@43696df3, history=redisReportApiRequestorGateway,cwrep.apire port.request.channel,amqpCwrepApiRequestGateway,cw rep.apireport.reply.channel, r
        eplyChannel=org.springframework.integration.core.M essagingTemplate$TemporaryReplyChannel@43696df3}]
        .... truncated for submission limit ....
        2013-06-03 17:25:45,068 DEBUG org.springframework.integration.channel.DirectChan nel - postSend (sent=true) on channel 'cwrep.apireport.reply.channel', m
        essage: [Payload={"id":"8692946984224037181","method":"repo rt:getReport","params":["859edc34-f7d7-499d-ad26-96ae7a73b2ff","f6a09844-0ba1-4868-a444-2720e2598d93"]}][Headers={timestamp=1370301945068, id=de6c9dbe-e6c8-46ff-9b26-d6c8c925527f, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@43696df3, history=redisReportApiRequestorGateway,cwrep.apire port.request.channel,amqpCwrepApiRequestGateway,cw rep.apireport.reply.channel, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@43696df3}]
        2013-06-03 17:25:45,068 DEBUG org.springframework.integration.channel.DirectChan nel - postSend (sent=true) on channel 'cwrep.apireport.request.channel', message: [Payload={"id":"8692946984224037181","method":"repo rt:getReport","params":["859edc34-f7d7-499d-ad26-96ae7a73b2ff","f6a09844-0ba1-4868-a444-2720e2598d93"]}][Headers={timestamp=1370301945067, id=005aed54-03d6-46a2-8c6b-55db40593bb4, history=redisReportApiRequestorGateway,cwrep.apire port.request.channel, errorChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@43696df3, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@43696df3}]
        Corresponding server log output:

        2013-06-03 17:25:45,067 DEBUG org.springframework.integration.amqp.support.Defau ltAmqpHeaderMapper - headerName=[content-type] WILL be mapped, matched pattern=content-type
        .... truncating for submission limit....
        2013-06-03 17:25:45,068 DEBUG org.springframework.integration.channel.DirectChan nel - preSend on channel 'apireport.request.channel', message: [Payload={"id":"8692946984224037181","method":"repo rt:getReport","params":["859edc34-f7d7-499d-ad26-96ae7a73b2ff","f6a09844-0ba1-4868-a444-2720e2598d93"]}][Headers={errorChannel=org.springframework.integrat ion.core.MessagingTemplate$TemporaryReplyChannel@3 eb18f18, content-type=text/plain, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@3eb18f18, amqp_replyTo=cw.cwrep.apireport.reply.queue, amqp_correlationId=[B@7b2990cf, amqp_receivedExchange=cwrepapi.exchange, amqp_contentEncoding=UTF-8, amqp_redelivered=false, history=apireport.request.channel, id=e81d769d-9f77-4895-8b02-d457d211524f, timestamp=1370301945068, amqp_deliveryMode=PERSISTENT, amqp_deliveryTag=3}]
        .... truncated for submission limit ....
        2013-06-03 17:25:45,068 INFO com.mycompany.cwrep.handlers.ReportGenerator - api report request [{errorChannel=org.springframework.integration.core .MessagingTemplate$TemporaryReplyChannel@3eb18f18, content-type=text/plain, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@3eb18f18, amqp_replyTo=cw.cwrep.apireport.reply.queue, amqp_correlationId=[B@7b2990cf, amqp_receivedExchange=cwrepapi.exchange, amqp_contentEncoding=UTF-8, amqp_redelivered=false, history=apireport.request.channel, id=e81d769d-9f77-4895-8b02-d457d211524f, timestamp=1370301945068, amqp_deliveryMode=PERSISTENT, amqp_deliveryTag=3}]: {"id":"8692946984224037181","method":"report:getRe port","params":["859edc34-f7d7-499d-ad26-96ae7a73b2ff","f6a09844-0ba1-4868-a444-2720e2598d93"]}
        2013-06-03 17:25:45,069 DEBUG org.springframework.integration.handler.ServiceAct ivatingHandler - handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvo kingMessageProcessor@b4b5885]' sending reply Message: [Payload={"id":"8692946984224037181","result":{"dat a here"}}][Headers={errorChannel=org.springframework.integrat ion.core.MessagingTemplate$TemporaryReplyChannel@3 eb18f18, content-type=text/plain, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@3eb18f18, amqp_replyTo=cw.cwrep.apireport.reply.queue, amqp_correlationId=[B@7b2990cf, amqp_receivedExchange=cwrepapi.exchange, amqp_contentEncoding=UTF-8, amqp_redelivered=false, history=apireport.request.channel, id=0de622ea-f3a9-498e-8576-3457b8d35413, timestamp=1370301945068, amqp_deliveryMode=PERSISTENT, amqp_deliveryTag=3}]
        2013-06-03 17:25:45,069 DEBUG org.springframework.integration.channel.DirectChan nel - preSend on channel 'apireport.reply.channel', message: [Payload={"id":"8692946984224037181","result":{"dat a here"}}][Headers={errorChannel=org.springframework.integrat ion.core.MessagingTemplate$TemporaryReplyChannel@3 eb18f18, content-type=text/plain, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@3eb18f18, amqp_replyTo=cw.cwrep.apireport.reply.queue, amqp_correlationId=[B@7b2990cf, amqp_receivedExchange=cwrepapi.exchange, amqp_contentEncoding=UTF-8, amqp_redelivered=false, history=apireport.request.channel,apireport.reply. channel, id=51bed25d-8604-413d-9738-d3b6fe83fa5b, timestamp=1370301945069, amqp_deliveryMode=PERSISTENT, amqp_deliveryTag=3}]
        2013-06-03 17:25:45,069 DEBUG org.springframework.integration.channel.DirectChan nel - postSend (sent=true) on channel 'apireport.reply.channel', message: [Payload={"id":"8692946984224037181","result":{"dat a here"}}][Headers={errorChannel=org.springframework.integrat ion.core.MessagingTemplate$TemporaryReplyChannel@3 eb18f18, content-type=text/plain, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@3eb18f18, amqp_replyTo=cw.cwrep.apireport.reply.queue, amqp_correlationId=[B@7b2990cf, amqp_receivedExchange=cwrepapi.exchange, amqp_contentEncoding=UTF-8, amqp_redelivered=false, history=apireport.request.channel,apireport.reply. channel, id=51bed25d-8604-413d-9738-d3b6fe83fa5b, timestamp=1370301945069, amqp_deliveryMode=PERSISTENT, amqp_deliveryTag=3}]
        2013-06-03 17:25:45,069 DEBUG org.springframework.integration.channel.DirectChan nel - postSend (sent=true) on channel 'apireport.request.channel', message: [Payload={"id":"8692946984224037181","method":"repo rt:getReport","params":["859edc34-f7d7-499d-ad26-96ae7a73b2ff","f6a09844-0ba1-4868-a444-2720e2598d93"]}][Headers={errorChannel=org.springframework.integrat ion.core.MessagingTemplate$TemporaryReplyChannel@3 eb18f18, content-type=text/plain, replyChannel=org.springframework.integration.core. MessagingTemplate$TemporaryReplyChannel@3eb18f18, amqp_replyTo=cw.cwrep.apireport.reply.queue, amqp_correlationId=[B@7b2990cf, amqp_receivedExchange=cwrepapi.exchange, amqp_contentEncoding=UTF-8, amqp_redelivered=false, history=apireport.request.channel, id=e81d769d-9f77-4895-8b02-d457d211524f, timestamp=1370301945068, amqp_deliveryMode=PERSISTENT, amqp_deliveryTag=3}]
        2013-06-03 17:25:45,069 DEBUG org.springframework.integration.amqp.support.Defau ltAmqpHeaderMapper - headerName=[history] WILL NOT be mapped

        Comment


        • #5
          Sorry for the long output, I trimmed some for the submission limit, hopefully did not remove critical sections.

          The thing I find interesting is that it appears the client amqp outbound-gateway prepares the reply as it sends the message to the server, not even waiting for the return from the server first. I assume that is the crux of the problem, and would be in my configuration. Am I reading that right?

          Comment


          • #6
            Yes; definitely looks like a Rabbit configuration issue - it looks like the outbound gateway is sending the message to itself.

            I need to get my head around your rabbit config, and why it would cause this, but at first glance, given you are using a direct exchange, it looks like you are missing the routing-key (= queue name) on the outbound gateway.

            Comment


            • #7
              I will check the routing-key out, however I reference a template (amqpTemplateCwrepApiRequest) and in the template I set the queue and the reply-queue, so that seems strange I would have to set it again. Seems to go against DRY.

              Comment


              • #8
                Hi!
                Please, read the JavaDocs of RabbitTemplate, to understand the difference between queue and routingKey properties.
                And that's why Gary suggests you to add routing-key attribute to the <amqp:outbound-gateway>.

                -Artem

                Comment


                • #9
                  The queue on the template is only used by the receive() methods. When sending, AMQP clients send to an exchange, not a queue. It just happens with DirectExchange that the routing key = the queue name.

                  Comment


                  • #10
                    Thank you, Gary. I think part of the problem is I took a working configuration (with temporary reply queue) and tried to modify the setup to use a durable reply queue. Most messaging in our setup uses asynchronous processing (to avoid delays waiting for replies) and during the benchmarking we found significant slowdowns using temporary queues, so we only use durable queues in these cases. For certain sections we are having to do synchronous calls, but we wanted to eliminate the temporary reply queues for this same reason.

                    Comment


                    • #11
                      Everything has been working great. Just thought I would update working configuration in case others have similar problem. I don't know if there are still questionable things about this, but something to be said for something that works.


                      Client config:

                      Code:
                          <int:channel id="cwrep.apireport.reply.channel"/>
                          <int:channel id="cwrep.apireport.request.channel"/>
                      
                          <rabbit:queue name="cw.cwrep.apireport.request.queue" durable="true"/>
                          <rabbit:queue name="cw.cwrep.apireport.reply.queue" durable="true"/>
                      
                          <rabbit:direct-exchange name="cwrepapi.exchange" durable="true">
                              <rabbit:bindings>
                                  <rabbit:binding queue="cw.cwrep.apireport.request.queue" key="cw.cwrep.apireport.request.queue"/>
                                  <rabbit:binding queue="cw.cwrep.apireport.reply.queue" key="cw.cwrep.apireport.reply.queue"/>
                              </rabbit:bindings>
                          </rabbit:direct-exchange>
                      
                          <rabbit:template id="amqpTemplateCwrepApiRequest" connection-factory="rabbitConnectionFactory"
                                           reply-queue="cw.cwrep.apireport.reply.queue"
                                           exchange="cwrepapi.exchange">
                              <rabbit:reply-listener/>
                          </rabbit:template>
                      
                          <int:gateway id="redisReportApiRequestorGateway" default-request-channel="cwrep.apireport.request.channel"
                                       error-channel="logger.channel" default-reply-channel="cwrep.apireport.reply.channel"
                                       default-reply-timeout="3000" default-request-timeout="3000"
                                       service-interface="com.mycompany.foobar.cwapi.handlers.RedisApiRequestorIntfc"/>
                      
                          <amqp:outbound-gateway id="amqpCwrepApiRequestGateway" routing-key="cw.cwrep.apireport.request.queue"
                                                 exchange-name="cwrepapi.exchange" request-channel="cwrep.apireport.request.channel"
                                                 reply-channel="cwrep.apireport.reply.channel"
                                                 amqp-template="amqpTemplateCwrepApiRequest"/>
                      Server config:

                      Code:
                          <int:channel id="apireport.request.channel"/>
                          <int:channel id="apireport.reply.channel">
                      
                          <rabbit:queue name="cw.cwrep.apireport.request.queue" durable="true"/>
                          <rabbit:queue name="cw.cwrep.apireport.reply.queue" durable="true"/>
                      
                          <rabbit:direct-exchange name="cwrepapi.exchange" durable="true">
                              <rabbit:bindings>
                                  <rabbit:binding queue="cw.cwrep.apireport.request.queue" key="cw.cwrep.apireport.request.queue"/>
                                  <rabbit:binding queue="cw.cwrep.apireport.reply.queue" key="cw.cwrep.apireport.reply.queue"/>
                              </rabbit:bindings>
                          </rabbit:direct-exchange>
                      
                          <int:service-activator input-channel="apireport.request.channel" requires-reply="true"
                                                 output-channel="apireport.reply.channel"
                                                 ref="apiReportGenerator" method="handleApi"/>

                      Comment


                      • #12
                        And thank you Gary for your great help.

                        Comment

                        Working...
                        X