Announcement Announcement Module
Collapse
No announcement yet.
Tcp response problem Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Tcp response problem

    Hi,

    I am having a problem with a 2 way socket adapter. This app opens a socket and waits for a client to connect. This scoket is persistant and stays connected. Once the client connects it sends a http message to the server which contains a "hearbeat" message:

    ESTP/1.0
    trans-id: ca860014-238e-11e2-c000-000ffe432105
    content-length: 0
    content-type: application/bems-heartbeat
    date: Wed, 31 Oct 2012 19:11:36 GMT

    When this message arrives a custom deserializer reads the message off the socket, reads the http headers, and reads the rest of the http message from the socket based on the "content-length" header. The headers and body are then put in a serializable class and converted to a byte stream that is returned from the deserializer.



    A Spring message processor then puts the message on a JMS queue which is picked up by a message driven bean. One the MDB gets the message it streams the bytes back into a object so I can get the the http headers and body.

    At this point I extract the "trans-id" header and create a http response:
    ESTP/1.0 100 ACK
    content-length: 0
    date: Wed, 31 Oct 2012 19:11:36 GMT
    trans-id: ca860014-238e-11e2-c000-000ffe432105

    I now try to send the response back to the client as follows:

    Message<String> responseMessage = MessageBuilder.withPayload(response).setHeaderIfAb sent("ip_connection_id", saveHeader.getHeader()).build();
    output.send(responseMessage);

    Where "output" is the outgoing message channel (tcpSend) used by "tcp-outbound-channel-adapter"

    Here is my sping configuration:

    Code:
                  <!-- Client side -->
    	
    	<int:channel id="tcpSend" >
    		<int:interceptors>
    			<int:wire-tap channel="logger"/>
    		</int:interceptors>
    	</int:channel>
    	
    	<int-ip:tcp-outbound-channel-adapter id="tcpOutbound" 
    		channel="tcpSend"
    		connection-factory="cfServer"
    	/>
    	
    	<int:logging-channel-adapter id="logger"
    		log-full-message="true" 
    		level="INFO"/>
    	
    	<int:message-history/>
    	
    	 
                  <int:recipient-list-router input-channel="tcpReceive">
    		<int:recipient channel="saveHeader"/>
    	</int:recipient-list-router>
    	
    	<int:channel id="saveHeader" />
     
                  <int:chain input-channel="saveHeader" >
    		<int:transformer expression="headers.ip_connection_id"/>
    		<int:service-activator ref="headerSaver" method="setHeader"/>
    	</int:chain> 
    
    	<!-- Server side -->
    	
    	<int-ip:tcp-connection-factory id="cfServer"
    		type="server" 
    		host="localhost"
    		port="10754"  
    		deserializer="connectionSerializeDeserialize" 
    	/>
    	
    	<int:channel id="tcpReceive" >
    		<int:interceptors>
    			<int:wire-tap channel="logger"/>
    		</int:interceptors>
    	</int:channel>
    	
    	<int-ip:tcp-inbound-channel-adapter id="tcpInbound"
    		channel="tcpReceive"
    		connection-factory="cfServer" 
    	/>
    	
    	<int:service-activator id="tcpHandler" 
    		input-channel="tcpReceive"
    		ref="esmiMessageProcessor" 
    		method="processMessage" 
    	/>	
    	
    	<bean id="headerSaver" class="com.intrado.isg2.bl.SaveHeader" />
    
    	<bean id="connectionSerializeDeserialize"
    		class="com.intrado.isg2.bl.ByteArrayEsmiSerializer" />	
    
    	<bean id="esmiMessageProcessor" class="com.intrado.isg2.bl.EsmiMessageProcessor">
    		<property name="jmsSender" ref="jmsSender" />
    	</bean>



    Now the problem seems to be I do not send a response for every message recieved. Here is what the logger is showing:

    13:30:53,532 INFO [LoggingHandler] [Payload=[B@139ce14b][Headers={timestamp=1351711853529, id=8b80d245-105a-4dd9-a4cf-3a4ffc7ee692, history=tcpInbound,tcpReceive,logger,logger.adapte r, ip_address=10.100.211.80, ip_connection_seq=1, ip_hostname=10.100.211.80, ip_tcp_remote_port=48885, ip_connection_id=10.100.211.80:48885:3695f03b-c83a-4f6f-b185-ad572e0074b5}]

    13:31:18,516 INFO [LoggingHandler] [Payload=[B@7849d631][Headers={timestamp=1351711878516, id=707d9793-200c-408d-8710-47cd3a7866b5, history=tcpInbound,tcpReceive,logger,logger.adapte r, ip_address=10.100.211.80, ip_connection_seq=2, ip_hostname=10.100.211.80, ip_tcp_remote_port=48885, ip_connection_id=10.100.211.80:48885:3695f03b-c83a-4f6f-b185-ad572e0074b5}]

    13:31:18,895 INFO [LoggingHandler] [Payload=ESTP/1.0 100 ACK
    content-length: 0
    date: Wed, 31 Oct 2012 19:31:18 GMT
    trans-id: 717ace16-2391-11e2-c000-000ffe461222][Headers={timestamp=1351711878895, id=d2c7e90f-b43d-4fb0-b46c-251a865969a9, history=tcpSend,logger,logger.adapter, ip_connection_id=10.100.211.80:48885:3695f03b-c83a-4f6f-b185-ad572e0074b5}]


    This clearly shows that 2 messages had to arrive before a response was sent. I have no clue why it takes 2 incomming message to get a single response.

    Also the reply never seems to get back to the client. The log shows it going out but the client never seems to get the response.

    I have lost an entire day chasing this down, please help or suggest a way I can get more logging turned on so I can figure out what is going on.

  • #2
    Your configuration is a little confusing - for example, the saveHeader chain is under the "client side" config whereas it's actually used on the server side.

    In any case, that chain is completely replacing your payload with the ip_connection_id.

    The root cause of your problem is you have two subscribers on the tcpReceive channel - the chain and the service; the default dispatcher will round-robin the requests; the first goes to the chain; the second goes to you esmiMessageProcessor service.

    You need to make tcpReceive a <publish-subscribe/> channel with the chain being the first sub (enforce with order="1") and the service being the second sub (order="2").

    Also, I don't see an inbound adapter on the client to receive any replies.

    If this is always a request/reply scenario, you can use gateways instead of channel adapters; that way you don't need to bother with correlation of ip_connection_ids.

    Hope that helps.

    Comment


    • #3
      Originally posted by Gary Russell View Post
      Your configuration is a little confusing - for example, the saveHeader chain is under the "client side" config whereas it's actually used on the server side.

      In any case, that chain is completely replacing your payload with the ip_connection_id.

      The root cause of your problem is you have two subscribers on the tcpReceive channel - the chain and the service; the default dispatcher will round-robin the requests; the first goes to the chain; the second goes to you esmiMessageProcessor service.

      You need to make tcpReceive a <publish-subscribe/> channel with the chain being the first sub (enforce with order="1") and the service being the second sub (order="2").

      Also, I don't see an inbound adapter on the client to receive any replies.

      If this is always a request/reply scenario, you can use gateways instead of channel adapters; that way you don't need to bother with correlation of ip_connection_ids.

      Hope that helps.

      Gary,

      First thanks for your help!

      I did as you suggested. I am now seeing 1 message received and 1 message sent.

      I am correlating the connection id's as I will be sending the received message out to another process and it will respond when it is ready.

      I am receiveing a message on the "tcpReceive" channel and responding on the "tcpSend" channel that has a outbound-channel-adapter that is sending the message.

      I can see the response in Wireshark and I can connect with telnet, send a message, and get a response. My only issue now is the response stays in the receive queue of the socket instead of being picked up by my process.

      Please let me know if you think this configuration is incorrect.

      Code:
      	<!-- Client side -->
      	
      	<int:channel id="tcpSend" >
      		<int:interceptors>
      			<int:wire-tap channel="logger"/>
      		</int:interceptors>
      	</int:channel>
      
      <!-- 	
      	<int-ip:tcp-connection-factory id="cfClient"
      		type="client"
      		host="localhost"
      		port="10754"
      		single-use="false"
      		so-timeout="10000"
      	/>
      	 -->	
      	<int-ip:tcp-outbound-channel-adapter id="tcpOutbound" 
      		channel="tcpSend"
      		connection-factory="cfServer"
      	/>
      	
      	<int:logging-channel-adapter id="logger"
      		log-full-message="true" 
      		level="INFO"/>
      	
      	<int:message-history/>
      	
      <!-- 
      	<bean id="sslServerConnectionFactory"
      		class="org.springframework.integration.ip.addons.TcpSSLServerConnectionFactory">
      		<constructor-arg value="11111" />
      		<property name="singleUse" value="false" />
      	</bean>
       -->		
       
      	<!-- Server side -->
      	
      	<int-ip:tcp-connection-factory id="cfServer"
      		type="server" 
      		host="localhost"
      		port="10754"  
      		deserializer="connectionSerializeDeserialize" 
      	/>
      	
      	<int:recipient-list-router input-channel="tcpReceive" order="1">
      		<int:recipient channel="saveHeader"/>
      	</int:recipient-list-router>
      	
      	<int:channel id="saveHeader" />
       
          <int:chain input-channel="saveHeader" >
      		<int:transformer expression="headers.ip_connection_id"/>
      		<int:service-activator ref="headerSaver" method="setHeader"/>
      	</int:chain> 
      	
      	<int:publish-subscribe-channel id="tcpReceive" >
      		<int:interceptors>
      			<int:wire-tap channel="logger"/>
      		</int:interceptors>
      	</int:publish-subscribe-channel>
      	
      	<int-ip:tcp-inbound-channel-adapter id="tcpInbound"
      		channel="tcpReceive"
      		connection-factory="cfServer" 
      	/>
      	
      	<int:service-activator id="tcpHandler" 
      		input-channel="tcpReceive"
      		ref="esmiMessageProcessor" 
      		method="processMessage" 
      		order="2"
      	/>	
      	
      	<bean id="headerSaver" class="com.intrado.isg2.bl.SaveHeader" />
      
      	<bean id="connectionSerializeDeserialize"
      		class="com.intrado.isg2.bl.ByteArrayEsmiSerializer" />


      I have a outbound adapter on the tcpSend channel

      Comment


      • #4
        But, in order for the client side to receive the reply, you need an <inbound-channel-adapter/> that uses the same connection factory as the oubound adapter. Otherwise the connection has no idea what to do with any data. You should see a log message under DEBUG...

        Code:
        logger.debug("TcpListener exiting - no listener and not single use");
        or

        Code:
        logger.warn("Unexpected message - no inbound adapter registered with connection " + message);
        if the factory creates one-shot (single-use) connections (which is not the case for you).

        On reflection, we probably should have logged something in the non-single-use side too, if unexpected data is received.

        Comment


        • #5
          I can not do that I get an exception thown when the war file is deployed.

          org.springframework.beans.factory.BeanCreationExce ption: Error creating bean with name 'org.springframework.integration.ip.tcp.TcpSending MessageHandler#1': Error setting property values; nested exception is org.springframework.beans.PropertyBatchUpdateExcep tion; nested PropertyAccessExceptions (1) are:
          PropertyAccessException 1: org.springframework.beans.MethodInvocationExceptio n: Property 'connectionFactory' threw exception; nested exception is java.lang.IllegalArgumentException: org.springframework.integration.ip.tcp.connection. TcpNetServerConnectionFactory may only be used by one outbound adapter


          Which make sence as I already have a outbound adapter for that connection factory.

          [code]

          <int:channel id="tcpSend" >
          <int:interceptors>
          <int:wire-tap channel="logger"/>
          </int:interceptors>
          </int:channel>

          <int-ip:tcp-outbound-channel-adapter id="tcpOutbound"
          channel="tcpSend"
          connection-factory="cfServer"
          />

          <!-- Server side -->

          <int-ip:tcp-connection-factory id="cfServer"
          type="server"
          host="localhost"
          port="10754"
          deserializer="connectionSerializeDeserialize"
          />

          <int-ip:tcp-inbound-channel-adapter id="tcpInbound"
          channel="tcpReceive"
          connection-factory="cfServer"
          />

          Comment


          • #6
            Right, you can't have another outbound adapter (only one allowed per factory), but you need an inbound adapter to receive the replies.

            It's simpler if you use gateways (which are inherently two-way) instead of adapters (which are one-way), but there are some performance considerations there, as described in the reference docs. You can use collaborating channel adapters (an outbound and inbound sharing a connection factory), as long as you can handle your own correlation of the replies to the requests.

            There is a sample here https://github.com/SpringSource/spri...rver-multiplex that is an example of using collaborating adapters.

            Comment

            Working...
            X