Announcement Announcement Module
Collapse
No announcement yet.
Issues with Aggregator and its Mechanics Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Issues with Aggregator and its Mechanics

    Hi,
    I have a client application which creates messages to be aggregated using spring-integration (Version 2.1.0.RELEASE). A messaging gateway adds in the required header fields (code as seen below) and sends the manually SPLIT messages to a queue channel for aggregation. While unit testing, I noticed, that the first set of split messages get aggregated correctly and the Aggregator creates a single output message and puts it onto the output-channel. If I, however send the same set of three manually SPLIT messages again for aggregation (with changed expiration date / payload), the messages get stuck in the channel for aggregation. Inspecting the message headers on a wiretap and in more detail via the logs, suggests that the newly sent messages use the same ID as the earlier three messages which were sent for aggregation. The only time the whole setup starts working again is if I change the correlationId of the new messages.
    Question is:
    Once the aggregator aggregates the messages and creates a single message and puts it on the output-channel (all channels are QUEUE channels), shouldn't these split messages be flushed out from the aggregation input-channel and cleared for the next sets of messages ?

    Here are snippets of code to demo my issue

    Code:
    <gateway id = "processorGateway" service-interface = "com.xyz.gateway.ProcessorGateway" >
        <method name = "processPacket" request-channel = "aggregatorChannel" />
    </gateway>
    Code:
    <aggregator id = "msgAggregator" input-channel = "aggregatorChannel" 
    	            output-channel = "decryptionChannel" discard-channel = "aggregatorDiscardChannel"
    	            send-partial-result-on-expiry = "false" send-timeout = "600000" 	              
    	            ref = "messageAggregatorBean" method = "aggregate">
    </aggregator>
    <beans:bean id = "messageAggregatorBean" class = "com.xyz.aggregator.PacketAggregator" />
    Here is the Gateway interface definition

    Code:
    public interface ProcessorGateway {
    	
    	// requestChannel configured in XML file. Here only to show equivalent annotation
    	public void processPacket(Object payload, 
    			                   @Header("correlationId") Object correlationId, 
    			                   @Header("sequenceNumber") Integer sequenceNumber,
    			                   @Header("sequenceSize") Integer sequenceSize,
    			                   @Header("expirationDate") Long expiryTime);
    }

    Test code to dump manually split packets onto the aggregator channel


    Code:
    	public void testAggregationOfPackets() {
    		
    	        // Created the messages and simulate a manual split. Packet 1
    		p1 = new Packet();
    		p1.setCorrelationId("98220363711");
    		p1.setSequenceNumber(1);
    		p1.setSequenceSize(3);
    		long currentTimeMillis = System.currentTimeMillis();
    		p1.setExpirationDate(currentTimeMillis + 60000);
    		p1.setPayload("Part 1 of message");
    
    		// Packet 2		
    		p2 = new Packet();
    		p2.setCorrelationId("98220363711");
    		p2.setSequenceNumber(2);
    		p2.setSequenceSize(3);
    		currentTimeMillis = System.currentTimeMillis();
    		p2.setExpirationDate(currentTimeMillis + 600000);
    		p2.setPayload("Part 2 of message");
    		
    		// Packet 3
    		p3 = new Packet();
    		p3.setCorrelationId("98220363711");
    		p3.setSequenceNumber(3);
    		p3.setSequenceSize(3);
    		currentTimeMillis = System.currentTimeMillis();
    		p3.setExpirationDate(currentTimeMillis + 600000);
    		p3.setPayload("Part 3 of message");
    
    		// Order of insertion P3, P1, P2
    		// The messages should show up on the "aggregatorChannel" and output a single message on 
                    // on the "decryptionChannel"
    		processorGateway.processPacket(p3.getPayload(),p3.getCorrelationId(), p3.getSequenceNumber(), 
    				   p3.getSequenceSize(), p3.getExpirationDate());
    		processorGateway.processPacket(p1.getPayload(),p1.getCorrelationId(), p1.getSequenceNumber(), 
    				   p1.getSequenceSize(), p1.getExpirationDate());
    		processorGateway.processPacket(p2.getPayload(),p2.getCorrelationId(), p2.getSequenceNumber(), 
    				   p2.getSequenceSize(), p2.getExpirationDate());
    		
    		Message<?> packet = aggregatorChannelTap.receive(1000);
    		logger.debug("**** First packet on queue has ID " + packet.getHeaders().getId());
    		packet = aggregatorChannelTap.receive(1000);
    		logger.debug("**** Second packet on queue has ID " + packet.getHeaders().getId());		
    		packet = aggregatorChannelTap.receive(1000);
    		logger.debug("**** Third packet on queue has ID " + packet.getHeaders().getId());		
    		
    		// Receive the message on the decryptionChannel. THIS WORKS GREAT
    		packet = decryptionChannelTap.receive(5000);
    		assertNotNull("Expecting a packet on vendor processing channel", packet);
    		logger.info("Aggregated packet on queue has ID " + packet.getHeaders().getId());
                    
                    // Send same messages this time in a different order (P2, P3, P1)
                    processorGateway.processPacket(p2.getPayload(),p2.getCorrelationId(), p2.getSequenceNumber(), 
    				   p2.getSequenceSize(), p2.getExpirationDate());
    
    		processorGateway.processPacket(p3.getPayload(),p3.getCorrelationId(), p3.getSequenceNumber(), 
    				   p3.getSequenceSize(), p3.getExpirationDate());
    
    		processorGateway.processPacket(p1.getPayload(),p1.getCorrelationId(), p1.getSequenceNumber(), 
    				   p1.getSequenceSize(), p1.getExpirationDate());
    
                    Message<?> packet = aggregatorChannelTap.receive(1000);
    		logger.debug("**** Second round - First packet on queue has ID " + packet.getHeaders().getId());
    		packet = aggregatorChannelTap.receive(1000);
    		logger.debug("**** Second round - Second packet on queue has ID " + packet.getHeaders().getId());		
    		packet = aggregatorChannelTap.receive(1000);
    		logger.debug("**** Second round - Third packet on queue has ID " + packet.getHeaders().getId());		
    		
                    // Receive the message on the decryptionChannel. 
                    MESSAGES NEVER SHOW UP HERE UNLESS CORRELATION ID IS CHANGED. 
    		packet = decryptionChannelTap.receive(5000);
    
    }
    The second round of packet sends never shows up on the output-channel after aggregation. They just hang out on the aggregatorChannel waiting for something. I would have thought that once the Aggregator has aggregated messages, the split messages are flushed out from the input-channel. As I mentioned before, the IDs of the messages created remain identical, even if the payload or some of the message header values (like expiration date are changed.). The only time this second round works again is if I change the correlationId of the message headers.

    Please help me understand the mechanics of how the queues get flushed and how it treats already aggregated messages.

  • #2
    This is a great question and in fact goes right at the heart of some fo the improvements we've made to the aggregator for 2.1. And although your issue can be solved by simply adding a simple attribute to the aggregator configuration I might as well give you a little history.
    The behavior that you are observing is indeed correct. One of the main purpose of the aggregator is to not only group related messages together but also to handle late arriving messages. After all aggregator is a stateful component (unlike most EIP components). In other words messages with the same correlation id that arrive *after* a message group identified by such correlation id is released are considered to be late arriving messages and than the question is what to do with those?

    So the behavior you observe is the default behavior showing how the aggregator is dealing with late arriving messages. The difference between aggregator in 2.1 and previous versions of the aggregator is that in 2.1 you can actually manage this behavior with a new boolean attribute expire-groups-upon-completion which by default is set to 'false' (we do believe this is the correct default behavior). This means that the aggregator will maintain a weak reference to the released (expired) message group, thus being able to recognize late arriving messages and discard them. You can actually see it (as an experiment) if you add 'discard-channel' attribute to your aggregator (the aggregator will send all discarded messages one by one to the discard-channel)
    However if you set expire-groups-upon-completion to 'true' aggregator will not maintain any reference to the expired group which means your new batch of messages will look like a new group and the process will repeat itself.

    I hope this clarifies.

    Comment


    • #3
      Thanks for the explanation. What is the best mechanism then, for handling duplicate messages in spring-integration ? Or is this something that the consuming application logic is expected to handle after the messages have been consumed ?

      Also I would have thought that to compare if two messages are duplicate (at least in case of an aggregator input-channel) a comparison of several (if not all) fields in the header as well as the payload would be involved. From the observations in my earlier post, it seems only the correlationId (by default) or the correlation strategy is involved.

      My feeling was that correlation is used only to identify members of a group as against usage to compare messages for duplication.

      Comment


      • #4
        Well, that is correct. By default correlation id and sequence number is used to correlate messages. Aggregator is not in the business to perform any duplication checks; *only* correlation checks, and I actually think that is what you meant. It seems to me you would like to perform custom correlation which is not based on correlation id or may be it is based on correlation id and few more attributes (e.g., headers). If that is the case that all you need to do is implement custom CorrelationStrategy and inject it into the aggregator. You can read more here: http://static.springsource.org/sprin...le/#aggregator

        Comment

        Working...
        X