Announcement Announcement Module
Collapse
No announcement yet.
TCP outbound gateway connection pool issue Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • TCP outbound gateway connection pool issue

    Hi,

    I'm experiencing an issue with the tcp outbound gateway. This is used to integrate my application with a third party application service running on the same server. The third party respond with an xml for my xml request. Below is the gateway configuration.

    Code:
    	
    
             <int-ip:tcp-connection-factory id="connectionFactory"
    		deserializer="deserializer" host="${socket.host}" port="${socket.port}"
    		pool-size="100" single-use="false"
    		so-keep-alive="true" type="client"
    		so-timeout="20000" />
    
    	<int-ip:tcp-outbound-gateway id="gateway"
    		request-channel="sendStringMessageChannel" reply-channel="recieveStringMessageChannel"
    		connection-factory="connectionFactory" request-timeout="10000"
    		reply-timeout="10000" />
    The deserializer code executed in the reply is as follows. It checks the end tag of the xml response to close the stream and return the response to further processing.

    Code:
    public String deserialize(InputStream inputStream) throws IOException {
    
            BufferedReader br = null;
            StringBuilder sb = null;
            String str = null;
    
            try {
    
                br = new BufferedReader(new InputStreamReader(inputStream));
                sb = new StringBuilder(br.readLine());
    
                while (br.ready()) {
    
                    str = br.readLine();
                    sb.append(str);
                    if (str != null && str.contains("</msg>")) {
    
                        break;
                    }
                }
    
                return sb.toString();
    
            } catch (Exception e) {
    
                throw new RuntimeException(e);
    
            } finally {
                
                if (br != null) {
                    br.close();
                }
            }
        }
    The issue is when put under load and when the limit of the pool is reached it begins to throws following errors.

    Code:
    [ERROR] Failed to send or receive
    org.springframework.integration.MessagingException: Failed to send or receive
    at org.springframework.integration.ip.tcp.TcpOutboundGateway.handleRequestMessage(TcpOutboundGateway.java:120)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:97)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    Code:
    2012-10-19 14:20:55.989 TcpOutboundGateway [ERROR] Tcp Gateway exception
    java.net.SocketException: Socket closed
             at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:116)
             at java.net.SocketOutputStream.write(SocketOutputStream.java:132)
    When I check the threads using VisualVM I see this number of threads were created but those never get killed.

    I'm really stuck at this point being unable to tweak it any further to fix the issue. Is there a flow in the configuration? or has any important setting missing or contradictory setting is used?

    SI version is 2.1.0.

    Thanks in advance for any help or insight into issues related to this.

    Sameera

  • #2
    You should not be closing the input stream in the deserializer.

    If you only want to use a socket once for each request, set single-use to "true".

    That said, we should be cleaning up the threads in this case; the pool-size is not really relevant for a client factory with single-use="false".

    I'll open a JIRA ticket.

    Comment


    • #3
      Thanks Gary for quick response. Its late in the night here. I'll update the deserializer code and run the test again. Hope you'll attach the Jira link here.

      Thanks
      Sameera

      Comment


      • #4
        I have run lots of tests with the deserializer improperly closing the connection and everything gets cleaned up properly; so I won't open a JIRA without more evidence that there is a problem on our end.

        By the way, 2.1.0 is very old. 2.1.4 is the latest release.

        Comment


        • #5
          I did try the suggestions and the upgrade as well. Still the issue seems not solved. I will put more effort to isolate the issue and report back. I had to move on - implemented a class and registered as a service activator to open a socket connection and close after processing - as we had to meet a deadline.

          I'll update this thread if I find a issue with the gateway.

          Comment


          • #6
            Hi,

            Regarding the previous thread since the outbound gateway should only be used for relatively low-volume use thought of including outbound and inbound channel adapters.But I am getting the exception

            org.springframework.integration.support.channel.Ch annelResolutionException: no output-channel or replyChannel header available.

            Do I have to use header-enricher?

            looking forward for your reply on this.

            Thanks in advance

            Comment


            • #7
              The intermediate TCP sample shows one technique for retaining/reinstating the replyChannel for a message received from a channel adapter...

              https://github.com/SpringSource/spri...rver-multiplex

              Comment


              • #8
                Thanks a lot.I'll try with this.

                Comment


                • #9
                  Hi,

                  I tried with the example https://github.com/SpringSource/spri...rver-multiplex.

                  But was unable to get the response. I am also not clear on the correlation logic used in that example. Could you please explain to me on what this means?

                  Code:
                  correlation-strategy-expression="payload.substring(0,3)"
                    
                  release-strategy-expression="size() == 2" />
                  
                  <transformer input-channel="toTransformer.client"
                  expression="payload.get(1)"/>


                  I tried the same thing without using the aggregator. Since it gave me the org.springframework.integration.support.channel.Ch annelResolutionException: no output-channel or replyChannel header available exception, I used header-enricher. Here is my config file.


                  Code:
                          <int:channel id="sendMessageChannel" />
                  	<int:channel id="sendXMLMessageChannel" />
                  	<int:channel id="sendStringMessageChannel" />
                  	<int:channel id="recieveStringMessageChannelIn" />
                  
                  		
                  	<int:gateway id="jposGateway"
                  		service-interface="com.gateway.infrastructure.JPOSGateway">
                  		<int:method name="sendMessage" request-channel="sendMessageChannel" />
                  	</int:gateway>
                  	
                  	
                  	<int-xml:marshalling-transformer
                  		marshaller="marshaller" input-channel="sendMessageChannel"
                  		output-channel="sendXMLMessageChannel" result-type="StringResult" />
                  
                  	<int:object-to-string-transformer id="transformerBytes2String"
                  		input-channel="sendXMLMessageChannel" output-channel="sendStringMessageChannel" />
                  
                  	<int-ip:tcp-connection-factory id="client"
                  		deserializer="deserializer" type="client" host="${socket.host}"
                  		port="${socket.port}" single-use="false" so-timeout="20000"/>
                  		
                  		<int-ip:tcp-outbound-channel-adapter id="outAdapter.client"
                  		channel="sendStringMessageChannel"
                  		connection-factory="client" /> 
                  
                  	<int-ip:tcp-inbound-channel-adapter id="inAdapter.client"
                  		channel="recieveStringMessageChannelIn"
                  		connection-factory="client"/>
                  
                  	
                  	<bean id="deserializer" class="com.infrastructure.jpos.SocketInputStreamDeserializer" />
                  		
                  	<int-xml:unmarshalling-transformer
                  		unmarshaller="marshaller" input-channel="recieveStringMessageChannel" />
                  		
                  	<bean id="receive" class="org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel">
                  		<constructor-arg type="long" value="1000"/>
                  		</bean>
                  
                  		
                      <int:header-enricher input-channel="recieveStringMessageChannelIn"   outputchannel="recieveStringMessageChannel">
                  	<int:error-channel ref="receive"/>
                       	<int:reply-channel ref="receive"/>
                      </int:header-enricher>

                  This is my gateway method.

                  Code:
                  Isomsg sendMessage(Isomsg msg,
                     @Header("Reference") String referenceId,
                     @Header("requestType") String requestType);

                  But I was still unable to get a response.Could you please guide me on this?
                  Last edited by yogya; Nov 19th, 2012, 03:19 AM. Reason: Code Formatting

                  Comment


                  • #10
                    When you use one-way adapters (rather than a gateway) you lose the replyChannel header (it's a live object on which your gateway is waiting for a reply).

                    The aggregator in the sample shows one technique of retaining the header - send the original message to the aggregator (contains the header); send the reply as well; the aggregated message contains a list with both the request and the reply (and the header), In the sample, we drop the original request with the transformer.

                    In order for this to work, the aggregator needs some correlation data. With tcp, this has to be carried in the payload somewhere.

                    If you use an outbound-gateway instead, you don't need to worry about this because the framework can do the correlation for you.

                    Comment


                    • #11
                      Thank you very much for your quick response.

                      First I tried with tcp outbound gateway but the issue was when put under load and when the limit of the pool is reached it begins to throws following errors.


                      Code:
                      TcpOutboundGateway [ERROR] Tcp Gateway exception
                      java.net.SocketException: Socket closed
                               at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:116)
                               at java.net.SocketOutputStream.write(SocketOutputStream.java:132)


                      since the outbound gateway should only be used for relatively low-volume use thought of including outbound and inbound channel adapters.



                      When changed the config file as below I was able to get the response by using service activator.


                      Code:
                      <int:channel id="sendMessageChannel" />
                      	<int:channel id="sendXMLMessageChannel" />
                      	<int:channel id="sendStringMessageChannel" />
                      	<int:channel id="recieveStringMessageChannelIn" />
                      
                      	<int:publish-subscribe-channel id="recieveStringMessageChannel2" />
                      		
                      		
                      	<int:gateway id="jposGateway"
                      		service-interface="com.gateway.infrastructure.JPOSGateway">
                      		<int:method name="sendMessage" request-channel="sendMessageChannel" />
                      	</int:gateway>
                      	
                      	
                      	<int-xml:marshalling-transformer
                      		marshaller="marshaller" input-channel="sendMessageChannel"
                      		output-channel="sendXMLMessageChannel" result-type="StringResult" />
                      
                      	<int:object-to-string-transformer id="transformerBytes2String"
                      		input-channel="sendXMLMessageChannel" output-channel="sendStringMessageChannel" />
                      
                      	<int-ip:tcp-connection-factory id="client"
                      		deserializer="deserializer" type="client" host="${socket.host}"
                      		port="${socket.port}" single-use="false" so-timeout="20000"/>
                      		
                      	<int-ip:tcp-outbound-channel-adapter id="outAdapter.client"
                      		channel="sendStringMessageChannel"
                      		connection-factory="client" /> 
                      
                      	
                      	<int-ip:tcp-inbound-channel-adapter id="inAdapter.client"
                      		channel="recieveStringMessageChannelIn"
                      		connection-factory="client"/>
                      
                      
                      	 <int:header-enricher input-channel="recieveStringMessageChannelIn" output-channel="recieveStringMessageChannel">
                          		<int:error-channel ref="receive"/> 
                           	 <int:reply-channel ref="recieveStringMessageChannel2" />
                      	 </int:header-enricher>
                      	 
                      	
                      	<bean id="deserializer" class="com.gateway.infrastructure.SocketInputStreamDeserializer" />
                      		
                      		
                      		
                      	<int-xml:unmarshalling-transformer
                      		unmarshaller="marshaller" input-channel="recieveStringMessageChannel" output-channel="recieveStringMessageChannel2"
                      		 />
                      		
                      
                      	<bean id="receive" class="org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel" >
                      		<constructor-arg type="long" value="1000"/>
                      		</bean>
                      
                      	
                      	
                       <bean id="test" class="com.gateway.infrastructure.ServiceActivator" />
                      		
                      	
                       
                      	<int:service-activator method="runthis" input-channel="recieveStringMessageChannel2"
                      		ref="test" />


                      But even in this approach when put under a load I am still getting the exeption.

                      Code:
                      java.net.SocketException: Socket is closed
                      	at java.net.Socket.getOutputStream(Socket.java:808)
                      	at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:69)
                      	at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.doWrite(TcpSendingMessageHandler.java:146)
                      	... 50 more

                      Really appreciate your help.

                      Thanks.

                      Comment


                      • #12
                        Are you sure the server can cope with the volume?

                        Comment


                        • #13
                          As far as I know it can.But I'll try to get the vendor confirmation on this.

                          Comment

                          Working...
                          X