Announcement Announcement Module
No announcement yet.
Using publish-subscribe channels and http outbound gateway Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Using publish-subscribe channels and http outbound gateway

    Hi everyone,

    I am trying to build a simple app that will do the following:

    1. retrieve messages from channel 'sourceChannel'.
    2. The channel is defined as intublish-subscribe-channel.
    3. Subscriber 1 is a int-jdbc:outbound-channel-adapter that stores some of the info in a DB.
    4. Subscriber 2 in a int-http:outbound-gateway which uses sourceChannel as request-channel and logChannel as the reply-channel.
    5. Essentially, I would like to use the message from sourceChannel and send a request containing the payload, while the response should be logged the standard output.

    Adding the gateway configuration results to the following error:
    Duplicate entry 'xyz' for key 'PRIMARY';
    It seems Spring is trying to re-enter the same entry to the DB.

    For me this does not make sense, considering I am using a different reply-channel. Any ideas on that?

    Best regards and thanks in advance.
    Last edited by kpolychr; Apr 25th, 2013, 08:44 AM.

  • #2
    Show your message-flow config, please.
    How is it if you replace int-http:outbound-gateway with some simple endpoint: service-activator or just bridge?..



    • #3
      Hi Artem,

      Here is the message flow in XML:

       <!-- ============================== RECEIVE FROM SOURCE  ============================== -->
          <rabbit:connection-factory id="rabbitConnectionFactory" />
          <int-amqp:inbound-channel-adapter id="inboundAdapter" channel="sourceChannel" queue-names="queue.orders" connection-factory="rabbitConnectionFactory"
              mapped-request-headers="tkid,endpoint" />
          <int:publish-subscribe-channel id="sourceChannel" datatype="byte[]">
                  <int:wire-tap channel="sourceLoggingChannel" />
          <int:logging-channel-adapter id="sourceLoggingChannel" level="INFO"
              expression="'Coming from Source *** TKID='.concat(headers.tkid).concat('***ENDPOINT=').concat(headers.endpoint)" />
          <!-- ================================= INSERT DATA INTO DB ================================= -->
          <int-jdbc:outbound-channel-adapter data-source="my_datasource" sql-parameter-source-factory="newEntrySource" keys-generated="true"
              query="INSERT INTO formdata (TKID,DATA,ENDPOINT,STATUS,RETRY_COUNT) VALUES (:tkid,:data,:endpoint,:status,:retry_count)" channel="sourceChannel">
          <bean id="newEntrySource" class="org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
              <property name="parameterExpressions">
                      <entry key="tkid" value="headers.tkid" />
                      <entry key="data" value="payload" />
                      <entry key="endpoint" value="headers.endpoint" />
                      <entry key="status" value="1" />
                      <entry key="retry_count" value="0" />
          <!-- ================================= MAKE CALL TO ENDPOINT ================================= -->
          <int:publish-subscribe-channel id="fromEndpointChannel" datatype="java.lang.String">
                  <int:wire-tap channel="fromEndpointLoggingChannel" />
          <int:logging-channel-adapter id="fromEndpointLoggingChannel" level="INFO" expression="'Coming from endpoint *** TKID='.concat(headers.tkid).concat('***REQID=')" />
          <int-http:outbound-gateway reply-channel="fromEndpointChannel" request-channel="sourceChannel" url="{endpoint}?data={data}" http-method="GET"
              <int-http:uri-variable name="endpoint" expression="headers.endpoint" />
              <int-http:uri-variable name="data" expression="payload" />
          <int-stream:stdout-channel-adapter channel="fromEndpointChannel" append-newline="true" />
      Best regards


      • #4
        There's nothing in that configuration I can see that would cause the framework to call the JDBC adapter twice.

        I suggest you turn on DEBUG logging and follow the messages through the flow. It should become obvious what's happening.


        • #5
          turn on DEBUG logging
          In addition I suggest to switch on this:
          HTML Code:
          	<wire-tap channel="logger"/>
          	<logging-channel-adapter id="logger" level="WARN" log-full-message="true"/>
          To see how your messages journey.
          From other side: maybe you have doubles in the 'queue.orders' on RabbitMQ ?..


          • #6
            Hi Artem,

            I enabled the message history DEBUG and the following output shows just before every Exception (SQL error due to duplicate PK):

            15:54:31.825 WARN  [SimpleAsyncTaskExecutor-1][org.springframework.integration.handler.LoggingHandler] [Payload=[[email protected]][Headers={timestamp=1366898071825, id=8662a5b6-c688-4c36-be2e-c95cbc6185b2, history=inboundAdapter,sourceChannel,sourceLoggingChannel,historyLogger,historyLogger.adapter, tkid=tk00016, endpoint=http://localhost:8182/bin/servlets/json/test.json}]
            15:54:31.825 WARN  [SimpleAsyncTaskExecutor-1][org.springframework.integration.handler.LoggingHandler] [Payload=[[email protected]][Headers={timestamp=1366898071825, id=89423d48-fbcc-4887-8292-3ab0f8adc406, history=inboundAdapter,sourceChannel,historyLogger,historyLogger.adapter, tkid=tk00016, endpoint=http://localhost:8182/bin/servlets/json/test.json}]
            RabbitMQ contains NO duplicate messages.

            Best regards


            • #7
              following output shows
              But here is not enough info in history...
              Where is a call of WS?

              Comment, please, your in-line wire-taps and try again.
              I want to see how message goes through 'sourceChannel' & 'fromEndpointChannel'.
              Everything looks good, unless 'fromEndpointChannel'. Because there is only one subscriber - <int-stream:stdout-channel-adapter>


              • #8
                I had a look at the logs, and managed to find the cause.

                All in all, it was a message format conversion issue, not a general workflow issue.

                When attempting to make the http request via the gateway, the message is to be transformed to a String. However, RabbitMQ provides byte[] format.

                As such, I was getting the following error at the start of the execution :

                Caused by: org.springframework.integration.MessageHandlingException: HTTP request execution failed for URI [{endpoint}?data={data}]
                Caused by: org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type byte[] to type java.lang.String
                The message was then retried, causing the SQL error.

                However, upon trying to use the intayload-deserializer I now get the following error:
                Caused by: Failed to deserialize payload. Is the byte array a result of corresponding serialization for DefaultDeserializer?; nested exception is invalid stream header: 7B224944
                The original message is a Json String converted to byte[] format in order to send via rabbitMQ Client API.

                Are there any suggested guidelines in order to make sure a consistent message format is maintained (as much as possible) throughout the flow?

                Best regards


                • #9
                  Hello again,

                  I just found out that all that was needed was to call payload.toString() in the http gateway uri-param. So the final result should be as follows:

                  <int-http:outbound-gateway reply-channel="fromEndpointChannel" request-channel="sourceChannel" url="{endpoint}?data={data}" http-method="GET"
                          <int-http:uri-variable name="endpoint" expression="headers.endpoint" />
                  <!-- HERE we get the string value of the payload -->
                          <int-http:uri-variable name="data" expression="payload.toString()" />
                  This way the payload is transformed into the string required to populate the placeholders in the url attribute.

                  Best regards and thank you for your time.

                  PS: If there is any particular pointer to any guidelines in order to maintain the message format throughout the flow please let me know.


                  • #10
                    payload.toString() won't do what you expect (with a byte array); you need "new String(payload)".

                    If the sending system can set the content-type header; if it sets it to "text/x-json" (or anything containing text), the amqp adapter will emit a String payload.

                    Or, add a transformer right after the adapter <transformer ... expression="new String(payload)".


                    • #11
                      Thanks a ton Gary, this worked like a charm!

                      Best regards & thanks again!