Announcement Announcement Module
No announcement yet.
setup a header mapper for spring-amqp server configuration Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • setup a header mapper for spring-amqp server configuration

    hi gary :
    i have the following service configuration for the server with spring-amqp.

    the problem is the client receives the message but errors out with

    [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer - Received message: (Body:'<?xml version="1.0" encoding="UTF-8" standalone="yes"?><Echo_PayloadResponse xmlns=""><Payload>XXXXXXXXXXXXXXXXXXXXXXXXXXXXX</Payload></Echo_PayloadResponse>'; ID:null; Content:text/plain; Headers:{}; Exchange:; RoutingKey:replies; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:1)

    [SimpleAsyncTaskExecutor-1] ERROR org.springframework.amqp.rabbit.core.RabbitTemplat e - No correlation header in reply

    although on the server side i see message headers :
    Received message: (Body:'<Echo_Payload xmlns=""> <RequestPadding>1234567890</RequestPadding> <SleepMilliseconds>0</SleepMilliseconds> <PayloadSize>400</PayloadSize> </Echo_Payload>'; ID:null; Content:text/plain; Headers:{spring_reply_correlation=93863d64-52d0-4be0-a88c-c321222bf574, spring_reply_to=null};; RoutingKey:submitEchoPayload; Reply:replies; DeliveryMode:PERSISTENT; DeliveryTag:1)

    but these are not preserved on the server side cause i don't have mapping headers for spring_reply_correlation.
    and of course the submitMethod doesn't take a spring-amqp Message object, it only takes a JAXBReq and JAXBResponse object.

    public JAXBElement<EchoPayloadResponseT> submitEchoPayload(EchoPayloadT echoPayload) {

    the createMessage() method in XMLMessageConverter does the following:
    message = super.createMessage(xml, messageProperties);

    besides using spring-integration, what's the best way to preserve message headers given the constraints where the ultimate service doesn't "know" about "Spring's" Message (header,body)?

    thanks and regards,

    spring-context client side
    ================================================== =================
    <rabbit:queue name="replies" auto-delete="true" durable="false" exclusive="true"></rabbit:queue>
    <rabbit:template reply-queue="replies" id="amqpTemplate" connection-factory="rabbitConnection"
    exchange="" routing-key="submitEchoPayload" queue="amqp_echoPayload">
    ================================================== =================

    spring-context on server side
    ================================================== ======
    <bean id="echoPayloadServer" class="com.bear.EchoPayloadService"></bean>

    <listener-container xmlns="" acknowledge="auto" concurrency="2"
    connection-factory="rabbitConnection" message-converter="xmlConverter" prefetch="1">
    <listener ref="echoPayloadServer" method="submitEchoPayload" queue-names="amqp_echoPayload" />
    <bean id="xmlConverter" class="com.bear.XMLMessageConverter">

    <!-- rabbit configuration -->
    <rabbit:connection-factory id="rabbitConnection" />
    <rabbit:admin connection-factory="rabbitConnection" auto-startup="true"/>

    <rabbit:queue name="amqp_echoPayload" auto-delete="true" durable="false" exclusive="false"></rabbit:queue>

    <rabbit:direct-exchange name="">
    <rabbit:binding queue="amqp_echoPayload" key="submitEchoPayload"/>

    ================================================== ======

  • #2
    There are a few ways to do it; for example, in your XMLMessageConverter, save off the 'spring_reply_correlation' header in a ThreadLocal<String> field, in fromMessage(), and add it to the reply in toMessage() (or createMessage() if your converter subclassed AbstractMessageConverter).

    Or, you can declare your message listener container using <bean/> syntax and implement a MessageListener to directly handle the spring-amqp Message, call the message converter, call your service, and restore the header to the reply while converting back to a Message. This, however, would need you to replicate the reply processing logic from the MessageListenerAdapter.

    Probably the simplest would be to use the ThreadLocal.


    • #3
      thanks much gary for the pointers!
      i did take the short cut and use a threadlocal variable w/ a check to see if the header contains spring_reply_correlation. works great!

      some clarifications though:
      spring-amqp uses spring_reply_correlation header but i also noticed spring_reply_to
      Headers:{spring_reply_correlation=93863d64-52d0-4be0-a88c-c321222bf574, spring_reply_to=null};; RoutingKey:submitEchoPayload; Reply:replies; DeliveryMode:PERSISTENT; DeliveryTag:1)

      the server does properly reply to "replies" (because Header Reply=replies) queue even when spring_reply_to == null.
      so, the questions:
      [1] when spring_reply_to and Reply are both set, what takes priority for listener container?
      [2] what is expected as server side/client side contract when using spring-amqp?
      [3] also, amqp itself specifies correlationId (also wrapped as part of spring's messageProperties) so, do servers have to be "spring aware" ? could you also elaborate why spring-amqp/integration can't use rabbit/amqp correlationId property?

      thanks for your patience!


      • #4
        The spring_reply_to header is really only needed for if there was an inbound reply_to header to the outbound template; the template restores it on the response. It's really intended for situations where there is a pipeline with more than 2 servers involved s1->s2->s3<-reply. On reflection, it probably should have been suppressed when null.

        The important header when using a rabbit template with a fixed reply queue is the spring_reply_correlation; the contract is the server must include this header in the reply.

        It's possible I could have used the inbuilt correlation header; may just be due to my oversight; I'll have to take a look when I get time.


        • #5
          Hi cogitate,

          Just in case you are using spring-amqp:inbound-gateway, the SPRING_REPLY_CORRELATION is alrady handled for you by DefaultAmqpHeaderMapper.
          check ltAmqpHeaderMapper

          There is a list of white listed headers which service will preserve and include in the replies, spring reply correation is one such header.

          Once can also write a custom header mapper, in case you have some application specfic headers, we used one such thing in our project some time back.