Announcement Announcement Module
Collapse
No announcement yet.
PassThroughMessageGroupProcessor Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • PassThroughMessageGroupProcessor

    We want to utilize the PassThroughMessageGroupProcessor. How is this used in conjunction with the aggregator? We are currently use 1.x and want to migrate up to 2.x. We seem to be missing something. A little guidance is appreciated.

    Thanks.

    our current aggregator looks like this:

    Code:
    	<int:aggregator id="aggregator" correlation-strategy="correlStrategy"
    		release-strategy="releaseStrategy" message-store="messageStore"
    		send-partial-result-on-expiry="true" ref="messageAggregator" method="aggregate"
    		send-timeout="10000" input-channel="inJmsChannel" output-channel="outHandlerChannel" />

  • #2
    ok.. little more detail. we simply want to pull X number of messages of JMS queue, batch them up and send a single message as POST body of HTTP request. Our current full config looks is below. We would expect that every X messages be aggregated and send to our service activator which performs the HTTP post. We seem to be losing messages though..


    Code:
    	<int-jms:channel id="aggregatorJmsChannel" connection-factory="integrationConnFactory" 
    		queue="queueV1" cache="none" message-driven="true" concurrency="15" destination-resolver="destinationResolver"
    		>
    		<int-jms:interceptors>
    			<bean class="org.springframework.integration.transformer.MessageTransformingChannelInterceptor">
    		        <constructor-arg>
    		        	<ref bean="Transformer"/>
    		        </constructor-arg>
    			</bean>	 
    		</int-jms:interceptors>
    	</int-jms:channel>
    	
    	<int-jms:outbound-channel-adapter id="InvalidV1Adapter" connection-factory="integrationConnFactory" 
    		destination="invalidQueue"/>
    	
    	<int:aggregator id="aggregator"
    		release-strategy="messageCountReleaseStrategy" message-store="messageStore"
    		send-partial-result-on-expiry="true" ref="stringMessageAggregator"
    		input-channel="aggregatorJmsChannel" output-channel="handlerChannel">
    	</int:aggregator>	
    	
    	<bean id="MessageStore" class="org.springframework.integration.store.SimpleMessageStore"/>
    	
    	<bean id="MessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    		<property name="timeout" value="30000"/>
    		<property name="expireOnDestroy" value="true"/>	
    		<property name="messageGroupStore" ref="MessageStore"/>	
    	</bean>
    
    	<task:scheduled-tasks scheduler="Scheduler">
    		<task:scheduled ref="MessageStoreReaper" method="run" fixed-rate="5000"/>
    	</task:scheduled-tasks>
    
    	<task:scheduler id="Scheduler"/>
    	
    	<int:service-activator ref="Handler" method="handleMessage" input-channel="HandlerChannel"/>
    
    	<bean id="messageCountReleaseStrategy" class="org.springframework.integration.aggregator.MessageCountReleaseStrategy">
    		<constructor-arg index="0" value="5"/>
    	</bean>
    	
    	<bean id="stringMessageAggregator" class="com.integration.aggregator.StringMessageAggregator"/>
    	
    	<bean id="transformer" class="com.integration.messages.transformers.MessageV1Transformer"/>
    	
    	<bean id="tandler" class="com.integration.handlers.MessageHandler">
    		<property name="host" value="dev.server.net"/>
    		<property name="path" value="/post/url"/>
    		<property name="httpClientConnnectionTimeout" value="3000"/>
    		<property name="httpClientContentCharSet" value="UTF-8"/>
    		<property name="httpClientHttpVersion" value="HTTP/1.0"/>
    		<property name="httpClientSocketTimeout" value="5000"/>
    		<property name="httpRetryHandler" ref="httpMethodRetryHandler"/>
    	</bean>	
    	
    	<bean id="httpMethodRetryHandler" class="com.integration.handlers.HttpMethodRetryHandler"/>

    Comment


    • #3
      The PassThroughMessageProcessor is related to the CorrelatingMessageBarrier which is a solution unrelated to your problem.

      Maybe you could stub jms and http out of your config and write a testcase around it so we can see how messages are lost?

      Comment


      • #4
        Thanks Iwein. We ended up refactoring a lot of our application. It was originally a mixture of home grown code sprinkled with SI.

        Biting the bullet and refactoring everything to utilize the SI has simplied the application and everything is working at expected now.

        The only sort of open issue that we want to aggregate without correlating. The PassThroughMessageGroupProcessor seems to imply this could/should be used in some manner.

        We have recently implemented an custom CorrelationStrategy that always returns the same value. This seems to give us the desired outcome.

        Comment


        • #5
          PassThroughMessageGroupProcessor Usage

          I am also looking for documentation on how i can use this PassThroughMessageGroupProcessor in conjunction with Aggregator. Can you point me to any url.

          Comment


          • #6
            Perhaps you should explain what you are trying to do, and why you think you need this.

            Comment

            Working...
            X