Announcement Announcement Module
Collapse
No announcement yet.
Asynchronous request reply correlation (again) Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Asynchronous request reply correlation (again)

    Hi, I'm having the same issue described here:

    http://forum.springsource.org/showth...ly-correlation

    Apparently there is a workaround for JMS but in my case I'm using TCP Sockets.

    I'm thinking in using an Aggregator in order to implement the domain specific correlation and use the "aggregated message" as a reply, but I'm not sure.

    The thread also comments on the UUID/ID header that could be used to mark the async reply as the reply to some previous message, but I can't find it on the guide.

    Is there a better way to do that with SI 2.0?

    regards,

  • #2
    You can use a combination of headers 'ip_connection_id' and 'ip_connection_seq' to perform correlation on messages originating from an inbound tcp endpoint.

    Each message arriving on a connection is allocated an incrementing sequence number.

    Comment


    • #3
      Thanks, but I think my requirement was not clearly specified.

      I have an outbound tcp endpoint; the replies (if they come at all) will not be ordered, depending on the peer systems. I will create the messages from a servlet, so the response must be routed to the same original thread.

      regards,

      Comment


      • #4
        Currently, the TCP adapters only send/receive the payload; the main reason being the adapters were envisioned to be used for interoperating with legacy systems, rather than other SI applications. This means that any reply correlation (outside of normal gateway correlation), such as what you describe, needs to be via data carried in the payload.

        There is an open JIRA to support transporting the entire message (including headers, which can then be used for correlation). https://jira.springsource.org/browse/INT-1807. This is planned to be a 2.1 feature, and the initial implementation should be in HEAD relatively soon,

        The linked forum thread (in the JIRA) provides a couple of techniques/suggestions about how to enhance the payload with correlation data. Of course, this requires the peer system to understand that payload enhancement.

        Comment


        • #5
          Well, I'm not thinking about transmiting headers on the TCP channels (many legacy systems will not accept these at all.) I'm looking for a standard way to match (using my domain specific logic) a reply from the network (from a legacy app) and enroute it as the reply for my original request message, using its pending thread. Currently I do that (without SI) with a map of latches (and request messages) where the threads are awaiting (also I have a "cleaner" thread that tears down the latches on timeout with empty responses.) Please note that the requirement is not related to the TCP adapter at all.

          Thank you in advance for any ideas...

          Comment


          • #6
            You can use either a recipient list router, or an ordered pub-sub channel to route the request to a custom aggregator after sending it out over TCP; the aggregator would use custom correlation/release strategies to use logic to match the response with the request; the response from the aggregator can be the response, enhanced with info from the request as necessary; the original request can be discarded. If you provide the aggregator with a message group store (a SimpleMessageStore would be sufficient if you don't care about persistence), the equivalent of your 'cleaner' thread would be provided by a MessageGroupStoreReaper which can expire requests.

            HTH

            Comment


            • #7
              Hi again,

              Following your advise I'm trying to implement it by using an <aggregator>. As far I can see the response is being correlated with the request and the ReleaseStrategy is correctly being applied, but apparently the aggregator is trying to use the "replyChannel" from the last message (the network reply) in order to respond. The problem is that such reply doesn't have a replyChannel header since it is being created "from scratch" from the network, so I get an exception (shown below.)

              Any other idea? Do I have to code a totaly new aggregator-like class?

              Thanks!

              Code:
              May 20, 2011 12:58:31 PM org.springframework.integration.gateway.MessagingGatewaySupport doSendAndReceive
              WARNING: failure occurred in gateway sendAndReceive
              org.springframework.integration.MessageHandlingException: java.lang.IllegalArgumentException: no outputChannel or replyChannel header available
              	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:76)
              	at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:64)
              	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:98)
              	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
              	at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:105)
              	at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:96)
              	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:44)
              	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
              	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
              	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
              	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
              	at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:220)
              	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
              	at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:105)
              	at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:96)
              	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:44)
              	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
              	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
              	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
              	at org.springframework.integration.core.MessagingTemplate.doSendAndReceive(MessagingTemplate.java:318)
              	at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:239)
              	at org.springframework.integration.core.MessagingTemplate.convertSendAndReceive(MessagingTemplate.java:274)
              	at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:224)
              	at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:203)
              	at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:300)
              	at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:269)
              	at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:260)
              	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
              	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
              	at $Proxy0.enviar(Unknown Source)
              	at org.springframework.integration.samples.feed.ElProductor$Interna.run(ElProductor.java:31)
              	at java.lang.Thread.run(Thread.java:662)

              Comment


              • #8
                Does your aggregator defines an 'output-channel'? If one is defined it will use that one over the one in headers.

                Comment


                • #9
                  No, it doesn't. I want the aggregator to use the header present just in the request message (since the response doesn't have it.)

                  Code:
                  	public boolean canRelease(List<Message<?>> list) {
                  		Iterator<Message<?>> it =  list.iterator();
                  		Object rc = null;
                  		while(it.hasNext()) {
                  			System.out.println("Reply Channel Header: " + msg.getHeaders().get("replyChannel"));
                  		}
                                  System.out.println();
                  		return list.size() == 2;
                  	 }
                  The output is:
                  Code:
                  Reply Channel Header: org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@10efd7c
                  
                  Reply Channel Header: org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@10efd7c
                  Reply Channel Header: null
                  I suspect the problem is the aggregator trying to get the replyChannel header from the second message.

                  Comment


                  • #10
                    Seems I was (a little) mistaken. I was thinking that something this...

                    Code:
                    	public Message<String> aggregate(List<Message<String>> in) {
                    		return MessageBuilder.fromMessage(in.get(1))
                    			.setReplyChannel((MessageChannel) in.get(0).getHeaders().getReplyChannel())
                    			.build();
                    	}
                    ...would work with the default aggregator (CorrelatingMessageHandler) but the method completeGroup() therein, clearly only uses the replyChannel from the 'last' message in the group.

                    It seems to me that the aggregator should be able to influence this behavior, since it is coalescing a number of messages into one; but I will defer to Mark on that.

                    In any case, you can, of course, implement a custom aggregator by extending AbstractAggregatingMessageGroupProcessor, and provide it to the <aggregator/> via the ref attribute. (See reference guide).

                    Comment


                    • #11
                      Deleted: See May21 11:23pm post for a better work-around.
                      Last edited by Gary Russell; May 21st, 2011, 11:29 PM. Reason: Remove kludgy workaround

                      Comment


                      • #12
                        Yep, anyway tried the first suggestion with the same exception. Yet I didn't try extending the AbstractAggregatingMessageGroupProcessor.

                        For the other suggestion, I believe that the header-enricher will need access to the aggregator's MessageStore in order to do the correlation by itself and find the request message. My correlation/release beans are stateless so I can't see how they will help me, or I'm missing something?

                        Thanks for your help!
                        Last edited by Diego Bravo; May 21st, 2011, 10:32 PM.

                        Comment


                        • #13
                          Here's a cleaner work-around; I just tested this and it works fine...

                          Code:
                          public Message<String> aggregate(List<Message<String>> in) {
                          	return MessageBuilder.fromMessage(in.get(1))
                          		.setHeader("requestReplyChannel", in.get(0).getHeaders().getReplyChannel())
                          		.build();
                          }
                          Code:
                          <int:aggregator input-channel="in" ref="agg" output-channel="route.reply"/>
                          
                          <int:header-enricher input-channel="route.reply">
                          	<int:header name="replyChannel" expression="headers.requestReplyChannel"/>
                          </int:header-enricher>
                          	
                          <bean id="agg" class="org.myco.aggregator.gpr.GprAgg"/>
                          i.e. Copy the replyChannel to a temporary header; route the aggregator output to a header-enricher, and voila.

                          Comment


                          • #14
                            it's working for me too... Awesome!

                            Thanks a lot for your help and patience Gary!

                            Comment


                            • #15
                              Glad to help.

                              I opened an improvement JIRA issue https://jira.springsource.org/browse/INT-1913, if you want to watch it.

                              Comment

                              Working...
                              X