Announcement Announcement Module
Collapse
No announcement yet.
Application hangs if more than 1 thread tries to receive from tcp adapter Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Application hangs if more than 1 thread tries to receive from tcp adapter

    Hi,

    I've narrowed down an issue that I previously posted about yesterday (under what now seems like a very irrelevant title, hence the new post) and now realize that when more than one thread tries to send/receive to the outbound/inbound tcp channel adapter, the application hangs and cannot progress past that point. My question is, if we are using inbound and outbound adapters to connect over tcp/ip:
    1. should we be making multiple connections, or should all the threads be going to a single connection.
    2. if single connection, how do configure it so all threads reuse 1 connection

    Currently we have a task executor on the front end which spawns workmanager threads, then all those threads end up at
    Code:
    got connection to server:port connection_id
    Then the application just dies. We also have the tcp connection factory using workmanager threads.

    I just realized another thing that was in the log this morning, we have a statement to print the time taken in correlation strategy and that was printing before the "got connection" logging statements. And the correlation strategy for some reason is using it's own thread manager (task scheduler) and oddly not the work manager.

    The detailed logging shows that the application is stuck when trying to do a socket read from the tcp connection.

    Any ideas?

  • #2
    I saw your post yesterday, but had trouble parsing it, so I was going to try again tomorrow when I have more time.

    As long as you set single-use to "false" (the default), all threads will share the same connection (single-use means the socket is used for one request or request/reply).

    I am confused by your comment about a thread sending to and 'receiving from' the connection - the calling thread doesn't receive (with channel adapters), the message asynchronously shows up on another thread (channel adapters are for one-way integration). In a WAS environment, you should make sure that the connection factory uses the work manager TE, via its task-executor attribute, regardless of the kind of endpoint you are using.

    Comment


    • #3
      Let me take one step back and state the problem -
      We need to be able to process 50 TPS through the [gateway|channel adapter|whatever] over a persistent tcp/ip connection.

      Now onto how we thought we'd do that...
      I have single-use=false, so-keep-alive=true and so-timeout=0 for the connection factory.

      Regarding thread sending to and receiving from, I think I see why it looks this way. The code is using a gateway to send the message, and the gateway by nature waits on the response on the same thread right? (Which now looking closer at this, it might be this wrong setup causing the issue. Here is the context file that has the adapters and gateway configured:

      Code:
      <?xml version="1.0" encoding="UTF-8"?>
      <beans:beans ...>
                      <poller id="defaultPoller" default="true" max-messages-per-poll="500" fixed-rate="40"/>
       
                      <!--
                      Program will first send the message through local gateway if it fails it will send the message to remote gateway
                       -->
                      <gateway id="localGateWay" service-interface="com.test.TestGateway"
                                      default-request-channel="localRequestChannel" />
       
                      <gateway id="remoteGateWay" service-interface="com.test.TestGateway"
                                      default-request-channel="remoteRequestChannel" />
                                     
                      <!--  channel declaration for sending message from gateways -->            
                      <publish-subscribe-channel id="localRequestChannel"/> <!--   task-executor="myExecutor" -->
                      <publish-subscribe-channel id="remoteRequestChannel"/> <!--   task-executor="myExecutor" -->
                     
                      <!-- 
                                      channel declaration for sending messages to outbound adapters
                                      these channels will take care of roundrobin and failover 
                      -->
                      <channel id="localRequestChannelForOutbound">
                                      <dispatcher failover="true" load-balancer="round-robin"/>
                      </channel>
                     
                      <channel id="remoteRequestChannelForOutbound">
                                      <dispatcher failover="true" load-balancer="round-robin"/>
                      </channel>
                     
                      <!-- declaration of channels used for storing the message received from servers  -->
                      <channel id="localReplyChannelFromServer">
                                      <queue capacity="500"/>
                      </channel>
                      <channel id="remoteReplyChannelFromServer">
                                      <queue capacity="500"/>           
                      </channel>
       
                      <!-- 
                                      when request message is kept in request channel from gateway
                                      one copy of it will be sent replychannelfromserver which will be used in aggregator to match with the
                                      reply message
                                      one more copy will be send to requestChannelForOutbound which will be used by outbound adapter to process 
                      -->
                      <bridge input-channel="localRequestChannel" output-channel="localReplyChannelFromServer" order="1"/>
                      <bridge input-channel="localRequestChannel" output-channel="localRequestChannelForOutbound" order="2"/>
       
                      <bridge input-channel="remoteRequestChannel" output-channel="remoteReplyChannelFromServer" order="1"/>
                      <bridge input-channel="remoteRequestChannel" output-channel="remoteRequestChannelForOutbound" order="2"/>
                     
                      <!--  out bound /in bound adapter declaration for local instance -->
                      <ip:tcp-outbound-channel-adapter id="adapter_outbound_local_client_1"
          channel="localRequestChannelForOutbound"
          order="1"
          connection-factory="local_client_1"/>
           
                      <ip:tcp-inbound-channel-adapter id="adapter_inbound_local_client_1"
          channel="localReplyChannelFromServer"
          connection-factory="local_client_1"/>
         
                      <ip:tcp-outbound-channel-adapter id="adapter_outbound_local_client_2"
          channel="localRequestChannelForOutbound"
          order="2"
          connection-factory="local_client_2"/>
          
                      <ip:tcp-inbound-channel-adapter id="adapter_inbound_local_client_2"
          channel="localReplyChannelFromServer"
          connection-factory="local_client_2"/>
       
                      <!--  out bound /in bound adapter declaration for remote instance -->
                      <ip:tcp-outbound-channel-adapter id="adapter_outbound_remote_client_1"
          channel="remoteRequestChannelForOutbound"
          order="1"
          connection-factory="remote_client_1"/>
           
                      <ip:tcp-inbound-channel-adapter id="adapter_inbound_remote_client_1"
          channel="remoteReplyChannelFromServer"
          connection-factory="remote_client_1"/>
         
                      <ip:tcp-outbound-channel-adapter id="adapter_outbound_remote_client_2"
          channel="remoteRequestChannelForOutbound"
          order="2"
          connection-factory="remote_client_2"/>
          
                      <ip:tcp-inbound-channel-adapter id="adapter_inbound_remote_client_2"
          channel="remoteReplyChannelFromServer"
          connection-factory="remote_client_2"/>
         
          <!--  following aggregator will aggregate  message received from server with corresponding message from client
          while request is send , the same is saved in ReplyChannelFromServer, and when response comes it will also come to
          ReplyChannelFromServer then the correlation strategy will check whether the echo of both request and response are same
          then it considers as match and there are 2 such messages (release strategy) it will send them to output channel  
          -->
       
          <aggregator id="localAggregator" input-channel="localReplyChannelFromServer"
                                                                      output-channel="clientBytes2StringForSecondPayLoadChn"
                                                                      correlation-strategy="correlationStrategyBean"
                                                                      send-timeout="4000"
                                                                      release-strategy-expression="size() == 2" />
       
          <aggregator id="remoteAggregator" input-channel="remoteReplyChannelFromServer"
                                                                      output-channel="clientBytes2StringForSecondPayLoadChnRemote"
                                                                      correlation-strategy="correlationStrategyBean"
                                                                      send-timeout="4000"
                                                                      release-strategy-expression="size() == 2" />
                                                                     
          <!--  always second message will be the message received from the server -->
                      <transformer id="clientBytes2StringForSecondPayLoadLocal" input-channel="clientBytes2StringForSecondPayLoadChn"      expression="new String(payload.get(1))" />
                      <transformer id="clientBytes2StringForSecondPayLoadRemote" input-channel="clientBytes2StringForSecondPayLoadChnRemote"      expression="new String(payload.get(1))" />
                     
                      <beans:bean id="correlationStrategyBean" class="com.test.CorrelationStrategy"/>
                                                                     
      </beans:beans>
      I guess what you're saying about using the work manager thread for all endpoints applies to the aggregator too, so I should probably add that to the config for those. I think the weird thing about the way we are using channel adapters in this case is that it is supposed to be 1-way communication, but we actually need the reply in the same thread that we are sending the request.

      The code is roughly like so:

      Code:
      receive request from outside application
      use request data to form different request to another system over tcp/ip
      get response from other system
      use response to create reply to original request from outside of application
      send reply to original requestor
      So we actually need the reply from that inbound channel before we can move on in the code logic. Originally, we were using a simple gateway because it naturally sends the request then waits for the response to move on. But because of speed requirements, (we need to process up to 50 transactions per second) we changed to using the channel adapters. The other reason we switched to adapters is because even when we had the single-use false set on the gateway, it would not keep a persistent connection and the connection would be closed after every request...sometimes even before the response returned. Being unable to get the gateway to keep a persistent connection, we moved over to the channel adapters.

      But looking at our current configuration, it seems like we are still in effect using a gateway here. And whatever we are doing is causing the application to hang at this point when multiple threads arrive at the same time. If we send one or two single requests, no issues. But when we start the load... it chokes... and dies.

      Comment


      • #4
        I will look at your analysis when I have more time tomorrow.

        I assume you have looked at the intermediate tcp/ip sample because your example exhibits some of the techniques it uses.

        I just ran that sample, and it ran 500 TPS (100 requests/responses in 200ms) over a single socket.

        Perhaps you have some issues with your correlation code?

        I suggest you run with debug logging turned on; you should be able to figure out what's goiing on.

        Comment


        • #5
          Ok thanks for looking into this.

          I just compared the example to our configuration, and it seems we pretty much followed without divergence. The only thing I can see is our correlation strategy is not operating on the WAS workmanager thread, and is instead using the default task executor. I'll make that change and test again in the meantime.

          Regarding logging, I'm currently running org.springframework.integration.ip at the TRACE level. Maybe I can change that to org.springframework.integration just in case I'm missing something above the ip package.

          Comment


          • #6
            Problem solved by configuring different work managers roughly by component. We were originally using only one work manager configured with 1000 threads, now we are using 5 different work managers (IBM WAS config change.)

            Thanks for all input!

            Comment

            Working...
            X