Announcement Announcement Module
Collapse
No announcement yet.
Aggregator release with subscribing to Publish/Subscribe channel Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Aggregator release with subscribing to Publish/Subscribe channel

    Hi All,

    I am looking for a sample/example configuration. I want to release aggregator by subscribing to a Publish/Subscribe channel.

    Thanks,

    Srinivas

  • #2
    Hi!

    Sorry, but you provide not enough info.
    Maybe you mean Scatter-Gather.
    Then take a look into my example: https://github.com/artembilan/spring...be7cf68d92c920

    Cheers,
    Artem Bilan

    Comment


    • #3
      Thanks Artem. My configuration is as below.

      <channel id="aggregator-input-channel"/>

      <publish-subscribe-channel id="aggregator-release-channel" topic="endOfTheDayNotification"/>

      <aggregator input-channel="aggregator-input-channel" output-channel="aggregator-output-channel"
      release-strategy="size() == 2" expression=" payload.custId"/>

      <service-activator input-channel="aggregator-output-channel" ref="clientProcessor" method="process"/>
      I want to release the aggregator with a notification on "aggregator-release-channel" instead of release-strategy="size() == 2"

      Comment


      • #4
        No, it's still confused.
        Provide your use-case, please.
        Describe it in human words.
        There is no suitable abilities for your configuration, but maybe I can help when I get business requriments.

        Comment


        • #5
          Cleric Hi,

          My use case is - I want to aggregate all client in bound requests and correlate with the client id in payload and I want to release the aggregated messages after our end of the day process. The end of the day process is running in an another system which broadcasts a message via JMS topic.

          Thanks
          Last edited by csrinivasrao; Dec 8th, 2012, 12:28 PM.

          Comment


          • #6
            H-m-m.
            Ok, now it's look clear and, of course, as valid business task.

            I suggest you a bit different solution:
            HTML Code:
            <si:channel id="aggregator-input-channel">
            		<si:queue/>
            	</si:channel>
            
            <si:aggregator id="clientRequestsAggregator" input-channel="aggregator-input-channel"
            				   output-channel="aggregator-output-channel"
            				   correlation-expression="payload.custId"
            				   auto-startup="false"/>
            
            	<si:service-activator input-channel="aggregator-output-channel" ref="clientProcessor" method="process"/>
            
            	<jms:message-driven-channel-adapter channel="startClientRequestsAggregatorCommandChannel"
            										destination-name="endOfTheDayNotification"/>
            
            	<si:transformer input-channel="startClientRequestsAggregatorCommandChannel"
            					output-channel="controlBus"
            					expression="'@clientRequestsAggregator.start()'"/>
            	
            	<si:control-bus input-channel="controlBus"/>		
            So, in this case:
            1. All client requests will collect on the 'queue'
            2. By default aggregator doesn't do anything, because it's switched off via auto-startup="false"
            3. <jms:message-driven-channel-adapter> is listening 'endOfTheDayNotification' topic.
            4. The message from topic will travel to the <transformer> to create the 'start' command for aggragator
            5. This command will send to the <control-bus> and the last one initiates the work of aggregator.
            6. The 'clientRequestsAggregator' receives all messages from it's input channel using default release-strategy and correlates them with suggested by you 'payload.custId'

            But here you should think when to 'stop' 'clientRequestsAggregator', if your application works 24/7. Maybe you have 'startOfTheDayNotification' too ?


            From other side you can use MessageGroupStoreReaper as well:
            HTML Code:
            <bean id="clientRequestsStore" class="org.springframework.integration.store.SimpleMessageStore"/>
            	
            <si:aggregator input-channel="aggregator-input-channel"
            		   output-channel="aggregator-output-channel"
            		   correlation-expression=" payload.custId"
            		   release-strategy-expression="false"
            		   send-partial-result-on-expiry="true"
            		   message-store="clientRequestsStore"/>
            
            <si:service-activator input-channel="aggregator-output-channel" ref="clientProcessor" method="process"/>
            
            <jms:message-driven-channel-adapter channel="aggregateClientRequestsChannel" destination-name="endOfTheDayNotification"/>
                 
            <si:outbound-channel-adapter id="aggregateClientRequestsChannel" expression="@clientRequestsStoreReaper.run()"/>
            
            <bean id="clientRequestsStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
            	<constructor-arg ref="clientRequestsStore"/>
            </bean>
            So, HTH and you'll get the point to start the right solution.

            For more info about Aggregator, Control Bus and MessageGroupStoreReaper:
            http://static.springsource.org/sprin...tml#aggregator
            http://static.springsource.org/sprin...ml#control-bus

            Comment

            Working...
            X