Announcement Announcement Module
Collapse
No announcement yet.
Timeout in Aggregator?!? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Timeout in Aggregator?!?

    Hi,

    I have attached a simple split-aggregate example. Basically the splitter is sending 40,000 messages to the aggregator.

    Unfortunately the aggregator is not processed. If I process only 10,000 messages then it is fine but not with 40,000 (you might increase that size in the example since it seems to depend on the CPU).

    It seems that there is a workaround by defining a timeout in the aggregator:
    Code:
    <aggregator id="aggregator" input-channel="logChannel"
    		ref="logAll" method="log"
    		timeout="420000"/>
    But this workaround is not really an option since I do not know how many messages are send to the aggregator.

    Can anybody tell where exactly the problem is? I am using SI 1.0.0.

    Thanks Tai

  • #2
    Hey Tai,

    Can't say without looking into your sample (which I am doing now), so stay tuned.

    Marius

    Comment


    • #3
      Just a comment aside: I did some profiling where 200,000 rows are processed.
      The biggest bottleneck is in the AbstractMessageAggregator:

      protected void processBarrier(MessageBarrier<Map<Object, Message<?>>, Object> barrier) {
      ArrayList<Message<?>> messageList = new ArrayList<Message<?>>(barrier.getMessages().values ());
      Every time a message is send to the Aggregator a new ArrayList is created!

      Comment


      • #4
        Having a brief look at an old SI snapshot (20081017) I see there is quite a difference of how messages was processed. Previously the ArrayList was once created in the AbstractMessageBarrier and then messages has been added. That was the reason why my problem did not occur before SI 1.0.0.

        Comment


        • #5
          Hey Tai,

          Thanks for doing the load test. I can see where you're pointing at.

          Now, normally, for your use case (and for a set of items of that magnitude), it would be better to use a batch process.

          Follow on INT-330, which is already created and deals with how the aggregator message store is implemented. The change for that will provide an improvement in this area as well.

          Comment


          • #6
            Timeout and Batch Process

            Good morning Marius,

            changing the message storing in the Aggregator will solve the performance problem. But the timeout issue is still there. If I do not define a timeout in the Aggregator I would assume that the Aggregator would be processed (for an unlimited time) until all messages are received.

            Though this will not be an option for me since it takes about 60min until all 200,000 messages are passed. Is there a solution for 1.0.0? You mentioned batch processing. How should I do that?

            Thanks Tai
            Last edited by ttruong; Feb 10th, 2009, 06:40 AM.

            Comment


            • #7
              Workaround

              I did a cheap workaround by adjusting the AbstractMessageAggregator class. The Aggregator holds itself an ArrayList:

              ...
              public abstract class AbstractMessageAggregator extends
              AbstractMessageBarrierHandler<Map<Object, Message<?>>, Object> {
              ...
              private ArrayList<Message<?>> messageList = new ArrayList<Message<?>>();
              ...
              @Override
              protected void doAddMessage(Message<?> message, MessageBarrier<Map<Object, Message<?>>, Object> barrier) {
              barrier.getMessages().put(message.getHeaders().get Id(), message);
              messageList.add(message);
              }
              ...
              @Override
              protected void processBarrier(MessageBarrier<Map<Object, Message<?>>, Object> barrier) {
              //ArrayList<Message<?>> messageList = new ArrayList<Message<?>>(barrier.getMessages().values ());
              ...
              I think this should work.

              Comment


              • #8
                Tai,

                Thanks for the insight.

                The reason why the problem should be addressed at the message storage per barrier level, etc. is that every message group has its own MessageBarrier. Now, the solution you showed works for this particular example, but in the general context it won't work properly (if you have more than one aggregation group it would mix up messages). So, stay tuned while we will improve in this area, most likely for 1.0.2.

                Now, shifting the discussion to timeout - what exactly is the problem you're referring to? As far as I can see, the timeout does its job: it forces aggregation with whatever came until that particular moment, regardless whether all expected messages have arrived. Or am I wrong on that?

                When I mentioned batch processing, I was referring to something like Spring Batch (you may want to take a look at that project too), which is more suitable to process a single data set of that magnitude.

                Comment


                • #9
                  Hi Marius,

                  Yes the timeout does its job . Like here:

                  Code:
                  <aggregator id="aggregator" input-channel="logChannel"
                  		ref="logAll" method="log"
                  		timeout="420000"/>
                  On the other hand shouldn't it be able to turn it off (e.g. by omitting the attribute "timeout=..." in the configuration)?

                  Tai

                  Comment


                  • #10
                    Actually, omitting the setting forces the aggregator to time out after 60 seconds (the default). Due to the issues arising from the uncontrolled accumulation of messages over a long time, it is better to have a timeout in place (even if very long), and that is the default.

                    An idea we could consider is to use something like timeout="0" to turn that check off, but in the current implementation there is always a timeout.

                    Marius

                    Comment

                    Working...
                    X