Announcement Announcement Module
Collapse
No announcement yet.
want to achieve best throughput while generating files..!! Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • want to achieve best throughput while generating files..!!

    Hi ,

    I have develop an application in which I am putting 10,000 messages into a queue and that queue is configured internally that those 10,000 messages are being stored into two queues of 5000 messages each so finally there are two queues of 5000 messages each.

    now I have to achieve a target is that two read all the messages from thequeue and to make the files and each files is having 1000 messages in it.that I have achieved in a time period of 2 mins,


    FOR THIS I HAVE TWO MESSAGE LISTENERS THAT ARE GRABIng the messages from both the queues and putting into the input's channel , lets say queue 1 is putting in inputchannel1 and queue2 is putting in inputchannel2.further an aggregator is being attached to these input chaanels , lets say to inputchannel1 aggragator1 is attached and with inputchannel agrrgtaor 2 is attached now these agrregator contain the logic in which they grouped the message into the groups of 1000 messages each and after grouping they release those 1000 messages grops into output channels of each , as thier are two seprate output channels for each aggregator output and finaaly the file adapter is being attached to these two seprate ouput channels that generate the files seprately so at last 10 files ,each file containg 1000 messages are written...!!

    Now please guide me I want to achieve the process of writing those two files in less than 2 mins , what steps I need to do to achieve this throughput..!!!


  • #2
    Here is my xml configuration file .....

    Code:
    			
    			
    			
    	<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 
    		<property name="location">
    			<value>config.properties</value>
    		</property>
    	</bean>			
    			
    		
    	<!-- Websphere MQ connection factory config-->
    	<bean id="jmsQueueWriteConnectionFactory"
    		class="com.ibm.mq.jms.MQQueueConnectionFactory">
    		<property name="queueManager">
    			<value>${MQ.QueueManagerWrite}</value>
    		</property>
    		<property name="hostName">
    			<value>${MQ.HostNameWrite}</value>
    		</property>
    		<property name="port">
    			<value>${MQ.Port}</value>
    		</property>
    		<property name="channel">
    			<value>${MQ.Channel}</value>
    		</property>
    		<property name="transportType">
    			<value>${MQ.TransportType}</value>
    		</property>
    	</bean>
    	
    	
    	<bean id="jmsQueueReadConnectionFactory"
    		class="com.ibm.mq.jms.MQQueueConnectionFactory">
    		<property name="queueManager">
    			<value>${MQ.QueueManagerRead}</value>
    		</property>
    		<property name="hostName">
    			<value>${MQ.HostNameRead}</value>
    		</property>
    		<property name="port">
    			<value>${MQ.Port}</value>
    		</property>
    		<property name="channel">
    			<value>${MQ.Channel}</value>
    		</property>
    		<property name="transportType">
    			<value>${MQ.TransportType}</value>
    		</property>
    	</bean>	
    	<bean id="mqConnectionFactory"
    		class="org.springframework.jms.connection.SingleConnectionFactory">
    		<property name="targetConnectionFactory">
    			<ref bean="jmsQueueWriteConnectionFactory" />
    		</property>
    	
    	</bean>
    	
    	
    	
    	<!-- Websphere MQ queue config. This config is used only in the absence of jndi  -->
    	<bean id="odsRequestQueue_dev1" class="com.ibm.mq.jms.MQQueue">
    		<property name="baseQueueManagerName" value="${MQ.BaseQueueManagerName}" />
    		<property name="baseQueueName" value="${MQ.BaseQueueName}" />
    		<property name="targetClient" value="${MQ.TargetClient}" />
    	</bean>
    			
    	
    		<!-- putting data into the queue commented by saral 
    		
    				
    
       <si:channel id="input">
    		
    	</si:channel>
    			
    		 <jms:outbound-channel-adapter id="channel-to-mq"
    			channel="input" destination-name="${queueNameWrite}" connection-factory="mqConnectionFactory"  />  -->
    			
    		
    			
    			<!--  Reading from the queue commented by saral -->
     	       <jms:message-driven-channel-adapter
    			id="mq-message-listner_QManager_4" channel="aggregator-input-channel_1" connection-factory="jmsQueueReadConnectionFactory"
    			destination-name="${queueNameRead}" concurrent-consumers="${concurrent-consumers}"   />	
    		
    		<jms:message-driven-channel-adapter
    			id="mq-message-listner_QManager_3" channel="aggregator-input-channel_2" connection-factory="jmsQueueWriteConnectionFactory"
    			destination-name="${queueNameRead}" concurrent-consumers="${concurrent-consumers}"   />   
    
    		
    		
    		
    
    	<!--   <si:gateway id="gateway" service-interface="com.sample.agg.Gateway"
    		default-request-channel="aggregator-input-channel">
    	</si:gateway> -->
    	
    	<si:channel id="aggregator-input-channel_1">
    		<si:queue capacity="${queueCapacity}" />
    	</si:channel>
    	
    	<si:channel id="aggregator-input-channel_2">
    		<si:queue capacity="${queueCapacity}" />
    	</si:channel>	
    	
    	<si:aggregator id="aggregator_1" input-channel="aggregator-input-channel_1" output-channel="aggregator-output-channel_1"
    		
    		ref="sampleAggregator" method="aggregateMessagaes"  		
    		correlation-strategy="correlationBean" correlation-strategy-method="correlationStrategy" 
    		release-strategy="releaseStrategyBean" release-strategy-method="releaseStrategy" 
    		message-store="messageStore1"
    		order="1" 
    		send-partial-result-on-expiry="true"		
    		send-timeout = "5000"		>
    	</si:aggregator>
    	  
    	<si:aggregator id="aggregator_2" input-channel="aggregator-input-channel_2" output-channel="aggregator-output-channel_2"
    		
    		ref="sampleAggregator" method="aggregateMessagaes"  		
    		correlation-strategy="correlationBean" correlation-strategy-method="correlationStrategy" 
    		release-strategy="releaseStrategyBean" release-strategy-method="releaseStrategy" 
    		message-store="messageStore2"
    		order="1" 
    		send-partial-result-on-expiry="true"		
    		send-timeout = "5000"
    		>
    	</si:aggregator>
    	
    	
    	<bean id="sampleAggregator" class="com.walgreens.ods.producer.Aggregator" scope="prototype" ></bean>   
    	
    	<bean id="correlationBean" class="com.walgreens.ods.producer.CorrelationBean" scope="prototype" />
    	
    	<bean id="releaseStrategyBean" class="com.walgreens.ods.producer.ReleaseStrategyBean" scope="prototype" > 
    	<property name="recordLength" value="${recordLength}" />	
    	</bean>
    	
    	<!--   <si:channel id="throwAwayChannel"	>		
    	<si:queue capacity="${queueCapacity}" />
    	</si:channel> -->
    	
    
    		<file:outbound-channel-adapter channel="aggregator-output-channel_1"
    			directory="${aggregatorOutputDirectoryPath}" filename-generator-expression="${outputFileNameExpression}" temporary-file-suffix="_swp"
    			   />	
    		<file:outbound-channel-adapter channel="aggregator-output-channel_2"
    			directory="${aggregatorOutputDirectoryPath}" filename-generator-expression="${outputFileNameExpression}" temporary-file-suffix="_swp"
    			   />				   
    
    		<!--  <file:outbound-channel-adapter channel="throwAwayChannel"
    			directory="${aggregatorOutputDirectoryPath}" filename-generator-expression="'discard-'+headers.getTimestamp()+'.xml'"
    			temporary-file-suffix="_swp"   />	-->  
    	
    	
    	<si:channel id="aggregator-output-channel_1">
    		<si:queue capacity="${queueCapacity}" />
    	</si:channel>
    	
    	<si:channel id="aggregator-output-channel_2">
    		<si:queue capacity="${queueCapacity}" />
    	</si:channel>
    	
    	
    	<task:scheduler id="scheduler" />
    	
    	  
    	<task:scheduled-tasks scheduler="scheduler">
    		<task:scheduled ref="reaper1" method="run" fixed-rate="${reaperSchedulerFixedRate}" />
    		<task:scheduled ref="reaper2" method="run" fixed-rate="${reaperSchedulerFixedRate}" />
    		
    	</task:scheduled-tasks> 
    	
            		
    
    	 <bean id="reaper1" class="org.springframework.integration.store.MessageGroupStoreReaper">
    			<property name="messageGroupStore" ref="messageStore1"  />
    		 	
    		<property name="timeout" value="${reaperTimeOut}" />
    		
    	</bean> 
    
    	 <bean id="reaper2" class="org.springframework.integration.store.MessageGroupStoreReaper">
    			<property name="messageGroupStore" ref="messageStore2"  />
    		 	
    		<property name="timeout" value="${reaperTimeOut}" />
    		
    	</bean> 
    	
    	
     <bean id="messageStore1" class="org.springframework.integration.store.SimpleMessageStore" />
      <bean id="messageStore2" class="org.springframework.integration.store.SimpleMessageStore" />
    	
     	
    	<!--<si:service-activator input-channel="aggregator-output-channel" ref="printerService" method="printString"  ></si:service-activator>-->
    	
    	<!--  <bean id="printerService" class="com.sample.agg.PrinterService"></bean>-->
     	
    
    	<si:poller max-messages-per-poll="1" id="defaultPoller" default="true">
    		<si:interval-trigger interval="${pollerInterval}" />
    	</si:poller>
    	
    <!-- <si:inbound-channel-adapter id="test" ref="exampleReleaseStrategy" method="releaseStrategy" >
    <si:poller fixed-rate="5000"/>
    </si:inbound-channel-adapter> --> 	
    	
    
    </beans>


    here my properties file...

    Code:
    # Spring bean values
    
    #explicitly reading from seprate , queue name is decided after observing the messages  in queuezee queue by saral
    queueNameWrite=RODS.DATA.OUT
    queueNameRead=RODS.DATA.IN
    #queueNameRead=RODS.DATA.OUT
    
    
    MQ.QueueManagerWrite=DCC2_QM_3
    MQ.QueueManagerRead=DCC2_QM_4
    
    MQ.HostNameWrite=dinfmq03
    MQ.HostNameRead=dinfmq04
    
    MQ.Port=11421
    MQ.Channel=RODS.APP.SVRCONN
    MQ.TransportType=1
    MQ.DefaultDestinationName=RODS.DATA.OUT
    #MQ.DefaultDestinationName=RODS.APP.DATA.IN
    MQ.BaseQueueManagerName=RODS.APP.SVRCONN
    MQ.BaseQueueName=RODS.DATA.OUT
    MQ.TargetClient=1
    
    concurrent-consumers=10
    queueCapacity=100
    recordLength=1000
    aggregatorOutputDirectoryPath=C:\\ODS_DEV_DIR\\temp\\
    #aggregatorOutputDirectoryPathtemp1=C:\\ODS_DEV_DIR\\temp1\\
    #aggregatorOutputDirectoryPathtemp2=C:\\ODS_DEV_DIR\\temp2\\
    
    
    
    
    outputFileNameExpression='output-'+headers.getTimestamp()+'.xml'
    reaperSchedulerFixedRate=10000
    
    #reaperTimeOut=43000
    reaperTimeOut=42400
    pollerInterval=10
    
    
    
    
    Header.Source.Key=eiaHdrSource
    Header.Destination.Key=eiaHdrDest
    Header.DataId.Key=eiaHdrDataId
    Header.Route.Key=eiaHdrRoute
    	
    Header.Source.Value=RODS
    Header.Destination.Value=RODS
    Header.DataId.Value=DATA
    Header.Route.Value=LOCAL
    This works perfectly..and files are created in 2 minutes ....10 files are created having 1000 messages each...but this takes 2 minutes ..please guide me How this process of file creation can be achieved within 1 minute..!!1

    Comment


    • #3
      Hi Guys,

      Please advise on this as I am doing R&D on this that how to reduce the file creation time to achieve the best throghput..!! as i HAVE TRIED FEW r&d BUT COULDN'T bring down the file creation time
      Last edited by SARAL SAXENA; Nov 25th, 2011, 11:04 AM.

      Comment


      • #4
        Hi ,

        Does anybody have done R&d on that..??

        Comment


        • #5
          Hi guys,

          I have done that and I have bring down the time to 1 minute of the file creation by increasing the record length attribute and message reaper time out to 90000 seconds..!!

          Comment

          Working...
          X