Announcement Announcement Module
Collapse
No announcement yet.
Splitter and DynamicRouter Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Splitter and DynamicRouter

    Hi there,

    I'm attempting to give a try to spring-integration and I'm facing the following problem.

    I use an xpath-splitter to split an XML document. This implementation works well, but then I try to use my homemade RoutingSlipRouter.

    This implementation is based on an object RoutingSlip which wraps a list of steps. The RoutingSplit is put in the MessageHeader.

    (a code Snippet)

    Code:
    public class Step {
    
        private String name;
        private Date completeAt;
        
        ...
    }
    Code:
    public class RoutingSlip {
    
        private List<Step> stepList;
    
        /**
         * Flags the current step as finish and return the next step
         *
         * @return The next step to be processed
         */
        public Step completeCurrentStep() {
        ...
        }
    
        /**
         * Gets the first non completed step or null if no more to be processed 
         * 
         * @return A Step or null if no more to be processed
         */
        public Step getCurrentStep() {
        ...
        }
    The split operation works well, but every part of it shared the very same RoutingSlip instance. So when every task on the first fragment are completed, every other fragments will not be routed the same way, because considered as completed.

    I'm digging the code in order to find a way to clone my RoutingSlip object before the message was dispatch to the router, but I don't see any interceptor while playing with the splitting result.

    Is there any way to do that without having to override the whole XPathMessageSplitter class ?

    Thanks

  • #2
    After re-reading EIP description of the RoutingSlip pattern and the notes of Mark Fisher on Issue INT-267 I can more clearly see how to implement such a RoutingSlip pattern. After a few test, two things seem's mandatory for this :

    * The RoutingSlip Object MUST be Cloneable in order to be able to provide a copy of the Queue to every "thread" resulting of the split operation. This can be done in AbstractMessageSplitter while setting the sequenceNumber for each fragment.
    * The RoutingSlip MUST have an equals method which return true for every RoutingSlip instance created by the clone method so we are able to keep this object in the merge operation of the header during an aggregate operation.

    What I can't really figure out is what the RoutingSlip object really is.

    I attempt to make it a simple POJO returning a new channel name at each call, but then I need to add an IfElse condition after outputchannel/replychannel check when sending the reply (AbstractReplyProducingMessageHandler and CorrelatingMessageHandler). But if other routing patterns have to be implemented I agree this is not a good way to do that.
    In another hand, Mark talk about granted the RoutingSlip concept has a MessageChannel himself. It seem's to be a good approach but I can't figure out at which time when committing the send operation this MessageChannel will substitute himself to the underlying next MessageChannel in the Queue ??
    What seem's to miss here is a MessageChannelResolver interface with a single getChannel() method which was able to retrieve the next channel from the Queue or the ReplyChannel or anything else based on a custom policy.

    Don't you thing that the following, may ease the support of such patterns ?

    Code:
    /**
     * Send the message to the given channel. The channel must be a String or
     * {@link MessageChannel} or {@link MessageChannelResolver} instance, never <code>null</code>.
     */
    private void sendMessage(final Message<?> message, final Object channel) {
        if (channel instanceof MessageChannel) {
            this.messagingTemplate.send((MessageChannel) channel, message);
        }
        else if (channel instanceof MessageChannelResolver) {
            this.messagingTemplate.send((MessageChannelResolver) channel.getChannel(), message);
        }
        else if (channel instanceof String) {
            this.messagingTemplate.send((String) channel, message);
        }
        else {
            throw new MessageDeliveryException(message,
                "a non-null reply channel value of type MessageChannel or String is required");
        }
    }
    Last edited by Proner; Feb 22nd, 2011, 05:03 PM.

    Comment


    • #3
      Hum, in fact I was far away from the answer. I just understand what Mark said, speaking of MessageChannel "proxy". Now my rootingSlip Object is a MessageChannel which delegate to the right sub-messageChannel base on an index maintain within the RootingSlip. By the way, I was able to answer the problem of splitting and aggregation using a simple Dequeue<Integer>. No more need to clone the RoutingSlip object nor change anything in the current spring-integration source.

      So far, so good, I'm pretty happy with my minimal implementation but very impatient to see the official one (talking about retry policy and so on).

      Comment

      Working...
      X