Announcement Announcement Module
Collapse
No announcement yet.
Is this a use case for an aggregator? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Is this a use case for an aggregator?

    I need to calculate financial quote bars from a quote stream with a bar length provided at run time. Will an Aggregator provide me the ability to calculate bar lengths with a configurable timer length? For example, the incoming streaming quotes are to be aggregated at 1 second intervals into a single, conflated quote which represents the quote bar. If an Aggregator is not a good fit for this use case, what do you recommend?

    Quote Bars:

    The bar contains a OHLC and Volume,
    • O is the open of the first quote
    • C is the price of the last quote
    • H is the highest high of any quote during the bar interval
    • L is the lowest low during the bar interval
    • V is the accumulated volume of all quotes during the bar interval.
    • Change is the difference between the last barís close the current barís close.

    As output, I only need the calculated bar and can throw away all quotes used in the calculations.

    Thanks for your help.

  • #2
    Interesting use case so let's see

    1. ". . .the incoming streaming quotes are to be aggregated at 1 second intervals. . ." - Well, Aggregator will aggregate Messages as they come in based on correlation identifier, so I am not really sure about the 1 sec requirement. What if 3 Messages came in 1 sec? What to do with the other 2?
    2. ". . .into a single, conflated quote. . ." - Yes this is the perfect case for the Aggregator. If you only need a product of calculation from all the aggregated Messages than custom Aggregator will do the trick where you wil receive List of ALL aggregated Messages, perform your calculation and release a single result.

    Is that what you were looking for? Also, please clarify the questions about time-based aggregation.

    Comment


    • #3
      Thanks for the response, Oleg. I appreciate the confirmation on point #2 -- very useful information.

      Regarding the first point, time-based aggregation:

      Let's say the correlation identifier is the quote symbol value, e.g. VMW would be a specific correlation id, residing in the message payload. In this case, we have that same correlation id streaming in real-time to us throughout the day in sub-second intervals. However, our end consumers should only get a single quote update every 1 second. This is necessary because passing the full stream of quotes would be akin to having them drink out of a fire-hose when they can only handle drinking from a garden hose. Ideally, we would want to release and aggregate the current group of quote messages (0 to n) for VMW after 1 second has passed. Having the custom aggregator conflate that second's worth of quotes into a single quote snapshot and pass it on for consumption. We would want to repeat this process throughout the day.

      Is this possible with Spring Integration? If so, how do you suggest we proceed? For instance, is it possible to set an expiration on the message group of 1 second and then release it to the custom aggregator for conflation?

      Thanks again for your help.

      Comment


      • #4
        Well, that is actually easy. All you need is a Time-based ReleaseStrategy. Let me write some code and I'll post it here.
        In fact the more I am thinking the more I am starting to believe that it should come as part of the framework.

        Give me a few hours and I'll post the code, but meanwhile you can raise a JIRA Feature request and link it to this forum

        Comment


        • #5
          Thank you, Oleg. Your effort is greatly appreciated.

          I have raised the following JIRA feature request as suggested,

          INT-2595

          Comment


          • #6
            I'm looking forward to seeing your solution, Oleg.

            In the interim, until I can switch to your solution, I've put together the following to approximate a time-based release strategy. But, it relies purely on the timing of the call to the release strategy relative to the MessageGroup's created timestamp. As such, the interval that elapses could potentially be greater than the configured interval, particularly if messages are trickling in slowly -- this assumes that the release strategy is called on a one-to-one basis with the arriving messages, which may not be the case. Any suggestions for improvement would be appreciated.

            quotes-context.xml
            Code:
            	<int:aggregator input-channel="inboundQuotesChannel"
            		output-channel="outboundQuotesChannel" 
            		ref="quoteAggregator"
            		correlation-strategy="quoteCorrelationStrategy"
            		release-strategy="quoteReleaseStrategy"
            		expire-groups-upon-completion="true" />
            QuoteReleaseStrategy
            Code:
            @Component
            public class QuoteReleaseStrategy implements ReleaseStrategy {
            
            	Long AGGREGATOR_RELEASE_INTERVAL = 1000L; // TODO: externalize constant
            
            	public boolean canRelease(MessageGroup group) {
                            boolean releaseState = false;
            
            		Calendar calendar = Calendar.getInstance();
            		java.util.Date now = calendar.getTime();
            		Long currentTime = now.getTime();
            		Long createdTime = group.getTimestamp();
            		Long releaseTime = createdTime + AGGREGATOR_RELEASE_INTERVAL;
            
            		releaseState = isReleaseReady(releaseTime, currentTime);
            
            		return releaseState;
            	}
            
            	private boolean isReleaseReady(Long releaseTime, Long currentTime) {
            		int comparison = (releaseTime.compareTo(currentTime));
            		return (comparison != 1);
            	}
            }

            Comment


            • #7
              Thanks for raising the JIRA

              So here is the solution which also addresses the exact issue you are describing in your last post. https://github.com/olegz/s12gx.2011/...asedaggregator

              I left some comments in the code.
              In the nutshell the solution uses two types of release strategies (kind of advanced but really cool)

              1. It uses accumulation based release strategy expressed via release-strategy-expression
              Code:
              release-strategy-expression="size() == 10"
              This way the average wil never be calculated based on more than 10 messages

              Now to the time-based part.

              Consistent time-based part release is actually managed with org.springframework.integration.store.MessageGroup StoreReaper which is invoked by the scheduler (every tenth of a second) and checks every MessageGroup if it needs to be expired. Expiration is based on the timeout attribute (set to 2 sec).

              So essentially what happens is that you get a consisten average calculation based on 10 Messages or however many messages were accumulated in 2 sec (whichever comes first) and the result is;
              Code:
              --> Releasing based on accumulation of 10 messages at: Thu May 31 08:40:12 EDT 2012
              LoggingHandler: ## Aggregation Results - Ticker: VMW; Average price: $95.55061685940069
              --> Releasing Message Group based on time expiration at: Thu May 31 08:40:14 EDT 2012
              LoggingHandler: ## Aggregation Results - Ticker: VMW; Average price: $79.67318751249945
              --> Releasing Message Group based on time expiration at: Thu May 31 08:40:17 EDT 2012
              LoggingHandler: ## Aggregation Results - Ticker: VMW; Average price: $83.68345126878751
              --> Releasing based on accumulation of 10 messages at: Thu May 31 08:40:19 EDT 2012
              LoggingHandler: ## Aggregation Results - Ticker: VMW; Average price: $95.47588414267628
              --> Releasing Message Group based on time expiration at: Thu May 31 08:40:22 EDT 2012
              LoggingHandler: ## Aggregation Results - Ticker: VMW; Average price: $83.6791818430677
              --> Releasing Message Group based on time expiration at: Thu May 31 08:40:24 EDT 2012
              LoggingHandler: ## Aggregation Results - Ticker: VMW; Average price: $83.44183241640326
              --> Releasing Message Group based on time expiration at: Thu May 31 08:40:26 EDT 2012
              LoggingHandler: ## Aggregation Results - Ticker: VMW; Average price: $85.09482645259631
              --> Releasing Message Group based on time expiration at: Thu May 31 08:40:28 EDT 2012
              LoggingHandler: ## Aggregation Results - Ticker: VMW; Average price: $76.28797411121631
              --> Releasing Message Group based on time expiration at: Thu May 31 08:40:30 EDT 2012
              LoggingHandler: ## Aggregation Results - Ticker: VMW; Average price: $83.56258975908666
              --> Releasing Message Group based on time expiration at: Thu May 31 08:40:33 EDT 2012
              LoggingHandler: ## Aggregation Results - Ticker: VMW; Average price: $84.86988945691606
              --> Releasing Message Group based on time expiration at: Thu May 31 08:40:35 EDT 2012
              . . .
              The important part is also MarkerMessageProducingCallback which is registered as an additional expiry callback with the MessageGroupStore to manage a marker Message after release. I left a separate set of comments in there with details as to why (which is basically to address the exact issue you are describing in the last post.


              Let me know if you stil have questions

              Comment


              • #8
                Great solution, Oleg! Very insightful. Thanks again for taking the time to work through this.

                Comment


                • #9
                  Hi Oleg,

                  I am implementing the solution you outlined for farmer. I modified your outline in that we are dealing with objects and you are dealing with primitives, which is a given. The run says that there is a null correlation. The modifications of interest are:

                  I am placing the quote into the payload and the symbol into the header and then returning to transform it into the channel:
                  return MessageBuilder.withPayload(quote).setHeader("symbol", quote.getSymbol()).build();


                  Here is my configuration syntax and where the "null correlation" is appearing:
                  <int:logging-channel-adapter id="toQuotesBarChannel"
                  level="WARN"
                  expression="'## Aggregation Results - Ticker: ' + headers.symbol + '; Closing price: $' + payload.getPrice"/>

                  And for the aggregator I changed:
                  correlation-strategy-expression="headers.symbol"

                  I have alternatively tried:
                  headers['symbol']
                  header['symbol']
                  headers.get('symbol')
                  payload.getSymbol
                  payload.symbol

                  all with the same result.


                  Does anything jump out at you? Static typing would be helpful to reduce syntax errors and I am hoping that is all that this is.

                  2012-06-02 17:57:38,063 [pool-1-thread-1] INFO com.emc.consulting.broker.xxxtrader.service.suppor t.BrokerToxxxtraderQuoteTransformer - Quote [quoteid=250, low=34.1, open1=34.5, volume=49050, price=34.3, high=35.2, companyname=VMW, symbol=VMW, change1=0.02]
                  2012-06-02 17:57:38,063 [pool-1-thread-1] ERROR org.springframework.integration.handler.LoggingHan dler - org.springframework.integration.MessageHandlingExc eption: error occurred in message handler [org.springframework.integration.aggregator.Aggrega tingMessageHandler#0]
                  at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:79)
                  at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :114)
                  at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:1 01)
                  at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:61)

                  Thank you.
                  Last edited by owebeewan; Jun 2nd, 2012, 06:05 PM.

                  Comment


                  • #10
                    Why does this work for you but not when I run it with no modifications?

                    Originally posted by owebeewan View Post
                    Does anything jump out at you? Static typing would be helpful to reduce syntax errors and I am hoping that is all that this is.
                    Been at this for more than a few hours. I went back to ground zero and just took your exact code with no modifications whatsoever and ran it. I get the same error. You run successfully but I do not with:
                    Version: 2.9.1.RELEASE
                    Build Id: 201203221000

                    The main driver program is successfully publishing the messages but something downstream does not like it.

                    Is there a secret that I need to know about before it works? I included every spring related jar I could find and threw it in to eliminate the possibility of a missing jar. I found a similar observation at: Aggregator Correlation Strategy Failing

                    Oleg, did you by chance run your [excellently written] sample on a release that maybe the rest of us do not have yet?

                    2012-06-03 00:25:44,325 [main] INFO org.springframework.context.support.ClassPathXmlAp plicationContext - Refreshing org.springframework.context.support.ClassPathXmlAp plicationContext@e4e6cf: startup date [Sun Jun 03 00:25:44 EDT 2012]; root of context hierarchy
                    2012-06-03 00:25:44,404 [main] INFO org.springframework.beans.factory.xml.XmlBeanDefin itionReader - Loading XML bean definitions from class path resource [org/springone/timebasedaggregator/aggregator-config.xml]
                    2012-06-03 00:25:44,684 [main] INFO org.springframework.integration.config.xml.Default ConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
                    2012-06-03 00:25:44,684 [main] INFO org.springframework.integration.config.xml.Default ConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
                    2012-06-03 00:25:44,700 [main] INFO org.springframework.beans.factory.support.DefaultL istableBeanFactory - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultL istableBeanFactory@35ef07: defining beans [org.springframework.integration.internalDefaultCon figuringBeanFactoryPostProcessor,inChannel,org.spr ingframework.integration.aggregator.AggregatingMes sageHandler#0,org.springframework.integration.conf ig.ConsumerEndpointFactoryBean#0,outChannel,org.sp ringframework.integration.handler.LoggingHandler#0 ,outChannel.adapter,customAggregator,messageStore, reaper,org.springframework.scheduling.support.Sche duledMethodRunnable#0,org.springframework.scheduli ng.config.ScheduledTaskRegistrar#0,scheduler,execu tor,nullChannel,errorChannel,_org.springframework. integration.errorLogger,taskScheduler,org.springfr amework.integration.config.IdGeneratorConfigurer#0]; root of factory hierarchy
                    2012-06-03 00:25:44,778 [main] INFO org.springframework.scheduling.concurrent.ThreadPo olTaskExecutor - Initializing ExecutorService
                    2012-06-03 00:25:44,996 [main] INFO org.springframework.scheduling.concurrent.ThreadPo olTaskScheduler - Initializing ExecutorService 'scheduler'
                    2012-06-03 00:25:45,028 [main] INFO org.springframework.scheduling.concurrent.ThreadPo olTaskScheduler - Initializing ExecutorService 'taskScheduler'
                    2012-06-03 00:25:45,028 [main] INFO org.springframework.context.support.DefaultLifecyc leProcessor - Starting beans in phase -2147483648
                    2012-06-03 00:25:45,028 [main] INFO org.springframework.integration.endpoint.EventDriv enConsumer - Adding {aggregator} as a subscriber to the 'inChannel' channel
                    2012-06-03 00:25:45,028 [main] INFO org.springframework.integration.channel.ExecutorCh annel - Channel 'inChannel' has 1 subscriber(s).
                    2012-06-03 00:25:45,028 [main] INFO org.springframework.integration.endpoint.EventDriv enConsumer - started org.springframework.integration.config.ConsumerEnd pointFactoryBean#0
                    2012-06-03 00:25:45,028 [main] INFO org.springframework.integration.endpoint.EventDriv enConsumer - Adding {logging-channel-adapter:outChannel.adapter} as a subscriber to the 'outChannel' channel
                    2012-06-03 00:25:45,028 [main] INFO org.springframework.integration.channel.DirectChan nel - Channel 'outChannel' has 1 subscriber(s).
                    2012-06-03 00:25:45,028 [main] INFO org.springframework.integration.endpoint.EventDriv enConsumer - started outChannel.adapter
                    2012-06-03 00:25:45,028 [main] INFO org.springframework.integration.endpoint.EventDriv enConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogg er} as a subscriber to the 'errorChannel' channel
                    2012-06-03 00:25:45,028 [main] INFO org.springframework.integration.channel.PublishSub scribeChannel - Channel 'errorChannel' has 1 subscriber(s).
                    2012-06-03 00:25:45,028 [main] INFO org.springframework.integration.endpoint.EventDriv enConsumer - started _org.springframework.integration.errorLogger
                    2012-06-03 00:25:45,028 [main] INFO org.springframework.context.support.DefaultLifecyc leProcessor - Starting beans in phase 0
                    2012-06-03 00:25:45,028 [main] INFO org.springframework.integration.store.MessageGroup StoreReaper - started org.springframework.integration.store.MessageGroup StoreReaper@c5cd1a
                    2012-06-03 00:25:45,106 [executor-1] ERROR org.springframework.integration.handler.LoggingHan dler - org.springframework.integration.MessageHandlingExc eption: error occurred in message handler [org.springframework.integration.aggregator.Aggrega tingMessageHandler#0] at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:79)
                    at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :114)
                    at org.springframework.integration.dispatcher.Unicast ingDispatcher.access$000(UnicastingDispatcher.java :51)
                    at org.springframework.integration.dispatcher.Unicast ingDispatcher$1.run(UnicastingDispatcher.java:96)
                    at org.springframework.integration.util.ErrorHandling TaskExecutor$1.run(ErrorHandlingTaskExecutor.java: 52)
                    at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1110)
                    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:603)
                    at java.lang.Thread.run(Thread.java:722)
                    Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing? at org.springframework.util.Assert.state(Assert.java: 384)
                    at org.springframework.integration.aggregator.Abstrac tCorrelatingMessageHandler.handleMessageInternal(A bstractCorrelatingMessageHandler.java:174)
                    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:73)
                    ... 7 more

                    Comment


                    • #11
                      First I am not sure about this: Version: 2.9.1.RELEASE since the lates up until this morning was 2.1.1.RELEASE so I hope its a typo.
                      Second it seems to me something is wrong with your class-path judging by the stacktrace of the latest post. Are you using Maven/Ivy. . .? The way ye example was setup is all you needed after cloning the repo is just run it.

                      Yes in order for the Aggregator to aggregate things it MUST know how to correlate things that needs to be aggregated so correlation is a MUST. In my original example I was correlating based on the payload which was specified via 'correlation-strategy-expressioin= "payload"' attribute. Since you changed the sample and your correlation comes as a headers you should change it to 'correlation-strategy-expressioin= "headers.symbol"'.

                      But I also see you already tried that so let's figure out first why the original is not running.

                      Comment


                      • #12
                        Hi Oleg,

                        By the way, I asked farmer to verify that I am not crazy. farmer gets the exact results as I do. No, the 2.9.1 is accurate. SO that we are not referring to differing contexts, Here is an expanded context through copy/ paste and no typing:

                        SpringSource Tool Suite

                        Version: 2.9.1.RELEASE
                        Build Id: 201203221000

                        Copyright (c) 2007 - 2011 SpringSource, a division of VMware, Inc.
                        All rights reserved. Visit http://springsource.com/products/sts

                        I wonder if STS is not in agreement with what you are using? Yes, all is up-to-date in STS. What specific jars does the aggregator use? I can tell you what versions that mine are.

                        Thank you!

                        Comment


                        • #13
                          I have to look. I've been drinking the aut-dependency management coolaid for so long that I barely know what my class path is at any given time. All I know is that its right bwecouse I rely on POMs created and managed by the frameworks I depend on (e.g., Spring Integration).
                          So the sample we are talking about will work with any Spring Integration starting version 2.1.x

                          Are you using Maven/Ivy or what for dependency management?

                          Comment


                          • #14
                            Hi Oleg and Owebeewan,

                            I just checked my local pom.xml in the s12gx.2011 project from github.

                            Within it, for me at least, the version of Spring Integration was set to 2.1.0.M3,

                            Code:
                            <properties>
                                <spring.integration.version>2.1.0.M3</spring.integration.version>
                                ...
                            When I changed that property to the latest version of Spring Integration,

                            Code:
                            <properties>
                                <spring.integration.version>2.1.1.RELEASE</spring.integration.version>
                                ...
                            It works and I get the following output as Oleg was getting,

                            Code:
                            -> Releasing Message Group based on time expiration at: Mon Jun 04 12:44:00 CDT 2012
                            AbstractCorrelatingMessageHandler: Expiring MessageGroup with correlationKey[VMW]
                            LoggingHandler: ## Aggregation Results - Ticker: VMW; Average price: $81.68756120535292
                            --> Releasing Message Group based on time expiration at: Mon Jun 04 12:44:02 CDT 2012
                            --> Releasing based on accumulation of 10 messages at: Mon Jun 04 12:44:02 CDT 2012
                            LoggingHandler: ## Aggregation Results - Ticker: VMW; Average price: $85.95874956467463
                            --> Releasing based on accumulation of 10 messages at: Mon Jun 04 12:44:04 CDT 2012
                            LoggingHandler: ## Aggregation Results - Ticker: VMW; Average price: $95.67274109941528
                            
                            ...

                            Comment


                            • #15
                              I'll update but you can easily change it to 2.1.2.RELEASE ( the freshest release . . . few hours old )

                              Comment

                              Working...
                              X