Announcement Announcement Module
Collapse
No announcement yet.
TCP client Gateway - but need to handle unsolicited TCP server responses Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • TCP client Gateway - but need to handle unsolicited TCP server responses

    Hi.

    I am designing a gateway/client application to the 3rd party system which provides a TCP/IP service as the interface to communicate with it. So basically I need to listen on a queue [i.e wait for new messages] , get a message from the queue, transform the message , send the message to the TCP server (not in control - third party), get response from the server and then transform the message and send it back to another queue. The bit from getting a message from the queue to the end (i.e sending to another queue) should belong to one transaction. and each request can wait for the previous requests response returns. The interactions with the queues (i.e receive and send) i have implemented using spring-jms and spring transaction support (we are quite confident we will not move from jms based transports to other transports). remaining is the interface to TCP service for which i am hoping to use spring integration.
    The TCP server is a request /response one, basically if we send a request, it will reply - and i know the message delimiters.
    I have taken the "tcp-client-server" example from the spring-integration-samples as a template to write a gateway for the TCP service - i have commented out the TCP server related configs from the sample. The tricky thing to handle is the fact that the TCP server can send unsolicited messages as well. So basically after the first request to the server, the server can send messages which are not relevant to the request as well, but we will eventually get the expected response as well - which will be identified using a sequence number within the message content - i.e the response messages sequence number will be the same as the request messages sequence number. Also we know a way of identifying the unsolicited messages by looking at the message content. now the question is how we can filter these unsolicited trafic from the response.
    so basically in the following
    Code:
    <ip:tcp-outbound-gateway id="outGateway"
    		request-channel="input" reply-channel="clientBytes2StringChannel"
    		connection-factory="client" request-timeout="10000" reply-timeout="10000" />
    we need a way of filtering any messages which are unsolicited (possibly sending them to a log file) and then continue to listen for the correct message and then push it to the reply channel. the request can wait until this happens.
    any ideas on how i can do this?
    Thanks in advance for any replies.

  • #2
    Instead of using a gateway, see the section in the reference manual about 'Collaborating Outbound and Inbound Channel Adapters'.

    This was designed specifically for cases like this. Both adapters get a reference to the same connection factory.

    You will need a pub-sub channel; subscribe an outbound channel adapter to that channel, as well as a service activator that invokes a service that blocks the thread (keeping the transaction open). Use the 'order' attribute to ensure the service is activated after the tcp send.

    Connect the inbound channel adapter to a router which will ignore, log, or whatever, the unsolicited messages, and route the "good" responses to another method in the service that is blocking. Pass the response to the blocked thread, wake it, and it can then send the response, all within the same transaction.

    I think you need a custom service rather than an aggregator because you need to send the response on the original thread. Aggregators release on the thread that provides the message that enables the release.

    Hope that helps.

    Comment


    • #3
      Thanks Gary for the reply. I will have a think about your solution and get back.

      Comment


      • #4
        Gary, i have got a basic framework working which uses your solution (have not done the router for now - to test the service only for now i treat the first response from the TCP server as a 'good' response). so basically i have a custom service activator which has two methods request and reply. both method uses an underlying java.util.concurrent.SynchronousQueue where the request method will do a queue.take() and the reply method which gets invoked when a message gets sent to the inbound channel adapter will do a queue.put() and put the response message on the queue. the request method will return the response all the way down to the gateway entrypoint.
        I am wondering whether i can make use of a Rendezvous channel within my service activator class...

        Comment


        • #5
          Hi, I am out of town, with limited access to email, and no access to code.

          Your solution sounds ok, but you'll need a map of queues, keyed by your correlation key.

          I don't believe an RV channel will help you; all that does is suspend the thread that submits a message to the channel until someone receives the message.

          I may not be able to respond further until Sunday but someone else may be able to help, if you get stuck.

          Comment


          • #6
            Gary, I have the same issue and trying to use the collaborating adapters. I implemented the flows using the aggregator as in the tcp-client-server-multiplex example.

            You mentioned some kind of blocking service in order to mitigate the threading issue.
            In my scenario, original request comes from the UI, hence I need to ensure that it comes back on the same thread.
            Do you have any example of using such a blocking service instead of an aggregator ?

            Any help would be greatly appreciated.

            Matt

            Comment


            • #7
              If your UI controller invokes the SI flow using a <gateway/>, the thread will block in the <gateway/> until the response is returned to it by the inbound adapter's thread. through the aggregator. The multiplex sample app uses that technique and the correlated responses return to the test case on the same thread. (In the case above, he was using JMS so he would have needed some mechanism to hold up the JMS thread until the response was received).

              Beware that the <gateway/> timeout is infinite by default; you would probably want to set this to something reasonable so you can return an error to the UI if the response is not received in a reasonable time.

              Comment


              • #8
                Thank you Gary,

                I actually got this working fine.
                This was related to the error handling, as I was trying to throw an exception and it did not come back on the same thread.
                I realised that what I need to do is to --return-- it, and not --throw-- it.
                Than it was routed correctly back to the original caller thread.

                Saying that, I ran into another issue.
                This is related to the RuntimeException, such as java.net.ConnectExceptionthat is thrown when the gateway looses connectivity and I am trying to send a message.

                Such exception do not seem to be routed correctly, instead, I only get a MessageMappingException, that does not contain underlying ConnectException as a cause.

                I suspect, the issue is due to this code in the TcpSendingMessageHandler:

                Code:
                	protected void doWrite(Message<?> message) {
                		TcpConnection connection = null;
                		try {
                			connection = getConnection();
                			if (connection == null) {
                				throw new MessageMappingException(message, "Failed to create connection");
                			}

                getConnection() will result in the runtime java.net.ConnectException, but the sender class creates only a MessageMappingException.

                Any other way that I could get hold of the underlying runtime ConnectException ?

                Thanks

                Matt

                Comment


                • #9
                  I wonder, whether the connection interceptor could do it :

                  Code:
                  <int-ip:tcp-connection-factory id="client"
                  type="client"
                  host="localhost"
                  port="12345"
                  single-use="true"
                  so-timeout="100000"
                  using-nio="true"
                  interceptor-factory-chain="helloWorldInterceptorFactory"
                  />

                  Comment


                  • #10
                    This is a bug, the underlying exception should be provided as the cause in the MessagingException (it also shouldn't be a MessageMappingException). We're doing a release in the next couple of days, I'll see if I can get a fix in today or tomorrow.

                    I don't think you can work around it in an interceptor because the handler traps all exceptions.

                    https://jira.springsource.org/browse/INT-2886
                    Last edited by Gary Russell; Jan 17th, 2013, 09:49 AM.

                    Comment


                    • #11
                      That's excellent, thanks for your help.

                      Matt

                      Comment


                      • #12
                        I just found another problem related to the above mentioned connection exceptions.

                        When using a pair of collaborative adapters, I am trying to route such errors back into the aggregator, so that they result in a correct exception in the caller thread.
                        This is proving more difficult than I thought.

                        I set the errorChannel on the gateway, so such errors are routed to the transformer :

                        Code:
                        	<int:gateway id="barxServerGateway"
                        				 service-interface="MyServerGateway"
                        				 default-request-channel="requestObject" errorChannel="errorObject"/>
                        
                        
                        	<int:transformer input-channel="errorObject" ref="myErrorManager"
                        					 method="handleException" output-channel="toAggregator"/>
                        
                        	<!-- Aggregator matches incoming responses to the blocked requests and releases them -->
                        	<int:aggregator input-channel="toAggregator" output-channel="aggregatedResponseObject"
                        					expire-groups-upon-completion="true" release-strategy="customCorrelator"
                        					correlation-strategy="customCorrelator"/>
                        
                        	<!-- Extract the second object - the response -->
                        	<int:transformer input-channel="aggregatedResponseObject" expression="payload.get(1)"/>
                        Transformer then enriches them with the correct correlation ID, like this :

                        Code:
                        	public Object handleException(Message<?> message)
                        	{
                        		Throwable throwable = (Throwable) message.getPayload();
                        
                        		log.error("Got exception", throwable.getMessage());
                        
                        		if (throwable instanceof MessagingException)
                        		{
                        			if ("Failed to create connection".equals(throwable.getMessage()))
                        			{
                        				loggedIn = false;
                        			}
                        		}
                        
                        		MyCustomException exception = new MyCustomException ("Error during communication.");
                        
                        		// construct a new message, passing in headers from the original one
                        		Map<String, Object> messageHeaderMap = new HashMap<String, Object>();
                        		messageHeaderMap.putAll(message.getHeaders());
                        
                        		// copy the "messageId" header, so that the aggregator can correctly correlate
                        		MessageHeaders failedMessageHeaders =
                        				((MessagingException) throwable).getFailedMessage().getHeaders();
                        		messageHeaderMap.put("messageId", failedMessageHeaders.get("messageId"));
                        		// messageHeaderMap.put("errorChannel", failedMessageHeaders.get("errorChannel"));
                        		// messageHeaderMap.put("replyChannel", failedMessageHeaders.get("replyChannel"));
                        
                        		Message<?> result = new ErrorMessage(exception, messageHeaderMap);
                        
                        		return result;
                        	}
                        Note that the aggregator is using the "messageId" header as the correlation key.
                        I tried to populate just messageId field, that complains about replyChannel not present.
                        If I populate also errorChannel and replyChannel (from the failed message), this results in exception NOT routed to the caller.

                        Comment


                        • #13
                          What you are trying to do won't work. The problem is that once the gateway gets the exception, and sends the error message, it is no longer waiting on the "old" replyChannel - it's waiting on a new replyChannel in the ErrorMessage header.

                          Think of the error-channel flow as the "catch" part of a "try/catch" block - you can't jump back into the try from the catch.

                          You can't send the new replyChannel to the aggregator because it will drop that header (because it doesn't know which one is the right one to use).

                          So, what you need is another "try/catch" block in the flow (within the outer "try").

                          This is achieved by adding another gateway after the first, with the error channel...

                          Code:
                          <int:gateway id="barxServerGateway"
                          				 service-interface="MyServerGateway"
                          				 default-request-channel="toErrorCatchingGw" />
                          <int:service-activator input-channel="toErrorCatchingGw" ref="errorCatchingGw" />
                          <int:gateway id="errorCatchingGw" error-channel="errorObject" default-request-channel="requestObject" />
                          This way, the error is handled on this intermediate gateway and the reply is returned normally to the inbound gateway.
                          Last edited by Gary Russell; Jan 24th, 2013, 11:16 AM.

                          Comment


                          • #14
                            Thanks, I now got further with this.
                            Errors seem to be routing as they should.

                            With this set up however, I am getting a number of errors during startup :
                            Code:
                            01-24-2013 17:27:43 - [DEBUG] [main] MessagingMethodInvokerHelper - Method [public final void $Proxy44.addAdvisor(int,org.springframework.aop.Advisor) throws org.springframework.aop.framework.AopConfigException] is not eligible for Message handling.
                            java.lang.IllegalArgumentException: Found more than one parameter type candidate: [int] and [org.springframework.aop.Advisor]
                            2 more such exceptions appears, related to the methods addAdvice and replaceAdvisor.

                            They don't seem to affect the set up afterwards, but I wonder what could be causing them...

                            Comment


                            • #15
                              That message is benign - it's because the service activator code is trying to find the appropriate method to invoke on the gateway (which, with no service-interface is RequestReplyExchanger). It finds the method ok, but during the search, it hits a method that's not eligible for consideration and logs the condition under DEBUG.

                              We should probably change this log to TRACE level (rather then DEBUG), or at least suppress the full stack trace under DEBUG.

                              You can eliminate the message by configuring your log4j to set logs for org.springframework.integration.util.MessagingMeth odInvokerHelper to INFO, or WARN.

                              Comment

                              Working...
                              X