Announcement Announcement Module
Collapse
No announcement yet.
Channel Interceptor with a Filter Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Channel Interceptor with a Filter

    Hi Guys,

    Suppose I have 3 different channels, which come from 3 different sources: ChannelA, ChannelB and ChannelC, and I want each channel to go to some endpoint, say a ServiceActivator:

    Code:
    <integration:channel id="ChannelA" />
    <integration:service-activator 
                  input-channel="ChannelA" 
                  method="m"
                  ref="bean" />
    
    <integration:channel id="ChannelB" />
    <integration:service-activator 
                  input-channel="ChannelB" 
                  method="m2"
                  ref="bean" />
    
    <integration:channel id="ChannelC" />
    <integration:service-activator 
                  input-channel="ChannelC" 
                  method="m3"
                  ref="bean" />
    Now, I'd like to filter each Channel using the same filter method - I could do this like below, where I have 3 new channels, and 3 filters, doing the same thing:

    Code:
    <integration:filter
                  input-channel="ChannelA"
                  method="validate"
                  ref="validator"
                  output-channel="validChannelA" />
    
    <integration:filter
                  input-channel="ChannelB"
                  method="validate"
                  ref="validator"
                  output-channel="validChannelB" />
    
    <integration:filter
                  input-channel="ChannelC"
                  method="validate"
                  ref="validator"
                  output-channel="validChannelC" />
    Which seems a bit bloated.

    What I'd like to do instead of the above, is something like this, which I don't think is currently possible:

    Code:
    <integration:channel-interceptor pattern="Channel*">
    <integration:filter
                  method="validate"
                  ref="validator" />
    </integration:channel-interceptor>
    Is there a way to create that type of flow? Ultimately, I'd like to be able to specify whether or not a channel should be intercepted and filtered via either a channel name convention, or by supplying a list of filtered channels (comma separated in the pattern).

    Any suggestions will be much appreciated,
    Thanks,
    Pete

  • #2
    Hi!

    There are two ways to achieve your wishes:
    HTML Code:
    <channel-interceptor pattern="Channel*">
     <beans:bean id="interceptor" class="org.springframework.integration.channel.interceptor.MessageSelectingInterceptor">
    		<beans:constructor-arg>
    			<beans:bean class="org.springframework.integration.filter.MethodInvokingSelector"
    						c:object-ref="validator" c:method="validate"/>
    		</beans:constructor-arg>
    	</beans:bean>
    </channel-interceptor>
    
    OR
    
    <chain input-channel="ChannelA">
       <filter method="validate" ref="validator"/>
       <service-activator method="m" ref="bean" />
    </chain>
    IMO the first one is anti-pattern: it breaks loosely-coupled principle.

    Take care,
    Artem

    Comment


    • #3
      Hi Artem,

      Thanks for the very quick reply!

      For your second option (below), would that still require a chain for each Channel?

      Code:
      <chain input-channel="ChannelA">
         <filter method="validate" ref="validator"/>
         <service-activator method="m" ref="bean" />
      </chain>
      
      <chain input-channel="ChannelB">
         <filter method="validate" ref="validator"/>
         <service-activator method="m2" ref="bean" />
      </chain>
      
      <chain input-channel="ChannelC">
         <filter method="validate" ref="validator"/>
         <service-activator method="m3" ref="bean" />
      </chain>
      While that's much cleaner than my original example, it doesn't avoid the need to have one chain per work-flow - or am I missing something here?

      Option one seems to be the closest to what I'd need - are there any plans to implement a less coupled way of producing the same result? Perhaps adding ChannelIntercepting chains or filters?

      e.g.
      Code:
      <channel-interceptor pattern="Channel*">
         <chain>
             <filter method="validate" ref="validator"/>
             ....
         </chain>
      </channel-interceptor>
      Thanks,
      Pete

      Comment


      • #4
        It looks to me like this flow might logically be represented by a single channel hitting the filter first, followed by a router to the 3 endpoints post-validation. Is there a reason that is not a good fit?

        Comment


        • #5
          Hi Mark,

          There's a good chance that that will be the approach I'll eventually take - but here's why I would prefer a solution such as I described:

          The system I'm working on can be divided into 3 fairly distinct sections - the A, B, C in my example. The difference between my real project and the example is that each of these sections may eventually consist of 10-15+ different channels, each with different end-points.

          There will be several (likely to be 20+) different validation filters, which will only apply to certain channels (i.e. 3 channels from the A section, 2 from B etc.). This makes it difficult to pre-validate the messages, before routing them applicably - although admittedly it could be feasible with either several routers, or some complicated SPEL.

          What I would have liked, however, is to not have to change my workflow in order to perform validation - in other words, I'd like to be able to add/remove the validation performed on a channel at any time via a single config location.

          Your colleague Artem's first suggestion could work - but I believe there's less control over what happens to these messages, unless I subclass the MethodInvokingSelector; something I'd prefer to avoid.

          Would there be another way to produce the same result? Or is this something that potentially could be added to SI in the future?

          Thanks,
          Pete

          Comment


          • #6
            Can you describe what you mean by "less control over what happens to these messages, unless I subclass..."?

            Comment


            • #7
              Certainly - I used the suggested config:

              Code:
              <integration:channel-interceptor pattern="Channel*">
                         <bean id="interceptor"  class="org.springframework.integration.channel.interceptor.MessageSelectingInterceptor">
                             <constructor-arg>
                                  <bean class="org.springframework.integration.filter.MethodInvokingSelector">
                                      <constructor-arg index="0" ref="validator" />
                                      <constructor-arg index="1" value="validate" />
                                  </bean>
                              </constructor-arg>
                         </bean>
                     </integration:channel-interceptor>
              in a simple test SI service. The validate method returns true or false based on some condition. Sending a message to any channel "ChannelA" which fails the validation will throw the exception:

              Code:
              org.springframework.integration.MessageDeliveryException: selector '[email protected]bbb' did not accept message
              	at org.springframework.integration.channel.interceptor.MessageSelectingInterceptor.preSend(MessageSelectingInterceptor.java:48) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:227) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:152) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:128) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.core.MessagingTemplate.convertAndSend(MessagingTemplate.java:189) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:183) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:308) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:268) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:259) ~[spring-integration-core-2.1.2.RELEASE.jar:na]
              	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) ~[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
              	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202) ~[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
              	at $Proxy0.initiateRequest(Unknown Source) ~[na:na]
              In some cases, that would be fine, in others, I'd like to have discard-channels, or for the message to drop silently.

              I'm very unfamiliar with the actual SI codebase (your guiding principal is to be completely decoupled, so that's probably a good thing!) so there's probably an existing filter implementation which can do all that I need.

              Thanks,
              Pete
              Last edited by petey22uk; Jan 25th, 2013, 11:17 AM.

              Comment


              • #8
                In some cases, that would be fine, in others, I'd like to have discard-channels, or for the message to drop silently.
                I don't see any problems to implement your own interceptor:
                Code:
                public class MessageFilteringInterceptor extends ChannelInterceptorAdapter {
                
                		private final MessagingTemplate messagingTemplate = new MessagingTemplate();
                
                		private final MessageSelector selector;
                
                		private volatile boolean throwExceptionOnRejection;
                
                		private volatile MessageChannel discardChannel;
                
                		private MessageFilteringInterceptor(MessageSelector selector) {
                			this.selector = selector;
                		}
                
                		@Override
                		public Message<?> preSend(Message<?> message, MessageChannel channel) {
                			if (this.selector.accept(message)) {
                				return message;
                			}
                			if (this.discardChannel != null) {
                				this.messagingTemplate.send(this.discardChannel, message);
                			}
                			if (this.throwExceptionOnRejection) {
                				throw new MessageDeliveryException(message,
                						"selector '" + selector + "' did not accept message");
                			}
                			return null;
                		}
                	}
                WDYT?

                Comment


                • #9
                  Hi!

                  I want to return to this question and make some comments.
                  I take back my words about MessageSelectingInterceptor anti-pattern . It plays a bit diffenrent role with <filter>.
                  Let's imagine we have two massage-flows with <channel> between them. I this case the <filter> is a first component of the second flow. So, what to do with incoming Message is a responsibility of that flow. The first one doesn't know anything about the doom of the Message, especially if the <channel> is a queue.
                  From other side MessageSelectingInterceptor is something like a latch on the <channel> or as said EIP pipe.
                  So, if the message knows a "key" the "latch" is opening and Message goes to another end of the pipe. Otherwise the Message returns to the visitor. I our case to the first message-flow.
                  So, what to do with this Message after catching MessageDeliveryException from MessageSelectingInterceptor is a responsibility of the first flow.
                  Of course, here we need 'try ... catch' if our flow is direct (in the boundaries of one Thread), or 'error-channel' if it is an async one.

                  Anyway, even if it is possible, I don't recommend to make a lot of logic in the ChannelInterceptors, especially if the logic becomes like a 'message-flow'. We name it as 'mix of concerns'.

                  Hope I'm clear,
                  Artem

                  Comment


                  • #10
                    Hi Artem,

                    Your code example above is ideal - it covers what I wanted to achieve in my system.

                    I also agree with your comments; the extension on the MessageFilterInterceptor shouldn't contain any logic as far as I'm concerned - I'd like to keep that new class as simple, and as similar to standard SI in terms of xml as possible.

                    Do you agree though that there is a legitimate use-case for this? It would be fantastic to have full SI functionality within a ChannelInterceptor - i.e. to be able to 'detour' a message through additional filter/router/enrichment logic without effecting the original flow.

                    Thanks,
                    Pete

                    Comment


                    • #11
                      Hi, Pete!

                      Yes, it is legitimate. In most cases all you need and what you want is up to you. You have a powerfull platform in the person of Spring Integration in your hands. And all its abilities and tricks are open for extending and reusing.
                      However, as I said above, we should think more about archtecture and justification of our solutions: is it KISS, DRY, Loosely Coupled, Robust etc?
                      In your case of
                      through additional filter/router/enrichment logic without effecting the original flow.
                      I recommend to introduce some sub-flow who will be based on reply-channel from Message header.

                      Cheers,
                      Artem

                      Comment

                      Working...
                      X