Announcement Announcement Module
Collapse
No announcement yet.
Aggregator Question Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Aggregator Question

    I need to get 500 messages at a time off a message queue and then hand them off to a service activator to send them all as part of an HTTP post to an external system.

    I am trying to accomplish this with an aggregator and while my solution seems to work, it does not seem ideal. The only way I have been able to get this do what I need to create my own impl of CorrelationStrategy and Completion strategy. Is this a better/easier way to accomplish this?

    Cheers.

    Jay

    Code:
    <si:aggregator input-channel="channel1" completion-strategy="messageCountCompletionStrategy"
    	send-partial-result-on-timeout="true" ref="messageCountAggregator" method="aggregate"  tracked-correlation-id-capacity="10"
    	correlation-strategy="correlationStrategy" output-channel="channel2" timeout="10000"/>
    
    <!-- just for testing -->
    <si:service-activator input-channel="channel2" ref="fileWriteHandler" method="handleMessage"/>
    
    <bean id="correlationStrategy" class="com.mycompany.MessageCountCorrelationStrategy"/>
    
    <bean id="messageCountCompletionStrategy" class="com.mycompany.MessageCountCompletionStrategy">
    	<property name="maxMessageCount" value="10"/>
    </bean>
    
    <bean id="messageCountAggregator" class="com.mycompany.TrackCountAggregator" />
    Code:
    public class MessageCountCorrelationStrategy implements CorrelationStrategy {
    
    	private short counter = 0;
    	private short maxCount = 10;
    	private UUID correlationKey = UUID.randomUUID();
    	private Logger logger = LogUtils.getLogger(MessageCountCorrelationStrategy.class);
    	
    	public Object getCorrelationKey(Message<?> m) {
    
    		if(++counter > maxCount) {
    			counter = 0;
    			correlationKey = UUID.randomUUID();
    		}
    		return correlationKey.toString();
    	}
    
    }
    Code:
    public class MessageCountCompletionStrategy implements CompletionStrategy {
    
    	private int maxMessageCount = 10;
    	
    	public boolean isComplete(List<Message<?>> messages) {
    		if (CollectionUtils.isEmpty(messages)) {
    			return false;
    		}
    		return messages.size() >= maxMessageCount;
    
    	}
    
    	public void setMaxMessageCount(int maxMessageCount) {
    		this.maxMessageCount = maxMessageCount;
    	}
    
    }

  • #2
    You should make your strategies thread safe (use Atomic* classes for mutable counters for example).

    You could rely on the default strategies and set the correllation id and sequence* headers on the incoming messages. Also I think you don't need to keep track of the count in both correllation and completion strategy. Basically you want to correllate any messages that come in and complete when you have a certain number right?

    Comment


    • #3
      Thanks for the response Iwein. Yes, my use case is to correlate X number of messages and complete when X is reach or when Y timeout occurs.

      I looked at the default impl correlation and completion strategies. Since my messge are being sourced from a message queue I didnt see a good means to have to correlationId and seuqence set. Is this something that I could add with a transformer? HeaderEnricher?

      What would happen if Y timeout occurs before X number of message were correlated? It looks like the aggreator will send partial results (sendPartialResultsOnTimeout), but what happens to rest of correlated messages after parital is sent?

      Thanks again. Cheers.

      Comment


      • #4
        How about you set the correllationId to 1 on all messages and have a CompletionStrategy that ignores the sequence and just counts. That way you would aggregate X unless timout Y happens first. Just put the header-enricher in front of the aggregator in a chain.

        Please post back your code when you have it working, I haven't tried it myself yet

        Comment


        • #5
          This was my original idea, but it appears messages end up getting discarded because the correlationId has already been processed. It seems to keep track of a number correlationIds based on tracked-correlation-id-capacity attribute on aggregator config but this value cannot be set to zero. So after the first group is processed it always results in the rest of the messages are sent to discard channel.

          This is why I ended up keeping count in correlation strategy and completion strategy.

          2009-05-13 12:37:39,535 DEBUG [main] (AbstractMessageBarrierHandler.java:245) - Handling of Message group with correlationKey '1' has already completed or timed out

          Comment


          • #6
            Hmm, that's not very useful it seems. If you agree with me that that shouldn't have happened you can create a bug report for it here: http://jira.springframework.org/browse/INT

            Comment


            • #7
              It appears that you should link that issue to INT-604 too.

              Comment


              • #8
                Thanks again Iwein. Issue created http://jira.springframework.org/browse/INT-653

                Comment

                Working...
                X