Announcement Announcement Module
Collapse
No announcement yet.
AmqpInboundEndpoint doesn't copy headers Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • AmqpInboundEndpoint doesn't copy headers

    It seems AmqpInboundEndpoint doesn't copy headers from the AMQP messages. Is this by design or has it not been implemented yet?

  • #2
    Sorry, that is just not implemented yet. I will make a quick update so that it performs a rough, brute-force copy for now.

    Comment


    • #3
      Awesome. I won't tell you how long it took me to figure that out

      Comment


      • #4
        Now that I'm about to add this functionality, I realize that it would be nice if our MessageProperties object actually extended Map<String, ?>... then I could copy it directly. Otherwise, I'll need to either iterate manually or rely on reflection.

        Anyways, out of curiosity, can you tell me which of the properties you would expect to be copied (and "all" is a valid response)? Also, do you think a prefix would be acceptable (in Spring Integration, we tend to use something like "$amqp_prop")? Prefixes act as "namespaces" so that we avoid collisions. For example, in both Spring Integration and AMQP there is a "correlationId" property.

        Comment


        • #5
          My immediate need is some custom headers I'm setting on the AMQP messages. After the inbound endpoint handles converting the message, I'm using HeaderValueRouter based on a custom header value I'm setting.

          Adding prefixes sounds like a good idea.

          Comment


          • #6
            I just committed the following change...

            BEFORE:
            Code:
            public void onMessage(Message message) {
                sendMessage(MessageBuilder.withPayload(message.getBody()).build());
            }
            AFTER:
            Code:
            public void onMessage(Message message) {
                sendMessage(MessageBuilder.withPayload(message.getBody())
                        .copyHeaders(message.getMessageProperties().getHeaders()).build());
            }
            So, as you can see, this is just an *immediate* fix (and short-term probably since there is no prefix being added yet). Basically, the "headers" from the MessageProperties will be available as-is in the resulting Spring Integration Message.

            Once we add the prefix, and the rest of the headers this will likely change. I hope you don't mind living on the edge. It is "sandbox" code only for the near term future.

            -Mark

            Comment


            • #7
              I don't mind being on the bleeding edge and I do expect hiccups like this. If anything, I question my own sanity for learning this stuffy while using sandbox/milestone code but so far it's been pretty smooth.

              I appreciate the daily feedback and code changes.

              Comment


              • #8
                There is a bright side: you can significantly influence the direction. It's early in the game, but it should be very exciting.

                Comment


                • #9
                  The bright side is bloody. It's now blowing up trying to deal with a simple String header value. What should be a string is coming through as com.rabbitmq.client.impl.LongStringHelper$ByteArra yLongString

                  Here's the stacktrace:

                  2010-07-21 16:30:35,320 DEBUG [org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer] - <Received message from exchange [], routing-key [dbcmPlyReplyQueue]>
                  2010-07-21 16:31:21,612 DEBUG [org.springframework.integration.channel.DirectChan nel] - <preSend on channel 'amqpPolyResponseChannel', message: [Payload=[B@1a79b48][Headers={$timestamp=1279744281612, $history=[channel:amqpPolyResponseChannel], $id=3694667e-c73d-4ded-8a97-e8c80f3522ff, jobName=dbmmOas}]>
                  2010-07-21 16:31:21,612 DEBUG [org.springframework.integration.router.HeaderValue Router] - <(inner bean)#9 received message: [Payload=[B@1a79b48][Headers={$timestamp=1279744281612, $history=[channel:amqpPolyResponseChannel], $id=3694667e-c73d-4ded-8a97-e8c80f3522ff, jobName=dbmmOas}]>
                  2010-07-21 16:31:32,971 WARN [org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer] - <Execution of Rabbit message listener failed, and no ErrorHandler has been set.>
                  org.springframework.integration.core.MessagingExce ption: unsupported return type for router [class com.rabbitmq.client.impl.LongStringHelper$ByteArra yLongString]
                  at org.springframework.integration.router.AbstractCha nnelNameResolvingMessageRouter.addToCollection(Abs tractChannelNameResolvingMessageRouter.java:146)
                  at org.springframework.integration.router.AbstractCha nnelNameResolvingMessageRouter.determineTargetChan nels(AbstractChannelNameResolvingMessageRouter.jav a:111)
                  at org.springframework.integration.router.AbstractMes sageRouter.handleMessageInternal(AbstractMessageRo uter.java:106)
                  at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:63)
                  at org.springframework.integration.endpoint.HandlerIn vocationChain.handleMessage(HandlerInvocationChain .java:58)
                  at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :105)
                  at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:9 2)
                  at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:43)
                  at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:171)
                  at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:144)
                  at org.springframework.integration.channel.MessageCha nnelTemplate.doSend(MessageChannelTemplate.java:22 2)
                  at org.springframework.integration.channel.MessageCha nnelTemplate.send(MessageChannelTemplate.java:181)
                  at org.springframework.integration.endpoint.MessagePr oducerSupport.sendMessage(MessageProducerSupport.j ava:55)
                  at org.springframework.integration.amqp.AmqpInboundEn dpoint.access$000(AmqpInboundEndpoint.java:34)
                  at org.springframework.integration.amqp.AmqpInboundEn dpoint$1.onMessage(AmqpInboundEndpoint.java:55)
                  at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.doInvokeListener(AbstractM essageListenerContainer.java:316)
                  at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.invokeListener(AbstractMes sageListenerContainer.java:253)
                  at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.doExecuteListener(Abstract MessageListenerContainer.java:229)
                  at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.executeListener(AbstractMe ssageListenerContainer.java:198)
                  at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.processMessage(SimpleMessage ListenerContainer.java:229)
                  at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er.run(SimpleMessageListenerContainer.java:288)
                  at java.lang.Thread.run(Thread.java:619)

                  Comment


                  • #10
                    You are routing on that "jobName" header?

                    Comment


                    • #11
                      Yes. Should that normally be an issue?

                      Comment


                      • #12
                        No issue. I just wanted to make sure I understood which value was in that Rabbit-specific String class. From the log output, it appears that the result of its toString() is fine. We need to convert that somewhere (either on the AMQP side or the Spring Integration side... most likely the former).

                        Comment

                        Working...
                        X