Announcement Announcement Module
Collapse
No announcement yet.
Inbound FTP - Polling sub directories Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Inbound FTP - Polling sub directories

    I am trying to integrate/invoke Batch using Admin & Integration.

    The requirement is: We have a set of Batch jobs (read/write files) running and want to add the ability to read and write files from FTP folder.

    The way I plan to do this is: Create sub folders in the FTP for each of these Batch Jobs. Using FTP Adapter, read the files and put it in local folder.

    I want to have a single FTP Adapter that polls all the sub directories instead of having an adapter for each sub-folder. Is this possible?

  • #2
    One of the core concept behind an adapter/gateway is to bridge a "particular part/segment" of the remote system with messaging system (e.g., SI in this case) and not to be a generic template for doing remote operations. So when I say "particular part/segment" we actually use the term "addressable" adapter/gateway where an adapter may be specific to a particular URL (in cases of HTTP) or directory (for FTP/SFTP) etc. So in other words no, the adapter will only scan a predefined directory.
    How many directories do you plan to have?

    Comment


    • #3
      Also, you can watch this issues that we will be addressing for 2.1 https://jira.springsource.org/browse/INT-1945. Although it talks about Outbound gateway, in reality it will end up to be a major new feature in SI to allow this level of dynamics - possibly a Template approach which is familiar to Spring users

      Comment


      • #4
        Also, you may want to look at
        org.springframework.integration.file.DirectoryScan ner and its subclasses (e.g., RecursiveLeafOnlyDirectoryScanner). For your use case this may be all you need.

        Comment


        • #5
          Thanks for the responses/inputs. Somehow I did not receive the notifications.
          I have another issue to tackle - After posting, I read the document that states that the Resource Adapters continuously poll(is my understanding correct) which we do not want either. Hence, thinking of using a Jms Listener Gateway and based on the message content, would do FTP to specified Sub folder.
          Hope this is fine.

          Comment


          • #6
            Hi guys,

            I have also the need for a recursive FTP poller. I was thinking about Subclassing the FtpInboundFileSynchronizer and Override method synchronizeToLocalDirectory with a recursive synchronization, do you feel it is a good idea ?

            thanks

            Comment


            • #7
              [SOLVED] - Yiiiiiiihaaa !

              Hi guys, here is my solution, just to share, probably not the best, but it seem to work:

              Code:
              package com.xxxxx.yyyyy.modules.ftp;
              
              import java.io.File;
              import java.io.FileOutputStream;
              import java.io.IOException;
              import java.io.InputStream;
              import java.util.Collection;
              
              import org.apache.commons.net.ftp.FTPFile;
              
              import org.springframework.integration.MessagingException;
              import org.springframework.integration.file.remote.session.Session;
              import org.springframework.integration.file.remote.session.SessionFactory;
              import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer;
              import org.springframework.util.Assert;
              import org.springframework.util.ObjectUtils;
              
              /**
               * An implementation of {@link AbstractInboundFileSynchronizer} for FTP.
               * 
               * @author Iwein Fuld
               * @author Josh Long
               * @author Mark Fisher
               * @since 2.0
               */
              public class FtpInboundFileSynchronizer extends AbstractInboundFileSynchronizer<FTPFile> {
              
              	private SessionFactory sessionFactory;
              	private volatile String remoteDirectory;
              	private String remoteFileSeparator;
              	private boolean deleteRemoteFiles;
              	private boolean recursive;
              
              	/**
              	 * Create a synchronizer with the {@link SessionFactory} used to acquire
              	 * {@link Session} instances.
              	 * 
              	 * @param recursive
              	 */
              	public FtpInboundFileSynchronizer(SessionFactory sessionFactory, boolean recursive) {
              		super(sessionFactory);
              		this.sessionFactory = sessionFactory;
              		this.recursive = recursive;
              	}
              
              	@Override
              	public void setRemoteDirectory(String remoteDirectory) {
              		super.setRemoteDirectory(remoteDirectory);
              		this.remoteDirectory = remoteDirectory;
              	}
              
              	@Override
              	public void synchronizeToLocalDirectory(File localDirectory) {
              		synchronizeToLocalDirectory(localDirectory, this.remoteDirectory);
              	}
              
              	private void synchronizeToLocalDirectory(File localDirectory, String rDir) {
              		Session session = null;
              		try {
              			session = this.sessionFactory.getSession();
              			Assert.state(session != null, "failed to acquire a Session");
              			FTPFile[] files = session.<FTPFile> list(rDir);
              
              			if (recursive) {
              
              				logger.info(files.length + " file(s) retrieved.");
              				for (FTPFile file : files) {
              					FTPFile f = (FTPFile) file;
              					if (f.isDirectory()) {
              						logger.info("directory found: " + f.getName());
              						synchronizeToLocalDirectory(localDirectory, rDir + this.remoteFileSeparator + f.getName());
              					}
              				}
              			}
              
              			if (!ObjectUtils.isEmpty(files)) {
              				Collection<FTPFile> filteredFiles = this.filterFiles(files);
              				for (FTPFile file : filteredFiles) {
              					if (file != null) {
              						this.rcopyFileToLocalDirectory(rDir, file, localDirectory, session);
              					}
              				}
              			}
              		} catch (IOException e) {
              			throw new MessagingException("Problem occurred while synchronizing remote to local directory", e);
              		} finally {
              			if (session != null) {
              				try {
              					session.close();
              				} catch (Exception ignored) {
              					if (logger.isDebugEnabled()) {
              						logger.debug("failed to close Session", ignored);
              					}
              				}
              			}
              		}
              	}
              
              	@Override
              	public void setRemoteFileSeparator(String remoteFileSeparator) {
              		super.setRemoteFileSeparator(remoteFileSeparator);
              		Assert.notNull(remoteFileSeparator, "'remoteFileSeparator' must not be null");
              		this.remoteFileSeparator = remoteFileSeparator;
              	}
              
              	@Override
              	public void setDeleteRemoteFiles(boolean deleteRemoteFiles) {
              		super.setDeleteRemoteFiles(deleteRemoteFiles);
              		this.deleteRemoteFiles = deleteRemoteFiles;
              	}
              
              	private void rcopyFileToLocalDirectory(String remoteDirectoryPath, FTPFile remoteFile, File localDirectory, Session session) throws IOException {
              		String remoteFileName = this.getFilename(remoteFile);
              		String remoteFilePath = remoteDirectoryPath + remoteFileSeparator + remoteFileName;
              		if (!this.isFile(remoteFile)) {
              			if (logger.isDebugEnabled()) {
              				logger.debug("cannot copy, not a file: " + remoteFilePath);
              			}
              			return;
              		}
              		File localFile = new File(localDirectory, remoteFileName);
              		if (!localFile.exists()) {
              			String tempFileName = localFile.getAbsolutePath() + getTemporaryFileSuffix();// this.temporaryFileSuffix;
              			File tempFile = new File(tempFileName);
              			InputStream inputStream = null;
              			FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
              			try {
              				session.read(remoteFilePath, fileOutputStream);
              			} catch (Exception e) {
              				if (e instanceof RuntimeException) {
              					throw (RuntimeException) e;
              				} else {
              					throw new MessagingException("Failure occurred while copying from remote to local directory", e);
              				}
              			} finally {
              				try {
              					if (inputStream != null) {
              						inputStream.close();
              					}
              				} catch (Exception ignored1) {
              				}
              				try {
              					fileOutputStream.close();
              				} catch (Exception ignored2) {
              				}
              			}
              			if (tempFile.renameTo(localFile)) {
              				if (this.deleteRemoteFiles) {
              					session.remove(remoteFilePath);
              					if (logger.isDebugEnabled()) {
              						logger.debug("deleted " + remoteFilePath);
              					}
              				}
              			}
              		}
              	}
              
              	@Override
              	protected boolean isFile(FTPFile file) {
              		return file != null && file.isFile();
              	}
              
              	@Override
              	protected String getFilename(FTPFile file) {
              		return (file != null ? file.getName() : null);
              	}
              
              }
              And then i use this class instead of the original one for my FTP adapter. Thanks for your help!

              Comment


              • #8
                Can you just extend from it and override public void synchronizeToLocalDirectory(File localDirectory) method?

                Comment


                • #9
                  I stumbled into the same issue. The files on the sftp server are stored in sub-directories (which group files that belong together).

                  where / how can I configure a custom SftpInboundFileSynchronizer?

                  Comment


                  • #10
                    The namespace doesn't currently support injecting a custom synchronizer; you have to assemble the adapter using <bean/> (or @Bean) definitions.

                    You need a SourcePollingChannelAdapter, that gets a SftpInboundFileSynchronizingMessageSource in its 'source' property; having passed in your custom synchronizer to the SftpInboundFileSynchronizingMessageSource in its <constructor-arg/>.

                    For complete details about how to wire up the message source, see AbstractRemoteFileInboundChannelAdapterParser.
                    Last edited by Gary Russell; Jan 17th, 2013, 11:09 AM.

                    Comment


                    • #11
                      By the way, as Oleg pointed out in post #3 above, you can also use the gateway to fetch files in subdirectories. For example...

                      Code:
                      	<int-ftp:outbound-gateway id="gatewayLS" cache-sessions="false"
                      		session-factory="ftpSessionFactory"
                      		request-channel="inbound"
                      		command="ls"
                      		command-options=""
                      		expression="'a/*/*'"
                      		reply-channel="toSplitter"/>
                      
                      	<int:channel id="toSplitter" />
                      
                      	<int:splitter id="splitter" input-channel="toSplitter" output-channel="toGet"/>
                      
                      	<int-ftp:outbound-gateway id="gatewayGET" cache-sessions="false"
                      		local-directory="localdir"
                      		session-factory="ftpSessionFactory"
                      		request-channel="toGet"
                      		reply-channel="toRemoveChannel"
                      		command="get"
                      		command-options="-P"
                      		expression="payload.filename"/>
                      ...fetches all files in all subdirectories of directory "a".

                      Comment


                      • #12
                        Thanks for the replies!

                        I found a similar example in the archive: http://forum.springsource.org/archiv.../t-127972.html
                        But I would prefer not having to go there

                        I kinda ignored the option using the outbound-gateway. I thought one could not use this to retrieve files. This does preserve the directory structure? There are some issues in Jira that indicate the folder structure is lost.

                        I'll give the outbound-gateway a try :-)

                        Comment


                        • #13
                          No, it does, currently, flatten the directory structure,

                          Comment


                          • #14
                            Originally posted by Gary Russell View Post
                            The namespace doesn't currently support injecting a custom synchronizer; you have to assemble the adapter using <bean/> (or @Bean) definitions.

                            You need a SourcePollingChannelAdapter, that gets a SftpInboundFileSynchronizingMessageSource in its 'source' property; having passed in your custom synchronizer to the SftpInboundFileSynchronizingMessageSource in its <constructor-arg/>.

                            For complete details about how to wire up the message source, see AbstractRemoteFileInboundChannelAdapterParser.
                            So it looks like:
                            Code:
                                <bean id="recursiveFtpSynchronizer" class="org.company.sftp.inbound.RecursiveSftpInboundFileSynchronizer">
                                    <constructor-arg ref="sftpSessionFactory"/>
                                </bean>
                            
                                <bean id="recursiveFtpMessageSource" class="org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource">
                                    <constructor-arg ref="recursiveFtpSynchronizer"/>
                                </bean>
                            
                                <bean class="org.springframework.integration.endpoint.SourcePollingChannelAdapter">
                                    <property name="source" ref="recursiveFtpMessageSource"/>
                                </bean>
                            all properties from the inbound-channel-adapter namespace are on one of those beans? With less comfort I guess

                            Comment


                            • #15
                              For the most part, things related to the remote end (session factory etc) go on the synchronizer; things at the local end (local directory etc) go on the MessageSource, although the localFilenameGeneratorExpression goes on the synchronizer. For full details look at the parser.

                              https://github.com/SpringSource/spri...terParser.java

                              Comment

                              Working...
                              X