Announcement Announcement Module
Collapse
No announcement yet.
Amqp -> jdbc Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Amqp -> jdbc

    Hi,

    I am trying to implement something relative simple in spring integration. My use case is to read from a message queue with amqp and update a database with a heartbeat. I coded the app like this:

    Code:
    	<int:channel id="fromHeartbeat"/>
    	<int-amqp:inbound-channel-adapter id="heartbeatQueue"
    				      	      channel="fromHeartbeat" 
                                             queue-names="heartbeat"
                                             connection-factory="connectionFactory"/>
    
    	
    
    	<int:channel id="updateheartbeat"/>
    	<int-jdbc:outbound-channel-adapter channel="fromHeartbeat" query="update server set CPU = :payload[cpu],dt_last_heartbeat = now() where name = :payload[server]"
    	data-source="dataSource" />
    	
    	<!-- Infrastructure -->
    	<rabbit:connection-factory id="connectionFactory" host="*******" port="5672" virtual-host="/" username="guest" password="guest"/> 
    	<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
    
    	<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    	<property name="driverClassName" value="com.mysql.jdbc.Driver" />
    	<property name="url" value="jdbc:mysql://********/schema_name" />
    	<property name="username" value="********" />
    	<property name="password" value="*******" />
    	</bean>
    When I put a message on the queue (sample below) and run the program, The output is like this:
    Code:
    23:01:45.316 DEBUG [com.xxxxxx.XXXXXXX.clustermanager.Main.main()][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Using hostname [rabbitserver1.com] for hostname.
    23:01:46.842 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Creating cached Rabbit Channel from AMQChannel(amqp://guest@[email protected]:5672/,1)
    23:01:46.877 INFO  [com.xxxxxx.XXXXXXX.clustermanager.Main.main()][org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter] started heartbeatQueue
    23:01:46.947 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper] headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=amqp_receivedRoutingKey
    23:01:46.947 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper] headerName=[amqp_deliveryMode] WILL be mapped, matched pattern=amqp_deliveryMode
    23:01:46.947 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper] headerName=[amqp_redelivered] WILL be mapped, matched pattern=amqp_redelivered
    23:01:46.948 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper] headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=amqp_deliveryTag
    23:01:46.953 DEBUG [SimpleAsyncTaskExecutor-1][org.springframework.integration.jdbc.JdbcMessageHandler] org.springframework.integration.jdbc.JdbcMessageHandler#0 received message: [Payload=[B@3de3940a][Headers={timestamp=1369177306953, id=dec2ef11-3e9e-48f2-8074-286e896d4770, amqp_receivedRoutingKey=heartbeat, amqp_deliveryMode=NON_PERSISTENT, amqp_redelivered=true, amqp_deliveryTag=1}]
    Nothing seems to happen, the message remains on the queue and the database is not updated

    As you will notice, the JDBC message handler sees this is the payload:

    B@3de3940a,

    which looks like the results of an Object.ToString() where it just prints out the object ID.

    That's strange, because Rabbit shows the message as:

    Routing Key heartbeat
    Properties
    delivery_mode: 1
    headers:
    Payload: cpu=75,server=xxx.xxxxxx.com (30 bytes)
    Encoding: string

    As usual, there seems to be some transformation behavior that is strange. Is it something to do with the mime type of message on the Rabbit queue or a bug?

  • #2
    The inbound adapter has a message converter; by default it's a SimpleMessageConverter. It needs some clues as to how to interpret the bytes that comprise the payload of the AMQP message. The SimpleMessageConverter only handles String (if the contentType message property contains 'text' - e.g. 'text/plain') or it can also handle a Java serialized object (which is not recommended for AMQP) if the contentType message property is "application/x-java-serialized-object".

    In all other cases, it emits a message with a byte[] payload (which is what you are seeing in the LOG of the payload - it hasn't been converted - it's still a byte[]).

    If you control the sending system, the simplest solution is to have it set the contentType property of the message to "text/plain". A Spring Integration application on the sending side will do this automatically (if the outbound message contained a 'String' payload).

    Or, you can simply add a transformer between the inbound adapter and jdbc outbound adapter...

    Code:
    <int:transformer input-channel="updateheartbeat" output-channel="toJdbc" expression="new String(payload)" />
    or

    Code:
    <int:transformer input-channel="updateheartbeat" output-channel="toJdbc" expression="new String(payload, 'UTF-8')" />
    (or any other charset, depending on the needs of the sending system).

    Hope that helps.

    Comment


    • #3
      Originally posted by Gary Russell View Post

      If you control the sending system, the simplest solution is to have it set the contentType property of the message to "text/plain". A Spring Integration application on the sending side will do this automatically (if the outbound message contained a 'String' payload).
      Thanks Gary, I started down that route thinking it was a contentType problem prior to checking back on the thread. I made my own client and that did progress the error. Now I get this:

      Code:
      Caused by: org.springframework.beans.InvalidPropertyException: Invalid property 'payload[cpu]' of bean class [org.springframework.integration.message.GenericMessage]: Property referenced in indexed property path 'payload[cpu]' is neither an array nor a List nor a Set nor a Map; returned value was [cpu=75,server=XXXX.XXXXXX.com]
      I have a feeling my string is not in the right format for automatic parsing.
      I build the message like this:
      Code:
      channel.send(MessageBuilder.withPayload("cpu=75,server=XXXX.XXXXX.com").setHeader("server", "XXXX.XXXX.com").build());
      I realize I have "server" in both the header and the payload, but that's still a design consideration

      Comment


      • #4
        Your SpEL expression is not valid for a String payload "payload[cpu]", "payload[server]" implies the payload is a Map (which is what the error message is telling you).

        You can't just pull some contents out of a String payload with syntax like that.

        One technique would be to send a JSON representation of a Map object and use JSON Message Converters on both sides. Or, you can construct the JSON manually on the sending side and just use the converter on the receiving side...

        Code:
        {"cpu": 75, "server": "XXXX.XXXXX.com"}
        Or, just use

        Code:
        channel.send(MessageBuilder.withPayload("75").setHeader("server", "XXXX.XXXX.com").build());
        
        ...
        
        update server set CPU = :payload,dt_last_heartbeat = now() where name = :headers[server]
        You will need to add 'server' to mapped-headers.

        But the JSON route is probably more flexible.

        The contentType has to contain "json" for the inbound JsonMessageConverter to work (e.g. application/json or text/x-json).
        Last edited by Gary Russell; May 22nd, 2013, 07:50 AM.

        Comment


        • #5
          Thanks Gary, I was trying multiple variations of code on the sender side to experiment with different message types. I ended up using a LinkedHashMap to construct the payload, and converting that to a byte array which is what got put on the queue. On the receiver side context xml I ended up using payload deserializing transformer, which took the byte array and put the message back into a map. That caused the rest of my code to work. I realize there are different ways to make this work, but this was the smallest change.

          Code:
          <int:payload-deserializing-transformer input-channel="fromHeartbeat" output-channel="updateheartbeat"/>

          Comment

          Working...
          X