Announcement Announcement Module
Collapse
No announcement yet.
Asynchronous request reply correlation Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Asynchronous request reply correlation

    Does Spring Integration support Asynchronous request reply correlation?

    Given the following sequence:

    - Send Request on channel 1
    - Do some asynchronous processing
    - Received Response on channel 2

    Does Spring integration have some facility to corrolate the response with the request or do I have to cache manually my original request until I get the response back and do the correlation by hand.

    If it does,
    how do we specify that the message received on the response channel will be a reponse and not a request?
    how does it handle errors (no reply). Do we have to do some sort of garbage collection of pending reponse to request?

    Any code sample or examples (always appreciated :.)?

    Thanks,

    Simon

  • #2
    Yes, SI (Spring Integration) can do the heavy lifting for asynch correlation, but there's work to do on your part (in my limited experience with SI) to get it to work. I'm using SI 1.0m5, so this might be different from the version you are using (I'm also assuming you are using JMS).

    I created a producer:

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans:beans ...>
    
        <context:annotation-config/>
    
        <annotation-driven/>
    
        <message-bus/>
        
        <channel id="eventsChannel">
           	<interceptor ref="outgoingInterceptor"/>
        </channel>
        
        <channel id="replyChannel">
            <interceptor ref="incomingInterceptor"/>
        </channel>
        
        <beans:bean id="outgoingInterceptor" class="org.springframework.integration.transformer.MessageTransformingChannelInterceptor">
            <beans:constructor-arg ref="eventToXmlTransformer"/>
        </beans:bean>
        
        <beans:bean id="incomingInterceptor" class="com.xnife.event.common.example.interceptor.HandleCorrelationId"/>
    
        <jms-target id="jmsTarget" destination-name="com.xnife.events"/>
        
        <beans:bean id="eventToXmlTransformer" class="com.xnife.event.common.example.transformer.EventToXml"/>
        
        <jms-gateway 
            request-channel="replyChannel" 
            connection-factory="connectionFactory" 
            destination-name="com.xnife.events.reply"
            expect-reply="false"/>
         
        <channel-adapter channel="eventsChannel" target="jmsTarget"/> 
    
        <beans:bean id="doSomethingService" class="com.xnife.event.producer.example.internal.service.DoSomethingServiceImpl"/>
    
        <gateway id="publishEventServiceService"
             service-interface="com.xnife.event.producer.example.service.PublishEventService"
             request-channel="eventsChannel"
             reply-channel="replyChannel"/>
    
        <beans:bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <!--  we assume the activemq broker is running on the local host at the default port  --> 
            <beans:property name="brokerURL" value="tcp://localhost:61616"/>
        </beans:bean>
        
        <beans:bean id="producer" class="com.xnife.event.producer.example.Producer"/>
     
    </beans:beans>
    and a consumer:

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans:beans ...>
    
        <context:annotation-config/>
    
        <annotation-driven/>
    
        <message-bus/>
        
        <channel id="eventsChannel">
            <interceptor ref="incomingInterceptor"/>
        </channel>
        
        <channel id="replyChannel"/>
        
        <service-activator input-channel="eventsChannel" ref="eventProcessorService" method="processEvent" output-channel="replyChannel"/>
    
        <beans:bean id="eventProcessorService" class="com.xnife.event.consumer.example.internal.service.EventProcessorServiceImpl"/>
            
        <beans:bean id="incomingInterceptor" class="org.springframework.integration.transformer.MessageTransformingChannelInterceptor">
            <beans:constructor-arg ref="xmlToEventTransformer"/>
        </beans:bean>
        
        <beans:bean id="xmlToEventTransformer" class="com.xnife.event.common.example.transformer.XmlToEvent"/>
        
        <jms-source id="jmsSource" connection-factory="connectionFactory" destination-name="com.xnife.events"/>
        
        <channel-adapter source="jmsSource" channel="eventsChannel"/> 
        
        <jms-target id="jmsTarget" connection-factory="connectionFactory" destination-name="com.xnife.events.reply"/>
        
        <channel-adapter channel="replyChannel" target="jmsTarget"/>
        
        <jms-gateway 
          id="jmsInbound" 
          connection-factory="connectionFactory"
          destination-name="com.xnife.events" 
          request-channel="eventsChannel"/>
         
        <beans:bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <!--  we assume the activemq broker is running on the local host at the default port  --> 
            <beans:property name="brokerURL" value="tcp://localhost:61616"/>
        </beans:bean>
        
    </beans:beans>
    From a correlation point-of-view, it's the interceptors that are interesting.

    I'm using objects within the code, but XML for the JMS payload. The producer's outgoing interceptor adds the SI message ID to the XML which will be copied onto the reply message sent by the consumer. This is because SI uses its internal message ID as the expected correlation ID.

    SI will not propagate the IDs used internally to the JMS message ID/correlation ID.

    The producer has another interceptor on the incoming channel so that the ID from the XML can be used to set the message's correlation ID. Watch out, the correlation ID should be an UUID object, not a string - that cost me plenty of time in debugging!

    Comment


    • #3
      First of all, thanks for providing this useful post. It also reveals to me that we are not quite making things simple enough yet

      There is one improvement in M6 that might be relevant depending on how you are using the correlationId. Now, the JMSCorrelationId is copied to/from the Spring Integration MessageHeaders when receiving/sending with JmsSource/JmsTarget. Its key in the MessageHeaders is JmsHeaders.CORRELATION_ID. However, the JMS Message id/correlationId are kept separate from the Spring Integration Message id/correlationId. That is intentional, but if you want to copy the Spring Integration Message id to a JMSCorrelationId, something like the following should do the trick (in M6):
      Code:
      Message outboundMessage = MessageBuilder.fromMessage(inboundMessage)
              .setHeader(JmsHeaders.CORRELATION_ID, inboundMessage.getHeaders().getId())
              .build();

      Comment


      • #4
        The M6 change helps a little, but I feel there's something off in the abstraction around handling the correlation stuff.

        I see why you'd want to keep the message and JMS correlation seperate, so that's not the issue.

        I'm thinking there needs to be some mapper or strategy interface for handling the correlation because having to copy the SI message ID is, in my opinion, breaking encapsulation.

        Comment

        Working...
        X