Announcement Announcement Module
Collapse
No announcement yet.
Aggregator + MessageGroupStoreReaper: Exception in expiry callback Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Aggregator + MessageGroupStoreReaper: Exception in expiry callback

    Hi Everyone!

    I have a little problem with my integration flow, using the Aggregator and the MessageGroupStoreReaper.
    My flow, so far, is:inbound-channel-adapter -> transformer -> splitter -> aggregator [+MessageGroupStoreReaper]

    I'm using a correlation-strategy-expression on the aggregator which works just fine.
    The problem is, I can't implement a release-strategy, because I have no information at which point the group is complete. BUT I can savely assume that if nothing has been added to the group for a minute, it can be treated as complete.
    Because of this I set up the MessageGroupStoreReaper like this:

    Code:
        <bean id="messageStore" class="org.springframework.integration.store.SimpleMessageStore" />	
        <bean id="messageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
            <property name="messageGroupStore" ref="messageStore" />
            <property name="timeout" value="60000" />
        </bean>
        <task:scheduler id="reaperScheduler" />	
        <task:scheduled-tasks scheduler="reaperScheduler">
            <task:scheduled ref="messageStoreReaper" method="run" fixed-rate="5000"/>
        </task:scheduled-tasks>
    The aggregator looks like this:
    (...but the release-strategy will never be fulfilled)
    Code:
    <int:aggregator id="statustypeAndInvoiceAggregator"
                        message-store="messageStore"
                        input-channel="ch_all_splitted"
                        output-channel="ch_all_aggregated"
                        discard-channel="nullChannel"
                        release-strategy-expression="#this.size() gt 1000"
                        send-timeout="12000"
                        send-partial-result-on-expiry="true"                    
                        correlation-strategy-expression="myStrategyHere " />
    What I don't understand is the resulting exception (even though the aggregation/timeout works as intended):

    Events + Stacktrace:
    Code:
    Read in the file:
    =================
    2012-01-30 14:13:00,021 INFO  [task-scheduler-4] org.springframework.integration.file.FileReadingMessageSource: Created message: [[Payload=c:\tmp\springint\BC1812006.txt][Headers={timestamp=1327929180021, id=58177c40-d52d-4dca-86bc-d4905df701c6}]]
    
    The messages after splitting:
    =============================
    2012-01-30 14:13:00,146 INFO  [task-scheduler-4] org.springframework.integration.handler.LoggingHandler: [Payload=Item{<contentSnipped/>}][Headers={timestamp=1327929180146, id=ddfd87d0-b694-4886-a041-4ef6a57ed624}]
    2012-01-30 14:13:00,146 INFO  [task-scheduler-4] org.springframework.integration.handler.LoggingHandler: [Payload=Item{<contentSnipped/>}][Headers={timestamp=1327929180146, id=5e67dab8-e591-478e-993c-44d9de9be289}]
    
    Timeout:
    ========
    2012-01-30 14:14:04,865 INFO  [reaperScheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler: Expiring MessageGroup with correlationKey[STATUSITEM_C18_2R85436]
    
    Release of the "partial result" messageGroup
    =============================================
    2012-01-30 14:14:04,865 INFO  [reaperScheduler-1] org.springframework.integration.handler.LoggingHandler: [Payload=[Item{<contentSnipped/>}, Item{<contentSnipped/>}]]
    
    Exception!? Why?
    =================
    2012-01-30 14:14:05,115 ERROR [reaperScheduler-1] org.springframework.integration.store.SimpleMessageStore: Exception in expiry callback
    org.springframework.integration.MessageDeliveryException: Dispatcher has no subscribers.
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:108)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:101)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.sendReplyMessage(AbstractCorrelatingMessageHandler.java:342)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.sendReplies(AbstractCorrelatingMessageHandler.java:335)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:310)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:295)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:278)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:232)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.access$000(AbstractCorrelatingMessageHandler.java:63)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$1.execute(AbstractCorrelatingMessageHandler.java:114)
    	at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:117)
    	at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:87)
    	at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:110)
    	</snip>
    2012-01-30 14:14:05,162 ERROR [reaperScheduler-1] org.springframework.scheduling.support.MethodInvokingRunnable: Invocation of method 'run' on target class [class org.springframework.integration.store.MessageGroupStoreReaper] failed
    org.springframework.integration.MessageDeliveryException: Dispatcher has no subscribers.
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:108)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:101)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.sendReplyMessage(AbstractCorrelatingMessageHandler.java:342)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.sendReplies(AbstractCorrelatingMessageHandler.java:335)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:310)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:295)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:278)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:232)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.access$000(AbstractCorrelatingMessageHandler.java:63)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$1.execute(AbstractCorrelatingMessageHandler.java:114)
    	at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:117)
    	at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:87)
    	at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:110)
    	</snip>
    Can someone please point me into the right direction? Why is this happening?
    Is this the designed behaviour?
    Last edited by joerg_s; Jan 30th, 2012, 09:56 AM.

  • #2
    Hello

    Does your ch_all_aggregated have any subscribers?

    Artem Bilan

    Comment


    • #3
      Hi Artem!

      Thanks for your fast response!

      ...omg, I wasn't able to see the wood for trees.
      It was indeed a missing subscriber.
      After adding a dummy outbound-channel-adapter it works like a charm.
      I'm feeling stupid now, haha.


      [edit] is there a way I can mark this thread as resolved?

      Comment

      Working...
      X