Announcement Announcement Module
Collapse
No announcement yet.
<jdbc:inbound-channel-adapter..> how to detect no rows Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • <jdbc:inbound-channel-adapter..> how to detect no rows

    I am using below configuration

    [CODE]
    <jdbc:inbound-channel-adapter
    query="${polling.sql}"
    channel="jdbcChannel" select-sql-parameter-source="parameterSource" data-source="dataSource"
    >
    <poller fixed-rate="60000">
    <transactional/>
    </poller>
    </jdbc:inbound-channel-adapter>

    [CODE]

    This works fine but the problem i have is if it gets no row, there is no way for me to detect that.
    I would like to know and log how many rows it selected and and if it did not select any row for quote some time (some configuration), send an alert email etc..
    Can someone please help me how i can build that logic around <jdbc:inbound-channel-adapter..
    Any sample configuration would be much appreciated.

    Thanks

  • #2
    Hi!

    For more comprehensive answer I need Spring Integration source code.
    So, or I'll answer tomorrow, or you get it from someone other

    But at a glance this statement "some time" suggests, that you should have some AtomicReference<Date> bean and change its value when your select returns rows and your messaging process is run after <jdbc:inbound-channel-adapter>.
    From other side you should have some component, who periodically (cron) takes a look into that "lastUpdate" bean and compare its Date with current time, to decide if there is need to send email.

    Take care,
    Artem

    Comment


    • #3
      Thanks, that is definitely a possibility to track when we got results rather than tracking when we did not get results.

      Please let me know if there are any built in mechanism that i can configure or other ways to achieve this. Its being a mission critical Application i want to on top of things and be able to log or report on each polling cycle to what happened.

      here is the configuration

      Code:
      	<interceptors>
      			<wire-tap channel="logger" />
      		</interceptors>
      	</channel>
      
      	<logging-channel-adapter id="logger" level="DEBUG" />	
      	
      	<jdbc:inbound-channel-adapter 
      		query="${polling.sql}"
      		channel="jdbcChannel" data-source="dataSource">		
      		<poller fixed-rate="60000">
      		<transactional/>
      		</poller>
      	</jdbc:inbound-channel-adapter>
      
      	<channel id="filteredChannel">
      		<queue capacity="10" />
      	</channel>
      	
      	<chain input-channel="jdbcChannel" output-channel="filteredChannel">
      		<splitter ref="splitBean"/>
      		<filter ref="filterBean" method="filter"  discard-channel="rejectedMessagesChannel" />
      	</chain>
      
      	<!-- rejected : begin -->
      	<publish-subscribe-channel id="rejectedMessagesChannel" />
      	
      	<header-enricher input-channel="rejectedMessagesChannel" output-channel="aggregateChannel">
      		<header name="valid" value="false" />
      	</header-enricher>
      
      	<jdbc:outbound-channel-adapter
      		query="update message set status=-1 where id in (:payload[id])"
      		data-source="dataSource" channel="rejectedMessagesChannel" />
      	<!-- rejected : end -->
      
      		
      	<channel id="testChannel">
      		<queue capacity="10" />
      		<interceptors>
      			<wire-tap channel="logger" />
      		</interceptors>
      	</channel>	
      
       
      	<int-jms:outbound-channel-adapter 	
      	channel="testChannel"
      	connection-factory="connectionFactory"
      	id="jmsOut" 	
      	destination="testQueue" extract-payload="true">
      	</int-jms:outbound-channel-adapter>	
      	 
      	 <!-- read it back to test-->
      	 
       <stream:stdout-channel-adapter id="jmsOUT" charset="UTF-8" append-newline="true"/> 
       
       <int-jms:inbound-channel-adapter id="jmsinAdater" connection-factory="connectionFactory" destination="testQueue" channel="jmsOUT"/>

      Comment


      • #4
        I believe Artem meant something like this...

        Code:
        <int-stream:stdin-channel-adapter channel="foo">
        	<int:poller fixed-delay="5000"/>
        </int-stream:stdin-channel-adapter>
        
        <int:publish-subscribe-channel id="foo" />
        
        <int-stream:stdout-channel-adapter channel="foo" order="2" />
        
        <bean id="lastProcessed" class="java.util.concurrent.atomic.AtomicReference">
        	<constructor-arg>
        		<bean class="java.util.Date" />
        	</constructor-arg>
        </bean>
        
        <int:service-activator input-channel="foo" expression="@lastProcessed.set(new java.util.Date())" order="1" />
        	
        <int:inbound-channel-adapter channel="noActivity" 
        		expression="@lastProcessed.get().getTime() &lt; T(System).currentTimeMillis() - 60000 ? 'NOACTIVITY' : null" >
        	<int:poller fixed-rate="30" time-unit="SECONDS" />
        </int:inbound-channel-adapter>
        
        <int:channel id="noActivity" />
        	
        <int-stream:stderr-channel-adapter channel="noActivity" append-newline="true" />
        Hopefully it's self-explanatory; obviously it's using in/out streams instead of real components. The 'lastProcessed' bean is updated before processing each "row".

        Another option might be to use JMX to expose the integration components (see the JMX chapter in the reference manual. And use an attribute polling channel adapter (or an external JMX monitor) to monitor the sent message count on the jdbcChannel to check if it has changed recently.
        Last edited by Gary Russell; Jul 17th, 2013, 02:43 PM.

        Comment


        • #5
          Excellent, Gary! Thanks!
          The prototype is that what I meant: SI becomes a language of programmers

          And right: the main highlights here are <publish-subscribe-channel> with <service-activator>, who changes the Date value and <inbound-channel-adapter>, who checks the Date to start "no rows" message flow.

          Comment


          • #6
            Thanks guys, much appreciated.
            This is my understanding that i will inject lastProcessed been to one of my processing chain component and keep updating lastProcessed every time i process something. Let me know if this is incorrect.

            Thanks!

            Comment


            • #7
              Yes, that will work too; then you don't need the pub/sub or service to update the time stamp, just the inbound channel adapter to generate a message if the condition is true.

              Comment


              • #8
                Thanks Guys, i have few vary basic questions if you please help to answer.

                1) in below configuration what really is the significance of queue capacity? what is going to happen if the jdbc poller produced thousands of rows (this is possible when you start after a long down time), how will this case be handled by the spring integration internally or what i need to do to make sure it does not fail and get through quickly.
                <channel id="filteredChannel">
                <queue capacity="10" />
                </channel>

                2) I need to poll using multiple sqls as types of messages are coming from different tables. I want to poll them sequentially, Poll for type A, than followed by type B, than followed by type C...and that the cycle repeats at a set interval, Can you please let me know how i can have this polling behavior just via configuration.
                I believe i can have 3 jdbc polllers configured all outputting to same channel, but than its difficult to coordinate that they work one by one following a set pattern. Not sure what the best approach should be.

                Thanks

                Thanks

                Comment


                • #9
                  1. The thread will block if the queue is full.

                  However, you can't use transactions with QueueChannels - the transaction will commit as soon as the message is successfully sent to the queue in the channel and the downstream flow won't be part of the transaction. See http://static.springsource.org/sprin...nsactions.html for more information.

                  Another way to limit the work (if there's lots to do) is to use a Pooled task executor on the <poller/>; that way you'll never have more than the pool size threads running.

                  2. For a complex situation like that you might be better writing a custom POJO using a JmsTemplate the poll the tables you want, depending on the current state. Then using a simple <int:inbound-channel-adapter/> to poll your POJO instead.

                  Something like
                  Code:
                  public Map<?, ?> getData() {
                  
                     if (timeToPollForA) {
                         Map<?,?> foo = this.jdbcTemplate.queryForMap(...);
                     }
                     else {
                  
                  ...
                  }

                  Comment


                  • #10
                    Thanks GAry.
                    Where can i find a sample or more details around how to write a custom POJO that can be polled using a int:inbound-channel-adapter. is there any interface that the POJO needs to implement, I will research this more using docs but if you could point me to something helpful that would be great.

                    Thanks

                    Comment


                    • #11
                      No; POJO means "Plain Old Java Object" - the framework will figure it out.

                      If the POJO has more than one public method you have to guide the framework by adding a method attribute to the adapter.



                      Code:
                      <int:inbound-channel-adapter method="foo" channel="input">
                          <bean class="foo.MyPojo">
                              <property name="jdbcTemplate ref="jdbcTemplate" />
                          </bean>
                          <int:poller .../>
                      </int:inbound-channel-adapter>
                      or

                      Code:
                      <int:inbound-channel-adapter ref="myPojo" method="foo" channel="input">
                          <int:poller .../>
                      </int:inbound-channel-adapter>
                      
                      <bean id="myPojo" class="foo.myPojo">
                          <property name="jdbcTemplate ref="jdbcTemplate" />
                      </bean>
                      The object returned by the method becomes the payload of the message.

                      Comment


                      • #12
                        Ok, great.
                        Thanks!!

                        Comment


                        • #13
                          Hello Guys,

                          I am having the same question as rosadom had but slightly different...

                          requirement is : if there are no rows in db to process, increase polling time otherwise use default polling time.

                          same way as finding "no rows" message as well as change the poller timing too.

                          one implement i found for dynamic poller from the spring integration samples is as below

                          Code:
                          <jdbc:embedded-database id="datasource" type="H2">
                          <jdbc:script location="classpath:setup-tables.sql"/>
                          </jdbc:embedded-database>
                          
                          
                          <int-jdbc:inbound-channel-adapter id="readDatabaseResults"
                          query="select * from person"
                          channel="databaseResults"
                          row-mapper="rowMapper"
                          data-source="datasource"
                          max-rows-per-poll="1"
                          update="delete from person where id = :personId"
                          >
                          
                          
                          
                          <int:poller trigger="dynamicTrigger"/>
                          
                          </int-jdbc:inbound-channel-adapter>
                          
                          <bean id="rowMapper" class="org.springframework.integration.samples.jdb c.PersonMapper"></bean>
                          
                          <int:channel id="databaseResults"/>
                          
                          <int:service-activator input-channel="databaseResults" ref="personService" method="process"/>
                          
                          <bean id="personService" class="org.springframework.integration.samples.jdb c.TestService"/>
                          
                          <bean id="dynamicTrigger"
                          class="org.springframework.integration.samples.jdb c.DynamicPeriodicTrigger">
                          <constructor-arg name="period" value="5000" />
                          </bean>
                          The only confusion I have is how to determine "no rows" to process and then change the polling interval so that it cant hit database hard.

                          Mohsin

                          Comment


                          • #14
                            how to determine "no rows" to process
                            <poller> has <advice-chain> sub-element, so can implement your own advice, e.g. AfterReturningAdvice and check the returnValue, if it true or false. Because the <advice-chain> is applied for boolean doPoll() method of AbstractPollingEndpoint, where true means the message was received and sended, false - otherwise.

                            And further you can change the polling interval from this Advice.

                            Comment


                            • #15
                              Thank you for prompt response even on weekend !

                              i have implemented Advice as follows

                              Code:
                               <int:poller trigger="dynamicTrigger">
                                      <int:advice-chain>
                                          <bean class="org.springframework.integration.samples.jdbc.Advice"/>
                                      </int:advice-chain>
                                  </int:poller>
                              
                              public class Advice implements AfterReturningAdvice {
                                  @Override
                                  public void afterReturning(Object o, Method method, Object[] objects, Object o2) throws Throwable {
                                      if( !(Boolean) o){
                                          // Change poller interval
                              
                                         
                                      }
                              
                                  }
                              }
                              but have confusion that how to change poller interval ?

                              tried to debug and it is(trigger) coming as AbstractPollingEndpoint instance.

                              any pointer will be greatly appreciated

                              Comment

                              Working...
                              X