Announcement Announcement Module
Collapse
No announcement yet.
Replacing Spring AMQP Request-Reply With Spring Integration Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Replacing Spring AMQP Request-Reply With Spring Integration

    All, I have a working application that currently uses Spring AMQP. When handling requests from the web tier, we use AMQP Template's sendAndReceive method which takes care of creating a temporary response queue and blocking for the response message to come back from RabbitMQ. I tried to replace the consumer side with Spring Integration using an InboundAdaper and InboundGateway (I tried both because it wasn't working properly for me and was experimenting). In both cases, the message was being properly pulled off the queue and handled. The issue was that Spring Integration would complain that no return channel was being provided -- the method processing the message returned a string. When I examined the incoming headers, I see that the AMQP reply-to property contains a valid value -- the temporary queue that the AMQP Template set up. Is this a supported scenario? Is Spring Integration supposed see the AMQP reply-to value can create some sort of dynamic channel? I tried configuring the AMQP Template so that instead of creating a temporary queue, it used a specific one and I bound my outbound channel to that queue. No dice. Still got the complaint about not having a reply channel. I'd love to let Spring Integration do some of the heavy lifting for me but I can't switch until I get request-reply to work properly. Any insight is appreciated.

    Thanks,
    Ron

  • #2
    I believe the "no reply channel" error you are seeing is not related to the AMQP "replyTo" property but rather to the local replyChannel header on the Message when it reaches the end of a pipeline (where a response has been produced). Could you provide an excerpt of your configuration on the consuming ("inbound") side?

    Comment


    • #3
      The fact you are saying return "channel" and not "reply queue" tends to make me think you have a Spring Integration configuration issue. The inbound gateway does use the replyTo from the AMQP headers when sending it's reply.

      If you can show your SI config and/or a debug log/stack trace, we might have more insight.

      Comment


      • #4
        Thanks for the quick replies. I've attached the relevant files for your examination. I'm hoping it is something silly that I'm doing in my configuration.

        Thanks,
        Ron

        Comment


        • #5
          Change

          Code:
          <int-amqp:channel id="pingChannel" queue-name="ping-queue" encoding="UTF-8" connection-factory="connectionFactory" error-handler="errorHandler"/>
          to

          Code:
          <int:channel id="pingChannel" />
          The way you have it now, you are sending the inbound gateway request back out to AMQP (an amqp:channel is a channel backed by an amqp queue).

          Aside from the fact you really don't want to send it back out to AMQP, this process causes the replyChannel header to be lost.

          With an in-memory <channel/>, the service activator will be invoked directly on the gateway listener's thread.

          Hope that helps.

          Comment


          • #6
            Thank you for your reply. The configuration I had to use was this one, due to the fact that our AMQP connection factory bean is named "connectionFactory" and not "rabbitConnectionFactory":

            <int-amqp:channel id="pingChannel" connection-factory="connectionFactory"/>

            Unfortunately, it does not seemed to have helped. I still get errors like this one:

            amqp_replyTo = amq.gen-wVaqyFT4Qf7r0JqkL_vDVr
            amqp_contentType = UNKNOWN
            amqp_receivedExchange = ping-exchange
            amqp_contentEncoding = UTF-8
            amqp_redelivered = false
            amqp_messageId = 3D675262
            id = fb42346b-5a65-4708-9501-fc165341fa00
            timestamp = 1348233123838
            amqp_receivedRoutingKey = service.ping
            amqp_deliveryMode = NON_PERSISTENT
            amqp_timestamp = Fri Sep 21 09:12:03 EDT 2012
            amqp_deliveryTag = 1
            spring_return_correlation = a6aa4e23-5caa-4b2f-948d-83bb01053fab
            message-type = NORMAL
            W SimpleMessageListenerContainer Execution of Rabbit message listener failed, and no ErrorHandler has been set. org.springframework.amqp.rabbit.listener.ListenerE xecutionFailedException: Listener threw exception
            at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.wrapToListenerExecutionFai ledExceptionIfNeeded(AbstractMessageListenerContai ner.java:638)
            at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.doInvokeListener(AbstractM essageListenerContainer.java:577)
            Caused by: org.springframework.integration.MessagingException : Failure occured in AMQP listener while attempting to convert and dispatch Message.
            at org.springframework.integration.amqp.channel.Abstr actSubscribableAmqpChannel$DispatchingMessageListe ner.onMessage(AbstractSubscribableAmqpChannel.java :157)
            at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.doInvokeListener(AbstractM essageListenerContainer.java:575)
            Caused by: org.springframework.integration.support.channel.Ch annelResolutionException: no output-channel or replyChannel header available

            I dump the incoming message headers and it appears that the reply-to queue from Spring AMQP is getting sent but not converted to an outbound channel that can send the response to the temporary queue that the client is waiting on. Forgive my ignorance, but I am curious about your statement "you really don't want to send it back out to AMQP". Ultimately, the caller is waiting on the temporary response queue that the Spring AMQP template created as part of the sendAndReceive call so don't I want to send the response message back out via AMQP? I noticed that when I hovered over the int-amqp:inbound-gateway tag in IntelliJ, it popped up "Configures a gateway that will receive AMQP Messages sent to a given queue and then forward those messages to a Message Channel. If a reply Message is returned, it will also send that to the 'replyTo' provide by the AMQP request Message." This got me to thinking that my simple service interface should return a Message<String> instead of String. Unfortunately, this didn't seem to make a difference. Any other suggestions you have are appreciated.

            Thanks,
            Ron

            Comment


            • #7
              I think you missed the crux of my point; you need to change

              Code:
              <int-amqp:channel .../>
              to
              Code:
              <int:channel/>
              An <int-amqp:channel/> is a <channel/> that sends and receives from amqp.

              Comment


              • #8
                In other words...

                Code:
                    <int-amqp:inbound-gateway id="inboundGateway"
                                              request-channel="pingChannel"
                                              queue-names="ping-queue"
                                              connection-factory="connectionFactory"
                                              mapped-request-headers="*"
                                              mapped-reply-headers="*"
                                              error-handler="errorHandler"/>
                
                
                    <int:channel id="pingChannel" />
                
                    <int:service-activator id="pingServiceActivator"
                                           input-channel="pingChannel"
                                           requires-reply="true">
                        <bean class="com.transparent.symphone.adapter.inbound.amqp.ping.SimplePingService"/>
                    </int:service-activator>

                Comment


                • #9
                  Thanks! That seems to have done the trick. I obviously do not understand the nuances between the two channel types and would love a quick explanation. I had a minor error in my XML that prevented me from trying the solution out of the box (I was missing some location information). For those of you who may be interested, here is the final working configuration:

                  Code:
                  <beans xmlns="http://www.springframework.org/schema/beans"
                         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                         xmlns:int="http://www.springframework.org/schema/integration"
                         xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
                         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                                             http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                                             http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd">
                  
                  
                      <int:channel id="pingChannel"/>
                  
                  
                      <int-amqp:inbound-gateway id="inboundGateway"
                                                request-channel="pingChannel"
                                                queue-names="ping-queue"
                                                connection-factory="connectionFactory"
                                                mapped-request-headers="*"
                                                mapped-reply-headers="*"
                                                error-handler="errorHandler"/>
                  
                  
                      <int:service-activator id="pingServiceActivator"
                                             input-channel="pingChannel"
                                             requires-reply="true">
                          <bean class="com.transparent.symphone.adapter.inbound.amqp.ping.SimplePingService"/>
                      </int:service-activator>
                  </beans>
                  Thank you for all the help.

                  Ron

                  Comment


                  • #10
                    would love a quick explanation
                    <int:channel/> is simply a "connector" between the amqp inbound gateway and the service activator.

                    In this case, the connector is a DirectChannel, which means the service method is called directly on the thread that receives the amqp message.

                    You could declare it thus:
                    Code:
                    <int:channel id="pingChannel">
                        <queue/>
                    <int:channel>
                    which would use a QueueChannel (and you'd need a poller on the service activator to "poll" the queue for messages).

                    A third option is
                    Code:
                    <int:channel id="pingChannel">
                        <dispatcher task-executor="myTaskExecutor" />
                    <int:channel>
                    In this case the inbound adapter thread hands off the work to another thread (supplied by the task-executor).


                    In all three cases, this is all done entirely in memory.


                    Now, let's say you have an application that gets a request, sends it to a QueueChannel, and it's consumed by some service.

                    Code:
                    <int-http:inbound-adapter ...>
                    
                    <int:channel id="foo">
                        <queue />
                    </int:channel>
                    
                    <int:service ... />
                    The problem with this is you could have messages in the QueueChannel which would be lost if the server crashes.

                    In this case, you might decide to replace the channel with an <int-amqp:channel .../> because then, the messages will be persisted to an amqp queue, and therefore won't be lost with a server crash.

                    Hope that helps.

                    Comment

                    Working...
                    X