Announcement Announcement Module
Collapse
No announcement yet.
Simple TCP Client - Spring Integration ( Not connecting) Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Simple TCP Client - Spring Integration ( Not connecting)

    I am trying to simulate following simple tcp client which connect to a socket and reads the data when available and prints.
    Code:
    public class TestAdapter {
    	public static void main(String[] args) throws IOException {
    		Socket socket = null;
    		PrintWriter out = null;
    		BufferedReader in = null;
    
    		try {
    			socket = new Socket("localhost ", 11111);
    			boolean state = socket.isConnected();
    			if( state) 
    				System.out.println("Successfuly connected ");
    			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    		} catch (UnknownHostException e) {
    			System.err.println("Don't know about host: localhost.");
    			System.exit(1);
    		} catch (IOException e) {
    			System.err.println("Couldn't get I/O for " + "the connection to: localhost.");
    			System.exit(1);
    		}
    
    		String userInput;
    	
    		while ((userInput = in.readLine()) != null) {
    			System.out.println("echo: " + userInput);
    		}
    
    		out.close();
    		in.close();
    		socket.close();
    	}
    
    }
    With the following configuration, test method on PrintService is not getting invoked when there is some bytes available.
    Can anyone help me correct these settings if anything wrong?

    Code:
    <int:channel id="inputChannel"/> 
    <ip:tcp-inbound-channel-adapter id="tcpInboundAdapter" 
    	                               	channel="inputChannel" 
    	                               	connection-factory="client" 
    	                           	client-mode="true"  /> 
    <ip:tcp-connection-factory id="client" 
    				type="client"
    				host="localhost"
    				port="11111" 
    				single-use="false"
    				using-nio="true"
    				using-direct-buffers="false"
    				pool-size="2"
    				so-keep-alive="true"
    				so-timeout="0"/>
    <int:service-activator id="activator"
                   		   input-channel="inputChannel"
    	   		   ref="printService"
    	   		   method="test"/>
    <bean id="printService" class="com.springintegration.print.PrintService" />
    Code:
    public class PrintService {
    	public void test(byte[] bytes) {
    		System.out.println("Received:" + new String(bytes) );
    	}
    }
    from the debug log, connection seems to be established, but the service activator is not invoking test method on PrintService

    Code:
    01/19 15:25:00,486 DEBUG [main] DefaultLifecycleProcessor} - Successfully started bean 'tcpInboundAdapter'
    01/19 15:25:00,586 DEBUG [ThreadPoolTaskScheduler-1] TcpNioClientConnectionFactory} - Opening new socket channel connection to localhost:11111
    01/19 15:25:00,622 DEBUG [ThreadPoolTaskScheduler-1] ClientModeConnectionManager} - Connection dell271:11111:79d7e264-d1bb-4b93-8d1b-ffa5c13e9b94 established
    01/19 15:25:15,374 DEBUG [pool-1-thread-2] TcpNioConnection} - dell271:11111:79d7e264-d1bb-4b93-8d1b-ffa5c13e9b94 Reading...
    01/19 15:25:15,374 DEBUG [pool-1-thread-2] TcpNioConnection} - Read 3251 into raw buffer
    Last edited by pkbhavani; Jan 19th, 2012, 04:48 PM.

  • #2
    You need some structure to the message to tell the adapter when a complete message has been received.

    TCP is a streaming protocol; Spring Integration (SI) is a messaging system, we need some way to tell SI when a full message has been completely received.

    I see you are using BufferedReader.readLine() in your example, which returns if only '\n', '\r', or '\r\n'. Depending on the sending side, you may only be sending, say, '\n'.

    The connection factory has an attribute 'deserializer' which tells us how to interpret the data, and produce a Message object. The default deserializer expects a carriage return + linefeed ('\r\n') at the end of the data; but other deserializers are available - see the reference documentation...

    http://static.springsource.org/sprin...tion-factories

    The provided classes implement both serializer and deserializer functionality. We don't currently have an implementation that only uses '\n'.

    For very simple applications, where the client sends a message and then closes the socket, you can use a ByteArrayRawSerializer which uses the socket closure to signal the end of the message.

    For applications that send more than one message on the same socket, you need some mechanism to demarcate the messages. If one of the standard (de)serializers are not sufficient, you can write your own and set it in the deserializer attribute. If this is the case, you might want to look at one of the standard (de)serializers and, perhaps, make yours a subclass of AbstractByteArraySerializer.

    Hope that helps.
    Last edited by Gary Russell; Jan 19th, 2012, 05:19 PM.

    Comment


    • #3
      thank you so much for your reply, I am trying to figure out how to plugin the serializer/deserializer for this use case.
      I changed configuration with serializer interceptor, but deserialize and serialize methods in my custom ByteArrayCrLfSerializer is being invoked. still seeing the same output with no change.

      can you please see if this configuration is infact could address the above plain old java sample tcp listener.

      Code:
      <bean id="serializer" class="com.springintegration.print.ByteArrayCrLfSerializer" />
      
      <int:channel id="inputChannel"/>
       
      <ip:tcp-inbound-channel-adapter id="tcpInboundAdapter" 
      	                               	channel="inputChannel" 
      	                               	connection-factory="client" 
      	                           	client-mode="true"  /> 
      
      <ip:tcp-connection-factory id="client" 
      				type="client"
      				host="localhost"
      				port="11111" 
      				single-use="false"
      				using-nio="true"
      				using-direct-buffers="false"
      				pool-size="2"
      				so-keep-alive="true" 
                                      deserializer="serializer"
                                      serializer="serializer"
      				so-timeout="0"/>
      
      <int:service-activator id="activator"
                     		   input-channel="inputChannel"
      	   		   ref="printService"
      	   		   method="test"/>
      
      <bean id="printService" class="com.springintegration.print.PrintService" />

      Comment


      • #4
        changed the setting with using-nio=false, now seems to be working. Here is the complete configuration for any one looking this kind of example. I have no idea if rest of the setting are needed or not for tcp-connection-factory.

        Code:
        	<bean id="serializer" class="org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer" />
          	
        	<int:channel id="inputChannel"/> 
        	<int:channel id="outputChannel"/>
        	
        	<ip:tcp-inbound-channel-adapter id="tcpInboundAdapter" 
        		                           	channel="inputChannel" 
        		                           	connection-factory="client"  
        		                           	client-mode="true"  /> 
        		                           	
        	<ip:tcp-connection-factory id="client" 
        					type="client"
        					host="localhost"
        					port="11111" 
        					single-use="false"
        					using-nio="false"
        					using-direct-buffers="false"
        					pool-size="2"
        					so-keep-alive="true" deserializer="serializer" serializer="serializer"
        					so-timeout="0"/>
        					
        	<int:service-activator id="activator"
        	               input-channel="inputChannel"
        		   		   ref="printService"/>
        		   		   
        	<bean id="printService" class="com.springintegration.print.TcpListener" />
        TcpListener class
        Code:
        @Component
        public class TcpListener {
        	@ServiceActivator
        	public void handleTcp(Message<?> message) {
        		System.out.println("*** TCP Message: " + message);
        		System.out.println("*** TCP Message Payload: " + new String((byte[]) message.getPayload()));
        	}
        }

        Comment


        • #5
          Do you mean "is NOT being invoked"?

          Can you run with a TRACE level log and attach it?

          I just modified the tcp-client-server sample like this...

          Code:
          	<ip:tcp-connection-factory id="client"
          		type="client"
          		host="localhost"
          		port="11111"
          		so-timeout="10000"
          		using-nio="true"
                  />
          	
          	<ip:tcp-inbound-channel-adapter id="inbound"
          		connection-factory="client"
          		client-mode="true"
          		channel="serverBytes2StringChannel" />
          	
          	<transformer id="serverBytes2String"
          		input-channel="serverBytes2StringChannel"
          		output-channel="toSA" 
          		expression="new String(payload)"/>
          
          	<channel id="toSA" />
          
          	<service-activator input-channel="toSA"
          					   output-channel="stdout"
          					   ref="echoService"
          					   method="test" />
          
          	<int-stream:stdout-channel-adapter id="stdout" append-newline="true" />
          
          	<beans:bean id="echoService" 
          		  class="org.springframework.integration.samples.tcpclientserver.EchoService" />
          Code:
          @ContextConfiguration("/META-INF/spring/integration/tcpClientServerDemoX-context.xml")
          @RunWith(SpringJUnit4ClassRunner.class)
          public class TcpClientServerDemoClientModeTest {
          
          	@Test
          	public void testHappyDay() throws Exception {
          		ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(11111);
          		Socket sock = server.accept();
          		sock.getOutputStream().write("Hello, world!\r\n".getBytes());
          		Thread.sleep(1000);
          		sock.close();
          		server.close();
          	}
          And everything worked as expected...

          Code:
          echo:Hello, world!

          Comment


          • #6
            As you can see 'using-nio="true"' worked fine for me. I'd like to see a TRACE log if possible; I'd like to understand why you are seeing this problem.

            In the past, we have found some subtle differences in operating systems. What OS are you running on?

            Thanks.

            Comment


            • #7
              I think the problem is that your pool-size is too small; I would suggest a minimum of 3. I think what's happening is you have data to assemble into a message, but there are no threads available to consume it and the thread that's reading from the port is blocked waiting for some of the data to be consumed before it can continue. One thread is always reserved to handle socket events.

              We should probably change the documentation to suggest a minumum pool-size of 3.

              Comment


              • #8
                Here is the debug log for my working settings.

                01/25 09:27:39,507 TRACE [main] CachedIntrospectionResults} - Getting BeanInfo for class [org.springframework.scheduling.concurrent.ThreadPo olTaskScheduler]
                01/25 09:27:39,513 TRACE [main] CachedIntrospectionResults} - Caching PropertyDescriptors for class [org.springframework.scheduling.concurrent.ThreadPo olTaskScheduler]
                01/25 09:27:39,514 TRACE [main] CachedIntrospectionResults} - Found bean property 'beanName' of type [java.lang.String]
                01/25 09:27:39,514 TRACE [main] CachedIntrospectionResults} - Found bean property 'class' of type [java.lang.Class]
                01/25 09:27:39,514 TRACE [main] CachedIntrospectionResults} - Found bean property 'daemon' of type [boolean]
                01/25 09:27:39,514 TRACE [main] CachedIntrospectionResults} - Found bean property 'errorHandler' of type [org.springframework.util.ErrorHandler]
                01/25 09:27:39,514 TRACE [main] CachedIntrospectionResults} - Found bean property 'poolSize' of type [int]
                01/25 09:27:39,514 TRACE [main] CachedIntrospectionResults} - Found bean property 'rejectedExecutionHandler' of type [java.util.concurrent.RejectedExecutionHandler]
                01/25 09:27:39,514 TRACE [main] CachedIntrospectionResults} - Found bean property 'scheduledExecutor' of type [java.util.concurrent.ScheduledExecutorService]
                01/25 09:27:39,514 TRACE [main] CachedIntrospectionResults} - Found bean property 'threadFactory' of type [java.util.concurrent.ThreadFactory]
                01/25 09:27:39,515 TRACE [main] CachedIntrospectionResults} - Found bean property 'threadGroup' of type [java.lang.ThreadGroup]
                01/25 09:27:39,515 TRACE [main] CachedIntrospectionResults} - Found bean property 'threadGroupName' of type [java.lang.String]
                01/25 09:27:39,515 TRACE [main] CachedIntrospectionResults} - Found bean property 'threadNamePrefix' of type [java.lang.String]
                01/25 09:27:39,515 TRACE [main] CachedIntrospectionResults} - Found bean property 'threadPriority' of type [int]
                01/25 09:27:39,515 TRACE [main] CachedIntrospectionResults} - Found bean property 'waitForTasksToCompleteOnShutdown' of type [boolean]
                01/25 09:27:39,516 DEBUG [main] DefaultListableBeanFactory} - Creating instance of bean '(inner bean)#1'
                01/25 09:27:39,517 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'errorChannel'
                01/25 09:27:39,517 TRACE [main] CachedIntrospectionResults} - Getting BeanInfo for class [org.springframework.integration.channel.MessagePub lishingErrorHandler]
                01/25 09:27:39,518 TRACE [main] CachedIntrospectionResults} - Caching PropertyDescriptors for class [org.springframework.integration.channel.MessagePub lishingErrorHandler]
                01/25 09:27:39,518 TRACE [main] CachedIntrospectionResults} - Found bean property 'beanFactory' of type [org.springframework.beans.factory.BeanFactory]
                01/25 09:27:39,519 TRACE [main] CachedIntrospectionResults} - Found bean property 'class' of type [java.lang.Class]
                01/25 09:27:39,519 TRACE [main] CachedIntrospectionResults} - Found bean property 'defaultErrorChannel' of type [org.springframework.integration.MessageChannel]
                01/25 09:27:39,519 TRACE [main] CachedIntrospectionResults} - Found bean property 'sendTimeout' of type [long]
                01/25 09:27:39,519 DEBUG [main] DefaultListableBeanFactory} - Finished creating instance of bean '(inner bean)#1'
                01/25 09:27:39,519 DEBUG [main] DefaultListableBeanFactory} - Invoking afterPropertiesSet() on bean with name 'taskScheduler'
                01/25 09:27:39,519 INFO [main] ThreadPoolTaskScheduler} - Initializing ExecutorService 'taskScheduler'
                01/25 09:27:39,522 DEBUG [main] DefaultListableBeanFactory} - Finished creating instance of bean 'taskScheduler'
                01/25 09:27:39,523 DEBUG [main] DefaultListableBeanFactory} - Creating shared instance of singleton bean 'org.springframework.integration.config.IdGenerato rConfigurer#0'
                01/25 09:27:39,523 DEBUG [main] DefaultListableBeanFactory} - Creating instance of bean 'org.springframework.integration.config.IdGenerato rConfigurer#0'
                01/25 09:27:39,523 DEBUG [main] DefaultListableBeanFactory} - Eagerly caching bean 'org.springframework.integration.config.IdGenerato rConfigurer#0' to allow for resolving potential circular references
                01/25 09:27:39,523 DEBUG [main] DefaultListableBeanFactory} - Finished creating instance of bean 'org.springframework.integration.config.IdGenerato rConfigurer#0'
                01/25 09:27:39,524 DEBUG [main] ClassPathXmlApplicationContext} - Unable to locate LifecycleProcessor with name 'lifecycleProcessor': using default [org.springframework.context.support.DefaultLifecyc leProcessor@722d95b3]
                01/25 09:27:39,525 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'tcpInboundAdapter'
                01/25 09:27:39,525 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'client'
                01/25 09:27:39,525 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'activator'
                01/25 09:27:39,525 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean '_org.springframework.integration.errorLogger'
                01/25 09:27:39,525 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'lifecycleProcessor'
                01/25 09:27:39,526 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'tcpInboundAdapter'
                01/25 09:27:39,526 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'client'
                01/25 09:27:39,526 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'activator'
                01/25 09:27:39,526 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean '_org.springframework.integration.errorLogger'
                01/25 09:27:39,526 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'lifecycleProcessor'
                01/25 09:27:39,527 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'tcpInboundAdapter'
                01/25 09:27:39,527 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'client'
                01/25 09:27:39,527 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'activator'
                01/25 09:27:39,527 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean '_org.springframework.integration.errorLogger'
                01/25 09:27:39,527 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'lifecycleProcessor'
                01/25 09:27:39,528 INFO [main] DefaultLifecycleProcessor} - Starting beans in phase -2147483648
                01/25 09:27:39,528 DEBUG [main] DefaultLifecycleProcessor} - Starting bean 'activator' of type [class org.springframework.integration.config.ConsumerEnd pointFactoryBean]
                01/25 09:27:39,528 INFO [main] EventDrivenConsumer} - Adding {service-activator:activator} as a subscriber to the 'inputChannel' channel
                01/25 09:27:39,528 INFO [main] DirectChannel} - Channel 'inputChannel' has 1 subscriber(s).
                01/25 09:27:39,528 INFO [main] EventDrivenConsumer} - started activator
                01/25 09:27:39,528 DEBUG [main] DefaultLifecycleProcessor} - Successfully started bean 'activator'
                01/25 09:27:39,528 DEBUG [main] DefaultLifecycleProcessor} - Starting bean '_org.springframework.integration.errorLogger' of type [class org.springframework.integration.endpoint.EventDriv enConsumer]
                01/25 09:27:39,528 INFO [main] EventDrivenConsumer} - Adding {logging-channel-adapter:_org.springframework.integration.errorLogg er} as a subscriber to the 'errorChannel' channel
                01/25 09:27:39,529 INFO [main] PublishSubscribeChannel} - Channel 'errorChannel' has 1 subscriber(s).
                01/25 09:27:39,529 INFO [main] EventDrivenConsumer} - started _org.springframework.integration.errorLogger
                01/25 09:27:39,529 DEBUG [main] DefaultLifecycleProcessor} - Successfully started bean '_org.springframework.integration.errorLogger'
                01/25 09:27:39,529 INFO [main] DefaultLifecycleProcessor} - Starting beans in phase 0
                01/25 09:27:39,529 DEBUG [main] DefaultLifecycleProcessor} - Starting bean 'tcpInboundAdapter' of type [class org.springframework.integration.ip.tcp.TcpReceivin gChannelAdapter]
                01/25 09:27:39,530 INFO [main] TcpNetClientConnectionFactory} - started client
                01/25 09:27:39,530 INFO [main] ThreadPoolTaskScheduler} - Initializing ExecutorService
                01/25 09:27:39,532 INFO [main] TcpReceivingChannelAdapter} - started tcpInboundAdapter
                01/25 09:27:39,532 DEBUG [ThreadPoolTaskScheduler-1] TcpNetClientConnectionFactory} - Opening new socket connection to 127.0.0.1:11111
                01/25 09:27:39,532 DEBUG [main] DefaultLifecycleProcessor} - Successfully started bean 'tcpInboundAdapter'
                01/25 09:27:39,533 TRACE [main] ClassPathXmlApplicationContext} - Publishing event in org.springframework.context.support.ClassPathXmlAp plicationContext@630045eb: org.springframework.context.event.ContextRefreshed Event[source=org.springframework.context.support.ClassPa thXmlApplicationContext@630045eb: startup date [Wed Jan 25 09:27:38 CST 2012]; root of context hierarchy]
                01/25 09:27:39,534 DEBUG [main] DefaultListableBeanFactory} - Returning cached instance of singleton bean 'org.springframework.integration.config.IdGenerato rConfigurer#0'
                Press Enter/Return in the console to exit
                01/25 09:27:39,565 DEBUG [ThreadPoolTaskScheduler-1] ClientModeConnectionManager} - Connection dell271.otcgh:2055:f19a202a-e5e6-407d-ad8a-03352ff73fd4 established
                01/25 09:27:39,565 DEBUG [pool-1-thread-2] TcpNetConnection} - Reading...
                01/25 09:27:39,566 DEBUG [pool-1-thread-2] ByteArrayCrLfSerializer} - Available to read:0

                Comment


                • #9
                  Thanks, I figured out the root cause was your pool-size setting being 2; it needs to be a minimum of 3 when using NIO.

                  See https://jira.springsource.org/browse/INT-2418 and https://jira.springsource.org/browse/INT-2419.

                  Comment


                  • #10
                    Machine I am testing this is on window 7.
                    java version "1.6.0_29"
                    Java(TM) SE Runtime Environment (build 1.6.0_29-b11)
                    Java HotSpot(TM) 64-Bit Server VM (build 20.4-b02, mixed mode)

                    Comment


                    • #11
                      Thank you so much. when setting using-nio=false as in my configuration, my current setting pool-size=2 is working. I will try with pool-size=3.

                      Comment


                      • #12
                        Right; the pool-size only needs to be increased when using-nio="true"

                        Note that, as mentioned in the reference manual, using-nio is usually only needed for high volume and/or handling many connections.

                        Comment

                        Working...
                        X