Announcement Announcement Module
Collapse
No announcement yet.
Splitter with Aggregator Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Splitter with Aggregator

    Let's say I have the following flow. Through ChannelA flows some request (RequestA). I need RequestA split into subparts, so
    I use a splitter to build a list of RequestB. Each RequestB flows through the router and then to corresponding service activators. Each
    service activator accepts a single RequestB and returns the same message, say ResponseA. It appears that Spring Integration will automatically
    take the return value from the splitter and call the Service Activator for each item in the list. Next, I use an aggregator to "aggregate"
    all the ResponseA and then put ResponseA into ChannelB. Now since the message was originally split into RequestB and the next RequestB will flow
    down this same path, how do I aggregate the responses that would be on ChannelB? I would expect to have the same amount of responses to aggregate as the size of the original split list.

    ChannelA --> Splitter --> Router --> Activator1 --> Aggregator1 --> ChannelB

    Long story short, I need to be able to aggregate everything that will be on ChannelB after all the items have processed in the list from the original split. I've tried several different things but I always end up with the last thing put on ChanB being returned. In the case where a list of 2 items is being split, I need to be able to return 2 responses together back to a specified channel. Please let me know if this is clear. Thanks.

  • #2
    Well, he whole point of Splitter/Aggregator pattern is to aggregate all messages that were split before back into a single message. It does so based on SEQUENCE_SIZE (result of how many messages after the split) and CORRELATION_ID (the way to let aggregator know which messages are related).
    The way i understand your problem your aggregation doesn't correlate the messages properly. YUo may want to provide more details and/or simple configuration, YOu can also look at some of the test cases which have plenty of samples.
    Aggregator:
    https://src.springsource.org/svn/spr...r/integration/

    Splitter:
    https://src.springsource.org/svn/spr...tion/splitter/

    Comment


    • #3
      Ok, I've attached a simple example, depending only on SI and JUnit. Hopefully, this is more clear. It's a banking example where I want to send two deposits to a channel, route these to a service activator and then aggregate all the receipts into a deposit summary. My assumption would be if I pass in two deposits with two different account numbers (since this is my aggregation correlation strategy), I would get two deposit summaries back. Currently, I'm only getting one. What am I missing? Thanks again!

      Comment


      • #4
        Ok tj

        Couple of things.

        First:
        When you are sending a List or an Array or a coma delimited String as a payload without providing custom splitter, such message will be considered already split and the only thing that AbstractMessageSplitter will do is to assign the correlation-key, sequence-size and sequence numbers. So you were essentially sending an already split message.
        So send one message at the time in the loop.
        You can still nest the aggregators and send a list as before which will be naturally split as before on individual messages and then each individual message will be split again. This way you can treat your initial payload as batch which could be aggregated later on as well, but we won't go there just yet.

        Second:
        Based on your code I believe your intention was to send a Deposit to two different consumers, receive the replies from both (Receipt) and aggregate them into a single message which itself is going to be the aggregate of 2. A pretty standard use case for which you had much more code then you needed

        So here is the modified XML configuration;
        Code:
        <si:gateway id="bankingGateway" 
        	service-interface="com.mycompany.si.BankDemo$BankingGateway"
        	default-request-channel="deposits"/>
        
        <si:channel id="deposits" />
        <si:channel id="receipts" />
        
        <si:splitter id="splitter" 
        	        input-channel="deposits" 
        	        output-channel="depositRouter" 
        	        expression="T(java.util.Collections).nCopies(2, payload)"/>
        
        <si:recipient-list-router input-channel="depositRouter">
        	<si:recipient channel="bankingServiceChannel" />
        	<si:recipient channel="bankLogChannel" />
        </si:recipient-list-router>
        
        <si:aggregator id="aggregator" input-channel="storedReceipts" />
        
        <bean name="aggregateReceipts" class="com.mycompany.si.service.ReceiptAggregate" />
        
        <si:service-activator ref="bankingService" method="deposit"
        		input-channel="bankingServiceChannel" output-channel="storedReceipts" />
        
        <si:service-activator ref="bankLogService" method="log"
        		input-channel="bankLogChannel" output-channel="storedReceipts" />
        
        <bean id="bankingService" class="com.mycompany.si.service.BankingService" />
        <bean id="bankLogService" class="com.mycompany.si.service.BankLogService" />
        Notice how simpler the splitter has become by using Spring Expression language where I am simply calling nCopies(..) method which multiplies payload by 2 (essentially splitting/duplication it)
        Also, the case of aggregation is very standard (based on correlationId and sequenceSize so you don't need to provide any customization.

        ... and Test Case

        Code:
        public class BankDemo{
           @Test
           public void testListOfDepositsReturnListOfSummaries(){
              ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                    "com/mycompany/si/config/springIntegration.xml");    
              BankingGateway bankingGateway = context.getBean("bankingGateway", BankingGateway.class);
              
              List<Deposit> deposits = new ArrayList<Deposit>()
              {
                  {
                     add(new Deposit(25.0, "1"));
                     add(new Deposit(50.0, "2"));
                  }
              };
              for (Deposit deposit : deposits) {
            	  Object result = bankingGateway.processDeposits(deposit);
                  System.out.println("Result: " + result);
                  assertTrue(result instanceof List);
              }
              context.close();
           }
           
           public static interface BankingGateway{   
        	   public Object processDeposits(Deposit deposit);
           }
        }
        In Test case I noticed you used SimpleMessagingGateway and Channels directly and what I wanted to make sure you understand the concept of GatewayProxyFactoryBean which is exposed via 'gateway' element allowing you for a complete type-safe POJO way of interacting with Messaging system, so you entire test case is now completely independent from SI API. . . how cool is that? use framework but don't depend on it.

        Comment


        • #5
          tj

          I just realized i made a minor mistake. Your config could be even simpler.

          Based on your requirement you really don't even need a splitter. You only need a splitter if you plan to send each message to a different channel and then aggregate them later. However in your case you are sending a single message to all recipients (channels) . . . that is why you have RecipientListRouter, so all you need is to send a single message directly to this router and tell router top apply sequence numbers via apply-sequenceattribute and that is all.

          Code:
          <si:gateway id="bankingGateway" service-interface="thread91349.BankingGateway"
          		default-request-channel="depositRouter" />
          
          <si:channel id="receipts" />
          
          <si:recipient-list-router input-channel="depositRouter" apply-sequence="true">
          	<si:recipient channel="bankingServiceChannel" />
          	<si:recipient channel="bankLogChannel" />
          </si:recipient-list-router>
          
          <si:aggregator id="aggregator" input-channel="storedReceipts" />
          
          <bean name="aggregateReceipts" class="thread91349.ReceiptAggregate" />
          
          <si:service-activator ref="bankingService" method="deposit"
          		input-channel="bankingServiceChannel" output-channel="storedReceipts" />
          
          <si:service-activator ref="bankLogService" method="log"
          		input-channel="bankLogChannel" output-channel="storedReceipts" />
          
          <bean id="bankingService" class="thread91349.BankingService" />
          <bean id="bankLogService" class="thread91349.BankLogService" />

          Comment

          Working...
          X