Announcement Announcement Module
Collapse
No announcement yet.
Flow not passing to confirm-ack-channel on successful posting of message to queue Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Flow not passing to confirm-ack-channel on successful posting of message to queue

    I wanted to send response to a service based on whether the message is posted successfully to queue or not. Am using amqp:outbound-channel-adapter to sent to queue and also gave parameters 'confirm-ack-channel' and 'confirm-nack-channel' each mapped to a separate channel and both of them call different service activators for sending different response back.

    Below is the configuration.

    <amqp:outbound-channel-adapter id="updateOutboundAdapter" channel="updateChannel"
    amqp-template="rabbitTemplate" exchange-name="testExchange"
    routing-key="routingKey" confirm-ack-channel="successRespChannel" confirm-nack-channel="failRespChannel"/>


    <int:service-activator id="successResp" input-channel="successRespChannel" ref="responseToService" output-channel="responseIntermediateChannel">
    </int:service-activator>


    I have enabled publisher confirms on the connection factory as shown below.
    ​<bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection. CachingConnectionFactory" publisherConfirms = "true">
    <constructor-arg ref="rabbitConnectionSSLFactory" />
    </bean>


    Could you please help me understand how can i get this to working.

  • #2
    To enable publisher confirms, you need a "confirm-correlation-expression" so that you can correlate the confirmation to a specific message you sent. This would usually be something from the message, like "payload.invoiceNumber".

    We don't get anything back from RabbitMQ besides the confirmation so the adapter puts the value from the correlation expression into the payload of the confirmation message.

    I will create a JIRA issue - we should have given an error when you set a confirmation channel without an expression.

    Comment


    • Gary Russell
      Gary Russell commented
      Editing a comment
      Or you can use just "payload" if you want the complete payload that was sent.

  • #3
    Thanks a lot Gary. I will check this and let you know if i still have any issues.

    Comment


    • #4
      Hi Gary, Thanks for your inputs. Am able to invoke confirm-ack-channel up on successful submission to queue. however when the RabbitMQ is down or designated queue is not present, i was expecting 'amqp:outbound-channel-adapter' to invoke 'confirm-nack-channel', but it is not invoking confirm-nack-channel and it doesn't post any exception in the logs either. Please advice.

      Comment


      • #5
        Also, Am using spring spring ws for soap requests as shown below.

        <int-ws:inbound-gateway id="inboundWebserviceGateway" request-channel="requestChannel" reply-channel="responsechannel"
        marshaller="marshaller" unmarshaller="marshaller" reply-timeout="5000"/>

        ​Actual complete scenario is to accept soap request, take the request object and modify/add new params and put it in queue. In the success scenario, when I am able to successfully put to queue it is invoking confirm-ack-channel configured for 'amqp:outbound-channel-adapter', however even if i map the 'output-channel' to responsechannel for ws:inbound gateway. It is not sending the response back.

        Here is the configuration

        //WS payload rounter to accept soap request's

        <intayload-type-router id="payloadRouter" input-channel="requestChannel" default-output-channel="nullChannel" >
        <int:mapping type="com.vmware.domain.UpdateRequest" channel="updateTriggerChannel" />
        </intayload-type-router>

        <int:channel id="updateTriggerChannel"/>

        //Service Activator to update/add request object and prepare message to put in queue

        <int:service-activator input-channel="updateTriggerChannel" output-channel="updateChannel" ref="updateTriggerService" ></int:service-activator>

        <intublish-subscribe-channel id="updateChannel" >
        <int:interceptors><int:wire-tap channel="loggerChannel"/></int:interceptors>
        </intublish-subscribe-channel>

        //actual outbound to send to Queue
        <amqp:outbound-channel-adapter id="updateOutboundAdapter" channel="updateChannel"
        amqp-template="rabbitTemplate" exchange-name="testExchange"
        routing-key="routingKey" confirm-ack-channel="successRespChannel" confirm-nack-channel="failRespChannel"/>


        <int:channel id="successRespChannel"/>
        <int:channel id="failRespChannel"/>

        //Respective activators based on where the message is successfully sent or not

        <int:service-activator id="successUpdateTriggerActivator" input-channel="successRespChannel" ref="successRespActivator" output-channel="responseIntermediateChannel"></int:service-activator>
        <int:service-activator id="failUpdateTriggerFailureActivator" input-channel="failRespChannel" ref="failureRespActivator" output-channel="responseIntermediateChannel"></int:service-activator>

        //Send the response from service-activators to responsechannel of WS
        <int:recipient-list-router id="customResponseRouter" input-channel="responseIntermediateChannel"
        ignore-send-failures="true" timeout="500">
        <int:recipient channel="loggingResponseChannel"/>
        <int:recipient channel="responsechannel"/>
        </int:recipient-list-router>

        <int:channel id="responseIntermediateChannel"/>


        Please help me resolve these issues.

        Comment


        • #6
          I still don't see a confirm-correlation-expression in your outbound-channel-adapter configuration.

          After you fix that, I suggest you turn on DEBUG logging to trace the message flow.

          Comment


          • #7
            Hi Gary,

            Thanks a lot for your time and help. I had indeed gave the confirm-correlation-expression, however i copied the old one here. Apologies for the same.

            Correct Configured is given below
            <amqp:outbound-channel-adapter id="updateOutboundAdapter" channel="updateChannel"
            amqp-template="rabbitTemplate" confirm-correlation-expression="payload" exchange-name="testExchange"
            routing-key="routingKey" confirm-ack-channel="successRespChannel" confirm-nack-channel="failRespChannel" return-channel="failRespChannel"/>

            I have enabled the debug and found below things, Can you please help me how to get this corrected.

            Exception found is : org.springframework.integration.support.channel.Ch annelResolutionException: no output-channel or replyChannel header available

            I think after the outbound-channel-adapter is called, headers are overwritten with the new ones. hence when the response is put in reply-channel of int-ws:inbound-gateway it is not able to find replyChannel header.


            Headers before invoking amqp:outbound-channel-adapter

            [Headers={timestamp=1402123049571, id=d4dafbc2-e491-4554-96d1-58a96ca615a3, errorChannel=org.springframework.integration.core. [email protected], replyChannel=org.springframework.integration.core. MessagingTemplate$[email protected], msgtype=hourlyUpdate}]

            Headers after posting successfully to queue:

            [Headers={timestamp=1402123057247, id=57114b81-9c11-45e6-b99b-d894d3caf21f, amqp_publishConfirm=true}]

            Because of this when the response is sent to reply-channel , it is not able to fine the output channel and exception is thrown.

            Can you please let me know how can i retain these headers even after invoking amqp:outbound-channel-adapter.

            Comment


            • #8
              The message headers are not maintained automatically - the confirmation process was not intended to create a "reply" message for a gateway - just so your application can get a confirmation that a message was sent.

              That said, you can do what you want with a little more configuration...

              Use confirm-correlation-expression="#root" (this means save the whole messge, which will be the payload of the ack message).

              Change confirm-ack-channel to "successRespTransformChannel"

              Then add the following...

              Code:
              <int:chain input-channel="successRespTransformChannel" output-channel="successRespChannel">
                  <int:header-enricher>
                       <int:reply-reply-channel expression="payload.headers['replyChannel']" />
                  </int:header-enricher>
                  <int:transformer expression="payload.payload" />
              </int:chain>

              Comment


              • #9
                Hi Gary,

                I have made the requested changes and it works fine in success scenario. However when the rabbitmq is down, i was expecting it to call 'confirm-nack-channel'. However i am getting the error response back ws and it doesn't invoke this 'confirm-nack-channel' channel.

                Here is the configuration:

                outbound-channel-adapter configured:

                <amqp:outbound-channel-adapter id="updateOutboundAdapter" channel="updateChannel"
                amqp-template="rabbitTemplate" exchange-name="testExchange"
                routing-key="routingKey" confirm-correlation-expression="#root"
                confirm-ack-channel="successRespTransformChannel" confirm-nack-channel="failureRespTransformChannel"
                return-channel="failureRespTransformChannel" />


                <int:channel id="successRespTransformChannel">
                <int:interceptors><int:wire-tap channel="loggerChannel"/></int:interceptors>
                </int:channel>

                <int:chain input-channel="successRespTransformChannel" output-channel="successRespChannel">
                <int:header-enricher>
                <int:reply-channel expression="payload.headers['replyChannel']" />
                </int:header-enricher>
                <int:transformer expression="payload.payload" />
                </int:chain>

                <int:channel id="failureRespTransformChannel">
                <int:interceptors><int:wire-tap channel="loggerChannel"/></int:interceptors>
                </int:channel>

                <int:chain input-channel="failureRespTransformChannel" output-channel="failRespChannel">
                <int:header-enricher>
                <int:reply-channel expression="payload.headers['replyChannel']" />
                </int:header-enricher>
                <int:transformer expression="payload.payload" />
                </int:chain>

                In case when the rabbitmq is up and running, I get the success response back.

                However when the rabbitmq is down, i get error : java.net.ConnectException: Connection refused: connect

                Error from Logs :
                2014-06-09 12:44:07,912 WARN [http-8080-1] [org.springframework.integration.ws.MarshallingWebS erviceInboundGateway [240]] - failure occurred in gateway sendAndReceive
                org.springframework.integration.MessageHandlingExc eption: error occurred in message handler [org.springframework.integration.amqp.outbound.Amqp OutboundEndpoint#2]
                Caused by: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect

                RabbitMQ Configuration:

                <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection. CachingConnectionFactory"
                p:username="localhost" password="password" port="5672"
                p:virtualHost="/" publisherConfirms = "true" publisherReturns="true">
                <constructor-arg value="localhost" />
                </bean>

                <!-- A template for sending messages and performing other commands to RabbitMQ -->
                <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.Rabbit Template"
                p:connectionFactory-ref="rabbitConnectionFactory" p:mandatory="true"/>

                Please suggest.

                Comment


                • #10
                  No; publisher confirms are a function of the RabbitMQ server (they are returned to the gateway by the broker). Nacks mean the broker couldn't deliver the messages to the queue (e.g. missing exchange).

                  If rabbitMQ is completely down, you have to handle the error by adding an error-channel to inbound gateway - the payload will be a MessagingException, with two properties (failedMessage and cause). The error flow needs some indication of the error the caller - e.g. 'rabbit is not available' (or throw an appropriate exception).

                  Comment


                  • #11
                    Hi Gary,

                    We are using amqp:outbound-channel-adapter to put in queue, if rabbitmq is down i want to call a different service activator which send a failure response to ws. And amqp:outbound-channel-adapter doesn't have a error-channel attribute that can be configured.

                    Please let me know in which context i can use inbound-gateway for this functionality.

                    Thanks,
                    Srikanth

                    Comment


                    • #12
                      The error channel goes on whatever starts your flow (upstream of the outbound channel adapter). You need to show your upstream config.

                      Comment


                      • #13
                        Thanks a lot Gary for your timely help. I will try this and let you know if i still have any issues.

                        Comment

                        Working...
                        X