Announcement Announcement Module
No announcement yet.
Amqp -> jdbc Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Amqp -> jdbc


    I am trying to implement something relative simple in spring integration. My use case is to read from a message queue with amqp and update a database with a heartbeat. I coded the app like this:

    	<int:channel id="fromHeartbeat"/>
    	<int-amqp:inbound-channel-adapter id="heartbeatQueue"
    	<int:channel id="updateheartbeat"/>
    	<int-jdbc:outbound-channel-adapter channel="fromHeartbeat" query="update server set CPU = :payload[cpu],dt_last_heartbeat = now() where name = :payload[server]"
    	data-source="dataSource" />
    	<!-- Infrastructure -->
    	<rabbit:connection-factory id="connectionFactory" host="*******" port="5672" virtual-host="/" username="guest" password="guest"/> 
    	<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
    	<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    	<property name="driverClassName" value="com.mysql.jdbc.Driver" />
    	<property name="url" value="jdbc:mysql://********/schema_name" />
    	<property name="username" value="********" />
    	<property name="password" value="*******" />
    When I put a message on the queue (sample below) and run the program, The output is like this:
    23:01:45.316 DEBUG [com.xxxxxx.XXXXXXX.clustermanager.Main.main()][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Using hostname [] for hostname.
    23:01:46.842 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,1)
    23:01:46.877 INFO  [com.xxxxxx.XXXXXXX.clustermanager.Main.main()][org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter] started heartbeatQueue
    23:01:46.947 DEBUG [SimpleAsyncTaskExecutor-1][] headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=amqp_receivedRoutingKey
    23:01:46.947 DEBUG [SimpleAsyncTaskExecutor-1][] headerName=[amqp_deliveryMode] WILL be mapped, matched pattern=amqp_deliveryMode
    23:01:46.947 DEBUG [SimpleAsyncTaskExecutor-1][] headerName=[amqp_redelivered] WILL be mapped, matched pattern=amqp_redelivered
    23:01:46.948 DEBUG [SimpleAsyncTaskExecutor-1][] headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=amqp_deliveryTag
    23:01:46.953 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.integration.jdbc.JdbcMessageHandler] org.springframework.integration.jdbc.JdbcMessageHandler#0 received message: [Payload=[[email protected]][Headers={timestamp=1369177306953, id=dec2ef11-3e9e-48f2-8074-286e896d4770, amqp_receivedRoutingKey=heartbeat, amqp_deliveryMode=NON_PERSISTENT, amqp_redelivered=true, amqp_deliveryTag=1}]
    Nothing seems to happen, the message remains on the queue and the database is not updated

    As you will notice, the JDBC message handler sees this is the payload:

    [email protected],

    which looks like the results of an Object.ToString() where it just prints out the object ID.

    That's strange, because Rabbit shows the message as:

    Routing Key heartbeat
    delivery_mode: 1
    Payload: cpu=75, (30 bytes)
    Encoding: string

    As usual, there seems to be some transformation behavior that is strange. Is it something to do with the mime type of message on the Rabbit queue or a bug?

  • #2
    The inbound adapter has a message converter; by default it's a SimpleMessageConverter. It needs some clues as to how to interpret the bytes that comprise the payload of the AMQP message. The SimpleMessageConverter only handles String (if the contentType message property contains 'text' - e.g. 'text/plain') or it can also handle a Java serialized object (which is not recommended for AMQP) if the contentType message property is "application/x-java-serialized-object".

    In all other cases, it emits a message with a byte[] payload (which is what you are seeing in the LOG of the payload - it hasn't been converted - it's still a byte[]).

    If you control the sending system, the simplest solution is to have it set the contentType property of the message to "text/plain". A Spring Integration application on the sending side will do this automatically (if the outbound message contained a 'String' payload).

    Or, you can simply add a transformer between the inbound adapter and jdbc outbound adapter...

    <int:transformer input-channel="updateheartbeat" output-channel="toJdbc" expression="new String(payload)" />

    <int:transformer input-channel="updateheartbeat" output-channel="toJdbc" expression="new String(payload, 'UTF-8')" />
    (or any other charset, depending on the needs of the sending system).

    Hope that helps.


    • #3
      Originally posted by Gary Russell View Post

      If you control the sending system, the simplest solution is to have it set the contentType property of the message to "text/plain". A Spring Integration application on the sending side will do this automatically (if the outbound message contained a 'String' payload).
      Thanks Gary, I started down that route thinking it was a contentType problem prior to checking back on the thread. I made my own client and that did progress the error. Now I get this:

      Caused by: org.springframework.beans.InvalidPropertyException: Invalid property 'payload[cpu]' of bean class [org.springframework.integration.message.GenericMessage]: Property referenced in indexed property path 'payload[cpu]' is neither an array nor a List nor a Set nor a Map; returned value was [cpu=75,]
      I have a feeling my string is not in the right format for automatic parsing.
      I build the message like this:
      channel.send(MessageBuilder.withPayload("cpu=75,").setHeader("server", "").build());
      I realize I have "server" in both the header and the payload, but that's still a design consideration


      • #4
        Your SpEL expression is not valid for a String payload "payload[cpu]", "payload[server]" implies the payload is a Map (which is what the error message is telling you).

        You can't just pull some contents out of a String payload with syntax like that.

        One technique would be to send a JSON representation of a Map object and use JSON Message Converters on both sides. Or, you can construct the JSON manually on the sending side and just use the converter on the receiving side...

        {"cpu": 75, "server": ""}
        Or, just use

        channel.send(MessageBuilder.withPayload("75").setHeader("server", "").build());
        update server set CPU = :payload,dt_last_heartbeat = now() where name = :headers[server]
        You will need to add 'server' to mapped-headers.

        But the JSON route is probably more flexible.

        The contentType has to contain "json" for the inbound JsonMessageConverter to work (e.g. application/json or text/x-json).
        Last edited by Gary Russell; May 22nd, 2013, 07:50 AM.


        • #5
          Thanks Gary, I was trying multiple variations of code on the sender side to experiment with different message types. I ended up using a LinkedHashMap to construct the payload, and converting that to a byte array which is what got put on the queue. On the receiver side context xml I ended up using payload deserializing transformer, which took the byte array and put the message back into a map. That caused the rest of my code to work. I realize there are different ways to make this work, but this was the smallest change.

          <int:payload-deserializing-transformer input-channel="fromHeartbeat" output-channel="updateheartbeat"/>