Announcement Announcement Module
Collapse
No announcement yet.
2 way tcp socket adapter? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • 2 way tcp socket adapter?

    Hello,
    I'm pretty new to Spring and Spring integration. I'd like an app that can create 2 tcp sockets for bidirectional communication. I can't quite figure out how to set up 1 socket that can do 2 way communication. It seems that the tcp adapters are 1 way only? I want a client app to connect to my 2 sockets and be able to send/receive from both of those sockets.

    Any help would be appreciated.

    Thanks,
    Walt

  • #2
    For simple request/reply 2 way comms, use an <int-ip:outbound-gateway/> for each.

    For completely asynchronous 2-way communication, use the same client connection factory in both an outbound and inbound adapter (see section about collaborating channel adapters in the reference manual). Note, however, that the connection is not established until a message is sent. There is an open JIRA issue asking for the connection to be established independent of actually sending a message; it should be addressed soon. https://jira.springsource.org/browse/INT-1770

    Comment


    • #3
      Thanks Gary. Here is my xml snippet to pass by you to make sure I understand...

      I'm hoping this sets up 1 socket that I can connect to on port 12345 and send and receive messages to/from ?

      <ip:tcp-connection-factory id="DebugEngineCommandsFactory"
      type="server"
      port="12345"
      deserializer="javaDeserializer"
      serializer="javaSerializer"
      />

      <!-- take messages from -->
      <ip:tcp-outbound-channel-adapter id="CommandsOutputAdapter"
      channel="CommandsOutputChannel"
      connection-factory="DebugEngineCommandsFactory"/>

      <ip:tcp-inbound-channel-adapter id="CommandsInputAdapter"
      channel="CommandsInputChannel"
      connection-factory="DebugEngineCommandsFactory"/>

      Comment


      • #4
        Correct.

        The only thing you have to be careful about on the server side is to make sure any outgoing messages have the correct 'ip_connection_id' header - that's how we know which connection to use. If the message originates from the inbound adapter, the header will already be filled in, but it you construct a new message, it needs the header.

        This is also true if you put an error channel on the inbound adapter, the error message won't have the header, but the failed message will. Here is an example of how to handle errors and send some response to the client - it is a modified version of the tcp-client-server sample application, modified to use adapters instead of a gateway...

        Code:
        <!-- 	<ip:tcp-inbound-gateway id="gatewayCrLf" -->
        <!-- 		connection-factory="crLfServer" -->
        <!-- 		request-channel="serverBytes2StringChannel"  -->
        <!-- 		error-channel="errorChannel"/> -->
        
        	<ip:tcp-inbound-channel-adapter
        		connection-factory="crLfServer" error-channel="errorChannel"
        		channel="serverBytes2StringChannel"/>
        		
        	<ip:tcp-outbound-channel-adapter connection-factory="crLfServer" channel="fromSA"/>
        			
        	<channel id="toSA" />
        	<channel id="fromSA" />
        
        	<service-activator input-channel="toSA"
        					   output-channel="fromSA"
        					   ref="echoService"
        					   method="test"
        	/>
        
        	<beans:bean id="echoService" 
        		  class="org.springframework.integration.samples.tcpclientserver.EchoService" />
        		  
        	<transformer id="serverBytes2String"
        		input-channel="serverBytes2StringChannel"
        		output-channel="toSA" 
        		expression="new String(payload)"/>
        		
        	<chain input-channel="errorChannel" output-channel="fromSA">
        		<header-enricher>
        			<header name="ip_connection_id" expression="payload.failedMessage.headers.ip_connection_id"/>
        		</header-enricher>
        		<transformer
        			expression="payload.failedMessage.payload + ':' + payload.cause.message"/>
        	</chain>
        The <chain/> at the end copies the header from the failed message.

        In future, please use [ code ] ... [ /code ] tags around code and config snippets (no spaces inside the brackets).
        Last edited by Gary Russell; Oct 13th, 2011, 02:43 PM.

        Comment


        • #5
          asynchronous isn't working

          Howdy,
          I sort of have this working, but when I want to send a message without it being a response to an incoming I get a spring exception.

          Here is the scenario.
          server is the spring integration app that creates 2 tcp server adapters as described in our previous messages in this thread.

          The client app can send messages to both of the 2 tcp sockets, and the SI app can send replies to those incoming messages as a return value in the endpoint method.

          Now I need to send a message out one of the tcp sockets asynchronously to send something to the client. This is when I get an exception.

          Code:
          org.springframework.integration.MessageHandlingException: error occurred in message handler [org.springframework.integration.ip.tcp.TcpSendingMessageHandler#1]

          Here is my relevant context file

          Code:
             <ip:tcp-connection-factory id="DebugEngineEventsFactory"
              type="server"
              port="${debug.engine.events.port}"
              using-nio="false"
            />
          
              <ip:tcp-outbound-channel-adapter id="EventsOutputAdapter"
                 channel="EventsOutputChannel"
                 connection-factory="DebugEngineEventsFactory"/>
          
              <ip:tcp-inbound-channel-adapter id="EventsInputAdapter"
                 channel="EventsInputChannel"
                 connection-factory="DebugEngineEventsFactory"/>
          
              <gateway id="StudioEventGateway" service-interface="com.hp.oo.studio.debug.engine.gateways.StudioGateway"
          	         default-request-channel="SendEventsExecutorChannel" />
          
              <chain input-channel="SendCommandsExecutorChannel" output-channel="CommandsOutputChannel">
          		<integration:object-to-json-transformer object-mapper="customObjectMapper"/>
          	    <transformer expression="new String(payload)"/>
              </chain>
          
              <chain input-channel="EventsInputChannel" output-channel="ReceiveEventsExecutorChannel">
          	<transformer expression="new String(payload)"/>
          	<integration:json-to-object-transformer type="com.hp.oo.studio.debug.engine.shared.protocol.events.DebugEvent"
          	                object-mapper="customObjectMapper" />
                  <header-enricher>
          		<header name="replyChannel" value="SendEventsExecutorChannel"/>
                     </header-enricher>
              </chain>
          
             <service-activator input-channel="ReceiveEventsExecutorChannel"
          	                   ref="DebugExecutorService"
          	                   method="eventFromStudio" />

          Async java method

          Code:
          	
          /** 
            * this method is run asynchronously as part of an event loop in the app 
            * itself.   We need to send an "event" back to the client.  But Spring
            * fails to send the message, even though we successfully sent an 
            * message out the event channel already as a reply from an incoming
            * message.
            *
            */
          void sendDebugEvent(IDebugEvent event, boolean error) {
                  if (debugMode) {
                      try {            	            	
          
                          //I tried this and it fails as well.
                      	/*StudioEventGateway = (StudioGateway)context.getBean("StudioEventGateway");
                      	StudioEventGateway.send(event);*/
                      	
                          //this doesn't work either.
                          ChannelResolver channelResolver = new BeanFactoryChannelResolver(context);		
                      	MessageChannel output = channelResolver.resolveChannelName("SendEventsExecutorChannel");
          
                          //exception here....             	
                      	output.send(new GenericMessage<IDebugEvent>(event));
          
                      } catch (Exception e) {
                          System.err.println("Error: " + e);
                          System.exit(1);
                      }
                  } else if (error) {
                      System.err.println("Error: " + event);
                  }
              }
          
          
                  /** 
                    * THIS works...but is the input endpoint from the client.
                    * the reply object gets sent out the channel to the client.
                    *
                    */
          	public IDebugEvent eventFromStudio(DebugEvent event) {		
          		IDebugEvent responseEvent = new DebugEvent();
          		return responseEvent;
          	}

          Comment


          • #6
            asynchronous isn't working

            Howdy,
            I sort of have this working, but when I want to send a message without it being a response to an incoming I get a spring exception.

            Here is the scenario.
            server is the spring integration app that creates 2 tcp server adapters as described in our previous messages in this thread.

            The client app can send messages to both of the 2 tcp sockets, and the SI app can send replies to those incoming messages as a return value in the endpoint method.

            Now I need to send a message out one of the tcp sockets asynchronously to send something to the client. This is when I get an exception.

            Code:
            org.springframework.integration.MessageHandlingException: error occurred in message handler [org.springframework.integration.ip.tcp.TcpSendingMessageHandler#1]

            Here is my relevant context file

            Code:
               <ip:tcp-connection-factory id="DebugEngineEventsFactory"
                type="server"
                port="${debug.engine.events.port}"
                using-nio="false"
              />
            
                <ip:tcp-outbound-channel-adapter id="EventsOutputAdapter"
                   channel="EventsOutputChannel"
                   connection-factory="DebugEngineEventsFactory"/>
            
                <ip:tcp-inbound-channel-adapter id="EventsInputAdapter"
                   channel="EventsInputChannel"
                   connection-factory="DebugEngineEventsFactory"/>
            
                <gateway id="StudioEventGateway" service-interface="com.hp.oo.studio.debug.engine.gateways.StudioGateway"
            	         default-request-channel="SendEventsExecutorChannel" />
            
                <chain input-channel="SendCommandsExecutorChannel" output-channel="CommandsOutputChannel">
            		<integration:object-to-json-transformer object-mapper="customObjectMapper"/>
            	    <transformer expression="new String(payload)"/>
                </chain>
            
                <chain input-channel="EventsInputChannel" output-channel="ReceiveEventsExecutorChannel">
            	<transformer expression="new String(payload)"/>
            	<integration:json-to-object-transformer type="com.hp.oo.studio.debug.engine.shared.protocol.events.DebugEvent"
            	                object-mapper="customObjectMapper" />
                    <header-enricher>
            		<header name="replyChannel" value="SendEventsExecutorChannel"/>
                       </header-enricher>
                </chain>
            
               <service-activator input-channel="ReceiveEventsExecutorChannel"
            	                   ref="DebugExecutorService"
            	                   method="eventFromStudio" />

            Async java method

            Code:
            	
            /** 
              * this method is run asynchronously as part of an event loop in the app 
              * itself.   We need to send an "event" back to the client.  But Spring
              * fails to send the message, even though we successfully sent an 
              * message out the event channel already as a reply from an incoming
              * message.
              *
              */
            void sendDebugEvent(IDebugEvent event, boolean error) {
                    if (debugMode) {
                        try {            	            	
            
                            //I tried this and it fails as well.
                        	/*StudioEventGateway = (StudioGateway)context.getBean("StudioEventGateway");
                        	StudioEventGateway.send(event);*/
                        	
                            //this doesn't work either.
                            ChannelResolver channelResolver = new BeanFactoryChannelResolver(context);		
                        	MessageChannel output = channelResolver.resolveChannelName("SendEventsExecutorChannel");
            
                            //exception here....             	
                        	output.send(new GenericMessage<IDebugEvent>(event));
            
                        } catch (Exception e) {
                            System.err.println("Error: " + e);
                            System.exit(1);
                        }
                    } else if (error) {
                        System.err.println("Error: " + event);
                    }
                }
            
            
                    /** 
                      * THIS works...but is the input endpoint from the client.
                      * the reply object gets sent out the channel to the client.
                      *
                      */
            	public IDebugEvent eventFromStudio(DebugEvent event) {		
            		IDebugEvent responseEvent = new DebugEvent();
            		return responseEvent;
            	}

            Comment


            • #7
              I've tried replying to this thread twice now and neither of those posts are showing up....what gives?

              I'll try again.....

              Ok I just sent another reply after this one and I don't see it here. It included a long description of my new problem as well as code blocks. *sigh*


              I can't send asynchronous messages out a tcp adapter that's already received a message.
              Last edited by hemna; Oct 27th, 2011, 11:07 AM.

              Comment


              • #8
                I did see your message as an email notification but when I came here it was gone; not sure why - I assumed you had (somehow) deleted it.

                I haven't had a chance to take a detailed look at your configuration but I see you have no so-timeout on your connection factory. I recently added a note about this to the reference document. There is currently a default 10 second timeout when using collaborating adapters.

                You didn't show the full stack trace so I can't be sure that's what you are seeing, but I am guessing so.

                If using 2.0, try setting so-timeout to some large number (in 2.1.M3 you can set to 0 for infinity). You can set it to some reasonable number (milliseconds), or set it to the max with a SpEL expresion #{T (java.lang.Integer).MAX_VALUE}.

                HTH

                Comment


                • #9
                  Thanks for the reply Gary. I am using 2.0.5, and the exception I'm getting doesn't provide a stack unfortunately. I can try and debug into it and see how it's getting there. I'll try setting the so-timeout to some large number and see if that helps.....


                  ok I tried to add the so-timeout and that didn't help. I believe I found the offending piece....

                  org.springframework.integration.ip.tcp.TcpSendingM essageHandler

                  the connectionId comes back as null

                  Code:
                  	public void handleMessageInternal(final Message<?> message) throws MessageRejectedException,
                  			MessageHandlingException, MessageDeliveryException {
                  		if (this.serverConnectionFactory != null) {
                  			// We don't own the connection, we are asynchronously replying
                  			Object connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID);
                  			TcpConnection connection = connections.get(connectionId);
                  			if (connection != null) {
                  				try {
                  					connection.send(message);
                  				} catch (Exception e) {
                  					logger.error("Error sending message", e);
                  					connection.close();
                  				}
                  			} else {
                  				logger.error("Unable to find incoming socket for " + message);
                  			}
                  			return;
                  		}
                  The message is...

                  Code:
                  [Payload={"@class":"com.hp.oo.studio.debug.engine.shared.protocol.events.DebugEventDEStarted","message":"vmstarted"}][Headers={timestamp=1319745621975, id=197dfd3a-dfd8-4835-8934-45b857209527}]
                  The message headers don't contain a CONNECTION_ID. I'm not sure how or where that is populated, but I'm simply putting a message out on the channel that goes through a chain who's output channel has an output channel of EventsOutputChannel. There is a tcp-outbound-channel-adapter attached to that channel via...

                  Code:
                  <ip:tcp-outbound-channel-adapter id="EventsOutputAdapter"
                         channel="EventsOutputChannel"
                         connection-factory="DebugEngineEventsFactory"/>
                  Last edited by hemna; Oct 27th, 2011, 03:04 PM.

                  Comment


                  • #10
                    I went back to your previous reply and see that you mentioned I need to add the ip_connection_id header, but I have no idea where to get the value from. your example shows getting a header value from a failed message payload... This is probably a simple issue, but being new to Spring and Spring Integration, I'm not sure how to populate that value properly. I presume I'd add the header enricher on my outbound chain, but I don't know where to get the value from...


                    Reading more of the source code makes me think that the underlying logic of the TcpMessageHandler is broken. It assumes that the message outbound is always in response to a message inbound. Inbound messages would have the connection id, which you can pull out and add to a reply message header.
                    But when you want to initiate a new message out the socket back to the client, there is no way to tell the message which client (connection id) to send the message to. I'm initiating a new message that I want to send outbound from the server to the client that isn't in response to another message.

                    The this.serverConnectionFactory has a LinkedList of connections, but it's never used. I presume this is because we could have more than one client connected. In my particular app, I'll always only have 1 client connected.
                    Last edited by hemna; Oct 27th, 2011, 03:37 PM.

                    Comment


                    • #11
                      I don't believe anything is broken.

                      That's why they are called collaborating channel adapters. The inbound message is not sent the TcpSendingMessageHandler (outbound-adapter); it's sent to the TcpReceivingChannelAdapter (inbound adapter).

                      Regarding the header; the example I gave shows how to populate the header on an error channel.

                      I need to understand more of your message flow, and what you are trying to do with your messages.

                      Comment


                      • #12
                        I need 2 way asynchronous communication and request/reply communication on a socket. I need to be able to reply to incoming messages, which I'm already doing. But I need to initiate new messages to the client that is not a reply to a client message. My app has a loop that does work until the work is complete. Throughout the work loop some event happens that needs to notify the client of that event. This is out of band from the request/reply scenario.

                        Comment


                        • #13
                          OK; got it.

                          What you need to do is capture the header from a good inbound message.

                          If an inbound message is good and your app replies; it will contain the header.

                          If the inbound message fails, you'll have to populate the header as I described in the error flow.

                          For unsolicited outbound messages, you'll need to use the previously saved header. Something like this should work.

                          Code:
                          public class SaveHeader {
                          
                          	private volatile String header;
                          
                          	public String getHeader() {
                          		return header;
                          	}
                          
                          	public void setHeader(String header) {
                          		this.header = header;
                          	}
                          
                          }
                          Code:
                          	<int-ip:tcp-connection-factory 	id="crLfServer"
                          		type="server"
                          		port="11111"/>
                          	
                          	<int-ip:tcp-inbound-channel-adapter connection-factory="crLfServer"
                          		channel="inbound"/>
                          		
                          	<int:recipient-list-router>
                          		<int:recipient channel="saveHeader"/>
                          		<int:recipient channel="doWork"/>
                          	</int:recipient-list-router>
                          	
                          	<int:channel id="saveHeader" />
                          	
                          	<bean id="headerSav" class="foo.SaveHeader" />
                          	
                          	<int:chain input-channel="saveHeader" >
                          		<int:transformer expression="headers.ip_connection_id"/>
                          		<int:service-activator ref="headerSaver" method="setHeader"/>
                          	</int:chain>
                          	
                          	<int:channel id="doWork" /> 
                          	
                          	<!-- Your main code here including error handling as above -->
                          	
                          	<!-- ##################################################### -->
                          
                          	<!-- Your unsolicited outbound -->
                          	
                          	<int:channel id="unsolicitedMessageChannel"/>
                          	
                          	<int:header-enricher input-channel="unsolicitedMessageChannel" 
                          		output-channel="toOutboundAdapter">
                          		<int:header name="ip_connection_id" expression="@headerSaver.header"/>
                          	</int:header-enricher>
                          Because the setHeader method returns void, that part of the flow ends and we then send the message to the next recipient (your code).

                          The header enricher uses a SpEL expression to get the header value by calling the getHeader() method on the headerSaver.

                          I haven't tested it, but hopefully you get the idea.

                          Comment


                          • #14
                            connection_id' header - that's how we know which connection to use. If the message originates from the inbound adapter, the header will already be filled in, but it you construct a new message, it needs the header.

                            This is also true if you put an error channel on the inbound adapter, the error message won't have the header, but the failed message will. Here is an example of how to handle errors and send some response to the clientI gave shows how to populate the header on an error channel.

                            Comment


                            • #15
                              Hi Gary,

                              Thanks very much! for the explanation.

                              I am following tcp-client-server-multiplex example in spring-integration-samples and I want to send new message to the client that is not a reply to a client message.
                              I would like to know in case of tcp-client-server-multiplex example, how to save ip_connection_header and use it to send new message.

                              Best Regards,
                              Chary

                              Comment

                              Working...
                              X