Announcement Announcement Module
Collapse
No announcement yet.
Creating FTP/SFTP Inbound Channel Adapter Programmatically Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Creating FTP/SFTP Inbound Channel Adapter Programmatically

    Hi All,

    We have a requirement to poll files from different remote FTP directories (can be around 50 different FTP locations) and keep them in a single local directory.

    We understand that we need to configure ftp/sftp inbound channel adapters and file outbound channel programmatically.. We have to read host name ,user,password,remote directory etc from database and create ftp/sftp inbound channels and file outbound channel dynamically .

    For a single host it is working using xml configuration,

    Code:
    <bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    		<property name="host" value="127.0.0.1"/>
    		<property name="port" value="22"/>
    		<property name="user" value="user"/>
    		<property name="password" value="pwd"/>
    	</bean>
    	<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
    			channel="filesOut"
    			session-factory="sftpSessionFactory"					
    			remote-directory="/tmp/test"
    			local-directory="D:test"
    			auto-create-local-directory="true"
    			delete-remote-files="true"
    			filename-pattern="*.txt">
    		<int:poller fixed-rate="1000" max-messages-per-poll="1"/>
    	</int-sftp:inbound-channel-adapter>
    
    		
    	<int-file:outbound-channel-adapter id="filesOut"
    								   directory="D:\new"
    								   delete-source-files="true"/>	
    	<int:channel id="receiveChannel">
    		<int:queue/>
    	</int:channel>
    
    </beans>
    But in our case we need to connect to many different FTP locations and we have got one program similar to our requirement from this forum .
    Code:
    public class FTPService implements Runnable {
    
    @Override
    public void run() {
    try{	
    while(true) {
    String host = "127.0.0.1";
    String username = "user";
    String password = "pwd";
    
    DefaultFtpSessionFactory ftpSessionFactory = new DefaultFtpSessionFactory();
    ftpSessionFactory.setHost(host);
    ftpSessionFactory.setPort(22);
    ftpSessionFactory.setUsername(username);
    ftpSessionFactory.setPassword(password);
    System.out.println("message:2 " );
    ftpSessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
    
    FtpInboundFileSynchronizer ftpInboundFileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory);
    ftpInboundFileSynchronizer.setDeleteRemoteFiles(false);
    Pattern pattern = Pattern.compile(".*\\.txt$");
    FtpRegexPatternFileListFilter ftpRegexPatternFileListFilter = new FtpRegexPatternFileListFilter(pattern);
    ftpInboundFileSynchronizer.setFilter(ftpRegexPatternFileListFilter);
    ftpInboundFileSynchronizer.setRemoteDirectory("/tmp/test");
    
    FtpInboundFileSynchronizingMessageSource ftpInboundFileSynchronizingMessageSource = new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer);
    ftpInboundFileSynchronizingMessageSource.setAutoCreateLocalDirectory(true);
    ftpInboundFileSynchronizingMessageSource.setAutoStartup(true);
    ftpInboundFileSynchronizingMessageSource.setLocalDirectory(new File("D:/test"));
    ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
    ftpInboundFileSynchronizingMessageSource.setTaskScheduler(threadPoolTaskScheduler);
    PollableChannel ftpChannel = new QueueChannel(100);
    ftpInboundFileSynchronizingMessageSource.setOutputChannel(ftpChannel);
    
    MessageHandler messageHandler = new DelayHandler("1000");
    ConsumerEndpointFactoryBean factoryBean = new ConsumerEndpointFactoryBean();
    factoryBean.setHandler(messageHandler);
    factoryBean.setInputChannel(ftpChannel);
    factoryBean.setInputChannelName("ftpChannel");
    factoryBean.start();
    Message<?> message = ftpChannel.receive();
    System.out.println("message: " + message);
    }
    }catch(Exception e){
    	e.printStackTrace();
    }
    
    }
    
    public static void main(String[] args)throws Exception {
    	System.out.println("one "  );
    Thread t = new Thread(new FTPService());
    t.start();
    }
    }
    But this program is not working . In this forum also mentioned that it was not working and suggested to use some samples in dynamic-ftp in spring site., but that is different with respect to our scenario .


    Can some one help to solve this issue or suggest a good approach to handle this ?

    Thank You
    Joby
    Last edited by Gary Russell; Jun 4th, 2013, 08:42 AM.

  • #2
    This thread talks about dynamic adapters on the inbound side and the poster made his application work successfully

    Comment


    • #3
      Thanks Gary for the quick response.

      I went through the thread and it provides a sample for multiple outbound channel configurations to upload to different ftp channels. Great if we can get a sample for reading from different ftp directories (inbound channels) dynamically through a dynamic poller interval.

      Also when credentials for an already configured session for an inbound channel changes on the fly (through a user interface), do we need to do something extra to remove existing session?

      Thanks in advance.

      Comment


      • #4
        The thread I pointed to describes how to do it on the inbound side. There is not currently a sample for that.

        Currently, each request uses a new session.

        To change credentials, get a reference to the session factory and change the properties as necessary; the next session will use the same properties.

        Comment


        • #5
          In our scenario, we need to have concurrent threads running with different sessions. That is, each inbound adapter associated with a remote FTP/SFTP host should be running simultaneously with its own session. Is that possible?

          Comment


          • #6
            Yes; just follow the solution in the post.

            Comment


            • #7
              I had also a similar issue my solution was:
              (1) getting s/ftp session info from DB -> (2) create dynamic s/ftp channels -> (3) continue the same flow for all messages.

              Below are the relevant code snippets (edited in notepad to remove company/project related info, may not be perfectly correct). the dynamic section is bases on SI examples, I only had to glue the XMLs as parent-child in order to continue the same flow from all dynamic channels.

              I had to create 3 Spring Integ. XML files:
              1. the main (spring parent) context - get records from DB, create channels, continue the flow
              Code:
              <!-- Get Active S/FTP connections from DB -->
              <int-jpa:inbound-channel-adapter id="connList"
              	entity-manager-factory="entityManagerFactory"
              	auto-startup="true"
              	jpa-query="select ch from S_FTPChannel ch where ch.isActive=true"
              	delete-after-poll="false">
              	<int:poller fixed-rate="${connList.poller.fixrate}" >
              		<int:transactional propagation="REQUIRED" transaction-manager="transactionManager"/>
              	</int:poller>
              </int-jpa:inbound-channel-adapter>
              
              <!-- Split each connection to dynamic S/FTP channel -->	
              <int:splitter id="dbConnectionSplitter" ref="connSplitter"  input-channel="connList" output-channel="toDynRouter"/>
              
              <!-- resolve each S/FTP to dynamic inbound -->
              <int:channel id="toDynRouter" />
              <int:router input-channel="toDynRouter" ref="channelResolver" method="resolve" id="dynRouter" default-output-channel="dynRouterLog">
              </int:router>
              <int:logging-channel-adapter id="dynRouterLog" level="ERROR" logger-name="com.me.dynlogger" log-full-message="true"/>
              
              <!-- Continue the flow, all S/FTP messages will be here... -->
              <!-- The common Inbound of files after S/FTP -->
              <int:channel id="fromFtpOrSftp" ><!-- datatype="java.io.File"> -->
              	<int:queue capacity="10" />
              </int:channel>
              2. The dynFTP xml (spring Child) - creates an FTP adapter
              Code:
              <bean id="defFtpFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
              	<property name="host" value="${host}"/>
              	<property name="port" value="${port}"/>
              	<property name="username" value="${user}"/>
              	<property name="password" value="${password}"/>
              </bean>	
              
              <int-ftp:inbound-channel-adapter id="dynInbondAdapter"
              	session-factory="defFtpFactory" 
              	remote-directory="${remote.dir}" charset="UTF-8"
              	local-directory="file:${localDir}"
              	auto-create-local-directory="true" 
              	delete-remote-files="false" filter="FtpRawFileFilter">
              	<int:poller fixed-rate="${pollerRate}" time-unit="MILLISECONDS" max-messages-per-poll="100"/>
              </int-ftp:inbound-channel-adapter>
              3. The dynSFTP xml (spring Child) - creates an SFTP adapter
              Code:
              <bean id="defSftpFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
              	<property name="host" value="${host}"/>
              	<property name="port" value="${port}"/>
              	<property name="user" value="${user}"/>
              	<property name="password" value="${password}"/>
              </bean>
              
              <int-sftp:inbound-channel-adapter id="dynInbondAdapter"
              	session-factory="defSftpFactory" 
              	remote-directory="${remote.dir}" charset="UTF-8"
              	local-directory="file:${localDir}"
              	auto-create-local-directory="true" 
              	delete-remote-files="false" filter="SftpRawFileFilter">
              	<int:poller fixed-rate="${pollerRate}" time-unit="MILLISECONDS" max-messages-per-poll="100"/>
              </int-sftp:inbound-channel-adapter>
              In addition, there are supporting beans:

              Java code for splitting the the list of connections from the DB.
              Code:
              @Component
              public class SftpConnectionsSplitter extends AbstractMessageSplitter {
              	
              	private final Logger log = LoggerFactory.getLogger(SftpConnectionsSplitter.class);
              
              	@Autowired
              	private TChannelRepository rep; //a Spring Data repository for DB access.
              
              	/**
              	 * split the db record list of valid s/ftp connections to single connection per message.
              	 */
              	@Override
              	@Transactional
              	protected Object splitMessage(Message<?> message) {
              		List<S_FTPChannel> payload = (List<S_FTPChannel>) message.getPayload();
              		List<Message<S_FTPChannel>> result = new ArrayList<Message<S_FTPChannel>>();
              		for (TransmissionChannelSFTP ch : payload) {
              			result.add(MessageBuilder.withPayload(ch).copyHeadersIfAbsent(message.getHeaders()).build());
              		}
              		log.trace("splitMessage() return list of {} channels.", result.size());
              		return result;
              	}
              Java code for dynamically creating S/FTP adapters.
              Code:
              /**
               * 
               * based on SI example - https://github.com/SpringSource/spring-integration-samples/tree/master/advanced/dynamic-ftp
               *
               */
              public class DispatchInboundChannel implements ApplicationContextAware, Serializable {
              
              	private static final long serialVersionUID = 1L;
              	private final Logger log = LoggerFactory.getLogger(DispatchInboundChannel.class);
              	public static final int MAX_CACHE_SIZE = 50;
              	private ApplicationContext appCtx;
              	private final Map<MessageChannel, ConfigurableApplicationContext> contexts = new HashMap<MessageChannel, ConfigurableApplicationContext>();
              
              	private int cacheSize = MAX_CACHE_SIZE;
              
              	/**
              	 * map of channels 
              	 */
              	private final LinkedHashMap<Long, MessageChannel> channels = new LinkedHashMap<Long, MessageChannel>() {
              
              		private static final long serialVersionUID = 2L;
              
              		@Override
              		protected boolean removeEldestEntry(Map.Entry<Long, MessageChannel> eldest) {
              			// This returning true means the least recently used
              			// channel and its application context will be closed and removed
              			boolean remove = size() > cacheSize;
              			if (remove) {
              				//TODO: add error log msg
              				release(eldest.getValue());
              			}
              			return remove;
              		}
              	};
              
              public MessageChannel resolve(S_FTPChannel ch) {
              		log.debug("resolve for Transmission Channel - {}, {}, {}", new Object[] {ch.getId(), ch.getChannelType(), ch.getName()});
              		MessageChannel channel = this.channels.get(ch.getId());
              		if (channel == null) {
              			channel = createNewDynamicChannel(ch);
              		}
              		return channel;
              	}
              
              	private synchronized MessageChannel createNewDynamicChannel(S_FTPChannel tchannel) {
              		MessageChannel channel = this.channels.get(tchannel.getId()); // double check
              		if (channel == null) {
              			StandardEnvironment env = createEnvironmentForChannel(tchannel);
              			if (env != null) {
              				ConfigurableApplicationContext ctx = null;
              				switch (tchannel.getChannelType()) {
              				case SFTP:
              					ctx = new ClassPathXmlApplicationContext(
              							new String[] { "/spring/dynamic-sftp-inbound-adapter-context.xml" }, false, appCtx);				
              					break;
              				case FTP:
              					ctx = new ClassPathXmlApplicationContext(
              							new String[] { "/spring/dynamic-ftp-inbound-adapter-context.xml" }, false, appCtx);
              					break;
              				default:
              					log.error("Resolved channel type is {}, but currently only SFTP, FTP are supported. Ignoring this channel. {}", tchannel.getChannelType(), tchannel);
              					return null;
              				}
              				ctx.setEnvironment(env);
              				ctx.refresh();
              				channel = ctx.getBean("dynRouterLog", MessageChannel.class);
              				this.channels.put(tchannel.getId(), channel);
              				log.debug("createNewDynamicChannel() created new Message Channel {} for TransmissionCh {}.", channel.hashCode(), tchannel.getId());
              				// Will works as the same reference is presented always
              				this.contexts.put(channel, ctx);
              			} else {
              				//TODO log error
              			}
              		}
              		return channel;
              	}
              
              	@SuppressWarnings("rawtypes")
              	private StandardEnvironment createEnvironmentForChannel(S_FTPChannel ch) {
              		StandardEnvironment env = new StandardEnvironment();
              		Properties props = new Properties();
              		try {
              			log.debug("createEnvironmentForChannel() - reloading propertyConfigurer" );
              			props = ((PropertyPlaceholderConfigurer)appCtx.getBean("propertyConfigurer")).reload(); 
              		} catch (BeansException e) {
              			e.printStackTrace(); 
              		} catch (IOException e) {
              			e.printStackTrace();
              		}
              		props.setProperty("host", ch.getServer());
              		props.setProperty("port", ch.getRemotePort());
              		props.setProperty("user", ch.getUsername());
              		props.setProperty("password", ch.getPassword());
                              props.setProperty("remote.dir", ch.getRdir());
              		props.setProperty("pollerRate", ch.getPollerRate());
              		props.setProperty("localDir", ch.getAbsolutePath());
              		
              		if (log.isDebugEnabled()) {
              			log.debug(props.toString());
              		}
              		PropertiesPropertySource pps = new PropertiesPropertySource("localProps", props);
              		env.getPropertySources().addFirst(pps);
              		return env;
              
              	}
              
              	public LinkedHashMap<Long, MessageChannel> getChannels() {return channels;	}
              
              	public int getCacheSize() { return cacheSize; }
              
              	public void setCacheSize(int cacheSize) { this.cacheSize = cacheSize;}
              
              	/**
              	 * Closes {@link ApplicationContext} of {@link MessageChannel}.
              	 * Any error during closure gets ignored.
              	 * Made synchronized in order to be compatible with create method.
              	 * 
              	 * @param channel
              	 *            to release
              	 */
              	public synchronized void release(MessageChannel channel) {
              		ConfigurableApplicationContext ctx = contexts.get(channel);
              		if (ctx != null) {
              			try {
              				ctx.close();
              			} catch (Throwable throwable) {// Ignore
              			} finally {// Concurrency exception will be thrown in case of removing channel
              			}
              		}
              	}
              
              	/**
              	 * Destroys cached {@link MessageChannel}, before running application stops.
              	 */
              	@PreDestroy
              	public synchronized void cleanCache() {
              		// Step 1. Releasing all existing Contexts
              		for (MessageChannel channel : contexts.keySet()) {
              			release(channel);
              		}
              		contexts.clear();
              		// Step 2. Cleaning context cache
              		channels.clear();
              	}
              	
              	@Override
              	public void setApplicationContext(ApplicationContext applicationContext)
              			throws BeansException {
              		appCtx = applicationContext;
              	}
              
              }

              Comment

              Working...
              X