Announcement Announcement Module
Collapse
No announcement yet.
Spring Integration: Split message again after using aggregate? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Spring Integration: Split message again after using aggregate?

    For those who recognise it, I've copied this question I posted on StackOverflow, in the hope I get some more feedback or suggestions...

    In my Spring Integration powered project I have a splitter and router combination for sending my source object to various transformers. The new "transformed" objects are then passed back to an aggregator and processed. This all works very well, except that my aggregator had to pass my results as a single ArrayList payload back to the flow so they could be passed to the outbound-channel-adapter (which then had to accept an ArrayList payload).

    This is an example of how my flow used to look like:

    Code:
    <splitter ref="articleContentExtractor" />
    
    <payload-type-router>
        ... routing to various transformers ...
        ... results are sent to articleOutAggregateChannel ...
    </payload-type-router>
    
    <aggregator ref="articleAggregator" input-channel="articleOutAggregateChannel" />
    
    <outbound-channel-adapter id="contentSaveService" ref="contentExporter" 
                method="persist" channel="contentOutChannel" />
    Now, I want to split up my aggregated results so they are processed properly, since I need to route some of the objects to a different outbount-channel-adapter. I added a new DefaultMessageSplitter after the aggregator, but it seems only the first element gets passed to the router.

    This is my current, broken, flow:

    Code:
    <splitter ref="articleContentExtractor" />
    
    <payload-type-router>
        ... routing to various transformers ...
        ... results are sent to articleOutAggregateChannel ...
    </payload-type-router>
    
    <aggregator ref="articleAggregator" input-channel="articleOutAggregateChannel" />
    
    <splitter ref="entitySaveRouter" />
    
    <payload-type-router resolution-required="true">
        <mapping type="x.y.z.AbstractContent" 
                channel="contentOutChannel" />
        <mapping type="x.y.z.Staff" 
                channel="staffOutChannel" />
    </payload-type-router>
    
    <outbound-channel-adapter id="contentSaveService" ref="contentExporter" 
        method="persist" channel="contentOutChannel" />
    
    <outbound-channel-adapter id="staffSaveService" ref="staffExporter" 
        method="persist" channel="staffOutChannel" />
    What am I doing wrong? Am I using my aggregator properly?

    Note: If I just remove the aggregator (and the second splitter) altogether the rest of the flow works perfectly, although the aggregator is required to perform some cross-referencing actions.

  • #2
    I've worked around this issue by altering my flow, I now persist the "staff" objects using a <service-activator> before passing them to the aggregator:

    Code:
    <!-- Save reference data -->
    <service-activator id="staffSaveService" ref="staffExporter" method="persist" 
    	input-channel="staffOutChannel" output-channel="articleOutAggregateChannel" />
    	
    <!-- Aggregate and persist the new content objects -->
    <aggregator ref="articleContentExtractor" input-channel="articleOutAggregateChannel"
    	output-channel="contentOutChannel" />
    	
    <outbound-channel-adapter id="contentSaveService" ref="contentExporter" 
    	method="persistAll" channel="contentOutChannel" />
    This way I can continue to pass the aggregated collection to the "contentSaveService" bean as I was before, but

    My new fix is not ideal, for 2 reasons:

    - I'm still having to pass an all the objects to my "contentSaveService" directly from the aggregator; as a single message with payload type "List<Object>". This kind of defeats the ESB-style design at that part of the flow.

    - It's more difficult to validate the data before persisting it. Before, I was ensuring everything was transformed correctly in my aggregator before passing it to my <outbound-channel-adapter>'s, now a failed Article may leave some Staff objects orphaned in the output datastore.

    If anyone has any better suggestions I'm still keen to refactor this part of my project.

    Comment


    • #3
      First, lets make sure we are on the same page. Aggregator's responsibility is to aggregate several messages into a single message, so the fact that Aggregator outputs a collection of aggregated messages is an expected behavior.
      The fact that you want to split them again after aggregation is fine as well and should just work with default splitter.

      How do you know that your router only gets the first message? Or does it get a collection with a single Message?

      Comment


      • #4
        Thanks for the quick response Oleg.

        Your description of the aggregator matches my understanding. In generic terms, what I want to do is aggregate all my messages, correlate the payloads (e.g. add references to "staff" inside "article"), then split them again so I can send them to different DAO's.

        With the second splitter in place, I traced the flow with a debugger, and this is what I saw:

        1. Inside aggregator method: created an ArrayList of 3 objects - {Article, Staff, Staff}
        2. Inside second splitter method: ArrayList of 3 objects- {Article, Staff, Staff} (as expected), returned the whole ArrayList to be split.
        3. Inside payload router: a single message is delivered with type Article, so the second splitter appears to have worked, but only returned the first element

        I'll come clean now and say I've tried not to touch the message headers at all at this point. Where possible I would like to avoid tightly-coupling to Spring Integration code. Although this is still an option if it's the only way.

        Comment


        • #5
          Thinking more about it adding service-activator before aggregator to make it work doesn't make any sense as two are not related in any way.

          So you are saying that the second splitter received an array consisting of 3 messages but outputted only one? I'll try to reproduce it, but i would suggest to verify on your end by adding logger instead of the router to the other side of the splitter (temporarily) and see how many messages you will log.

          Comment


          • #6
            In fact I just tried and I see the expected result. Here is my config:
            Code:
            <int:splitter input-channel="inputChannel" output-channel="serviceChannel" ref="mySplitter"/>
            	
            	<int:transformer input-channel="serviceChannel" output-channel="aggregationChannel">
            		<bean class="org.springframework.integration.test.MyTransformer"/>
            	</int:transformer>
            	
            	<int:aggregator input-channel="aggregationChannel" output-channel="outputChannel"/>
            	
            	<int:splitter input-channel="outputChannel" output-channel="loggingChannel"/>
            	
            	<int:logging-channel-adapter id="loggingChannel" level="WARN" expression="'#### Result from the Splitter ' + #this"/>
            And the output:
            Code:
            LoggingHandler: #### Result from the Splitter [Payload=HELLO]...
            LoggingHandler: #### Result from the Splitter [Payload=BYE]...
            LoggingHandler: #### Result from the Splitter [Payload=WHAT'S UP]...
            As you can see from above, Aggregator released an array with 3 messages, the default splitter splits the array and the logger prints one message at the time.

            Comment


            • #7
              Great, thanks for clarifying it should work. I'll add the logging to my original configuration and post up my findings.

              Comment

              Working...
              X