Announcement Announcement Module
Collapse
No announcement yet.
Using FTP Inbound Channel Adapter Programmatically Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Using FTP Inbound Channel Adapter Programmatically

    Good evening.
    We have a requirement to use ftp inbound channel adapter programmatically.
    In other words it is required to read remote directories from database and set up and run a separate ftp inbound channel adaper for each remote directory.

    First I tried decaratively that worked fine (which is as follows).

    ftp.properties:
    ---------------
    host=127.0.0.1
    username=admin
    password=password
    ===============================
    ftp-inbound-context.xml:
    -----------------------
    <contextroperty-placeholder location="ftp.properties"/>
    <context:component-scan base-package="com.test.si.ftp"/>

    <int-ftp:inbound-channel-adapter id="ftpInbound"
    channel="ftpChannel"
    session-factory="ftpClientFactory"
    filename-regex=".*\.txt$"
    auto-create-local-directory="true"
    delete-remote-files="false"
    remote-directory="/"
    local-directory="/var/output/">
    <intoller fixed-rate="1000"/>
    </int-ftp:inbound-channel-adapter>

    <int:channel id="ftpChannel">
    <int:queue/>
    </int:channel>
    ====================================
    FtpConfiguration.java:
    ----------------------
    @Configuration
    public class FtpConfiguration {

    @Value("${host}")
    private String host;

    @Value("${username}")
    private String username;

    @Value("${password}")
    private String password;

    @Bean
    public DefaultFtpSessionFactory ftpClientFactory() {
    DefaultFtpSessionFactory ftpSessionFactory =
    new DefaultFtpSessionFactory();
    ftpSessionFactory.setHost(host);
    ftpSessionFactory.setPort(2121);
    ftpSessionFactory.setUsername(username);
    ftpSessionFactory.setPassword(password);
    ftpSessionFactory.setClientMode(
    FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
    return ftpSessionFactory;
    }
    }
    ==================================================
    Test class:
    -----------
    ApplicationContext context =
    new ClassPathXmlApplicationContext("ftp-inbound-context.xml");
    PollableChannel ftpChannel = context.getBean("ftpChannel", PollableChannel.class);

    Message<?> message = ftpChannel.receive();
    System.out.println("message: " + message);
    ================================================== ===

    The above said ftp inbound channel adapter usage is programmatically done as follows (which didn't work):

    FTPService.java:
    ----------------
    public class FTPService implements Runnable {

    @Override
    public void run() {
    while(true) {
    String host = "127.0.0.1";
    String username = "admin";
    String password = "password";

    DefaultFtpSessionFactory ftpSessionFactory = new DefaultFtpSessionFactory();
    ftpSessionFactory.setHost(host);
    ftpSessionFactory.setPort(2121);
    ftpSessionFactory.setUsername(username);
    ftpSessionFactory.setPassword(password);
    ftpSessionFactory.setClientMode(FTPClient.PASSIVE_ LOCAL_DATA_CONNECTION_MODE);

    FtpInboundFileSynchronizer ftpInboundFileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory);
    ftpInboundFileSynchronizer.setDeleteRemoteFiles(fa lse);
    Pattern pattern = Pattern.compile(".*\\.txt$");
    FtpRegexPatternFileListFilter ftpRegexPatternFileListFilter = new FtpRegexPatternFileListFilter(pattern);
    ftpInboundFileSynchronizer.setFilter(ftpRegexPatte rnFileListFilter);
    ftpInboundFileSynchronizer.setRemoteDirectory("/");

    FtpInboundFileSynchronizingMessageSource ftpInboundFileSynchronizingMessageSource = new FtpInboundFileSynchronizingMessageSource(ftpInboun dFileSynchronizer);
    ftpInboundFileSynchronizingMessageSource.setAutoCr eateLocalDirectory(true);
    ftpInboundFileSynchronizingMessageSource.setAutoSt artup(true);
    ftpInboundFileSynchronizingMessageSource.setLocalD irectory(new File("/var/output"));
    ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
    ftpInboundFileSynchronizingMessageSource.setTaskSc heduler(threadPoolTaskScheduler);
    PollableChannel ftpChannel = new QueueChannel(100);
    ftpInboundFileSynchronizingMessageSource.setOutput Channel(ftpChannel);

    MessageHandler messageHandler = new DelayHandler(1000L);
    ConsumerEndpointFactoryBean factoryBean = new ConsumerEndpointFactoryBean();
    factoryBean.setHandler(messageHandler);
    factoryBean.setInputChannel(ftpChannel);
    factoryBean.setInputChannelName("ftpChannel");
    factoryBean.start();
    Message<?> message = ftpChannel.receive();
    System.out.println("message: " + message);
    }
    }

    public static void main(String[] args) {
    Thread t = new Thread(new FTPService());
    t.start();
    }
    }
    =============================
    Could you please let me know what I am missing while running programmatically.
    I appreciate and thank you for your feedback.

    Thanks
    Venkat

  • #2
    I am not sure I understand what other benefits you get from doing it programmatically. I mean we can show you what needs to be done but it would be nice to see if you are getting something extra with it. Am I missing something?

    Comment


    • #3
      Good morning Oleg.
      One of the requirements is to read remote directories from database and create ftp inbound channel adapter for each remote directory. If it is done declaratively then I have to set up ftp inbound channel adapter for each remote directory manually. I know it is much more easier to do delcaratively because it requires least effort. Please let me know if your thoughts if I could fulfill this requirement without doing programmatically. I appreciate and thank you for your feedback.

      Thanks
      Venkat

      Comment


      • #4
        The dynamic-ftp 'advanced' sample shows how to create a flow-snippet context programmatically, and pass variables into placeholders using the Spring 3.1 environment abstraction.

        The sample is for the outbound side; we simply resolve to a channel to send a message into the flow snippet.

        On the inbound side, one technique would be to make the flow snippet containing the adapter a child context of the main context; that way, the various inbound channel adapters can set the channel to a common channel in the parent context.

        Hope that helps.

        Comment


        • #5
          Good morning Gary.
          I will check out dynamic-ftp sample. Thanks for your help.

          Venkat

          Comment


          • #6
            If you get something working and would like to contribute it back to the samples; let us know.

            Comment


            • #7
              Gary before even I get started on dynamic-ftp sample I had to work on figuring out on the following problem.
              Assuming that I run ftp-inbound-channel adapter declaratively (for reference I am copying it here again):
              -------------------------------------------------
              <int-ftp:inbound-channel-adapter id="ftpInbound"
              channel="ftpChannel"
              session-factory="ftpClientFactory"
              filename-regex=".*\.txt$"
              auto-create-local-directory="true"
              delete-remote-files="false"
              remote-directory="/"
              local-directory="/var/output/">
              <intoller fixed-rate="1000"/>
              </int-ftp:inbound-channel-adapter>

              <int:channel id="ftpChannel">
              <int:queue/>
              </int:channel>
              -------------------------------------------------
              Assume that this process on NodeA. How do I make this process High Available.
              If I run this process on two Nodes, NodeA and NodeB, then both processes will try to poll for a file and try to download to respective local node directories. Could you please let me know your thoughts on how to make this process Highly Available?.

              Thanks
              Venkat

              Comment


              • #8
                Please use [ code ] ... [ /code ] tags (no spaces in brackets) around code and config.

                This demonstration https://github.com/garyrussell/sprin...ration-ha-demo (from SpringOne last year) shows a technique of how to have multiple instances of an application up and running, but only one at a time consuming input data. It uses a cluster controller, and hearbeats, to keep track that the master instance is running ok; if one of the other instances detects the master is off line, one of them is elected the new master. The inbound adapters are stopped/started as necessary by the cluster controller using a <control-bus/>. There's a link to the SpringOne presentation in the github readme.

                Active/Passive just needs the cluster controller; Active/Active needs the master to distribute the work to other instances, via an AMQP or JMS queue or similar.

                The global cluster state needs to be maintained somewhere - the demo uses either a database table row, or redis.

                Hope that helps.

                Comment


                • #9
                  Gary thanks for the info.
                  I was going the demo project to understand spring-integration-cluster. I just wanted to find what value should I provide for key = integration.cluster.inbound.adapter in test-data-source-context.xml if I wanted to test against ftp inbound channel adapter. For amqp, redis is used is there an option to make use of mysql.

                  Thanks
                  Venkat

                  Comment


                  • #10
                    That's the id of the inbound-adapter you need to control (e.g. for <ftp:inbound-adapter id="myAdapter" .../> you would use 'myAdapter').

                    With the demo you have amqp and redis or jms and jdbc.

                    If you don't need active/active, then just use the jms/jdbc option and remove the jms part.

                    Comment

                    Working...
                    X