Announcement Announcement Module
Collapse
No announcement yet.
Query regarding poller ....!! Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Query regarding poller ....!!

    Hi,

    I have one query regarding poller as I have studied the poller that it is sort of a daemon thread that keep polling the source and will grab the meaasge from source within a define time period and if no poller is defined then a single default poller must be registered ...!!

    I am developing an application in which I have done the configuration in xml file also there the particular tag in which global level of poller is defined that I haven't understood completely..please guide me what its functinality is there ....the tag is ...


    Code:
    <si:poller max-messages-per-poll="1" id="defaultPoller" default="true">
    		<si:interval-trigger interval="${pollerInterval}" />
    	</si:poller>

    The complete configuration of the xml file is ....
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:si="http://www.springframework.org/schema/integration"
    	xmlns:stream="http://www.springframework.org/schema/integration/stream"
    	xmlns:jms="http://www.springframework.org/schema/integration/jms"
    	xmlns:task="http://www.springframework.org/schema/task"
    	xmlns:file="http://www.springframework.org/schema/integration/file"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans
    			http://www.springframework.org/schema/beans/spring-beans.xsd
    			http://www.springframework.org/schema/task
    			http://www.springframework.org/schema/task/spring-task-3.0.xsd
    			http://www.springframework.org/schema/integration
    			http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    			http://www.springframework.org/schema/integration/jms 
    			http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd 
    			http://www.springframework.org/schema/integration/stream
    			http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
    			http://www.springframework.org/schema/integration/file
    			http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd">
    			
    			
    			
    	<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 
    		<property name="location">
    			<value>config.properties</value>
    		</property>
    	</bean>			
    			
    		<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    		<property name="brokerURL">
    			<value>tcp://localhost:61616</value>
    		</property>
    	</bean>
    
    	<si:channel id="input">
    		
    	</si:channel>
    			
    		 <jms:outbound-channel-adapter id="channel-to-mq"
    			channel="input" destination-name="${queueName}" /> 
    
    
    		<jms:message-driven-channel-adapter
    			id="mq-message-listner" channel="aggregator-input-channel"
    			destination-name="${queueName}" concurrent-consumers="${concurrent-consumers}"   />	
    			
    
    	<!-- <si:gateway id="gateway" service-interface="com.sample.agg.Gateway"
    		default-request-channel="aggregator-input-channel">
    	</si:gateway> -->
    	
    	<si:channel id="aggregator-input-channel">
    		<si:queue capacity="${queueCapacity}" />
    	</si:channel>
    	
    	<si:aggregator id="aggregator" input-channel="aggregator-input-channel" output-channel="aggregator-output-channel"
    		
    		ref="sampleAggregator" method="aggregateMessagaes"  		
    		correlation-strategy="correlationBean" correlation-strategy-method="correlationStrategy" 
    		release-strategy="releaseStrategyBean" release-strategy-method="releaseStrategy" 
    		message-store="messageStore"
    		order="1" 
    		send-partial-result-on-expiry="true"
    		
    		send-timeout="1000"
    		>
    	</si:aggregator>
    	
    	<bean id="sampleAggregator" class="com.walgreens.ods.producer.Aggregator"></bean> <!-- This bean clubs the List of  messages   -->
    	
    	<bean id="correlationBean" class="com.walgreens.ods.producer.CorrelationBean"/>
    	
    	<bean id="releaseStrategyBean" class="com.walgreens.ods.producer.ReleaseStrategyBean"> <!--This bean if results to true then messages are released  -->
    	<property name="recordLength" value="${recordLength}" />	
    	</bean>
    	
    	<!-- <si:channel id="throwAwayChannel"	>		
    	<si:queue capacity="${queueCapacity}" />
    	</si:channel> -->
    	
    	
    		<file:outbound-channel-adapter channel="aggregator-output-channel"
    			directory="${aggregatorOutputDirectoryPath}" filename-generator-expression="${outputFileNameExpression}"
    			temporary-file-suffix="_swp"   />	
    	
    		<!-- <file:outbound-channel-adapter channel="throwAwayChannel"
    			directory="${aggregatorOutputDirectoryPath}" filename-generator-expression="'discard-'+headers.getTimestamp()+'.xml'"
    			temporary-file-suffix="_swp"   />	 -->
    	
    	
    	<si:channel id="aggregator-output-channel">
    		<si:queue capacity="${queueCapacity}" />
    	</si:channel>
    	
    	<task:scheduler id="scheduler" />
    	 	
    	<task:scheduled-tasks scheduler="scheduler">
    		<task:scheduled ref="reaper" method="run" fixed-rate="${reaperSchedulerFixedRate}" />
    	</task:scheduled-tasks> 
    
    	<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    			<property name="messageGroupStore" ref="messageStore"  />
    		 	
    		<property name="timeout" value="${reaperTimeOut}" />
    		
    	</bean>
    	
     <bean id="messageStore" class="org.springframework.integration.store.SimpleMessageStore" />
    	
    <!-- 	
    	<si:service-activator input-channel="aggregator-output-channel" ref="printerService" method="printString"  ></si:service-activator>
    	
    	<bean id="printerService" class="com.sample.agg.PrinterService"></bean>
    --> 	
    
    	<si:poller max-messages-per-poll="1" id="defaultPoller" default="true">
    		<si:interval-trigger interval="${pollerInterval}" />
    	</si:poller>	
    <!-- <si:inbound-channel-adapter id="test" ref="exampleReleaseStrategy" method="releaseStrategy" >
    <si:poller fixed-rate="5000"/>
    </si:inbound-channel-adapter> -->	
    	
    
    </beans>
    The structure of the properties file is ....


    Code:
    # Spring bean values
    
    queueName=RODS.APP.DATA.IN2
    concurrent-consumers=10
    queueCapacity=100
    recordLength=10
    aggregatorOutputDirectoryPath=C:/ODS_DEV_DIR/temp
    outputFileNameExpression='output-'+headers.getTimestamp()+'.xml'
    reaperSchedulerFixedRate=10000
    #reaperTimeOut=30000
    #reaperTimeOut=3000000
    reaperTimeOut=30
    #pollerInterval=1500
    pollerInterval=10
    Please guide me for the global level configuration of poller that is being implemented here..!!

  • #2
    Here it is:
    http://static.springsource.org/sprin...hannel-adapter
    http://static.springsource.org/sprin...ollingconsumer

    Comment


    • #3
      Hi oleg zhurakousky,
      Sorry to say but I have gone through the refernces first ther are amazing but I couldn't grasp that much from them ...could you please guide me wats going on inside the below code....
      Code:
      <si:poller max-messages-per-poll="1" id="defaultPoller" default="true">
      		<si:interval-trigger interval="${pollerInterval}" />
      	</si:poller>
      iT WILL HELP ME TO MAKE MY UNDERSTANDING MORE CLEAR, tHANKS iN advance..!!

      Comment


      • #4
        Why don't you just remove it (comment it out) and see what happens.

        Hint: You'll get an error message that will explain all (if you really need a default poller).

        Comment


        • #5
          Have a look at this in org.springframework.integration.config.SourcePolli ngChannelAdapterFactoryBean

          Code:
          if (this.pollerMetadata == null) {
          	this.pollerMetadata = IntegrationContextUtils.getDefaultPollerMetadata(this.beanFactory);
          	Assert.notNull(this.pollerMetadata, "No poller has been defined for channel-adapter '"
          	+ this.beanName + "', and no default poller is available within the context.");
          }
          and

          Code:
          PollingConsumer pollingConsumer = new PollingConsumer((PollableChannel) channel, this.handler);
          if (this.pollerMetadata == null) {
          	this.pollerMetadata = IntegrationContextUtils.getDefaultPollerMetadata(this.beanFactory);
          	Assert.notNull(this.pollerMetadata, "No poller has been defined for endpoint '" + this.beanName
          		+ "', and no default poller is available within the context.");
          }
          in org.springframework.integration.config.ConsumerEnd pointFactoryBean (the above piece of code is relevant only in case of PollableChannels)

          Also have a look at org.springframework.integration.config.xml.PollerP arser which parses the top level poller definitions

          the method resolveId of this class is particularly interesting which gives an id based on whether default attribute is present or not.


          See how these above things relate to each other and perhaps that answers your question.

          Comment

          Working...
          X