Announcement Announcement Module
Collapse
No announcement yet.
TCP receive message to different port than sent on Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • TCP receive message to different port than sent on

    I have an application in which I need to get a person's credit report by sending demographic information to a remote server and receiving the report from that server to a different port. For example, I'd send a message containing the demo info to port 9300 on the remote server and receive the credit report message to port 9305 of my local.

    I tried this configuration :

    Code:
    <!-- CLIENT SIDE -->
    	
    	<bean id="rawSerializer" class="org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer" />
    	<bean id="isRequestUtils" class="com.util.ISRequestUtils"/>
    	
    	<int:channel 
    		id="initialRequest"
    		datatype="java.lang.String"/>
    		
    	<int:channel 
    		id="txAdded"
    		datatype="java.lang.String"/>
    		
    	<int:channel 
    		id="finalRequest"
    		datatype="java.lang.String"/>	
    		
    	<int:channel
    		id="creditReport"
    		datatype="java.lang.String"/>
    		
    	<int:gateway
    		id="requestHandler"
    		service-interface="com.util.ISRequestHandler2"
    		default-request-channel="initialRequest"
    		default-reply-channel="creditReport"/>
    		
    	<int:transformer
    		id="addTXcontrol"
    		input-channel="initialRequest"
    		output-channel="txAdded"
    		ref="isRequestUtils"
    		method="addTX"/>
    		
    	<int:transformer
    		id="addPrefix"
    		input-channel="txAdded"
    		output-channel="finalRequest"
    		ref="isRequestUtils"
    		method="addPrefix"/>
    	
     	<int-ip:tcp-outbound-channel-adapter
    		id="message"
    		channel="finalRequest"
    		connection-factory="requestCF"/>
    		
    	<int-ip:tcp-connection-factory
    		id="requestCF"
    		type="client" 
    		host="${infoserver.address}" 
    		port="${infoserver.request.port}"
    		so-keep-alive="true"
    		single-use="false"
    		serializer="rawSerializer"/>
    
    
    	<!-- SERVER SIDE -->
    	
    	<bean id="byteArrayStxEtxSerializer" class="org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer">
    		<property name="maxMessageSize" value="102400"/>
    	</bean>
    	<bean id="isResponseUtils" class="com.util.ISResponseUtils"/>
    	
    	<int:channel
    		id="byteResponse"/>
    		
    	<int:channel
    		id="stringResponse"
    		datatype="java.lang.String"/>
    		
    	<int:channel
    		id="finalResponse"
    		datatype="java.lang.String"/>		
    				
    	<int-ip:tcp-connection-factory
    		id="isListener"
    		type="server"
    		host="localhost"
    		port="${infoserver.response.port}"
    		so-keep-alive="true"
    		single-use="false"
    		deserializer="byteArrayStxEtxSerializer"/>
    		
    	<int-ip:tcp-inbound-channel-adapter
    		id="inboundResponse"
    		channel="byteResponse"
    		connection-factory="isListener"/>
    
    	<int:transformer
    		id="bytes2String"
    		input-channel="byteResponse"
    		output-channel="stringResponse"
    		expression="new String(payload)"/>
    				
    	<int:service-activator
    		input-channel="stringResponse"
    		output-channel="creditReport"
    		ref="isResponseUtils"
    		method="showResponse"/>
    My main:

    Code:
    ApplicationContext context = new ClassPathXmlApplicationContext("/spring-config/si-config.xml");
    		
    		final ISRequestHandler2 requestHandler = context.getBean(ISRequestHandler2.class);
    		//final AbstractServerConnectionFactory isListener = context.getBean(AbstractServerConnectionFactory.class);
    		
    		String response = requestHandler.send("DEMOGRAPHIC_INFO");
    		System.out.println(response);

  • #2
    This is the error I get:

    Code:
    [02-07-2013 11:28:09,760] ERROR TcpNetConnection:155 Exception sending meeeage: [Payload=[B@69d02b][Headers={timestamp=1360254485026, id=3933c780-3969-4030-8856-8eaf143cf25a, ip_tcp_remotePort=2381, ip_address=10.127.211.92, ip_hostname=VMCAP2ISDEVMED, ip_connectionId=VMCAP2ISDEVMED:2381:ba447e3a-2a1b-4fe0-a732-c93323be6232}]
    org.springframework.integration.support.channel.ChannelResolutionException: no output-channel or replyChannel header available
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:206)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:165)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:159)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:141)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:216)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:200)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:165)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:159)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:141)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:216)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:200)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:165)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:159)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:141)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:92)
    	at org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter.onMessage(TcpReceivingChannelAdapter.java:74)
    	at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:144)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    	at java.lang.Thread.run(Unknown Source)

    Comment


    • #3
      Right - if you want to use a gateway, with asynchronous TCP (or any) adapters, you need to preserve/restore the replyChannel header that the gateway saves in the outbound message headers. The sending thread is waiting for a response on this channel (which is a temporary channel created by the gateway).

      You can either do this with custom code (invoke a service to capture the header outbound and use a <header-enricher /> to restore it on inbound.

      The tcp-client-server-multiplex sample (https://github.com/SpringSource/spri...rver-multiplex) uses another technique - sends a copy of the outbound message to an aggregator and sends the reply to the aggregator; the headers from each are merged (thus restoring the replyChannel), and then the request message is dropped from the aggregated result.

      Or, instead of using a request/reply gateway, you can do it all yourself - with the sending bean sending the request to a gateway method that returns void and exposing a method on that bean that is invoked using a <service-activator/>.

      Hope that helps.

      Comment


      • #4
        But if I specify the default-reply-channel of the gateway to be the output-channel of the service-activator, shouldn't the framework do that for me?

        Comment


        • #5
          No; it doesn't work that way - the gateway needs some way to correlate responses to requests - there might be many threads waiting for a response and the gateway needs to know which thread gets a particular response. The way it's done is to create a TemporaryChannel for each request and store it in the replyChannel header.

          The reply-channel on the gateway is simply a mechanism to allow you to explicitly send there from some component (or if you wish it to be a special channel type - e.g. <publish-subscribe/> so the reply can go to more than one destination), or if you want to, say, <wire-tap/> the reply-channel.

          Internally, the gateway bridges messages arriving on the reply-channel to the replyChannel header.

          Since the replyChannel is a LIVE java object - it can't be exchanged with external systems - the thread is waiting to receive the reply on it.

          Comment


          • #6
            I think I understand.

            Using my config file, to accomplish what I want, I need to set the replyChannel of messages dropped onto the creditReport channel to the replyChannel of message dropped onto the finalRequest channel and then have message dropped onto the creditReport channel go to a tcp-inbound-channel-adapter connected to the requestCF connection factory?

            Can this be done in just the config file?

            Comment


            • #7
              You need something in the data to correlate the response to the request - you may be able to do it without any code - the tcp-client-server-multiplex sample I mentioned in post #3 uses an aggregator - but it uses a simple correlation scheme. It depends on how easy it is to extract the correlation data.

              For your case, you might find it easier to do the correlation in your code that handles the reply - set up another method and invoke it as a <service-activator/>

              Comment


              • #8
                I did look at the tcp multiplex example. But if the correlation data is in different places in the request and response payloads, I don't know if that will work. Also, the method I use to send the request is synchronized, would there be a need to correlate the response to the request?

                I'm using (or want to use) something like:
                Code:
                public synchronized String getCreditReport(String personID) {
                			String creditReport = null;
                			String demoInfo = lookupDemoInfo(personID);
                			
                			SIContextManager.getInstance();
                			RequestHandler requestHandler = (RequestHandler) SIContextManager.getBean("requestHandler");
                			creditReport = requestHandler.send(demoInfo);
                			
                			return creditReport;
                		}

                Comment


                • #9
                  You could do it that way, but you'd need a transformer/header-enricher on each leg to populate a header with the correlation data.

                  need to correlate the response to the request?
                  Synchronizing won't help; the framework needs some way to figure out where to send the reply.

                  For your application, I would recommend doing something like this...

                  Code:
                  public class Foo {
                  
                  	private final Log logger = LogFactory.getLog(this.getClass().getName());
                  
                  	private final Map<String, BlockingQueue<String>> pendingReplies = new ConcurrentHashMap<String, BlockingQueue<String>>();
                  
                  	@Autowired
                  	private RequestHandler requestHandler;
                  
                  	public String getCreditReport(String personID) {
                  		BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
                  		this.pendingReplies.put(personID, queue);
                  		this.requestHandler.send(personID);
                  		String reply = null;
                  		try {
                  			reply = queue.poll(10, TimeUnit.SECONDS);
                  		}
                  		catch (InterruptedException e) {
                  			Thread.currentThread().interrupt();
                  		}
                  		this.pendingReplies.remove(personID);
                  		if (reply == null) {
                  			throw new RuntimeException("No reply from credit service");
                  		}
                  		return reply;
                  	}
                  
                  	public void handleReply(String reply) {
                  		String personID = lookup(reply);
                  		BlockingQueue<String> queue = this.pendingReplies.get(personID);
                  		if (queue == null) {
                  			logger.error("Reply too late:" + reply);
                  		}
                  		else {
                  			try {
                  				queue.put(reply);
                  			}
                  			catch (InterruptedException e) {
                  				Thread.currentThread().interrupt();
                  			}
                  			this.pendingReplies.remove(personID);
                  		}
                  	}
                  
                  	private String lookup(String reply) {
                  		// find personID in reply
                  		return reply;
                  	}
                  
                  	public interface RequestHandler {
                  		void send(String personID);
                  	}
                  }
                  Code:
                  	<context:annotation-config />
                  
                  	<bean id="client" class="foo.Foo" />
                  
                  	<int:gateway id="requestHandler" service-interface="foo.Foo$RequestHandler"
                  		default-request-channel="foo"/>	
                  
                  	<int:channel id="foo" />
                  ...
                  
                  
                  
                  <!-- INBOUND -- >
                  
                  ...
                  	<int:service-activator input-channel="bar" 
                  		ref="client" method="handleReply"/>
                  Last edited by Gary Russell; Feb 11th, 2013, 04:21 PM.

                  Comment


                  • #10
                    Thanks for the suggestion. I tried doing something like that, but it wasn't working.

                    So I went back to the tcp-multiplex example. I'm able to set the correlation_id in the headers for both the request and response, but am unsure how to tell the aggregator to match by that.

                    I have this config:
                    Code:
                    <!-- CLIENT SIDE -->
                    ...
                    <int:header-enricher
                    		id="addRequestCorrID"
                    		input-channel="prefixAdded"
                    		output-channel="finalRequest">
                    		<int:correlation-id ref="isRequestUtils" method="getCorrelationID"/>
                    	</int:header-enricher>
                    	
                    	<int-ip:tcp-outbound-channel-adapter
                     		order="2"
                    		id="message"
                    		channel="finalRequest"
                    		connection-factory="requestCF"/>
                    	
                    	<int-ip:tcp-connection-factory
                    		id="requestCF"
                    		type="client" 
                    		host="${infoserver.address}" 
                    		port="${infoserver.request.port}"
                    		so-keep-alive="true"
                    		single-use="false"
                    		serializer="rawSerializer"/>
                    		
                    	<int:bridge
                    		order="1"
                    		input-channel="finalRequest"
                    		output-channel="toAggregator"/>
                    	
                     	<int-ip:tcp-inbound-channel-adapter
                     		id="fromServerSide"
                     		channel="toAggregator"
                     		connection-factory="requestCF"/>
                    
                    	<int:channel
                    		id="toAggregator"
                    		datatype="java.lang.String"/>
                    		
                    	<int:aggregator
                    		input-channel="toAggregator"
                    		correlation-strategy-expression="header.correlationid"
                    		release-strategy-expression="size() == 2"/>
                    
                    <!-- SERVER SIDE -->
                    ...
                    <int:transformer
                    		id="bytes2String"
                    		input-channel="byteResponse"
                    		output-channel="stringResponse"
                    		expression="new String(payload)"/>
                    
                    	<int:header-enricher
                    		id="addResponseCorrID"
                    		input-channel="stringResponse"
                    		output-channel="creditReport">
                    		<int:correlation-id ref="isResponseUtils" method="getCorrelationID"/>
                    	</int:header-enricher>
                    	
                    	<int:bridge
                    		order="1"
                    		input-channel="creditReport"
                    		output-channel="toAggregator"/>
                    	
                    	<int:service-activator
                    		order="2"
                    		input-channel="creditReport"
                    		ref="isResponseUtils"
                    		method="persistResponse"/>
                    		
                    	<int:channel
                    		id="creditReport"
                    		datatype="java.lang.String"/>

                    Comment


                    • #11
                      When you use the standard correlation id header (correlationId) you can just eliminate the correlation strategy from the aggregator - it will use that header by default.
                      Last edited by Gary Russell; Feb 13th, 2013, 11:05 AM.

                      Comment


                      • #12
                        That worked.

                        From the logs, I can see the messages being correlated, and the transformer outputting the message that contains the credit report. But for some reason, it doesn't get sent to the client-side tcp-inbound adapter.

                        This config:
                        Code:
                        <int:bridge
                        		order="1"
                        		input-channel="finalRequest"
                        		output-channel="toAggregator"/>
                        	
                         	<int-ip:tcp-inbound-channel-adapter
                         		id="fromServerSide"
                         		channel="fromTransformer"
                         		connection-factory="requestCF"/>
                        
                        	<int:channel
                        		id="toAggregator"
                        		datatype="java.lang.String"/>
                        		
                        	<int:channel
                        		id="toTransformer"/>
                        		
                        	<int:channel
                        		id="fromTransformer"/>
                        		
                        	<int:aggregator
                        		input-channel="toAggregator"
                        		output-channel="toTransformer"
                        		release-strategy-expression="size() == 2"/>
                        
                        	<int:transformer
                        		input-channel="toTransformer"
                        		output-channel="fromTransformer"
                        		expression="payload.get(1)"/>
                        results in this error:
                        Code:
                        [02-13-2013 14:11:55,507] ERROR TcpNetConnection:155 Exception sending meeeage: [Payload=[B@1445748][Headers={timestamp=1360782715460, id=862e2275-103e-4453-8ad2-345348896316, ip_tcp_remotePort=3411, ip_address=10.127.211.92, ip_hostname=VMCAP2ISDEVMED, ip_connectionId=VMCAP2ISDEVMED:3411:c8eff8bc-fb84-4e86-951b-90b712078091}]
                        org.springframework.integration.MessageDeliveryException: Dispatcher has no subscribers for channel fromTransformer.
                        Doesn't this config make the tcp-inbound-channel-adapter a subscriber to the fromTransformer channel?

                        Comment


                        • #13
                          I figured it out.

                          I forgot to add the default-reply-channel to my gateway.

                          I'm now able to send a request and get a response back.

                          Thanks for the help.

                          Comment


                          • #14
                            Cool!

                            Glad to help.

                            Comment

                            Working...
                            X