Announcement Announcement Module
Collapse
No announcement yet.
JMS Outbound Gateway and a custom JMSCorrellationID Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • JMS Outbound Gateway and a custom JMSCorrellationID

    Hello,

    I'm building something where I need to correlate two messages based on their ID. I may not have the original requesting message - which means I won't have the reply destination and I won't have the jmsMessageID, which means I will have no way of getting rg.springframework.integration.jms.JmsOutboundGate way to see it and correllate it with my request. The reason this is that I'm using the JMS pings mainly as a way to remind a secondary system to refresh it's view of a dataset, it doesn't operate or care about the JMS request then and there. Invariably, the ping will promote a response which will be sent back to the response destination where it'll ideally be found by the waiting JMS gateway.

    It seems like one of the simplest ways to handle this would be to simply reuse a correllationID header passed through the request message in
    private javax.jms.Message sendAndReceive(Message<?> requestMessage); if the header is present on the request message arriving at the endpoint, then t should be used when sending the message and setting up the consumer's selector.

    Thank you.

  • #2
    Thinking through this a little more - I found http://www.theserverside.com/discuss...hread_id=36929 wherein James Strachan explains that he did the multiplexing of requests and responses manually and avoided using JMS selectors when implementing Lingo. He explains that this is because JMS wasn't built with the idea of many, dynamic consumers - that it was more geared towards static consumers. Basically, for performance. I'm not sure of this one way or another - I can't imagine why they'd expose a feature like message filtering with a selector as part of the userland API if it wasn't meant to be used, a lot. Otherwise, it seems like creating destinations might be relegated to an implementation detail and the JMS spec limited to interfacing with pre-defined destinations. Certainly, I can't imagine implementors are oblivious to this use case - in the same way that prepared statement creation and other non canonical use cases are still tuned in JDBC drivers...

    So, is his point valid? Would it be worth it re-working the gateway here? The other thing I wondered was why not implement the send/recieve logic on top of the JmsTemplate's functionality?

    Following up to my own issue - I ended up using the JmsTemplate and hacking something (specific to my use case, of course) together.

    Code:
     
      public Message<?> sendAndReceive(final Message<?> requestMessage, String requestDestination, String responseDestination) throws JMSException {
            final HeaderMappingMessageConverter headerMappingMessageConverter = new HeaderMappingMessageConverter();
            headerMappingMessageConverter.setExtractJmsMessageBody(true);
            headerMappingMessageConverter.setExtractIntegrationMessagePayload(true);
            final ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
            jmsTemplate.send(requestDestination, new MessageCreator() {
                public javax.jms.Message createMessage(Session session) throws JMSException {
    
                    javax.jms.Message jmsMessage = headerMappingMessageConverter.toMessage(requestMessage, session);
    
                    String correlationId = (String) requestMessage.getHeaders().get(MessageHeaders.CORRELATION_ID);
    
                    if (StringUtils.isEmpty(correlationId))
                        correlationId = jmsMessage.getJMSMessageID();
    
                    if (!StringUtils.isEmpty(correlationId)) {
                        jmsMessage.setJMSCorrelationID(correlationId);
                        threadLocal.set(correlationId);
                    }
    
                    return jmsMessage;
                }
            });
    
    
            javax.jms.Message response = jmsTemplate.receiveSelected(responseDestination, "JMSCorrelationID = '" + encodeCorrelationId(threadLocal.get()) + "'");
    
            return (Message<?>) headerMappingMessageConverter.fromMessage(response);
        }
    Thanks, and Happy new year!

    Comment


    • #3
      I do think the most important point from the referenced thread is that MessageSelectors are not ideal for selection criteria that changes at runtime (as in your case with a new CorrelationID per request). That's different than something like "client ID" where two different clients X and Y each use a MessageSelector for clientID=x and clientID=y respectively.

      As one example, if you want to take advantage of cached MessageConsumers with Spring's CachingConnectionFactory, the consumers in the cache are "keyed" by Destination + Session + *selector string*.

      Comment


      • #4
        So, I agree with what you're saying. Perhaps we could change the implementation of the gateway to be more sensitive about creating dynamic selectors...

        In the meantime, since it is already creating dynamic selectors for each outgoing message by using the request's messageID, could we enhance it to support my first post's use case - where I want to specify the selector criteria, since I have no way of knowing what the request message ID may be?

        Thanks,

        Comment


        • #5
          Well, the outbound gateway supports two different options:

          1) TemporaryQueue per request
          2) shared response Destination with CorrelationID

          In case #2, if a particular client (gateway) is only going to send one request at a time, then even the CorrelationID could be shared. However, that seems rare.

          I think what we need to do is add support for a Map (with CorrelationID keys and "callback" values). That can also get a bit messy... but in any case, I am thinking that this all might need to be considered for the following issue (concerning a "sendAndReceive" operation on the JmsTemplate itself): http://jira.springframework.org/browse/SPR-3332

          All that aside... and getting back to your specific issue, is it possible to use a "header-enricher" to set the CorrelationID on the Message immediately before sending it to the outbound-gateway's request-channel, or am I missing something?

          Comment


          • #6
            I should probably look at my own code before jumping to conclusions

            Let me rephrase my last question: IF we added support to check for a JMSCorrelationID header on the Message that is sent to the outbound-gateway and prefer that over the JMS Message ID (of the sent JMS Message as it is now), would that solve your problem?

            The only issue I can see with that is that a "stale" JMSCorrelationID could be present on the SI Message (e.g. when the initial entry point was JMS).

            Thoughts?

            Comment


            • #7
              Similar issue in Apache Camel

              Hi All,

              I just wanted to point out that we are currently discussing a similar issue in Apache Camel jira: https://issues.apache.org/activemq/browse/CAMEL-2249

              Another project where this issue was touched is Apache CXF. There currently the correlation id for the reply is determined in the following way:

              public String determineCorrelationID(javax.jms.Message request) throws
              JMSException {
              String correlationID = request.getJMSCorrelationID();
              if (correlationID == null || "".equals(correlationID)) {
              correlationID = request.getJMSMessageID();
              }
              return correlationID;
              }

              Greetings

              Christian

              Comment


              • #8
                Thanks Christian. That's the same basic approach we're using for the Correlation ID header in Spring Integration Messages as well (within the base MessageHandler implementation), so I was thinking that first checking for an existing JMS CorrelationID in the outbound-gateway would be consistent with that approach.

                @Josh, let me know if that would then work for your case (with a header-enricher providing the JmsHeaders.CORRELATION_ID header upstream).

                Thanks,
                Mark

                Comment


                • #9
                  Sorry for the delay - must've glazed over the first email ping informing me of updates on this and I just remembered to check back...

                  @mark the proposed solution sounds like it would solve my use case.

                  Since we are already aware that there'd be a limitation in shadowing/blocking an existing, proper JMS correllationID, could we surface an option on the gateway element to preserve it? Or, rather, surface an option on the gateway to specify which header key to use as the correllation? (I know, it seems like API urban sprawl, but how else do u avoid shadowing AND preserve the functionality of correllating requests/responses?). 'Sides it wouldn't be something people would have to dance around - it's just there if they need it.

                  Thus the following pseudocode :

                  if correllationHeaderSpecified :
                  // then setup a selector that honors 'specifiedCorrellationHeader = $(inbound specified correllation header)'
                  else if JMSCorrellationID :
                  // then setup a selector that honors jmscorrellationId = $(inound jmscorrelationId)
                  else
                  // setup a selector that honors jmscorellationId = messageId # as it does now

                  Thank you(...! ),

                  Comment


                  • #10
                    Looking at your earlier comment and at the mentioned JIRA thread (http://jira.springframework.org/browse/SPR-3332), I think that's all a step in the right direction and it gets back to my original question of why the gateway's support's not implemented on top of the JMSTemplate's sendAndRecieve support.

                    Stating the obvious... in the interest of timelines, I might suggest the support be added in SI asap (for people like me who want to have a 100% SI messaging solution instead of 95% with the 5% handled on top of the more 'raw' JMSTemplate :-) ) and then a more general CorrellationStrategy (for which perhaps a default temporary queue implementation, a shared queue + correlation selector, and a multiplexing Map implementation might be shipped by default) for JMSTemplate be added. Once that's there, throw away the 20 lines in the SI implementation and re-use the JMStemplate functionality. Ultimately, the JMStemplate, the JMS service invoker, and the SI implementation could all reuse the JMSTemplate support.

                    Comment

                    Working...
                    X