Announcement Announcement Module
Collapse
No announcement yet.
MessageHeader in Aggregator Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • MessageHeader in Aggregator

    Hello,
    I'm currently trying to use the aggregator component.
    According the doc, there are 2 ways available, i can:
    1. implement the MessageGroupProcessor
    2. define my own bean and indicate the aggregation method in my context.

    I don't want to use the first one (too coupled to Spring integration framework and I prefer manipulate List<Message<?>> than MessageGroup).

    But I face some problem setting my aggregated message's headers.
    Code:
    public class PayloadAggregator {
    
    	public Message<?> aggregate(List<Message<?>> messages) {
    		StringBuffer buf = new StringBuffer(); 
    		for (Message<?> msg : messages) {
    			buf.append(msg.getPayload());
    		}
    
    		return MessageBuilder.withPayload(buf.toString()).copyHeaders(aggregatedHeaders(messages)).build();
    	}	
    
             private Map<String, ?> aggregatedHeaders(List<Message<?>> messages) {
    		Map<String, Object> headers = new HashMap<String, Object>();
    		headers.put("MY_HEADER", "hi");
    		return headers;
    	}
    }
    The point is that the final message does not only contain my customized header, but also other ones coming from my List<Message<?>> to aggregate. Using breakpoints, i saw the the AbstractAggregatingMessageGroupProcessor is called (the CorrelatingMessageHandler use it before calling the MethodInvokingMessageGroupProcessor).

    I don't want to use that messageGroupProcessor because it does not fit my needs. Is there any want to desactivate it while using a custom pojo aggregator ?
    If I want to fully manage my message headers, am I forced to implement my own MessageGroupProcessor ?

    Thanks for your replies

  • #2
    I think in 2.1 we should refactor this so that you could also provide your own header-aggregator strategy (something we have already discussed a bit). The default strategy implementation could still be the same (what's currently in the protected method of the abstract base class implementation of MessageGroupProcessor).

    That said, I'm wondering why the default behavior for header aggregation is not ok for your case? I see that you are just experimenting with "hi" above, but can you describe the actual use-case?

    Thanks,
    Mark

    Comment


    • #3
      Hi Mark and thank you for your answer.
      Here are my reasons why I don't want to use the abstract message group processor:
      1. My messages to be aggregated contain a lot of headers I don't need anymore in my aggregated one (i could complexify my flow with header-filters but I would have to explicitly mention each header to filter). As I am developping a aggregator api that will be used by many flows, I would like to filter every headers automatically, and add some ones defined by the api.
      2. Technically, I could use this message group processor (because even if the headers are present, I won't use them), but I don't like to keep data I don't need anymore and that are not related to the aggregated message. It is quite confusing, and it is even better for performances to only transport useful data.

      Functionnaly, here is my use case : I've got n messages to be aggregated. Each one contains a header ("TRN" as transactionNumber), and has already been stored in DB in a state "Processing".
      After the aggregation, I need to send the aggregated message to our partner, and then to update data in "Processed". To avoid sending n messages and do n updates, I intend to add a "TRN_LIST" header in the aggregated message. My audit-api (which aim is to manage the messages status) could so update all the lines in a single transaction.
      As my audit-api only needs the TRN_LIST header, I dont want to use the AbstractAggregatingMessageGroupProcessor (which will copy messages' headers in my final message), but only use my own one.

      Comment


      • #4
        There is another thing you can do which you may find interesting.
        So lets say you have an aggregator now, but as you said as messages are coming into the aggregator instead of updating status once per each aggregated message you want to aggregate all transactions into a list.
        Now, picture this.
        The input-channel of your aggregator is a pub-sub channel which has another subscriber, a Transformer that extracts TRN from the Message header into a payload of the Message and send it to another aggregator
        Code:
                                         Transformer -> trnAggregationChannel -> Aggregator -> (returns a Message with the List of TRN) -> do what you want with it
                                      /
        aggregationChannel(pub-sub) 
                                      \ Aggregator (your current aggr)
        Basically what I am saying you have two concerns.
        1. To Aggregate transactions
        2. To Aggregate Transaction numbers
        .. and therefore could be handled as two separate sub-flows

        Also, depending on your requirements 'trnAggregationChannel' could be async channel, thus aggregating TRNs paralel with the actual transactions

        Comment


        • #5
          Hi Oleg,
          Your solution is quite interesting since the separation of concern would make the flow cleaner, and my problem 1 is solved.
          However, concerning the point 2, I currently can't define my own header-aggregator strategy. To clean my aggregated message's headers, I have to insert a transformer before my aggregator. Do you think there is an other (and better) way ?
          Code:
                                           Transformer -> trnAggregationChannel -> Aggregator -> (returns a Message with the List of TRN) -> do what you want with it
                                        /
          aggregationChannel(pub-sub) 
                                        \ Transformer (remove all headers) -> Aggregator
          Thank you !

          Comment


          • #6
            I guess I am not following this one
            I currently can't define my own header-aggregator strategy.
            Do you still need to worry about it? I thought the whole point of aggregating TRN headers into a LIST was so you can send a single status update, but since it will not be done by a separate branch in the flow do you stil need to worry about it?

            If you need to just simply clean up some headers from the aggregated message then you can stick header-filter after aggregator, but you already know that

            Comment


            • #7
              You're right : the whole point was the transaction management, so it is solved with your solution.
              The other point is a nice-to-have. And I can't simply define a header-filter because I have absolutely no idea of the my messages will contain (I'm developing an API which will be used by lot of flows). If I want to remove my headers, I have to define a transformer.
              So don't worry for that point, I'll head for a transformer.

              Comment


              • #8
                I think we really need to add wildcard support for the header-filter. In other words, each of these should be legal:
                Code:
                <header-filter ... header-names="foo, bar"/>
                
                <header-filter ... header-names="foo*, bar"/>
                
                <header-filter ... header-names="foo*, bar*"/>
                
                <header-filter ... header-names="*"/>
                If you agree - and I bet you do - could you open a feature request in JIRA?

                Thanks,
                Mark

                Comment


                • #9
                  If I want to remove my headers, I have to define a transformer
                  - NO Header Filter http://static.springsource.org/sprin...#header-filter

                  Comment


                  • #10
                    I mean Header Filter is a variation of the transformer, but before you venture into writing your own, i wanted to make sure you know that we have one out of the box that would remove headers for you

                    Comment


                    • #11
                      Sorry, seems like a lot of similar and somewhat redundant responses. Mark and I seemed to encountered race condition

                      Comment


                      • #12
                        Jira created : INT-1919
                        Thanks to both of you for your availability.

                        Comment

                        Working...
                        X