Announcement Announcement Module
Collapse
No announcement yet.
Splitting JDBC query results Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Splitting JDBC query results

    I have a jdbc inbound poller that returns multiple rows(records). I want to split these into individual messages for downstream processing. I have tried different SPEL expressions in the splitter, but none seem to work and I could not find any good examples in the 3 books I own nor in the various online resources. My current config . Note that the query in the jdbc inbound adapter returns two rows (records)
    Code:
    	<int-jdbc:inbound-channel-adapter channel="deadBins" 
    		query="select idbin from bin" 
    		data-source="dataSource">
     		<int:poller fixed-rate="1000"/>
    	</int-jdbc:inbound-channel-adapter>
    
    	<int:channel id="deadBins"/>
    
            <int:chain input-channel="deadBins" output-channel="moveBin">
    		<int:splitter expression="payload.idbin" />                                                         
    	</int:chain>
    I have also tried payload[idbin] which did not work. What is the correct expression?

    also will moveBin channel receive two messages if their are two rows returned by the poller?

    The SPEL payload.idbin throws an error like this:

    Code:
    16:45:59.544 DEBUG [task-scheduler-9][org.springframework.jdbc.datasource.DataSourceUtils] Returning JDBC Connection to DataSource
    16:45:59.545 DEBUG [task-scheduler-9][org.springframework.integration.endpoint.SourcePollingChannelAdapter] Poll resulted in Message: [Payload=[{idbin=1439036}, {idbin=1439037}]][Headers={timestamp=1372437959545, id=ec79e6b1-4ea1-4dba-b1ef-9cc8f5ae29cf}]
    16:45:59.545 DEBUG [task-scheduler-9][org.springframework.integration.channel.DirectChannel] preSend on channel 'deadBins', message: [Payload=[{idbin=1439036}, {idbin=1439037}]][Headers={timestamp=1372437959545, id=c5c9a9e0-4ff1-4ea1-b25b-346859e5f8c1, history=deadBins}]
    16:45:59.545 DEBUG [task-scheduler-9][org.springframework.integration.handler.MessageHandlerChain] org.springframework.integration.handler.MessageHandlerChain#0 received message: [Payload=[{idbin=1439036}, {idbin=1439037}]][Headers={timestamp=1372437959545, id=c5c9a9e0-4ff1-4ea1-b25b-346859e5f8c1, history=deadBins}]
    16:45:59.545 DEBUG [task-scheduler-9][org.springframework.integration.splitter.ExpressionEvaluatingSplitter] org.springframework.integration.splitter.ExpressionEvaluatingSplitter@33cfa965 received message: [Payload=[{idbin=1439036}, {idbin=1439037}]][Headers={timestamp=1372437959545, id=c5c9a9e0-4ff1-4ea1-b25b-346859e5f8c1, history=deadBins}]
    16:45:59.545 DEBUG [task-scheduler-9][org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor] SpEL Expression evaluation failed with EvaluationException.
    org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8): Field or property 'idbin' cannot be found on object of type 'java.util.ArrayList'
    Last edited by ehrdoctors; Jun 28th, 2013, 11:05 PM.

  • #2
    It ends up that the SPEL needed is

    Code:
    <int:splitter expression="payload[0]" />
    It also appears that two messages are sent independently downstream to the moveBin channel. I have to learn the SPEL better as the solution doesn't really make intuitive sense to me. I have another problem downstream that is similar:

    Code:
    	<int:channel id="republishMessages"/>		
    
    	<int:chain input-channel="republishMessages" output-channel="republish">
    		<int:splitter expression="@selectMessagestoMove.exchange(#root).payload" />                                                         
    	</int:chain>
    
    
    	<int:gateway id="selectMessagestoMove" default-request-channel="selectMessages" error-channel="exceptionTransformationChannel" />                        
    
    	<int:chain input-channel="selectMessages">
    		<int-jdbc:outbound-gateway query="select field1, field2 from message where binid = :payload[binid]" data-source="dataSource" />                                                         
    	</int:chain>
    
    	
    	
    	<int:channel id="republish"/>
    	
    	<int-amqp:outbound-channel-adapter channel="republish"
    		amqp-template="amqpTemplate" exchange-name-expression="******"
    		routing-key-expression="*******"/>
    Note that the query "select field1, field2 from message" returns two rows, but only one message is sent from the splitter to the republish channel and on to rabbit.

    The SPEL @selectMessagestoMove.exchange(#root).payload must be causing only one record to return.

    I do notice this is the debug log:
    02:53:45.080 DEBUG [task-scheduler-1][org.springframework.integration.gateway.GatewayPro xyFactoryBean] Unable to attempt conversion of Message payload types. Component 'selectMessagestoMove' has no explicit ConversionService reference, and there is no 'integrationConversionService' bean within the context.


    If is use SPEL @selectMessagestoMove.exchange(#root).payload[0], then no messages get sent to rabbit.

    What would be the proper SPEL to split? It doesn't seem to follow the same pattern as the SPEL above even though these are similar operations
    Last edited by ehrdoctors; Jun 29th, 2013, 02:14 PM.

    Comment


    • #3
      It is not at all clear what you are trying to do.

      A splitter is intended to "split" a single message into multiple messages. whereas an expression like "payload.idBin" means produce a single message from some field idBin in the payload of the inbound message (unless idBin is a collection, in which case you'd get a separate message for each member of the collection). Yes, you can use any arbitrary expression, but the intent is that the expression should evaluate to a collection of some kind. For example if you had a POJO for an order that has a collection of order lines, you might use something like "payload.orderLines". The framework then busts up that collection into separate messages.

      In your first case, the input is already a collection (the jdbc polled adapter produces a List of rows).

      So, you only need a simple splitter (with no expression) <splitter />. The default splitter simply breaks apart the inbound collection.


      For your second case, I can't figure out why you want to invoke a JDBC select operation to determine how to split an existing message.

      A splitter doesn't seem to be the right component here.
      Last edited by Gary Russell; Jun 29th, 2013, 02:41 AM.

      Comment


      • #4
        Originally posted by Gary Russell;
        In your first case, the input is already a collection (the jdbc polled adapter produces a List of rows).

        So, you only need a simple splitter (with no expression) <splitter />. The default splitter simply breaks apart the inbound collection.
        OK, so if I have a splitter with no SPEL expression in the first case it will split the JDBC rows (in the message) into multiple messages . It is simpler than I thought. I will change that and make sure it still works.

        Originally posted by Gary Russel;
        For your second case, I can't figure out why you want to invoke a JDBC select operation to determine how to split an existing message.
        That was not my intent, so let me try to clearly state the case. This 2nd case is part of a larger message flow that originates with the JDBC inbound adapater in my first case. So a main message already exists and has some header and a payload an is is being passed to the republishMessages channel. Based on the binid of the message (which is in the payload), I want to query the message table and and get all the messages with that binid, and then send each Row back to AMQP as an individual message. I edited the code above to try to make it clearer.

        Comment


        • #5
          The splitter would likely work but you are mixing concerns. You don't really want to split the message, you want to use the message to generate another series of messages.

          It's probably clearer/cleaner to use a transformer (with an expression that invokes the gateway) to transform the payload.binId to a list of rows and then follow that with a simple splitter to generate a message for each row.

          Comment


          • #6
            Actually, you are over-complicating this flow.

            The <gateway/> technique you are using was suggested on other threads for doing header enrichment with JDBC data.

            You really just need to use the jdbc:outbound-gateway directly in your flow...

            channel->jdbc-outbound-gateway->splitter->amqp

            Comment


            • #7
              Originally posted by Gary Russell View Post
              Actually, you are over-complicating this flow.
              You really just need to use the jdbc:outbound-gateway directly in your flow...
              channel->jdbc-outbound-gateway->splitter->amqp
              Gary, Now that I read more about the jdbc outbound gateway I see your point, so I changed the flow.

              Code:
              	<int:channel id="republishMessages"/>
              
              	<int-jdbc:outbound-gateway query="select field1, field2 from message where binid = :payload[idbin]" 
              				      data-source="dataSource"
              				      request-channel="republishMessages"
                  				      reply-channel="messagesOutput"
                                                    max-rows-per-poll="0"
                                                />                                                         
              	
              	<int:channel id="messagesOutput"/>
              	<int:chain input-channel="messagesOutput" output-channel="republish">
              		<int:splitter />                                                         
              	</int:chain>
              	
              	<int:channel id="republish"/>
              	
              	<int-amqp:outbound-channel-adapter channel="republish"
              		amqp-template="amqpTemplate" exchange-name-expression="headers[serverName]"
              		routing-key-expression="headers[serverName]"/>
              Strangely enough, I still only get one message on my rabbit queue, even though
              select field1, field2 from message where binid = : payload[idbin] in the mysql client returns 2 rows. For some reason, as you can see in the debug trace below, only one message gets generated as a result of the query. EDIT: This code was fixed in the aove example to include max-rows-per-poll as Gary suggested below. The above code works!


              04:27:15.829 DEBUG [task-scheduler-1][org.springframework.integration.channel.DirectChan nel] preSend on channel 'republishMessages', message: [Payload={idbin=1439036}][Headers={timestamp=1372566435829, id=f169ca37-cd8a-4ae2-bfd6-65c3b9849a45, correlationId=ec9bac10-f9ac-4765-8401-ea31ccf4b30b, sequenceSize=1, updateresult=[Payload={UPDATED=1}][Headers={timestamp=1372566435817, id=33a53893-8cb6-4d41-b31d-b201b149174f, correlationId=ec9bac10-f9ac-4765-8401-ea31ccf4b30b, sequenceSize=1, sequenceNumber=1}], serverName=lm-XX.-XXX-XX-XXX, sequenceNumber=1}]
              04:27:15.829 DEBUG [task-scheduler-1][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL query
              04:27:15.830 DEBUG [task-scheduler-1][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL statement [select field1, field2 from message where binid = ?]
              04:27:15.830 DEBUG [task-scheduler-1][org.springframework.jdbc.datasource.DataSourceUtil s] Fetching JDBC Connection from DataSource
              04:27:15.833 DEBUG [task-scheduler-1][org.springframework.jdbc.datasource.DataSourceUtil s] Returning JDBC Connection to DataSource
              04:27:15.833 DEBUG [task-scheduler-1][org.springframework.integration.channel.DirectChan nel] preSend on channel 'messagesOutput', message: [Payload={field1=2360, field2=0}][Headers={timestamp=1372566435833, id=46fdde4b-b63b-4a0c-82dd-0b539c75db8b, correlationId=ec9bac10-f9ac-4765-8401-ea31ccf4b30b, sequenceSize=1, updateresult=[Payload={UPDATED=1}][Headers={timestamp=1372566435817, id=33a53893-8cb6-4d41-b31d-b201b149174f, correlationId=ec9bac10-f9ac-4765-8401-ea31ccf4b30b, sequenceSize=1, sequenceNumber=1}], serverName=lm-XX.-XXX-XX-XXX, sequenceNumber=1}]
              04:27:15.834 DEBUG [task-scheduler-1][org.springframework.integration.handler.MessageHan dlerChain] org.springframework.integration.handler.MessageHan dlerChain#3 received message: [Payload={field1=2360, field2=0}][Headers={timestamp=1372566435833, id=46fdde4b-b63b-4a0c-82dd-0b539c75db8b, correlationId=ec9bac10-f9ac-4765-8401-ea31ccf4b30b, sequenceSize=1, updateresult=[Payload={UPDATED=1}][Headers={timestamp=1372566435817, id=33a53893-8cb6-4d41-b31d-b201b149174f, correlationId=ec9bac10-f9ac-4765-8401-ea31ccf4b30b, sequenceSize=1, sequenceNumber=1}], serverName=lm-XX.-XXX-XX-XXX, sequenceNumber=1}]
              04:27:15.834 DEBUG [task-scheduler-1][org.springframework.integration.splitter.DefaultMe ssageSplitter] org.springframework.integration.splitter.DefaultMe ssageSplitter@1d91eb90 received message: [Payload={field1=2360, field2=0}][Headers={timestamp=1372566435833, id=46fdde4b-b63b-4a0c-82dd-0b539c75db8b, correlationId=ec9bac10-f9ac-4765-8401-ea31ccf4b30b, sequenceSize=1, updateresult=[Payload={UPDATED=1}][Headers={timestamp=1372566435817, id=33a53893-8cb6-4d41-b31d-b201b149174f, correlationId=ec9bac10-f9ac-4765-8401-ea31ccf4b30b, sequenceSize=1, sequenceNumber=1}], serverName=lm-XX.-XXX-XX-XXX, sequenceNumber=1}]
              04:27:15.834 DEBUG [task-scheduler-1][org.springframework.integration.splitter.DefaultMe ssageSplitter] handler 'org.springframework.integration.splitter.DefaultM essageSplitter@1d91eb90' sending reply Message: [Payload={field1=2360, field2=0}][Headers={timestamp=1372566435834, id=2bf6ac81-5afd-4379-9e8c-cdafd47c1281, correlationId=46fdde4b-b63b-4a0c-82dd-0b539c75db8b, sequenceSize=1, sequenceDetails=[[ec9bac10-f9ac-4765-8401-ea31ccf4b30b, 1, 1]], updateresult=[Payload={UPDATED=1}][Headers={timestamp=1372566435817, id=33a53893-8cb6-4d41-b31d-b201b149174f, correlationId=ec9bac10-f9ac-4765-8401-ea31ccf4b30b, sequenceSize=1, sequenceNumber=1}], serverName=lm-XX.-XXX-XX-XXX, sequenceNumber=1}]
              04:27:15.835 DEBUG [task-scheduler-1][org.springframework.integration.channel.DirectChan nel] preSend on channel 'republish', message: [Payload={field1=2360, field2=0}][Headers={timestamp=1372566435834, id=2bf6ac81-5afd-4379-9e8c-cdafd47c1281, correlationId=46fdde4b-b63b-4a0c-82dd-0b539c75db8b, sequenceSize=1, sequenceDetails=[[ec9bac10-f9ac-4765-8401-ea31ccf4b30b, 1, 1]], updateresult=[Payload={UPDATED=1}][Headers={timestamp=1372566435817, id=33a53893-8cb6-4d41-b31d-b201b149174f, correlationId=ec9bac10-f9ac-4765-8401-ea31ccf4b30b, sequenceSize=1, sequenceNumber=1}], serverName=lm-XX.-XXX-XX-XXX, sequenceNumber=1}]
              Last edited by ehrdoctors; Jul 2nd, 2013, 08:03 AM.

              Comment


              • #8
                I am not entirely sure why, but the outbound gateway defaults to only returning one object from the select query.

                I am guessing this was an oversight when the update query was made optional (https://jira.springsource.org/browse/INT-2289). Previously, the gateway was intended to insert/update a row and then (optionally) query it again after the update (for example to get fields auto-filled with defaults by the DBMS).

                It was realized that people might want to do on-demand queries such as yours, but the default of returning just one row was not relaxed (or documented). Please open a JIRA issue for this.

                The good news is you can alter the behavior by setting 'max-rows-per-poll' to increase the count (0 means all rows).

                While the attribute may not have the best name for your case (you are not polling), it has that name for consistency with the inbound adapter (plus the fact that the gateway could be polled).

                Comment


                • #9
                  I'll take the good news, thanks Gary. I will update the code above so it is working example

                  Comment

                  Working...
                  X