Announcement Announcement Module
Collapse
No announcement yet.
TCP Inbound Gateway with Splitter/Aggregator multi-threaded. Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • TCP Inbound Gateway with Splitter/Aggregator multi-threaded.

    Here is my problem:

    I've implemented a socket server like this (using spring integration):

    Code:
    <task:executor id="socketExecutor" pool-size="${integration.socket.pool.max.size}"/>
    	
    <ip:tcp-connection-factory id="server" type="server" port="${integration.server.port}" using-nio="${integration.server.using-nio}"
    		single-use="${integration.server.single-use}" so-timeout="${integration.server.so-timeout}"
    		deserializer="deserializer" task-executor="socketExecutor" so-linger="${integration.server.so-linger}" 
    		so-send-buffer-size="${integration.server.so-send-buffer}" so-receive-buffer-size="${integration.server.so-receive-buffer}" />
    
    <ip:tcp-inbound-channel-adapter channel="tcpInputChannel" connection-factory="server" error-channel="errorChannel"/>
    <ip:tcp-outbound-channel-adapter channel="tcpOutpuChannel" connection-factory="server" />
    So each incoming tcp call is handle by a different thread from the 'socketExecutor' thread pool.

    It reads the socket and sends the input to the splitter/aggregator for further processing:

    Code:
    <task:executor id="asmTxExecutor" pool-size= "${integration.asmv.pool.max.size}" />
    
    <channel id="parallelTxChannel">
    	<dispatcher task-executor="asmTxExecutor" />
    </channel>
    
    <chain input-channel="tcpInputChannel" output-hannel="parallelTxChannel">
    	<splitter ref="splitter" />
    </chain>
    
    <chain input-channel="parallelTxChannel" output-channel= "tcpOutpuChannel">
    	<service-activator ref="transactionExecutorService" />
    	<aggregator ref="aggregator" />
    </chain>
    Here the intended flow (it's all direct channel, so it's like one big transaction)
    The input gets split (multi-thread processing) then aggregated and reply.

    The problem is once the input message reaches the splitter and the multi-threading starts, my tcp server loses focus and think there is a new connection and tries to read (actually re-read), but blocks, until the multi-processing is completed and replied onto the output-channel. With this action, the socket is closed and the extra 'blocked' read then throws a SocketException.

    Is it clear?
    Is there a way to prevent this extra read from happening?
    It's seems like the uses of 2 different thread pool is causing this problem.
    The extra read is eliminated when no second thread pool is used, eliminating the multi-threaded processing at the same time.

  • #2
    Are you setting using-nio to true? Collaborating channel adapters (that use the same connection factory) are really intended for use with NIO. I would expect the behavior you are seeing with using-nio="false". When using NIO, threads from the pool are only used when data is available to be read from the socket; otherwise, they are returned to the pool. When the result is sent asynchronously, the channel (socket) will be closed (assuming single-use=true), and no error will appear on the read side (because there is no blocked thread).

    If I have misunderstood your situation, please attach a zip with a debug log illustrating the problem.

    Thanks

    Comment


    • #3
      I've tough I solved my problems by using the scope attribute on the channel using the 'second' thread pool. Here' how it look:

      Code:
      <channel id="scopedTarget.parallelTxChannel" scope="thread">
         <dispatcher task-executor="asmTxExecutor" />
      </channel>
      
      <beans:bean class= "org.springframework.beans.factory.config.CustomScopeConfigurer">
         <beans:property name="scopes">
            <beans:map>
      	  <beans:entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
            </beans:map>
         </beans:property>
      </beans:bean>
      But this setting actually only eliminated the multi-threading.

      Here are my settings for the server:

      Code:
      integration.asmv.pool.max.size=10
      integration.socket.pool.max.size=10
      
      integration.server.port=14550
      integration.server.using-nio=true
      integration.server.single-use=true
      integration.server.so-timeout=60000
      integration.server.so-linger=60
      integration.server.so-send-buffer=25350
      integration.server.so-receive-buffer=43720
      Also, when in debug mode, it doesn't seem to use the TcpNioServerConnectionFactory, but the TcpNetServerConnectionFactory even though the 'using-nio' flags is on. Any reasons for this?

      Thanks, as always for the quick response.
      Last edited by Olivier Quirion; May 5th, 2011, 10:46 AM.

      Comment


      • #4
        Hi Gary,
        As you can read from my previous-post I was mislead and going in a wrong direction. I've re-read the reference on collaborative channel adapter and you're post and by luck, tried this:

        Code:
        <ip:tcp-connection-factory id="server" type="server" port="${integration.server.port}" using-nio="true" single-use="true" 
        		so-timeout="${integration.server.so-timeout}" deserializer="deserializer" task-executor="socketExecutor" 
        		so-linger="${integration.server.so-linger}" so-send-buffer-size="${integration.server.so-send-buffer}" 
        		so-receive-buffer-size="${integration.server.so-receive-buffer}" />
        And it all started to work as expected. The problems was :

        Code:
        integration.server.using-nio=true
        integration.server.single-use=true
        ...
        
        using-nio="${integration.server.using-nio}"
        single-use="${integration.server.single-use}"
        It seems the 'property-placeholder' didn't work on the boolean field. By hard coding the value in the context it started to use the right TcpNioServerConnectionFactory.

        Can you corroborate this behavior?
        Thanks

        Comment


        • #5
          Thanks, I can confirm that using a placeholder (or SpEL expression) for the 'using-nio' attribute does not work; it causes a TcpNet* connection factory to be defined unconditionally.

          Would you mind entering a JIRA issue?

          I may not be able to get the fix done in time for the upcoming 2.0.4 release, however.

          Comment


          • #6
            Gary, my java application will be running in production on Z/Os, but I've ran into some problems when using TcpNioServerConnectionFactory on Z/Os (I run into a sort of deadlock after a few call to the server). Therefore, I've returned to the old TcpNet* and the extra read on the socket reappeared. From the reading of your post, this seems to be the normal behavior? Is this true and why? Should I return to an inbound gateway instead of using collaborative channel adapters? Would that solve the problem of the extra read? Collaborative channel adapters are purely for NIO? I've reed the documentation a few times and that didn't pop out to me.

            Regards,

            Comment


            • #7
              @Olivier,

              Collaborating channel adapters were introduced to alleviate a performance issue with gateways, particularly when one shared connection is being used for all messages.

              The reference guide says collaborating adapters are an alternative to using single-use connections with gateways, when the overhead of opening/closing sockets is too much,

              Another use case for collaborating adapters is where you want entirely asynchronous messaging (not request/reply) between two systems (again, usually using a single persistent connection).

              Given that you are using single-use connections, and request/response, there is no benefit to using collaborating adapters.

              I am not saying we couldn't eliminate the read-error log message you are seeing, it is just that I did not anticipate they would be used for single-use connections.

              For your use case, using an inbound gateway on the server would be the best practice.

              Further, unless you anticipate a very large number of (concurrent) connections, or your server process (time to handle a request) is very long, I would not expect you to see much, if any, difference between using NIO or not.

              Hope that helps...

              Comment


              • #8
                Thanks Gary, for this clear and concise answer.
                I've tried the Inbound-Gateway, but I get an odd behavior. The different messages seems to lose there ways back to the gateway (they create their own TemporaryReturnChannel). I've pin pointed the problem to my usage of a task executor to perform parallel treatment.

                Here the flow

                message --> gateway (in) --> splitter --> service activator --> aggregator --> gateway (out)

                I've used collaborative adapters because the inbound gateway just does not work when I use the task executor.

                Code:
                <task:executor id="socketExecutor" pool-size="${integration.socket.pool.max.size}"/>
                
                <ip:tcp-connection-factory id="server" type="server" port="${integration.server.port}" using-nio="false" single-use="true"  deserializer="deserializer" task-executor="socketExecutor" />
                
                <ip:tcp-inbound-gateway request-channel="tcpInputChannel" reply-channel="tcpOutputChannel" connection-factory="server" error-channel="errorChannel"/>
                
                <chain input-channel="tcpInputChannel" output-channel="parallelTxChannel">
                	<transformer ref="unmarshallingTransformer" />
                	<splitter ref="splitter" />
                </chain>
                
                <chain input-channel="parallelTxChannel" output-channel="tcpOutputChannel" >
                	<service-activator ref="transactionExecutorService">
                		<task:executor id="asmTxExecutor"/>
                	</service-activator>
                	<aggregator ref="aggregator" />
                	<transformer ref="marshallingTransformer" />
                </chain>
                	
                <chain input-channel="errorChannel" output-channel="tcpOutputChannel">
                	<service-activator ref="exceptionHandlingService" method="handleException"  />
                	<transformer ref="marshallingTransformer" />
                </chain>
                
                <channel id="parallelTxChannel">
                	<dispatcher task-executor="asmTxExecutor" />
                </channel>
                
                <task:executor id="asmTxExecutor" pool-size="${integration.asmv.pool.max.size}" />
                I've tried without the executor and everything works fine (but single threaded), but when I add the task executor, the transaction flow (direct channel flow) seems broke by the usage of different thread. Is there a way to setup this flow so all direct channel stay connected and the end message gets routed back through the original gateway?

                Should I use scope channel and how? Should I quit and tell myself that the collaborative adapter is the way to go (they work but with mild problem, the extra read) ? How can I link the different thread used by splitter/service/aggregator to the original thread used by the gateway?

                Comment


                • #9
                  Going async shouldn't make a difference; especially because you are putting an explicit reply channel on the second chain (this can really be omitted because SI will route to the temporary reply channel in the header if there is no output channel). You only really need to explicitly route to the reply channel if you want so do something else (such as wire tap, pub-sub etc for the reply).

                  What does your aggregator look like?

                  When an aggregator just returns a payload, SI will take care of making sure the headers are correct; merging the headers from inbound messages (however, it will remove the reply channel if two inbound messages have different reply channels).

                  Can you attach a zip with a DEBUG level log (for org,springframework.integration classes) with the task executor in place?
                  Last edited by Gary Russell; May 10th, 2011, 09:33 AM.

                  Comment


                  • #10
                    Here is my splitter/aggregator. I've removed all MessageBuilder type stuff from from my code so I don't screw the headers, so I'm all POJO.

                    Code:
                    public class SplitterImpl {
                    	
                    	public Object splitMessage(AtsRequest atsRequest) {
                          return atsRequest.getRequests();
                    	}	
                    }
                    
                    public class AggregatorImpl {
                    
                    	public AtsResponse aggregate(List<Response> responses) {
                    		AtsResponse atsResponse = new AtsResponse();
                    		
                    		for (Response response : responses) {
                    			atsResponse.getResponses().add(response);
                    		}
                          return atsResponse;
                    	}
                    }
                    Attachment
                    Attached Files

                    Comment


                    • #11
                      Interesting; at timeline 2011-05-10 09:25:55,312, we can see the message being bridged from the configured reply channel to temporary channel that the tcp thread is (should be) waiting on, but for some reason it's not picked up.

                      The next reply (at timeline 2011-05-10 09:26:54,230) is sent ok.

                      Can you reproduce with the thread name in the log (%t) and, after a failure, get a thread dump (jstack <pid> on *nix, Ctrl-Break on windows - or use VisualVM) after a failure?

                      I will attempt to reproduce, but I won't be able to get to it until tonight.

                      Thanks

                      Comment


                      • #12
                        Here is a log with the thread info in it.
                        I've also added the thread dump.

                        Attachment
                        Attachment

                        I know it has to do with loosing the link to the initial thread or something like that.
                        If you need anything else let me know.

                        Regards,
                        Attached Files

                        Comment


                        • #13
                          Here's the smoking gun...

                          Code:
                          2011-05-10 13:37:37,108 [socketExecutor-2] | DEBUG | postSend 
                          ...
                          2011-05-10 13:37:38,123 [socketExecutor-2] | DEBUG | null reply received 
                          ...
                          2011-05-10 13:37:39,776 [asmTxExecutor-3] | DEBUG | handler 'org.springframework.integration.handler.BridgeHandler@57ee57ee' sending reply Message:
                          The gateway is timing out after 1 second and your reply is taking 2.6 seconds.

                          The default timeout for a gateway is 1 second; this can be increased by using the reply-timeout attribute on the inbound gateway.

                          As you have seen, this won't happen on a single thread, obviously, because the thread is doing work rather than waiting for a response.

                          Hope that helps.

                          Comment


                          • #14
                            X-File Classified.
                            I've set the timeout to 60000 (millis) and boum, it all started to work as a charm.
                            If you want Gary, as a sign of my gratitude for your help, I'll simplify a bit my workflow and think of some sort of nice funny project (Cafe Sample V2?) and give it to Spring Community.
                            A sample of a TCP client/server with multi-threaded processing per request. You'll be able to put a link from here to the sample.
                            Would that be of any used?

                            Best regards,
                            Olivier Quirion @ IBM Canada
                            Last edited by Olivier Quirion; May 10th, 2011, 02:45 PM.

                            Comment


                            • #15
                              That would be great!

                              You might want to consider getting an account on git.springsource.org.

                              The samples are here ...

                              http://git.springsource.org/+spring-...ration/samples

                              You could then make a clone up there and push your changes, and then send a merge request to Mark and Oleg.

                              We really would like to see folks contributing to the samples; thanks for the offer!

                              BTW, I have raised a documentation JIRA issue, because I didn't see this default documented.

                              https://jira.springsource.org/browse/INT-1898

                              Comment

                              Working...
                              X