Announcement Announcement Module
Collapse
No announcement yet.
Using Aggregator with Completion Strategy Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Using Aggregator with Completion Strategy

    I am trying to use Aggregator annotation with the Completion strategy . The documentation on these two annotations is rather thin.

    Here is what I am trying to do. I retrieve a list of objects from a repository and I try to process these objects simulataneously. Processing an object would entail some calculations and then generating an xml snippet describing the object. I use the splitter to first split the list of objects into one each to put them on a queue channel, then I have a Message End point which reads off this queue channel and then invokes a Handler which process this object . The End point class has concurrency annotation which enables multiple threads process objects off the queue.

    The problem comes on trying to aggregate the responses from the handlers. Each of the above handler , after finishing the task publishes to an output queue (as defined in the MEP). I would like add another message handler that subscribes to the above output queue aggregates alll the messages and republishes them back to the final output adapter. The aggregator should be able to use a completion strategy to figure out if all the messages ( that were expected) have been recieved and be able to publish only after the strategy returns true.

    I guess this should be possible with Spring M4 , not sure how exactly to do it. Any directions will be of great help.

    thanks
    -Satish

  • #2
    In the simplest case (where you would be dealing with messages only) you wouldn't have to specify a completion strategy. Spring Integration automatically tags the messages that are produced by the splitter so that they can be aggregated.

    A sample that works for me:
    Code:
    		MessageChannel inputChannel = channelRegistry.lookupChannel("input");
    		MessageChannel outputChannel = channelRegistry
    				.lookupChannel("aggregated");
    		assertNotNull(inputChannel);
    		assertNotNull(outputChannel);
    		inputChannel.send(new GenericMessage<String>("aap noot mies"));
    		assertEquals("aap noot mies ", outputChannel.receive().getPayload());
    Code:
    	<si:message-bus auto-create-channels="true" />
    	<si:annotation-driven/>
    	<si:channel id="aggregated"/>
    	<bean id="aggregator" class="org.springframework.integration.config.namespaceTests.TestAggregator"/>
    	<bean id="splitter" class="org.springframework.integration.config.namespaceTests.TestSplitter"/>
    Code:
    @MessageEndpoint(input = "input", output="split")
    public class TestSplitter {
    	@Splitter
    	public List<Message<String>> name(Message<String> m) {
    		String[] split = m.getPayload().split(" ");
    		ArrayList<Message<String>> list = new ArrayList<Message<String>>();
    		for (String string : split) {
    			list.add(new GenericMessage<String>(string));
    		}
    		return list;
    	}
    }
    Code:
    @MessageEndpoint(input = "split")
    public class TestAggregator {
    	@Aggregator(defaultReplyChannel = "aggregated")
    	public Message<String> name(List<Message<String>> messages) {
    		StringBuffer buf = new StringBuffer();
    		for (Message<String> message : messages) {
    			buf.append(message.getPayload()+" ");
    		}
    		return new GenericMessage<String>(buf.toString());
    	}
    }

    Comment


    • #3
      Thanks Iwen.

      I figured it out after posting the question ( after some more investigation). Glad to have some one reconfirm the findings.

      thanks
      -Satish

      Comment

      Working...
      X