Announcement Announcement Module
No announcement yet.
message correlation Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • message correlation

    I have a Gateway interface method that looks like this:
    Future<Response> sendAndReceive(Request request);
    My application sends a request object, and then waits for the response. Now the send completes in one thread, and the reply comes back in another thread, on a different set of channels - I need to correlate the response with the original request so that the 'replyChannel' header is correctly set, and the gateway response is received. There is data in the objects available to correlate them (Response.getRequestId and Request.getRequestId)

    Is there an easy way to correlate the Response to the Request or do I have to write some custom code? I have done this successfully with an Aggregator, as this has the correlation functionality with SPEL, but I end up with a List being returned rather than one object, and it seems an overkill.

    What I am looking for is the correlation functionality of the Aggregator, but without the aggregation of messages.

    Thanks in advance.

  • #2

    What I can propose quickly it is implementation of some custom aggregation method:
    public class CustomAggregator {
      Response aggregate(List<? extends Message> messages) { 
    And use it as reference in the aggragator config:
    HTML Code:
    <aggregator input-channel="input" 
    				release-strategy-expression="size() == 2"
    However you can put your requestId correlation key into standart Message Header - correlationId:
    HTML Code:
    <header-enricher default-overwrite="true">
      <correlation-id expression="payload.requestId"/>
    And now you shouldn't have any correlation-strategy on the aggregator. It will be a default one.

    Is it appropriate?

    Artem Bilan


    • #3
      Thanks for the speedy reply, this is almost exactly as I have it, except with the CustomAggregator, which I think will solve my problem of a List being returned. I was kind of hoping that there would be a simpler way of correlating the 2 messages without the need for an aggregator though, and then subsequent finding of Response from within the List, built in to Spring Integration.


      • #4
        Let's step back a minute. The Gateway does add a replyChannel header to any message and connects it to an anonymous exclusive channel for the ultimate reply. That works even if the reply actually takes place on a different thread. So, can you explain what was happening in your use of Gateway that prevented that default behavior from working?



        • #5
          OK, here's a bit more about my set-up:

          I have the Request/Response Gateway, which my application interacts with to send the request, which goes through a Transformer, then to a ServiceActivator (which integrates with Quickfix/J to send a FIX protocol message).

          A FIX protocol reply is received (in the same bean object as the ServiceActivator, but independant to the request and in a seperate thread), its sends to a ResponseGateway, which then goes through a Transformer to become a Response object. This Response now needs to be correlated to the original Request - when I put this response object onto the reply channel of the first gateway, I get "no output-channel of replyChannel header available" exception. Hence the use of Aggregator to correlate using object properties, which locates the replyChannel and a successful reply appears.

          Maybe I'm going about this the wrong way, but with my limited knowlegde of SI it seemed like the best option to use ServiceActivators and Gateways. It does feel a bit like I need a FIX based inbound/outbound channel adapter, similar to the built in SI functionality.



          • #6
            You are going about it the right way (short of writing custom adapters). What is the method signature of your FIX service?

            It sounds like you are building a new reply message with MessageBuilder() - if you pass in the whole message, you can use the copyHeaders() method on the builder.


            • #7
              Martin, without looking at what you say about 'limited knowledge of SI', you have wonderful message flow!

              I can guess you lost reply-channel header inside your FIX service.
              However you've found some grate workaround - request/reply correlator.
              It's OK, that you use here aggregator.

              But, please, think about logic of your FIX service: if you use here entire Message<?> as return, you should manualy copy headers from received Message<?>.

              O-h-h! Gary, sorry for plagiarism ;-)


              • #8
                At present my FIX Service is not dependant on Spring Integration, its a POJO that has a public send() method that does quickfix.Session.send(), has a reference to a Gateway interface that has a handleResponse() method, called when a fix message is received. So I don't use MessageBuilder/Message SI objects directly.

                With FIX, there may be a request/response conversation, there may be multiple responses to a single request, and also responses without a request (the following day, for instance).

                So I don't have access or want to store the request Message in my Service on the way out, I'm hoping SI can help with correlation (of timely replies) and routing of messages correctly.


                • #9
                  OK; that makes sense; we generally encourage using POJOs instead of exposing application code to SI; so good.

                  One technique that has been used for this, is to use a custom bean that stores replyChannel headers in a map. It would be invoked as a service before your fix service and as a <header--enricher/> for the reply.

                  There's an example in this post...

                  However, your comment "... multiple responses for a single request..." concerns me; the originating gateway can only handle a single response to a request; you would need to use a void-returning gateway method, and a <int:outbound-channel-adapter/> (which calls a void method) for the replies. You do your own correlation there. If that's what you really need, then there's no need to worry about preserving the replyChannel header.


                  • #10
                    My plan for mulitple responses is that I'm only going to be interested in the first response in this Gateway - there's a web front end to the gateway, and the first, timely reply is the one the user is interested in. Subsequence responses will be handled by a different chain.

                    As it stands, I'm pretty happy with the Aggregator approach - I only have a simple custom Aggregator class that picks the second message from a List<Message<?>>, storing the replyChannels in a Map seems to be a little more complex.