Announcement Announcement Module
Collapse
No announcement yet.
ChannelInterceptor: null from preSend causes MessageDeliveryException Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • ChannelInterceptor: null from preSend causes MessageDeliveryException

    In the section on channel interceptors (3.1.3), the reference guide says:
    The methods that return a Message instance can be used for transforming the Message or can return 'null' to prevent further processing (of course, any of the methods can throw a RuntimeException). Also, the preReceive method can return 'false' to prevent the receive operation from proceeding.
    That would imply that I can add an interceptor around a channel and filter the messages that are added, but when I try to filter messages coming in by returning null, MessagingTemplate#doSend throws a MessageDeliveryException.

    org.springframework.integration.MessageDeliveryExc eption: failed to send message to channel '...' within timeout: -1

    Code:
    <int:inbound-channel-adapter expression="'world'" channel="channel1">
    	<int:poller fixed-rate="1000" />
    </int:inbound-channel-adapter>
    
    <int:channel id="channel1">
    	<int:interceptors>
    		<bean class="MyChannelInterceptor" /><!-- preSend returns null -->
    	</int:interceptors>
    </int:channel>
    
    <int:service-activator input-channel="channel1" expression="T(java.lang.System).out.println('hello ' + payload)" />
    I know that a proper filter endpoint would be the conventional pattern to use. The problem (which this example doesn't show) is that I want to keep duplicate messages out of a queue. The best way I can find to do that is to stash messages in a set as they enter the channel, remove them as they leave and cull the ones that are still in the set when a new message arrives. I have the possibility of several thousand idempotent messages coming through due to a cache invalidation. If someone were to trigger that again, I don't want it to stack up; I want anything that hasn't cleared the queue yet to be discarded so I only process it once. I guess I could manage this state elsewhere, but an interceptor seemed like the natural place to do it.

  • #2
    This change was made a long time ago to avoid unexpected silently dropping messages.

    It does seem that the documentation can be improved but, in most cases, the calling code would want to be informed that the message was discarded.

    One solution would be to put an error-channel on your poller and handle the exception silently there.

    However, it's not clear to me why you can't insert your stateful code in a <filter/> between the inbound adapter and your service...

    Code:
    channel1 -> stateful-filter -> channel2 ->service

    Comment


    • #3
      Okay, thanks for the link. I'll try an error channel, and if that fails, I can go the stateful filter route. The only drawback is that I would need something at the end of the flow to reach back and reset the filter state for that id after the message has been processed so it won't be ignored if it comes through again. The nice thing about an interceptor by comparison is I can put all that code in the same place. In most cases, I agree that it's preferable to know when a message is being lost though.

      Comment


      • #4
        If you don't mind exposing your code the messaging infrastructure, you could do this...

        Code:
        @Autowired
        private MessageChannel channel2;
        
        public void deDup(Message<?> message) {
            // preSend logic
            channel2.send(mesage);
            // posSend logic
        }
        And configure this using a service activator...

        Code:
        channel1 -> dedup-service-activator -> channel2 ->service

        Comment


        • #5
          I'm afraid I don't have a poller in my configuration to set error-channel on. I'm using a task:executor, and I don't see any way to configure how thread errors are handled.

          Due to the asynchronous nature of what I'm doing, I don't know that there's a way to wrap a channel other than using an interceptor and achieve the semantics that I'm after.

          I was able to achieve what I needed with a bean installed before the queue as a filter and after the queue as a transformer that removed the message.

          Code:
          <bean id="uniqueMessageFilter" class="...UniqueMessageFilter" />
          <int:filter input-channel="channel1" output-channel="channel2" ref="uniqueMessageFilter" />
          
          <task:executor id="pool" pool-size="2" />
          <int:channel id="channel2">
          	<int:dispatcher task-executor="pool" />
          </int:channel>
          
          <int:chain input-channel="channel2">
          	<int:transformer ref="uniqueMessageFilter" />
          	...
          <int:chain>
          
          --------------------
          
          public class UniqueMessageFilter {
          
          	private Set<Object> messages = Collections.synchronizedSet(new LinkedHashSet<Object>());
          
          	@Filter
          	public boolean filter(Message<?> message) {
          		return messages.add(message.getPayload());
          	}
          
          	@Transformer
          	public Message<?> remove(Message<?> message) {
          		messages.remove(message.getPayload());
          		return message;
          	}
          }
          It feels like there is a mismatch between what a channel interceptor can do and how MessagingTemplate interprets it. The code in MessagingTemplate assumes that false means timeout, but it could be for other reasons, not all of which would require an exception. It's expedient to throw there, but it really defeats the point of the signature of an interceptor. Error channels can only be configured in limited places and there's no general try/catch mechanism, so any use of an interceptor for filtering purposes is very likely to fail.

          Comment


          • #6
            I don't necessarily disagree with you; I am just trying to suggest a clean mechanism, given the current architecture.

            I suggested an error-channel on the poller because that's what your post #1 shows before the intercepted channel.

            The equivalent for an ExecutorChannel is to use an ErrorHandlingTaskExecutor.

            By default, (if your task executor is not already a ErrorHandlingTaskExecutor) it is wrapped in an ErrorHandlingTaskExecutor that uses a MessagePublishingErrorHandler to send the ErrorMessage to the default 'errorChannel'. But, you can wrap the executor yourself and use a MessagePublishingErrorHandler to send it to a channel of your choice (or even to nullChannel).

            Comment


            • #7
              Yeah, I tried to pick a simple example to state the problem with, but what I'm actually doing is an http-based thread feeding the "unique channel" and then splitting out into worker threads. There is an executor, but it's on the wrong end of the channel to trap any exceptions thrown when the calling thread thinks it has failed to send. Maybe breaking up the calling thread would give me a boundary to handle a send failure exception on, but I don't think that's going to end up being simpler than the filter/transform approach, so I'll leave it at that for now. Thanks again.

              Comment

              Working...
              X