Announcement Announcement Module
Collapse
No announcement yet.
Split-filter-aggregate pattern Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Split-filter-aggregate pattern

    Hi all,

    I'm using Spring Integration to build a high-volume customer email merge program for my organization. I have need of the splitter - filter - aggregator pattern mentioned in the title.

    Specifically, when the system is performing email previews, I want to aggregate the previous split, which may have had some results filtered out (due to ineligibility, opt-out, system error, etc), and select a small random sample of the message group to send to a preview recipient.

    Because the default behavior for the aggregator is to wait until the sequence size has been reached before releasing, I have taken the sub-optimal solution of timing out the message store after 10 seconds. This is sub-optimal because it reduces the apparent responsiveness of the system, and it risks not aggregating the complete message group, which results in the preview samples doubling, tripling, etc. in size.

    My initial line of thinking is to use a shared AtomicInteger in the message header, and intercept the filtered-out messages to decrement this value. However, I'm concerned that this could cause a race condition with the aggregator. Further, it significantly complicates marshalling/unmarshalling across JMS-backed channels.

    Is there a way to accomplish this simply in Spring Integration?

    Thanks!

  • #2
    Yes that is the default aggregator behavior. But you can change it by providing your own ReleaseStrategy where you can implement your own canRelease(..) method. There are many ways of solving your problem and they all depend on how much information is known at the time a particular message hits the aggregator so. . .
    When you look at the group of messages do you know (if you had access to the entire group) when messages can be released?

    Comment


    • #3
      Originally posted by oleg.zhurakousky View Post
      When you look at the group of messages do you know (if you had access to the entire group) when messages can be released?
      If I had access to the entire group (before being split), compared to the current group in the aggregator, and I could identify which ones were filtered out, then yes, I could tell as soon as all the non-filtered messages arrived and release them.

      Comment


      • #4
        Yes, and If I was an astronaut. . .

        Of course knowing all this information would be ideal, but i think there is a simpler solution.

        What if filtered messages were to reach the aggregator as well, but augmented with some header information tellig you that they are filtered. For example:
        Code:
        <int:splitter . . . output-channel="filterChannel"/>
        
        <int:filter input-channel="filterChannel" output-channel="aggregatorChannel" discard-channel="transformingChannel". . ./>
        
        <int:header-enricher inputChannel="transformingChannel" output-channel="aggregatorChannel">
            <int:header name="filtered" value="true"/>
        <int:header-enricher>
        
        <int:aggregator input-channel="aggregatorChannel". . .>
            <bean class="foo.bar.MyAggregator"/>
        <int:aggregator>
        
        public class MyAggregator {
        
               public List<Message> aggregate(List<Message> messages){
                        // the input list would contain all messages and al you eed to do is filter out the once that were filtered
               }
        The main points are:
        1. All messages reach the aggregator. This way you can rely on its default Release and Correlation strategy (less code for you)
        2. Messages that passed the initial filter will go on to some process which may be long and complex while discarded messages go straight to the aggregator and wait for the other to arrive thus completing the sequence
        3. The only work you need to do is implement the MyAggregator which will probably be only a few lines of code to remove messages that are marked as 'filtered' from it.

        Comment


        • #5
          Originally posted by oleg.zhurakousky View Post
          Yes, and If I was an astronaut. . .

          Of course knowing all this information would be ideal, but i think there is a simpler solution.

          What if filtered messages were to reach the aggregator as well, but augmented with some header information tellig you that they are filtered. For example:
          Code:
          <int:splitter . . . output-channel="filterChannel"/>
          
          <int:filter input-channel="filterChannel" output-channel="aggregatorChannel" discard-channel="transformingChannel". . ./>
          
          <int:header-enricher inputChannel="transformingChannel" output-channel="aggregatorChannel">
              <int:header name="filtered" value="true"/>
          <int:header-enricher>
          
          <int:aggregator input-channel="aggregatorChannel". . .>
              <bean class="foo.bar.MyAggregator"/>
          <int:aggregator>
          
          public class MyAggregator {
          
                 public List<Message> aggregate(List<Message> messages){
                          // the input list would contain all messages and al you eed to do is filter out the once that were filtered
                 }
          The main points are:
          1. All messages reach the aggregator. This way you can rely on its default Release and Correlation strategy (less code for you)
          2. Messages that passed the initial filter will go on to some process which may be long and complex while discarded messages go straight to the aggregator and wait for the other to arrive thus completing the sequence
          3. The only work you need to do is implement the MyAggregator which will probably be only a few lines of code to remove messages that are marked as 'filtered' from it.
          That seems to be the best solution, and it is one I was tending towards. Thanks for the confirmation.

          Comment

          Working...
          X