Announcement Announcement Module
No announcement yet.
Spring Integration & JMS over ActiveMQ Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Spring Integration & JMS over ActiveMQ

    I'm trying to use the Spring integration RequestReplyTemplate to synchronously send and receive a message on two different Active MQ queues (one for the client to send on; one to receive the reply on).

    I've confirmed that the both the send and reply messages appear on the correct queues, but when the server method returns (which results in the reply being placed on the reply queue), the client code raises an exception in which it complains that the correlation ID is null.

    I've looked through the spring integration code and I can see it setting the correlation ID in the reply message, but the ID doesn't seem to find its way on to the JMS queue, so that when the client picks up the reply message, there is no correlation ID.

    When I remove the channel-adapters from the config files and don't use the queues, the code works fine - so there's something funny happening with the JMS, but I don't know what.

    I've been struggling with this for three days now - I originally used the JMS gateway and found other problems there, so I've gone back to the simplest configuration I can. I've attached a zip of the source code and config files, plus a text file containing the log4j log showing the exception that occurs.

    I hope you can help!

    Thanks, Neil
    Last edited by neilwilk; Aug 27th, 2008, 02:19 AM. Reason: Its still not working in M6...

  • #2

    Hi, did you have a workaround for this problem? Did you try with M6?

    I got the following error using M6:

    WARNING: exception occurred in endpoint 'internal.correlator.[email protected]1d2b01b'
    org.springframework.integration.message.MessageHandlingException: unable to handle response, message has no correlationId: [Payload=Hello toto][Headers={internal.header.timestamp=1219754684531,, spring.integration.transport.jms.JMSRedelivered=false}]
            at org.springframework.integration.handler.ReplyMessageCorrelator.handle(
            at org.springframework.integration.endpoint.DefaultEndpoint.doHandleRequestMessage(
    There is a problem with the correlationId in the response. The response contains the reply 'Hello Toto', so the mechanism worked. But I don't know what to do with the correlation id.

    My setup:
    • ActiveMq 5.1 with one broker.
    • Set of files attached in this reply.
    • Spring Integration jars installed in the local maven repository.



    • #3
      Spring Integration M6 & JMS over ActiveMQ


      Unfortunately, the problem is not fixed in M6 as there are a number of bugs that prevent it working, including:
      1. The ID assigned by the sender is not preserved in the message received. Instead a new ID gets created. On the reply, this is set as the correlation ID, so the wrong correlation ID goes back in the reply.
      2. On the sender's end, after sending the request, the sender code uses the UUID to lookup the response. Even if the correct correlation ID is passed back, it won't be the same UUID and the lookup will fail.
      3. The code that maps between the Spring header and the JMS header (and vice-versa) doesn't use the right lookup values - where it should be using MessageHeaders constants, it uses JmsHeaders constants. Consequently, the code always gets null values.

      I'm currently trying to get it to work, but it has been very frustrating discovering all of these bugs.
      Last edited by neilwilk; Aug 27th, 2008, 02:13 AM.


      • #4
        Spring Integration & JMS over ActiveMQ


        I've managed to get the request/reply working over JMS - with my very simple test code - so no promises!

        I've had to change the following two classes:
        1., and

        Unfortunately I've had to meddle with one of the core spring integration classes ( to make it work, but I couldn't see any way around that. My change might break something else, unfortunately, but hopefully not.

        The main change is to the DefaultJmsHeaderMapper code. In mapFromMessageHeaders(MessageHeaders headers, javax.jms.Message jmsMessage), I store the ID of the received message in a spring integration user-defined attribute. This is necessary because Spring integration doesn't preserve it. Then, when the reply is sent out, I set the JMS correlation ID to this value.

        Upon receipt of this reply, the spring integration code correctly picks up the correlation ID, but the ReplyMessageCorrelator then attempts to correlate this value with the UUID of the original message. Consequently, I had to change this class so that it carries out a toString() on the UUID of the original message before attempting to correlate it with the reply.

        That seemed to do the trick.
        Last edited by neilwilk; Aug 27th, 2008, 02:15 AM.


        • #5
          Thank you, I will try that, I guest the problem will be solved by the Spring developers in a further release, but for now I need just to make it work in a prototype, so modifying the core code is not a problem.


          • #6
            Thanks for providing the detailed explanation and for reporting the JIRA issue. For others reading this post the issue is here:

            I wanted to point out that the underlying problem here is the assumption that the Spring Integration 'correlationId' *should* map to/from the JMSCorrelationId, and that is actually not intended behavior (thus far). The MessageHeaders are intended for local usage within Spring Integration only, and the JmsHeaders are intended for values to be passed across the JMS transport (mapped from/to Spring Integration MessageHeaders). The header mapping strategy enables any customized mapping between the JMS properties and the Spring Integration Message.

            Clearly, this has caused confusion, and at the very least we should provide a simpler configuration option for adding this behavior to the JMS header-mapping. Perhaps it should even be the default behavior if no JMS Correlation ID has been provided in the headers explicitly.

            I'm interested in further feedback and/or any clarification in case I am missing the point. I will then add some further comments to the JIRA issue based on how we intend to handle this.



            • #7
              Spring Integration & JMS over ActiveMQ

              Hi Mark

              I accept that the MessageHeaders correlationID is an internal field, but it is also the internal field that Spring Integration uses to perform request/reply correlation, so this value does have to (somehow) be passed in the JMS reply.

              We therefore need some way to pass a correlation ID in the JMS message. We could use a JMS property for this, but doesn't it make more sense to pass the correlation ID in the JMS correlation ID, as that's what it is intended for?

              In either of the above cases, the value passed reduces to a String, rather than an object, which is why I changed the ReplyMessageCorrelator code in the workaround.

              The only way I can think of avoiding this is to pass the UUID as an object in the JMS message content, rather than in the JMS message header.




              • #8

                I am still concerned about the mixing of Spring Integration headers and JMS headers. The definition of separate header names in JmsHeaders is completely deliberate. Even though for this particular use-case (request/reply where the request originates from Spring Integration) it seems appropriate, these same changes would break other use-cases

                For example, consider the case where Spring Integration is on the receiving side of a JMS request/reply operation. For that to work properly, the JMSCorrelationID of the reply Message sent by Spring Integration should be set to the ID of the JMS request Message. Otherwise, if we set the JMSCorrelationID to the value of a Spring Integration Message's "MessageHeaders.ID", the original sender would not be able to correlate the reply message.

                On the Spring Integration side, this same type of "interference" can occur. Imagine a model where immediately after receiving a JMS Message, there is a series of endpoints including a Splitter and Aggregator. The Splitter sets Spring Integration's "MessageHeaders.CORRELATION_ID" field so that the aggregator can group the aggregated Messages properly. This means that any JMSCorrelationID being stored with that same Spring Integration header name will have been lost. This same idea then applies to the distinction between Spring Integration's RETURN_ADDRESS and the JMSReplyTo. As soon as you add another endpoint, that could produce an Exception, because the RETURN_ADDRESS header is only intended to hold Spring Integration MessageChannels or channel names.

                In both cases, the issues arise when there is more than one component involved on either side of the interaction. Here is a very relevant discussion about the challenges of correlation within conversations by Gregor Hohpe (co-author of Enterprise Integration Patterns):

                Hopefully this clarifies why the headers have been deliberately separated. Nevertheless, we will need to develop a solution to this problem that feels intuitive to the developer while still working as expected in any of these use-cases. It seems that the current approach to SI/JMS header mapping is too generic. In reality, we need to implement a slightly different approach when SI is the requester vs. replier. In other words, we probably need to provide both inbound and outbound JmsGateway components to handle this without relying on the ReplyMessageCorrelator. Within those components the mapping can even use separate header names as long as the end user does not need to be bothered with the details. How does that sound?



                • #9
                  Spring Integration & JMS over ActiveMQ

                  Hi Mark

                  I did wonder about SI correlation ID and return address, as they seemed very similar to the JMS fields, but not having expert knowledge of the SI codebase, the internal SI use of these attributes was not apparent to me.

                  I now understand your reluctance to re-use these fields for JMS request/reply correlation. It wouldn't work in the larger context, and I can see why you are concerned about the mixing of Spring integration headers and JMS headers. Thanks for taking the time to explain it to me!

                  I don't think I'm competent to answer your question regarding the use of inbound and outbound JmsGateway components to handle the correlation - that's miles beyond my current knowledge of SI.

                  On re-reading Gregor Hohpe's article, he seems to be arguing that a "Conversation" ID needs to be carried with the message even if there are multiple intermediaries (e.g. aggregators, splitters, etc.) involved. Is that what you're proposing?

                  I really like Spring Integration and I want to use it, but that depends on SI providing JMS support for both asynchronous and synchronous (request/reply) messaging - our architecture assumes JMS support. Do you think you'll have the JMS correlation for request/reply messaging sorted in SP1?

                  I like the idea of the end user not needing to be bothered with the details...


                  Last edited by neilwilk; Aug 27th, 2008, 09:29 AM.


                  • #10
                    I like the sound of having inbound and outbound JMS gateways for handling this. I'll also put up my hand and say that the current Spring SI is confusing when you first try and do JMS request/reply type patterns.

                    I replied to an earlier question on request/reply with my particular solution (I used interceptors to handle the correlation ID).


                    • #11
                      Spring Integration & JMS over ActiveMQ

                      Spring Integration should do the JMS correlation, rather than fobbing it off on the poor benighted user.

                      The interceptor approach works, but even in M6 you need expose this in the user configuration, which doesn't seem right to me.


                      • #12
                        Thanks again for all of the feedback. Indeed, Spring Integration should handle the correlation behind the scenes. From my perspective, this is part of a larger design issue where we need to provide a clearer distinction between Channel Adapters and Gateways. In other words, the reason this currently places the burden on the developer is that for outbound JMS request/reply, we only have support for unidirectional Channel Adapters. If on the other hand, we provide an outbound JMS Gateway, the request/reply correlation will happen there in a single module that defines the request and reply channels. Neil is correct to say that these responsibilities should be encapsulated. For instance, the developer should not need to know about things such as ReplyMessageCorrelator. That is an internal component, while "channel-adapter" and "gateway" are part of the *public* API. So, please keep an eye on the JIRA issue, and feel free to provide feedback there or in this thread as we address the issue and you get a chance to try it out.



                        • #13

                          My case is much simpler, I have a JMS gateway reading from a queue into a channel.
                          I want to use an aggregator to combine several messages. I am getting the error:
                          "AggregatingMessageHandler requires the 'correlationId' property"

                          Was this issue resolved in a certain snapshot?



                          • #14
                            Sorry to revive an old thread...

                            So how do you use the outbound gateway? Here is what I am trying to do in order to test a request/reply scenario:
                            1. I am using a MessageChannelTemplate object to send a request message to a channel, which is set as the request-channel of an outbound gateway:
                              requestChannelTemplate.send(requestMsg, requestChannel);
                              <integration:channel id="requestChannel" />
                              <jms:outbound-gateway connection-factory="singleConnectionFactory" request-channel="requestChannel"
                              		request-destination-name="queue:request" reply-destination-name="queue:reply" />
                              I want the current thread to block until a reply is received.

                            2. In a separate thread, send a JMS reply, simulating a response from an external system.

                            On the reply message, no matter what I set the JMS or Spring Integration correlation ID to, the request always times out. How is the reply message being correlated to the request? What do I need to do in order to get this to work?