Announcement Announcement Module
Collapse
No announcement yet.
inbound-channel-adapter timeout Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • inbound-channel-adapter timeout

    Hello,

    how do you create a timeout for an int-ftp:inbound-channel-adapter?

    For instance, let's say the FTP address used the following adapter is invalid (because someone typed it wrong.) As a result, the adapter will loop infinitely. How do you make it time out, after, say, 10 seconds?

    Many thanks.

    Philroc

    Code:
    <int-ftp:inbound-channel-adapter id="ftpInbound"
    		channel="ftpChannel" 
    		session-factory="ftpClientFactory"
    		filename-regex=".*\.txt$"
    		auto-create-local-directory="true" 
    		delete-remote-files="false"
    		remote-directory="."
    		local-directory="file:output"
    		
    		 >
    
    <int:poller fixed-rate="1000"  />
    		
    	
    	</int-ftp:inbound-channel-adapter>
    Last edited by Gary Russell; May 15th, 2013, 09:13 AM.

  • #2
    By "timeout" I assume you mean stop polling after some number of failures; there's nothing built into the framework to do that automatically; the framework doesn't know that the error is due to mis-configuration, Vs. some transient problem.

    You can put an error-channel on the poller; send the ErrorMessage to some code that tracks the errors (the payload will be the exception). After detecting a specific type of failure (or number of them), send a message to a control bus to stop the adapter; something like.

    Code:
    <int:chain input-channel="error-channel" output-channel="controlBusChannel">
        <int:filter ref="filterBean" method="conditionallystopTheAdapter" />
        <int:transformer value="@ftpInbound.stop()" />
    </int:chain>
    <int:control-bus input-channel="controlBusChannel" />
    Instead of the filter, you might be able to use an <exception-type-router/>

    Comment


    • #3
      Hi,

      here's how I have rewritten my context.xml file further to you response:

      Code:
      ...
      <int-ftp:inbound-channel-adapter id="ftpInbound"
      		channel="ftpChannel" 
      		session-factory="ftpClientFactory"
      		filename-regex=".*\.txt$"
      		auto-create-local-directory="true" 
      		delete-remote-files="false"
      		remote-directory="."
      		local-directory="file:output"
      		 >
      		<int:poller fixed-rate="1000" error-channel="errorChannel" />
      	</int-ftp:inbound-channel-adapter>
      
      	<int:chain input-channel="errorChannel" output-channel="controlBusChannel">
          	<int:filter ref="messageFilter" method="conditionallyStopTheAdapter" />
          	<int:transformer expression="@ftpInbound.stop()" />
      	</int:chain>
      
      	<int:channel id="controlBusChannel"/>
      	
      	<int:control-bus input-channel="controlBusChannel" />
      
      	<int:channel id="ftpChannel">
      		<int:queue/>
      	</int:channel>
      	
      	<int:channel id="errorChannel">
      		<int:queue capacity="25" />
      	</int:channel>
      Here's the message filter:

      Code:
      @Component
      public class MessageFilter {
      	
      	private static final Logger LOGGER = Logger.getLogger(MessageFilter.class);
      
      	@Filter
      	public void conditionallyStopTheAdapter(@Header("ITEM_TYPE") String itemType) {
      		LOGGER.info("In conditionallyStopTheAdapter, the item type is " + itemType);	
      	}
      }
      and here's the FtpInbound class:

      Code:
      public class FtpInbound {
      
      	public static void main(String[] args) {
      		
      		ApplicationContext context =
      			new ClassPathXmlApplicationContext("META-INF/spring/integration/spring-integration-context.xml");
      		
      		PollableChannel ftpChannel = context.getBean("ftpChannel", PollableChannel.class);
      		Message<?> message = ftpChannel.receive();
      		System.out.println("Message = " + message);
      
      		
      	} // end main
      	
      	public static void stop() {
      		System.out.println("Stopping");
      	}
      	
      } // end FtpInbound
      I also have a ControlBean for the Control Bus [but am not sure what to do with it and where to plug it in]:

      Code:
      import org.springframework.jmx.export.annotation.ManagedOperation;
      import org.springframework.stereotype.Component;
      
      @Component
      public class ControlBean {
      	
      	@ManagedOperation
      	public void performOperation() {
      		System.out.println("Running managed operation");
      	}
      
      }
      When I run FtpInbound, I get an "Error creating bean with name "org.springframework.integration.config.ConsumerEn dpointFactoryBean#0: Invocation of Init failed" error message.

      Any help would be much appreciated.

      Philroc
      Last edited by Gary Russell; May 15th, 2013, 12:15 PM.

      Comment


      • #4
        Plesae use [ code ] not [ quote ] (I edited your posts).

        The filter needs to be something like

        Code:
        public boolean filter(Exception e) {
        
            if (... exception is the type I want to stop the adapter ...) {
                return false;
            }
            else {
               return true;
            }
        }

        Comment


        • #5
          Hello,

          here's my workflow so far. The filter returns false when the exception thrown is an UnknownHostException, but the transformer doesn't work at all.

          By the way, how do you debug a transformer in a chain, in Spring Integration?


          Code:
          	<int-ftp:inbound-channel-adapter
          		id="ftpInbound"
          		channel="ftpChannel" 
          		session-factory="ftpClientFactory"
          		filename-regex=".*\.txt$"
          		auto-create-local-directory="true" 
          		delete-remote-files="false"
          		remote-directory="."
          		local-directory="file:output"
          		 >
          		<int:poller id="ftpPoller" fixed-rate="1000" error-channel="errorChannel" /> 
          	</int-ftp:inbound-channel-adapter>
          
          	<bean id="messageFilterBean" class="com.iht.sftp.MessageFilter" />
          	
          
          	<int:chain input-channel="errorChannel" output-channel="controlBusChannel">
              	<int:filter ref="messageFilterBean" method="accept" />  
              	<int:transformer expression="@ftpInbound.stop()" />
          	</int:chain>
          
          	<int:channel id="controlBusChannel">
          		<int:queue/>
          	</int:channel>
          	
          	<int:control-bus input-channel="controlBusChannel" />
          
          	<int:channel id="ftpChannel" >
          		<int:queue/>
          	</int:channel>
          	
          	<int:channel id="defaultChannel">
          		<int:queue capacity="25" />
          	</int:channel>
          Code:
          @Component
          public class MessageFilter {
          ...
          
          @Filter
          public boolean accept(Exception exception) {
          if (exception instanceof org.springframework.integration.MessagingException
            && exception.getCause().getCause() instanceof java.net.UnknownHostException
          ) {
          return false;
          } else {
          return true;
          }
          }
          Code:
          public class FtpInbound {
          
          public static void main(String[] args) {
          ...
          }
          
          public void stop() {
          System.out.println("Stopping !!!");
          }
          Please help.

          Comment


          • #6
            Turn on DEBUG logging; you'll see "... received message: ..." and "... sending reply message: ..." for each endpoint (including those in chains).

            Comment

            Working...
            X