Announcement Announcement Module
Collapse
No announcement yet.
6 messages are not being released Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • 6 messages are not being released

    Hi ,

    I was working on application that makes use of aggregator that takes it input from a input channel and then aggregate the messages and then after that it throws to the destination channel.

    the agregator release stratergy is such that when mesage count reaches to 10 , it releases them..so for example if you send 20 messages it will release them in the group of 10 so finally 2 groups will be there (10*2)

    now my question is such that if you send 6 messages only instaesd of 20 then aggreagtor would keep waiting as it expects 20 messages but for this i have configured MessageGroupStoreReaper and in configuration section and I have also configured send-partial-result-on-expiry="true" in aggregator, Now when I am sending six messages , the program goes in loop and it doesn't releases those 6 messages, please guide me where the things are not going perfect..!!

    the configuration file is shown below.....

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:si="http://www.springframework.org/schema/integration"
    	xmlns:stream="http://www.springframework.org/schema/integration/stream"
    	xmlns:task="http://www.springframework.org/schema/task"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans
    			http://www.springframework.org/schema/beans/spring-beans.xsd
    			http://www.springframework.org/schema/task
    			http://www.springframework.org/schema/task/spring-task-3.0.xsd
    			http://www.springframework.org/schema/integration
    			http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    			http://www.springframework.org/schema/integration/stream
    			http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
    
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    		<property name="brokerURL">
    			<value>tcp://localhost:61616</value>
    		</property>
    	</bean>
    	
    	
    	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    		<property name="maxConnections" value="8" />
    		<property name="maximumActive" value="500" />
    		<property name="connectionFactory" ref="connectionFactory" />
    	</bean>
    
    
    	<si:gateway id="gateway" service-interface="com.sample.agg.Gateway"
    		default-request-channel="aggregator-input-channel"  >
    	</si:gateway>
    	
    	<si:channel id="aggregator-input-channel">
    		<si:queue capacity="100" />
    	</si:channel>
    	
    	<si:aggregator id="exampleAggregator" input-channel="aggregator-input-channel" output-channel="aggregator-output-channel" 
    		ref="sampleAggregator" method="aggregateMessagaes"  send-partial-result-on-expiry="true"
    		correlation-strategy="exampleCorrelationBean" correlation-strategy-method="correlationStrategy" 
    		release-strategy="exampleReleaseStrategy" release-strategy-method="releaseStrategy" 
    		message-store="exampleMessageStore" >
    		
    	</si:aggregator>
    	
    	<bean id="sampleAggregator" class="com.sample.agg.ExampleAggregator"></bean>
    	<bean id="exampleCorrelationBean" class="com.sample.agg.ExampleCorrelationBean" />
    	<bean id="exampleReleaseStrategy" class="com.sample.agg.ExampleReleaseStrategy"></bean>
    	
    	<bean id="exampleMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
    		
    	<bean id="exampleReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    			<property name="messageGroupStore" ref="exampleMessageStore" />
    <!--		<property name="timeout" value="5000" />-->
    	</bean>
    	
    	<task:scheduler id="exampleScheduler" />
    
    	<task:scheduled-tasks scheduler="exampleScheduler">
    		<task:scheduled ref="exampleReaper" method="run" fixed-rate="1000" />
    	</task:scheduled-tasks>
    	
    	<si:channel id="aggregator-output-channel">
    		<si:queue capacity="1000" />
    	</si:channel>
    	
    	<si:service-activator input-channel="aggregator-output-channel" ref="printerService" method="printString"></si:service-activator>
    	
    	<bean id="printerService" class="com.sample.agg.PrinterService"></bean>
    	
    	<si:poller max-messages-per-poll="1" id="defaultPoller" default="true">
    		<si:interval-trigger interval="1000" />
    	</si:poller>
    
    </beans>

    the main application from where I am sending the messages are shown below....

    Code:
    package com.sample.agg;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.integration.Message;
    import org.springframework.integration.support.MessageBuilder;
    
    
    public class TestApp {
    	
    	public static void main(String[] args) throws InterruptedException{
    		 //ApplicationContext context = new ClassPathXmlApplicationContext("resources/config.xml");
    		 ApplicationContext context = new ClassPathXmlApplicationContext("config.xml");
    		 
    		 Gateway gate=(Gateway) context.getBean("gateway");
    		 for(int i=0;i<6;i++){
    			 Message<String> msg=MessageBuilder.withPayload("My Sample Message:"+i).build();
    			 Thread.sleep(1000);
    			 gate.send(msg);
    		 }
    		 
    	}
    
    }

    the correlation bean is shown below...

    Code:
    package com.sample.agg;
    
    import org.springframework.integration.Message;
    
    public class ExampleCorrelationBean {
    	//Not grouping based on correlation ids 
    	public String correlationStrategy(Message<String> request){
    		return "NO GROUPING";
    	}
    }
    the release strategy bean is shown below...

    Code:
    package com.sample.agg;
    
    import java.util.List;
    
    import org.springframework.integration.Message;
    
    public class ExampleReleaseStrategy {
    	//Releases when message count reaches 10
    	public boolean releaseStrategy(List<Message<String>> requests) {
    		if(requests.size()==10){
    			System.out.println("Message count reached 10, so released for aggregation");
    			return true;
    		}
    		return false;
    	  }
    }
    I am also sending the zip of the application , please advise me why it is not releasing those 6 messages ..!!

  • #2
    The reaper timeout is commented out; the default timeout for the reaper is infinity so it will never reap if you don't set a timeout.

    Before posting questoins like this here you should really do some more work on your own. Spring Integration provides copious amounts of logging to help you understand what is going on in your application; just set the org.springframework.integration logger level to DEBUG. For example, in the reaper...

    Code:
    /**
     * Expire all message groups older than the {@link #setTimeout(long) timeout} provided. Normally this method would
     * be executed by a scheduled task.
     */
    public void run() {
    	if (timeout >= 0) {
    		if (logger.isDebugEnabled()) {
    			logger.debug("Expiring all messages older than timeout=" + timeout + " from message group store: "
    					+ messageGroupStore);
    		}
    		messageGroupStore.expireMessageGroups(timeout);
    	}
    }

    Comment


    • #3
      Hi Gary,
      Thanks for the explnation one thing I want to know ...that you have guided....
      The reaper timeout is commented out; the default timeout for the reaper is infinity so it will never reap if you don't set a timeout.
      so if I uncomment that reaper timeout will it work..!!!
      Last edited by SARAL SAXENA; Nov 13th, 2011, 11:01 AM.

      Comment


      • #4
        Did you try? Is it still not working after uncommenting it?

        Comment


        • #5
          Originally posted by SARAL SAXENA View Post
          Hi Gary,
          Thanks for the explnation one thing I want to know ...that you have guided....

          so if I uncomment that reaper timeout will it work..!!!
          Hi Gary,

          Thanks a lot the code works perfectly, you have guided perfectly it was because of that comment prblm thanks a lot again, now the only one issues was that rite now I was pushing only 6 messages but when I enter say 1900 messages my program got stuck, do we have to any sort of load balancing since this time I am entering messages in bulk although in configuration xml I have increased the capacity of queue channel , but still my program got stuck, please advise me on this,,,!!


          package com.sample.agg;

          import org.springframework.context.ApplicationContext;
          import org.springframework.context.support.ClassPathXmlAp plicationContext;
          import org.springframework.integration.Message;
          import org.springframework.integration.support.MessageBui lder;


          public class TestApp {

          public static void main(String[] args) throws InterruptedException{
          //ApplicationContext context = new ClassPathXmlApplicationContext("resources/config.xml");
          ApplicationContext context = new ClassPathXmlApplicationContext("config.xml");

          Gateway gate=(Gateway) context.getBean("gateway");
          for(int i=1;i<1900;i++){
          Message<String> msg=MessageBuilder.withPayload("My Sample Message:"+i).build();
          Thread.sleep(1000);
          gate.send(msg);
          }

          }

          }
          Below is the structure of the configuration xml file is..
          Code:
          <?xml version="1.0" encoding="UTF-8"?>
          <beans xmlns="http://www.springframework.org/schema/beans"
          	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:si="http://www.springframework.org/schema/integration"
          	xmlns:stream="http://www.springframework.org/schema/integration/stream"
          	xmlns:task="http://www.springframework.org/schema/task"
          	xsi:schemaLocation="http://www.springframework.org/schema/beans
          			http://www.springframework.org/schema/beans/spring-beans.xsd
          			http://www.springframework.org/schema/task
          			http://www.springframework.org/schema/task/spring-task-3.0.xsd
          			http://www.springframework.org/schema/integration
          			http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
          			http://www.springframework.org/schema/integration/stream
          			http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
          
          <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
          		<property name="brokerURL">
          			<value>tcp://localhost:61616</value>
          		</property>
          	</bean>
          	
          	
          	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
          		<property name="maxConnections" value="8" />
          		<property name="maximumActive" value="500" />
          		<property name="connectionFactory" ref="connectionFactory" />
          	</bean>
          
          
          	<si:gateway id="gateway" service-interface="com.sample.agg.Gateway"
          		default-request-channel="aggregator-input-channel"  >
          	</si:gateway>
          	
          	<si:channel id="aggregator-input-channel">
          		<si:queue capacity="2050" />
          	</si:channel>
          	
          	<si:aggregator id="exampleAggregator" input-channel="aggregator-input-channel" output-channel="aggregator-output-channel" 
          		ref="sampleAggregator" method="aggregateMessagaes"  send-partial-result-on-expiry="true"
          		correlation-strategy="exampleCorrelationBean" correlation-strategy-method="correlationStrategy" 
          		release-strategy="exampleReleaseStrategy" release-strategy-method="releaseStrategy" 
          		message-store="exampleMessageStore" >
          		
          	</si:aggregator>
          	
          	<bean id="sampleAggregator" class="com.sample.agg.ExampleAggregator"></bean>
          	<bean id="exampleCorrelationBean" class="com.sample.agg.ExampleCorrelationBean" />
          	<bean id="exampleReleaseStrategy" class="com.sample.agg.ExampleReleaseStrategy"></bean>
          	
          	<bean id="exampleMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
          		
          	<bean id="exampleReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
          			<property name="messageGroupStore" ref="exampleMessageStore" />
          		<property name="timeout" value="5000" />
          	</bean>
          	
          	<task:scheduler id="exampleScheduler" />
          
          	<task:scheduled-tasks scheduler="exampleScheduler">
          		<task:scheduled ref="exampleReaper" method="run" fixed-rate="1000" />
          	</task:scheduled-tasks>
          	
          	<si:channel id="aggregator-output-channel">
          		<si:queue capacity="2050" />
          	</si:channel>
          	
          	<si:service-activator input-channel="aggregator-output-channel" ref="printerService" method="printString"></si:service-activator>
          	
          	<bean id="printerService" class="com.sample.agg.PrinterService"></bean>
          	
          	<si:poller max-messages-per-poll="1" id="defaultPoller" default="true">
          		<si:interval-trigger interval="1000" />
          	</si:poller>
          
          </beans>
          I am entering messages in bulk although in configuration xml I have increased the capacity of queue channel , but still my program got stuck, please advise me on this,,,!
          Last edited by SARAL SAXENA; Nov 14th, 2011, 07:48 AM.

          Comment


          • #6
            Originally posted by SARAL SAXENA View Post
            Hi Gary,

            Thanks a lot the code works perfectly, you have guided perfectly it was because of that comment prblm thanks a lot again, now the only one issues was that rite now I was pushing only 6 messages but when I enter say 1900 messages my program got stuck, do we have to any sort of load balancing since this time I am entering messages in bulk although in configuration xml I have increased the capacity of queue channel , but still my program got stuck, please advise me on this,,,!!




            Below is the structure of the configuration xml file is..
            Code:
            <?xml version="1.0" encoding="UTF-8"?>
            <beans xmlns="http://www.springframework.org/schema/beans"
            	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:si="http://www.springframework.org/schema/integration"
            	xmlns:stream="http://www.springframework.org/schema/integration/stream"
            	xmlns:task="http://www.springframework.org/schema/task"
            	xsi:schemaLocation="http://www.springframework.org/schema/beans
            			http://www.springframework.org/schema/beans/spring-beans.xsd
            			http://www.springframework.org/schema/task
            			http://www.springframework.org/schema/task/spring-task-3.0.xsd
            			http://www.springframework.org/schema/integration
            			http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
            			http://www.springframework.org/schema/integration/stream
            			http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
            
            <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            		<property name="brokerURL">
            			<value>tcp://localhost:61616</value>
            		</property>
            	</bean>
            	
            	
            	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
            		<property name="maxConnections" value="8" />
            		<property name="maximumActive" value="500" />
            		<property name="connectionFactory" ref="connectionFactory" />
            	</bean>
            
            
            	<si:gateway id="gateway" service-interface="com.sample.agg.Gateway"
            		default-request-channel="aggregator-input-channel"  >
            	</si:gateway>
            	
            	<si:channel id="aggregator-input-channel">
            		<si:queue capacity="2050" />
            	</si:channel>
            	
            	<si:aggregator id="exampleAggregator" input-channel="aggregator-input-channel" output-channel="aggregator-output-channel" 
            		ref="sampleAggregator" method="aggregateMessagaes"  send-partial-result-on-expiry="true"
            		correlation-strategy="exampleCorrelationBean" correlation-strategy-method="correlationStrategy" 
            		release-strategy="exampleReleaseStrategy" release-strategy-method="releaseStrategy" 
            		message-store="exampleMessageStore" >
            		
            	</si:aggregator>
            	
            	<bean id="sampleAggregator" class="com.sample.agg.ExampleAggregator"></bean>
            	<bean id="exampleCorrelationBean" class="com.sample.agg.ExampleCorrelationBean" />
            	<bean id="exampleReleaseStrategy" class="com.sample.agg.ExampleReleaseStrategy"></bean>
            	
            	<bean id="exampleMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
            		
            	<bean id="exampleReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
            			<property name="messageGroupStore" ref="exampleMessageStore" />
            		<property name="timeout" value="5000" />
            	</bean>
            	
            	<task:scheduler id="exampleScheduler" />
            
            	<task:scheduled-tasks scheduler="exampleScheduler">
            		<task:scheduled ref="exampleReaper" method="run" fixed-rate="1000" />
            	</task:scheduled-tasks>
            	
            	<si:channel id="aggregator-output-channel">
            		<si:queue capacity="2050" />
            	</si:channel>
            	
            	<si:service-activator input-channel="aggregator-output-channel" ref="printerService" method="printString"></si:service-activator>
            	
            	<bean id="printerService" class="com.sample.agg.PrinterService"></bean>
            	
            	<si:poller max-messages-per-poll="1" id="defaultPoller" default="true">
            		<si:interval-trigger interval="1000" />
            	</si:poller>
            
            </beans>
            I am entering messages in bulk although in configuration xml I have increased the capacity of queue channel , but still my program got stuck, please advise me on this,,,!
            Please advise me on this ...!! Thanks in advance..!!

            Comment


            • #7
              As I said before, you need to turn on DEBUG logging for org.springframework.integration (e.g. in log4j.xml) and figure out what's happening to your messages.

              Comment


              • #8
                Originally posted by Gary Russell View Post
                As I said before, you need to turn on DEBUG logging for org.springframework.integration (e.g. in log4j.xml) and figure out what's happening to your messages.
                Hi Gary,

                I have enabled the logs and inside logs I can check that it is going inside the release stratergy but it is stuck there , since the number of messages are 1900 this time earlier it was 6 so the process was flowing smoothly..!! please guide how to overcome from this prblm..!!

                Comment


                • #9
                  This forum is not intended to be a tutorial on how to debug programs but, why don't you add some debugging logs to your release strategy?

                  (Hint: requests.size()).

                  If you turn on TRACE level, you'll see even more detail from the Aggregator...

                  Code:
                  if (logger.isTraceEnabled()) {
                  	logger.trace("Adding message to group [ " + messageGroup + "]");
                  }
                  Together with extra debug logging in your release strategy, you should be able to see what's happening.

                  Comment


                  • #10
                    Hi Gary,

                    while when I send the 200 messages the messages are getting enqued into the queue properly into the queue...after that a file has to be created but those file is not created in the case of 200 messages for example lets say I have the release starergy such that when message group reaches to 10 messages it should be get released and in such case if I send say 16 messages then first 10 should be successfully released and 1 file of those 10 messages is created in a directory and of the renaming six those are discarded 1 separate file of them also should be get created , in the case 16 messages this scenerio works perfectly , I have tested it on my end but when I increase the message size to 200 then My program got stuck...please advise me on this...I am attaching the sample code of this also...!! please look and advise me that why files process creation error is got stuck up in case of 200 mesaages..!!

                    the structure of the main files where if I increase the messages to 20

                    Code:
                    package com.walgreens.ods.producer;
                    
                    
                    import org.springframework.context.support.ClassPathXmlApplicationContext;
                    import org.springframework.integration.MessageChannel;
                    import org.springframework.integration.support.MessageBuilder;
                    
                    public class BatchRequestFileCreator {
                    	public static void main(String[] args) {
                    
                    		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                    				"spring-config.xml");
                    
                    		context.start();
                    		
                    		MessageChannel input = (MessageChannel) context.getBean("input");		
                    		
                    		for (int i =0; i< 19; i++){
                    			input.send(MessageBuilder
                    					.withPayload(
                    					"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
                    					+"<RetailODS xmlns=\"http://www.walgreens.com/schema/RetailODS\""
                    					+" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\""
                    					+" xsi:schemaLocation=\"http://www.walgreens.com/schema/RetailODS RetailODS.xsd\">"
                    					+"<StoreData>"
                    					+"<Store><Number>"+i+""+i+""+i+""+i+""+"</Number></Store><Pricing><WIC>000000</WIC><UPC>00000001411009</UPC><RegularPrice><Amount>000000</Amount><Quantity>01</Quantity><Type>1</Type><UnitAmount>000299</UnitAmount><MixMatchCode>0000</MixMatchCode></RegularPrice><SalePrice><Amount>000000</Amount><Quantity>00</Quantity><Type>0</Type><UnitAmount>000000</UnitAmount><MixMatchCode>0000</MixMatchCode></SalePrice><Coupon><type>00</type><family>0000</family></Coupon><Coupon><type>00</type><family>0000</family></Coupon><Coupon><type>00</type><family>0000</family></Coupon><Coupon><type>00</type><family>0000</family></Coupon><Coupon><type>00</type><family>0000</family></Coupon><Coupon><type>00</type> <family>0000</family></Coupon><Coupon><type>00</type><family>0000</family></Coupon><Coupon><type>00</type><family>0000</family></Coupon><Reference>000</Reference></Pricing><Inventory><PLN>224405</PLN><WIC>000000</WIC><UPC>00000001411009</UPC><OnHands>00002</OnHands><AvailableOnHands>00002</AvailableOnHands><OnOrder></OnOrder><Timestamp>08-03-2011 07:00:00</Timestamp></Inventory>" 
                    					+"</StoreData>"
                    					+"</RetailODS>"							
                    				).setCorrelationId("SERVICEID3_2011-09-24 16:23:28.593")
                    					.build() );
                    			
                    		}			
                    
                    	}
                    	
                    	
                    
                    }
                    The structure of the configuration file is ...

                    Code:
                    <?xml version="1.0" encoding="UTF-8"?>
                    <beans xmlns="http://www.springframework.org/schema/beans"
                    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:si="http://www.springframework.org/schema/integration"
                    	xmlns:stream="http://www.springframework.org/schema/integration/stream"
                    	xmlns:jms="http://www.springframework.org/schema/integration/jms"
                    	xmlns:task="http://www.springframework.org/schema/task"
                    	xmlns:file="http://www.springframework.org/schema/integration/file"
                    	xsi:schemaLocation="http://www.springframework.org/schema/beans
                    			http://www.springframework.org/schema/beans/spring-beans.xsd
                    			http://www.springframework.org/schema/task
                    			http://www.springframework.org/schema/task/spring-task-3.0.xsd
                    			http://www.springframework.org/schema/integration
                    			http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
                    			http://www.springframework.org/schema/integration/jms 
                    			http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd 
                    			http://www.springframework.org/schema/integration/stream
                    			http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
                    			http://www.springframework.org/schema/integration/file
                    			http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd">
                    			
                    			
                    			
                    	<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 
                    		<property name="location">
                    			<value>config.properties</value>
                    		</property>
                    	</bean>			
                    			
                    		<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
                    		<property name="brokerURL">
                    			<value>tcp://localhost:61616</value>
                    		</property>
                    	</bean>
                    
                    	<si:channel id="input">
                    		
                    	</si:channel>
                    			
                    		 <jms:outbound-channel-adapter id="channel-to-mq"
                    			channel="input" destination-name="${queueName}" /> 
                    
                    
                    		<jms:message-driven-channel-adapter
                    			id="mq-message-listner" channel="aggregator-input-channel"
                    			destination-name="${queueName}" concurrent-consumers="${concurrent-consumers}"   />	
                    			
                    
                    	<!-- <si:gateway id="gateway" service-interface="com.sample.agg.Gateway"
                    		default-request-channel="aggregator-input-channel">
                    	</si:gateway> -->
                    	
                    	<si:channel id="aggregator-input-channel">
                    		<si:queue capacity="${queueCapacity}" />
                    	</si:channel>
                    	
                    	<si:aggregator id="aggregator" input-channel="aggregator-input-channel" output-channel="aggregator-output-channel"
                    	discard-channel="throwAwayChannel"		
                    		ref="sampleAggregator" method="aggregateMessagaes"  		
                    		correlation-strategy="correlationBean" correlation-strategy-method="correlationStrategy" 
                    		release-strategy="releaseStrategyBean" release-strategy-method="releaseStrategy" 
                    		message-store="messageStore"
                    		order="1" 
                    		send-partial-result-on-expiry="false"
                    		
                    		send-timeout="1000"
                    		>
                    	</si:aggregator>
                    	
                    	<bean id="sampleAggregator" class="com.walgreens.ods.producer.Aggregator"></bean> <!-- This bean clubs the List of  messages   -->
                    	
                    	<bean id="correlationBean" class="com.walgreens.ods.producer.CorrelationBean"/>
                    	
                    	<bean id="releaseStrategyBean" class="com.walgreens.ods.producer.ReleaseStrategyBean"> <!--This bean if results to true then messages are released  -->
                    	<property name="recordLength" value="${recordLength}" />	
                    	</bean>
                    	
                    	<si:channel id="throwAwayChannel"	>		
                    	<si:queue capacity="${queueCapacity}" />
                    	</si:channel>
                    	
                    	
                    		<file:outbound-channel-adapter channel="aggregator-output-channel"
                    			directory="${aggregatorOutputDirectoryPath}" filename-generator-expression="${outputFileNameExpression}"
                    			temporary-file-suffix="_swp"   />	
                    	
                    		<file:outbound-channel-adapter channel="throwAwayChannel"
                    			directory="${aggregatorOutputDirectoryPath}" filename-generator-expression="'discard-'+headers.getTimestamp()+'.xml'"
                    			temporary-file-suffix="_swp"   />	
                    	
                    	
                    	<si:channel id="aggregator-output-channel">
                    		<si:queue capacity="${queueCapacity}" />
                    	</si:channel>
                    	
                    	<task:scheduler id="scheduler" />
                    	 	
                    	<task:scheduled-tasks scheduler="scheduler">
                    		<task:scheduled ref="reaper" method="run" fixed-rate="${reaperSchedulerFixedRate}" />
                    	</task:scheduled-tasks> 
                    
                    	<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
                    			<property name="messageGroupStore" ref="messageStore"  />
                    		 	
                    		<property name="timeout" value="${reaperTimeOut}" />
                    		
                    	</bean>
                    	
                     <bean id="messageStore" class="org.springframework.integration.store.SimpleMessageStore" />
                    	
                    <!-- 	
                    	<si:service-activator input-channel="aggregator-output-channel" ref="printerService" method="printString"  ></si:service-activator>
                    	
                    	<bean id="printerService" class="com.sample.agg.PrinterService"></bean>
                    --> 	
                    
                    	<si:poller max-messages-per-poll="1" id="defaultPoller" default="true">
                    		<si:interval-trigger interval="${pollerInterval}" />
                    	</si:poller>
                    	
                    <!-- <si:inbound-channel-adapter id="test" ref="exampleReleaseStrategy" method="releaseStrategy" >
                    <si:poller fixed-rate="5000"/>
                    </si:inbound-channel-adapter> -->	
                    	
                    
                    </beans>
                    Last edited by SARAL SAXENA; Nov 14th, 2011, 11:50 AM.

                    Comment


                    • #11
                      Hi Saral,
                      I took your project and executed it (changed the sleep and poller delay to to 10 ms as i dont have enough patience )
                      It worked fine with me, where are you facing issues?

                      Comment


                      • #12
                        Originally posted by Amol Nayak View Post
                        Hi Saral,
                        I took your project and executed it (changed the sleep and poller delay to to 10 ms as i dont have enough patience )
                        It worked fine with me, where are you facing issues?
                        Hi Anmol,

                        Thanks a lot , gr8 to hear ,,,few queries are you sending the 200 messages while checking ...? are the files get created ..? could you please send me those changes so that I can configure them here also..!!!thanks in advance..!!

                        Comment


                        • #13
                          Originally posted by SARAL SAXENA View Post
                          Hi Anmol,

                          Thanks a lot , gr8 to hear ,,,few queries are you sending the 200 messages while checking ...? are the files get created ..? could you please send me those changes so that I can configure them here also..!!!thanks in advance..!!

                          Hi Anmol,

                          If you could please send me the changes so that I can also configure those changes and can have look..!!please send..!!

                          Comment


                          • #14
                            @Amol - thanks a lot for helping out here; we really appreciate and encourage it!!

                            @Saral - as I said to you on this post http://forum.springsource.org/showth...425#post387425 - you need to run your reaper at a reasonable rate.

                            Just be sure to set the reaper timeout long enough to allow the normal processing to occur - you don't want to expire the group too early.

                            Given that you are polling at 1 message a second and your reaper times out at 5 seconds (with a one-per-second task execution), you will likely never get more than 5 or 6 messages in a group.

                            Comment


                            • #15
                              SARAL
                              You clearly showing complete lack of basic forum ethics as well as total disrespect to the community and people who are trying to help you.
                              If you would only spend a minute trying to understand what Amol is saying you would see that all he changed is the delay time to 10ms

                              Comment

                              Working...
                              X