Announcement Announcement Module
Collapse
No announcement yet.
AbstractDirectorySource.onFailure() never called? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • AbstractDirectorySource.onFailure() never called?

    Hi,

    I am trying to use AbstractDirectorySource.onSend() and onFailure() to move handled files to a success or a failure-directory. This works fine with onSend(). However if something goes wrong, onFailure() isn't called and therefore the file remains where it is and is not moved to my failure directory.

    I tried to throw some Exception (e.g. MessageException) from somewhere in the messaging process, in this case in a router. This doesn't result in a call to AbstractDirectorySource.onFailure(). Is this behaviour intentioned or is it a bug? Should I implement the functionality somewhere else?

    I see that AbstractFileMessageCreator is intended to delete the files, but I can't know if the processing will be successfull while creating the message.

    Best regards,
    Maarten Donders
    Last edited by mdond; Jul 23rd, 2008, 11:07 AM.

  • #2
    I remember (vaguely) I logged a bug for this where the wrong exception type was thrown. Which version are you using?

    Can you post some code (preferably a TestCase) that shows the problem? I can't seem to find the reference I was looking for.
    Last edited by iwein; Jul 23rd, 2008, 04:38 PM. Reason: additional question

    Comment


    • #3
      I am using 1.0M5.

      The cause of the problem seems to be SourceEndpoint.poll(). It should be sufficient to catch the Exception, call onFailure() and throw the Exception again. See complete explanation below.

      I took a look into the test source code delivered with spring integration for some inspiration how to set up a TestCase. Well, it seems something went wrong with the jar. It is nearly empty, so that didn't help much. Probably it would be enough to set up a test for SourceEndpoint.poll() with Mocks for MessageSource, OutputChannel and MessageDeliveryAware. The OutputChannel-Mock should throw an Exception and the test should fail, if MessageDeliveryAware.onFailure() isn't called.

      Here are some incomplete code-snippets, to show the cause of the problem:

      I implemented a FileSource with overriding onSend and onFailure:

      public class MyFileSource extends org.springframework.integration.adapter.file.FileS ource {
      @Override
      public void onSend(Message<?> message) {
      super.onSend(message);
      moveFile(message, successDirectory);
      }

      @Override
      public void onFailure(MessagingException exception) {
      super.onFailure(exception);
      moveFile(exception.getFailedMessage(), failureDirectory);
      }
      }


      I configure this FileSource programmatically, including a TransactionInterceptor:

      MyFileSource fileSource = new MyFileSource(path, messageCreator);
      SourceEndpoint fileSourceEndpoint = new SourceEndpoint(fileSource);
      fileSourceEndpoint.setName(fileSourceId);
      fileSourceEndpoint.setOutputChannelName("fileChann el");
      Schedule schedule = new PollingSchedule(15000);
      fileSourceEndpoint.setSchedule(schedule);
      TransactionInterceptor transactionInterceptor = new TransactionInterceptor(transactionManager);
      transactionInterceptor.afterPropertiesSet();
      fileSourceEndpoint.addInterceptor(transactionInter ceptor);
      messageBus.registerEndpoint(fileSourceEndpoint);

      I have a router that throws an error, when a plausibility check fails:

      @MessageEndpoint(input = "fileChannel")
      public class MyMessageRouter {
      @Router
      public String route(Message<?> message) {
      if (isNotPlausible(message))
      {
      throw new MessagingException(message, "Not plausible");
      }
      return getChannelName(message);
      }
      }

      I hoped, that in case of above exception, MyFileSource.onFailure() would have been called. But onFailure() only is called when in org.springframework.integration.endpoint.SourceEnd point.poll() getOutputChannel().send(message, this.sendTimeout) delivers false:

      In org.springframework.integration.endpoint.SourceEnd point:

      public boolean poll() {
      ...
      boolean sent = this.getOutputChannel().send(message, this.sendTimeout);
      if (this.source instanceof MessageDeliveryAware) {
      if (sent) {
      ((MessageDeliveryAware) this.source).onSend(message);
      }
      else {
      ((MessageDeliveryAware) this.source).onFailure(new MessageDeliveryException(message, "failed to send message"));
      }
      }
      return sent;
      }

      But this is not the case, as org.springframework.integration.endpoint.HandlerEn dpoint has following send()-Implementation:

      in org.springframework.integration.endpoint.HandlerEn dpoint:

      public boolean send(Message<?> message) {
      Message<?> replyMessage = this.handler.handle(message);
      if (replyMessage != null) {
      if (replyMessage.getHeader().getCorrelationId() == null) {
      replyMessage.getHeader().setCorrelationId(message. getId());
      }
      this.replyHandler.handle(replyMessage, message.getHeader());
      }
      return true;
      }

      This is always true or an exception is thrown. Therefore onFailure() is never called.
      Of course it is right to propagate the exeception through the call stack, otherwise the TransactionInterceptor wouldn't rollback the transaction!

      So I suggest to catch the Exception in SourceEndpoint.poll(), call onFailure() and throw the Exception again.

      Regards,
      Maarten
      Last edited by mdond; Jul 24th, 2008, 05:10 AM.

      Comment


      • #4
        The following change in org.springframework.integration.endpoint.HandlerEn dpoint works for me:

        Code:
        	public boolean poll() {
        		Message<?> message = (this.source instanceof BlockingSource && this.receiveTimeout >= 0) ? ((BlockingSource<?>) this.source)
        				.receive(this.receiveTimeout)
        				: this.source.receive();
        		if (message == null) {
        			return false;
        		}
        		boolean sent = false;
        		try {
        			sent = this.getOutputChannel().send(message, this.sendTimeout);
        		} catch (RuntimeException error) {
        			if (this.source instanceof MessageDeliveryAware) {
        				((MessageDeliveryAware) this.source)
        						.onFailure(new MessageDeliveryException(message, error
        								.getMessage()));
        			}
        			throw error;
        		}
        		if (this.source instanceof MessageDeliveryAware) {
        			if (sent) {
        				((MessageDeliveryAware) this.source).onSend(message);
        			} else {
        				((MessageDeliveryAware) this.source)
        						.onFailure(new MessageDeliveryException(message,
        								"failed to send message"));
        			}
        		}
        		return sent;
        	}
        Regards,
        Maarten

        Comment


        • #5
          Maarten,

          Could you please add an issue in JIRA for this?

          Thanks,
          Mark

          Comment

          Working...
          X