Announcement Announcement Module
Collapse
No announcement yet.
multiple task:scheduled on MessageGroupStoreReaper in 2 aggregator Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • multiple task:scheduled on MessageGroupStoreReaper in 2 aggregator

    Hi Guys,

    i use the SimpleMessageStore and MessageGroupStoreReaper to expire messages in two Aggregator, as reconfigured below:

    Code:
    <!-- the first one -->
    <bean id="message-store" class="org.springframework.integration.store.SimpleMessageStore" />
    
    <task:scheduled-tasks>
    	<task:scheduled fixed-delay="5000" ref="message-group-store-reaper1" method="run" />
    </task:scheduled-tasks>
    
    <bean id="message-group-store-reaper1" class="org.springframework.integration.store.MessageGroupStoreReaper">
    		<property name="messageGroupStore" ref="message-store" />
    		<property name="timeout" value="30000" />
    </bean>
    
    <int:aggregator input-channel="***" release-strategy="***" ref="***"
    		output-channel="***" correlation-strategy="***" message-store="message-store" discard-channel="***" />
    
    
    <!-- the second one -->
    <bean id="message-store2" class="org.springframework.integration.store.SimpleMessageStore" />  
    <task:scheduled-tasks>
             <task:scheduled fixed-delay="7000" ref="message-group-store-reaper2" method="run" />
    </task:scheduled-tasks>
    <bean id="message-group-store-reaper2" class="org.springframework.integration.store.MessageGroupStoreReaper">
    		<property name="messageGroupStore" ref="message-store2" />
    		<property name="timeout" value="90000" />
    </bean>
    
    <int:aggregator output-channel="***" ref="***" correlation-strategy="***"
    	message-store="message-store2" input-channel="***" />
    -----------------------------------------------------------------------------
    my case is that the 1st aggregator aggregates the message and send it to 2nd aggregator, or sends the timeout message to the 2nd aggregator once 1st reaper got timeout.

    the 1st aggregator correlates message by custom attribute of message, and 2nd aggregator correlates message by the default correlationId.

    i send 2 messages every-time, every message is consisted of "request" and "response" message.

    the fist time, i tried the flow, it works well: the 1st aggregator correlates request and response for 2 messages, and then the 2 message is merged in 2nd aggregator by its correlationId and then i test my flow again, then the 2nd message always cannot be processed in first aggregator (but still one message can be processed to the 2nd aggregator, so that caused the passed message is hanging for waiting the non-processed message).

    i don't know where the 2nd message is blocked. it just only goes into the channel ahead of 1st aggregator, i also noticed it is processed by "AbstractCorrelatingMessageHandler Handling message with correlationKey" in log, which proves the 2nd message was passed in 1st aggregator, but somehow it didn't go on with the flow.

    as we know, once scheduled task is started, this kind of log will be printed out as below:
    Code:
    MessageGroupStoreReaper Expiring all messages older than timeout=30000 from message group store ...
    MessageGroupStoreReaper Expiring all messages older than timeout=90000 from message group store ...
    ------------------------------------------------------------------------------------------------------------
    the strange thing is, after i tried second time, then the task: "3000" is gone. it turns out only one task is running.


    does anybody encounter this problem before or you ever used multiple task:scheduled on MessageGroupStoreReaper in 2 aggregator?

    Any suggestion for me?

    Thanks!
    Last edited by hauer; Jan 3rd, 2013, 08:55 PM. Reason: format the config

  • #2
    Please use [ code ] ... [ /code ] tags (no spaces inside brackets) when posting code and config.

    Sounds like some kind of locking issue. I am guessing the reaper is waiting for a lock that some other thread is holding. I think we need to understand what the threads are doing. Take a thread dump (using jstack or VisualVM); if you can't figure it out attach it here.

    Comment


    • #3
      Gary, thanks for your suggestions. i'm gonna use jconsole to track the threads.
      Originally posted by Gary Russell View Post
      Please use [ code ] ... [ /code ] tags (no spaces inside brackets) when posting code and config.

      Sounds like some kind of locking issue. I am guessing the reaper is waiting for a lock that some other thread is holding. I think we need to understand what the threads are doing. Take a thread dump (using jstack or VisualVM); if you can't figure it out attach it here.

      Comment

      Working...
      X