Announcement Announcement Module
No announcement yet.
Custom Correlation of Request and Replies on Client Side Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Custom Correlation of Request and Replies on Client Side

    I'm trying to work out how I can do custom message correlation on the client side of a tcp connection. Below is the listings for a simple echo test that has each component connected via tcp connections:
    1) Echo Client
    2) Interceptor (listening on port 41100)
    3) Echo Server (listening on port 41200)

    The main sends a "Help me" message to the Echo Client which then sends the message via tcp connection to the Interceptor which forwards the message onto the Echo Server using another tcp connection. The Echo Server returns the same message as its response. The Interceptor receives this message and when it tries to locate the connection to return the response to a connection cannot be found:

    Unable to find outbound socket for [Payload=[[email protected]][Headers={timestamp=1326622013300, id=8e3a17a8-2b8f-40c8-a6bb-8f0d4df8eafa, ip_address=, ip_connection_seq=1, ip_hostname=localhost, ip_tcp_remote_port=41200, ip_connection_id=localhost:41200:f5c2e15e-7c09-4802-965f-293ba7028194}]] []

    I've read the manual and understand the requirement to supply my own correlation logic on the client side but I don't see how I can achieve this. Is there a way to link the interceptors server connection factory with the interceptors client connection factory such that a connection on the server side can send and receive to one connection on the client side?

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns=""
      <!-- CLIENT -->
      <int:gateway id="echoClientService" service-interface="EchoService" default-request-channel="echoClientRequestChannel" default-reply-channel="echoClientResponseChannel"/>
      <int:channel id="echoClientRequestChannel"/>
      <int:channel id="echoClientResponseChannel"/>
      <int-ip:tcp-connection-factory id="echoClient" type="client" port="41100" host="localhost" using-nio="true" single-use="false"/>
      <int-ip:tcp-outbound-channel-adapter id="echoClientOutbound" channel="echoClientRequestChannel"  connection-factory="echoClient"/>
      <int-ip:tcp-inbound-channel-adapter  id="echoClientInbound"  channel="echoClientResponseChannel" connection-factory="echoClient"/>
      <!-- INTERCEPTOR -->
      <int-ip:tcp-connection-factory id="interceptorServer" type="server" port="41100" using-nio="true" single-use="false"/>
      <int-ip:tcp-inbound-channel-adapter  id="interceptorServerInbound"  channel="interceptorServerRequestChannel" connection-factory="interceptorServer"/>
      <int-ip:tcp-outbound-channel-adapter id="interceptorServerOutbound" channel="interceptorServerReplyChannel"   connection-factory="interceptorServer"/>
      <int:channel id="interceptorServerRequestChannel" />
      <int:channel id="interceptorServerReplyChannel" />
      <int:service-activator id="interceptorRequests" input-channel="interceptorServerRequestChannel" output-channel="interceptorClientRequestChannel">
        <bean class="TestInterceptor"/>
      <int:service-activator id="interceptorResponses" input-channel="interceptorClientResponseChannel" output-channel="interceptorServerReplyChannel">
        <bean class="TestInterceptor"/>
      <int:channel id="interceptorClientRequestChannel"/>
      <int:channel id="interceptorClientResponseChannel"/>
      <int-ip:tcp-connection-factory id="interceptorClient" type="client" port="41200" host="localhost" using-nio="true" single-use="false"/>
      <int-ip:tcp-outbound-channel-adapter id="interceptorClientOutbound" channel="interceptorClientRequestChannel"  connection-factory="interceptorClient"/>
      <int-ip:tcp-inbound-channel-adapter  id="interceptorClientInbound"  channel="interceptorClientResponseChannel" connection-factory="interceptorClient"/>
      <!-- ECHO SERVER -->
      <int-ip:tcp-connection-factory id="echoServer" type="server" port="41200" using-nio="true" single-use="false"/>
      <int-ip:tcp-inbound-channel-adapter  id="echoServerInbound"  channel="echoServerRequestChannel" connection-factory="echoServer"/>
      <int-ip:tcp-outbound-channel-adapter id="echoServerOutbound" channel="echoServerReplyChannel"   connection-factory="echoServer"/>
      <int:channel id="echoServerRequestChannel" />
      <int:channel id="echoServerReplyChannel" />
      <int:service-activator input-channel="echoServerRequestChannel" output-channel="echoServerReplyChannel">
        <bean class="TestEchoService"/>
      <int:poller id="defaultPoller" max-messages-per-poll="-1" default="true" fixed-rate="1000"/>
    Echo Service Gateway (
    public interface EchoService {
      public String echo(String request);
    Echo Service Implementation (
    import org.springframework.integration.Message;
    public class TestEchoService {
      public Message<byte[]> echo(Message<byte[]> request) {
        byte[] message = request.getPayload();
        System.out.println(new String(message));
        return request;
    Interceptor (
    import org.springframework.integration.Message;
    public class TestInterceptor {
      public Message<byte[]> intercept(Message<byte[]> request) {
        byte[] message = request.getPayload();
        System.out.println("Intercepted Message: " + new String(message));
        return request;
    Test Main (
    public class Test {
      ClassPathXmlApplicationContext applicationContext;
      public Test() {
        applicationContext = new ClassPathXmlApplicationContext(new String[] {"test.xml"});
        EchoService echoService = (EchoService)applicationContext.getBean("echoClientService");
        String echo = echoService.echo("Help me");
      public static void main(String[] args) {
        Test application = new Test();

  • #2
    The problem is that when using collaborating channel adapters, you need to somehow preserve the ip_connection_id header.

    In your echo service, SI takes care of that for you, but in your interceptor there is no way we can do that (message headers are not transferred over TCP, because there is no standard way to do that over TCP - there is a JIRA open to support that; currently scheduled for 2.2 -

    If really high performance is not required, use gateways in the interceptor instead of channel adapters. Reasonably high performance can be obtained with gateways if single-use sockets are used.

    If you wish to retain the use of channel adapters, you need to add code to handle the preservation of the ip_connection_id header - save it off before sending it and add it back in to the reply. Alternatively, use a similar technique to the one used in the intermediate sample here...

    In essence, this code sends a copy of the outbound message to an aggregator; when the reply is received, it is aggregated with the original (including all it's headers), and then we drop the outbound message, leaving us with the reply having its own headers merged with headers from the request. In this case, you will need to <header-filter/> the ip_connection_id header from the reply, before the aggregator, so we are left with the right header for the outbound adapter to figure out which connection to send the reply to.

    Hope that helps.


    • #3
      And, of course, you could keep the interceptor app stateless by somehow adding the ip_connection_id header to the payload someplace; as long as your final service app retains it you can use a <header-enricher /> to re-insert it in the reply in the interceptor app.


      • #4
        Thanks Gary. I finally got it to work by adding in the Aggregator+Transformer on the client side to marry up replies with responses but only after finally realising that my input channel on the client side was still the default point-to-point DirectChannel and when I changed it to a pub-sub channel it started to work.

        Should've an exception been thrown when I tried to use the same point-to-point direct channel for both the Gateway -> Outbound Client Adaptor and then also for Gateway -> Aggregator?

        Also I noticed in the sample you pointed me to doesn't explicitly define the channel between the aggregator and transformer. Is SI smart enough to fill in those config gaps and hence why there are no Annotations to instantiate a Channel?

        What I'm trying to achieve is to write a TCP Interceptor (for fraud checking) that intercepts messages between a Financial Switch and an Issuing Bank. i.e. Switch <-> Interceptor <-> Bank

        The connections between the Switch and Bank are typically 1 to 10 permanent tcp socket connections that have multiplexed financial transactions travelling over them (quite likely to have each connection with several outstanding requests waiting for a replies over the socket connection). The financial messages have a natural correlation Id (terminalId + merchantId + STAN + MTI) that I would like to use for this purpose. It needs to be able to process around 100 transactions per second - I assume this is on the high performance side of things so a blocking TCP Gateway that SI provides is not suitable hence why I'm trying to use async channel adaptors.

        Also, Thanks for fixing the XSD schema for the next release. I thought I was going crazy there for a while

        Regards, Shane.


        • #5
          ..Should've an exception been thrown when I tried to use the same point-to-point direct channel ...
          No; the default behavior, when multiple subscribers are subscribed to a PtP channel, is messages are sent out in round-robin fashion. It catches a lot of people out; there is an open JIRA to change, or at least add a mechanism to override, this default behavior ( Comment and/or vote for it if you want.

          ...Is SI smart enough to fill in those config gaps...
          Yes; there are two types of implicit configuration. If a consuming endpoint declares a channel that does not exist, a DirectChannel is automatically configured for you. If a channel adapter has no 'channel' attribute, a DirectChannel with the Id of the adapter is created (and the adapter gets ".adapter" appended to its Id. (However, I just found there's a bug in a few parsers (including TCP) where this does not work (

          ...I'm trying to use async channel adaptors. ...
          Yes, I believe this is the correct technology for your use case.

          Thanks for fixing ...
          My pleasure - thanks for finding it; most of the errors were due to some refactoring that was done long ago; we will probably add something to the build to detect these errors in future.