Announcement Announcement Module
Collapse
No announcement yet.
Restarting a previously stopped FTP inbound-channel-adapter Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Restarting a previously stopped FTP inbound-channel-adapter

    Hi folks,

    i have an application where
    • i want to receive messages from an FTP inbound-channel-adapter not continuously but only at manually triggered points in time
    • additionally i want to receive only messages of special single files - waiting for the file to exist for a defined time.
    • after receiving the message I want to stop the adapter - and later on restart with the same or another file.

    I used the following setup:

    Code:
    	<int-ftp:inbound-channel-adapter id="ftpInboundAdapter"
    			auto-startup="false"
    			session-factory="ftpSessionFactory"
    			cache-sessions="true"
    			channel="filterChannel"
    			local-directory="#{ systemProperties['java.io.tmpdir']}/localtest"
    			remote-directory="${ftp.directory}"
    			delete-remote-files="false"
    			auto-create-local-directory="true"
    			filter="singleFileFilter">
    		<int:poller fixed-rate="60000" max-messages-per-poll="-1"/>
    	</int-ftp:inbound-channel-adapter>
    	
    	<bean id="singleFileFilter" class="com.myapp.ftp.SingleFtpFileFilter" />
    	<int:channel id="filterChannel"/>
    	<int:filter input-channel="filterChannel" ref="singleFileFilter" output-channel="downloadChannel"/>
    	<int:channel id="downloadChannel">
    		<int:queue/>
    	</int:channel>
    SingleFtpFileFilter is implemented as FileListFilter and MessageSelector:

    Code:
    public class SingleFtpFileFilter implements MessageSelector, FileListFilter<FTPFile> {
    
    	/**
    	 * This class' logger.
    	 */
    	private final static Logger LOG = LoggerFactory.getLogger(SingleFtpFileFilter.class);
    	
    	/**
    	 * The file to download from the ftp server.
    	 */
    	private String fileToDownload;
    	
    	/**
    	 * Set the name of the file to download
    	 * @param fileToDownload name of the remote file
    	 */
    	public void setFileToDownload(String fileToDownload) {
    		this.fileToDownload = fileToDownload;
    	}	
    	
    	// ------------------------------------------------------------------------
    	// the remote file filter
    	// ------------------------------------------------------------------------
    
    	/**
    	 * The remote file filter.
    	 * @see org.springframework.integration.file.filters.FileListFilter#filterFiles(F[])
    	 */
    	@Override
    	public List<FTPFile> filterFiles(FTPFile[] files) {
    		List<FTPFile> filtered = new ArrayList<FTPFile>();
    		if (fileToDownload != null && fileToDownload.length() > 0) {
    			for (FTPFile file : files) {
    				if (file.getName().equals(fileToDownload)) {
    					LOG.debug("Accepted file {}", file.getName());
    					filtered.add(file);
    				}
    			}
    		}
    		return filtered;
    	}
    
    	// ------------------------------------------------------------------------
    	// the local file filter
    	// ------------------------------------------------------------------------
    	
    	@Override
    	public boolean accept(Message<?> message) {
    		if (fileToDownload != null && fileToDownload.length() > 0 &&
    				message != null && ((File) message.getPayload()).getName().equals(fileToDownload)) {
    			LOG.debug("Accepted message {}" + message);
    			return true;
    		}
    		return false;
    	}
    	
    }
    So far so good. The coreography is intended to be as follows:

    Code:
    		
    		// Start the test case - get inbound adapter as well as file filter
    		SourcePollingChannelAdapter adapter = ac.getBean("ftpInboundAdapter", SourcePollingChannelAdapter.class);
    		SingleFtpFileFilter filter = ac.getBean("singleFileFilter", SingleFtpFileFilter.class);
    
    		// Set file to download and start adapter
    		filter.setFileToDownload(fileToDownload);
    		adapter.start();
    
    		// receive message using timeout
    		Message<?> msg = ac.getBean("downloadChannel", PollableChannel.class).receive(3600000);
    		System.out.println("Received file message: " + msg);
    			
    		// stop adapter and reset filter
    		adapter.stop();
    		filter.setFileToDownload(null);
    This works correctly if I do it once. But after restarting the adapter later (calling adapter.start() again and using the same choreography as above) has a lot of side-effects:
    • There will be more than one task-scheduler on the same session thus sending messages more than once:
      Code:
      14:02:12.691 [task-scheduler-2] INFO  o.s.i.ftp.session.FtpSession - File have been successfully transfered to: INDICES/myFile_20111207.csv
      14:02:12.831 [task-scheduler-1] INFO  o.s.i.ftp.session.FtpSession - File have been successfully transfered to: INDICES/myFile_20111207.csv
    • Dependent on the timing I get exceptions receiving no message at all
      Code:
       [task-scheduler-1] ERROR o.s.i.handler.LoggingHandler - org.springframework.integration.MessageDeliveryException: failed to send message to channel 'downloadChannel' within timeout: -1
      	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:292)
      	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
      	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
      	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
      	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
      	at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:102)
      	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
      	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
      	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
      	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
      	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
      	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
      	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
      	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51)
      	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
      	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      	at java.lang.Thread.run(Thread.java:662)
      Received file message: null
    • There is no way to clear the included AcceptOnceFileListFilter content (except using a bad reflection hack) - and therefore I'm not able to trigger a previsouly received download for a second time.
    • There is no attribute scope which I can set to prototype when using an inbound-channel-adapter

    So how do I do this, restarting an inbound channel adapter from scratch, clearing all previously performed work??

    Thanks in advance
    Gunnar
    Last edited by Gunnar; Dec 12th, 2011, 07:36 AM.

  • #2
    What version are you using? I am puzzled about the behavior of the task-scheduler. In fact I believe someone has reported something similar a while back, but I could never reproduce it including just right now. So it would be nice if you can also provide enough info/code (may be in a zipped file) on how to reproduce it.

    Comment


    • #3
      Also, could you please repeat your test but set max-messages-per-poll="1" (not you have -1). I think it woud better satisfy your requirement since you are looking for a single file anyway

      Comment


      • #4
        Originally posted by oleg.zhurakousky View Post
        What version are you using? I am puzzled about the behavior of the task-scheduler. In fact I believe someone has reported something similar a while back, but I could never reproduce it including just right now. So it would be nice if you can also provide enough info/code (may be in a zipped file) on how to reproduce it.
        I'm using the latest available releases:
        • Spring 3.0.6.RELEASE
        • Spring Integration 2.0.5.RELEASE
        • For spring-integration-ftp I used 2.0.5.RELEASE as well, but modified the xsd to have the auto-start feature configurable, rest is original.

        I will try to provide more info / code - hopefully tomorrow.

        Regards
        Gunnar

        Comment


        • #5
          Thanks Gunnar

          That would be very helpful since as I said I do remember something similar was reported before, but was never followed up and I really want to get to the bottom of it.

          Comment


          • #6
            Hi Oleg,

            attached you will find my test case: Attachment

            Please take account of the following customized parts:
            • As I'm behind a somehow very special proxy I introduced a ProxyFtpSessionFactory - the code is a modified DefaultFtpSessionFactory
            • As already mentioned I modified the xsd of spring-integration-ftp to allow auto-startup - same as you have done in INT-2103 added 'auto-startup' to FTP inbound adapter
            • I'm using Ivy for dependency management - please find dependency descriptions included in the build folder

            To reproduce the error please adapt test.resource/user.properties and point to a FTP server, define files to download.
            Eventually you have to run FtpInboundReceiveSample.testFTPInboundReceiveMulti pleFiles() more than once - the error seems to depend on timing... It is very likely that you will get an Exception while downloading the third file:

            Code:
            09:53:15.830 [main] INFO  o.s.i.e.SourcePollingChannelAdapter - started ftpInboundAdapter
            09:53:16.033 [task-scheduler-2] DEBUG d.v.i.ftp.SingleFtpFileFilter - Accepted file yyyy_indices_20111128.csv
            09:53:16.033 [task-scheduler-1] DEBUG d.v.i.ftp.SingleFtpFileFilter - Accepted file yyyy_indices_20111128.csv
            09:53:16.236 [task-scheduler-2] INFO  o.s.i.ftp.session.FtpSession - File have been successfully transfered to: abc/def/yyyy_indices_20111128.csv
            09:53:16.236 [task-scheduler-1] INFO  o.s.i.ftp.session.FtpSession - File have been successfully transfered to: abc/def/yyyy_indices_20111128.csv
            
            
            09:53:16.236 [task-scheduler-1] DEBUG o.s.i.e.SourcePollingChannelAdapter - Poll resulted in Message: null
            09:53:16.236 [task-scheduler-2] DEBUG o.s.i.file.FileReadingMessageSource - Added to queue: [C:\WINDOWS\Profiles\xxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv]
            09:53:16.236 [task-scheduler-1] DEBUG o.s.i.e.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
            09:53:16.236 [task-scheduler-2] INFO  o.s.i.file.FileReadingMessageSource - Created message: [[Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]]
            09:53:16.236 [task-scheduler-2] DEBUG o.s.i.e.SourcePollingChannelAdapter - Poll resulted in Message: [Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
            09:53:16.236 [task-scheduler-2] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'filterChannel', message: [Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
            09:53:16.236 [task-scheduler-2] DEBUG o.s.integration.filter.MessageFilter - org.springframework.integration.filter.MessageFilter@15e2075 received message: [Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
            
            
            09:53:16.236 [task-scheduler-2] DEBUG d.v.i.ftp.SingleFtpFileFilter - Accepted message {}[Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
            09:53:16.236 [task-scheduler-2] DEBUG o.s.integration.filter.MessageFilter - handler 'org.springframework.integration.filter.MessageFilter@15e2075' sending reply Message: [Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
            09:53:16.236 [task-scheduler-2] DEBUG o.s.integration.channel.QueueChannel - preSend on channel 'downloadChannel', message: [Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
            09:53:16.236 [task-scheduler-2] DEBUG o.s.integration.channel.QueueChannel - postSend (sent=false) on channel 'downloadChannel', message: [Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
            09:53:16.236 [task-scheduler-2] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'errorChannel', message: [Payload=org.springframework.integration.MessageDeliveryException: failed to send message to channel 'downloadChannel' within timeout: -1][Headers={timestamp=1323766396236, id=2c59c498-a554-4c28-b51d-6800cc128a1e}]
            09:53:16.236 [task-scheduler-2] DEBUG o.s.i.handler.LoggingHandler - (inner bean)#2 received message: [Payload=org.springframework.integration.MessageDeliveryException: failed to send message to channel 'downloadChannel' within timeout: -1][Headers={timestamp=1323766396236, id=2c59c498-a554-4c28-b51d-6800cc128a1e}]
            09:53:16.236 [task-scheduler-2] ERROR o.s.i.handler.LoggingHandler - org.springframework.integration.MessageDeliveryException: failed to send message to channel 'downloadChannel' within timeout: -1
            	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:292)
            	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
            	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
            	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
            	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
            	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
            	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
            	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
            	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
            	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
            	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
            	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
            	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
            	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
            	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
            	at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:102)
            	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
            	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
            	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
            	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
            	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
            	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
            	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
            	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51)
            	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
            	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
            	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
            	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
            	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
            	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
            	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
            	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
            	at java.lang.Thread.run(Thread.java:662)
            
            09:53:16.236 [task-scheduler-2] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'errorChannel', message: [Payload=org.springframework.integration.MessageDeliveryException: failed to send message to channel 'downloadChannel' within timeout: -1][Headers={timestamp=1323766396236, id=2c59c498-a554-4c28-b51d-6800cc128a1e}]
            Received file message: null
            Thanks in advance,
            Gunnar
            Attached Files

            Comment


            • #7
              Gunnar
              Here is part one. I'll follow up with more.
              When 'max-messages-per-poll' is set to -1 the task will run continuously until it produces no message and than it will sleep until the next scheduled execution time (in your case its every minute)
              So what happens in your case is this;
              1. At the start of the test you manually put 'indices_test.csv' file in the temp directory in the beginning of your test it results in message:
              Message: [Payload=/Users/ozhurakousky/temp/ftptest/indices_test.csv][Headers={timestamp=1323787566011, id=db4c86bf-9172-46b0-b3fc-cb3a8a76ee3b}]
              This Message is generated as soon as adapter scans the temp directory which happens before it had a chance to initiate remote communication with FTP. This message is sent to the channel downstream where you have a <filter> which filters it out and Message never ends up in 'downloadChannel'. Meanwhile, adapter continues to spin and scans temp directory again but the 'indices_test.csv' file is now being filtered out by AcceptOnceFileListFilter and therefore no Message is produced from the temp directory. Only now adapter initiates communication with FTP to poll for more files. Now all that is happening on the separate thread which is not the thread that runs the test where you are issuing adapter.stop() which some times happens when the other thread is in the process of producing/sending Message. The stop command itself is calling this.runningTask.cancel(true); You can see that the interrupt flag is set to 'true' which means that any tasks that are in progress will be interrupted and since you are sending to the QueueChannel which uses LinkedBlockingQueue, queue.put(message) (line 80) results in InterruptedException which makes overall send call return 'false' hence the exception that you see:
              10:28:17.259 [task-scheduler-1hread] DEBUG: org.springframework.integration.channel.QueueChann el - postSend (sent=false) on channel 'myErrorChannel', message: [Payload=org.springframework.integration.MessageDel iveryException: failed to send message to channel 'downloadChannel' within timeout: -1][Headers={timestamp=1323790097258, id=eb94dfa7-dd00-4cc6-b712-5992da1a8fb2}]
              10:28:17.261 [task-scheduler-1hread] ERROR: org.springframework.integration.channel.MessagePub lishingErrorHandler - failure occurred in messaging task with message: [Payload=/Users/ozhurakousky/temp/ftptest/file2.pdf][Headers={timestamp=1323790097254, id=b5c69e5c-b1df-41b2-aaa4-5ebf9bf14ec5}]
              org.springframework.integration.MessageDeliveryExc eption: failed to send message to channel 'downloadChannel' within timeout: -1

              Comment


              • #8
                Hi Oleg,

                cool Is that a problem to call stop() at an improper time? If yes, how to avoid that?
                Looking forward to your final conclusion!

                Gunnar

                Comment


                • #9
                  Gunnar

                  I am still working on it, there might actually be a bug on our end. The root of the problem is that when you stop an adapter it interrupts LinkedBlockingQueue.put() method which ideally you don't want to be interrupted.
                  Anyway, i'll follow up

                  Comment


                  • #10
                    Gunnar

                    To make your test case work all you need is add send-timeout="0" to the <filter> since it is s sender to the QueueChannel.
                    Here is what's happening. QueueChannel is backed by a LinkedBlockingQueue (LBQ). When messages arrive we invoke one of 3 methods:
                    a) LBQ.offer(message) - non-blocking and non-interruptable method which is invoked if send-timeout is set to 0.
                    b) LBQ.offer(message, timeout) - blocking and interruptable method which is invoked if timeout is > 0
                    c) LBQ.put(message) - blocking and interruptable method which is invoked if timeout is < 0

                    Since you don't provide an explicit timeout we fallback on the LBQ.put(message) which on occasion gets interrupted in the middle of its execution hence the error you see.

                    Now, what I said above would make you test case work since 0 time-out would result in call to LBQ.offer(message) and since its non-interruptable the stopping of the FTP adapter would not affect it.

                    Having said that I think you are on the wrong track. I believe the reason why you are doing this is because you need to download specific files on demand and you are trying to make polling adapter work as if it was event driven adapter. What you need is FTP Outbound Gateway which is not available in 2.0.5 but is available in 2.1.RC1
                    Here is the doc: http://static.springsource.org/sprin...tbound-gateway
                    This way you'll be able to get a file on demand and in event driven style which I think is what you really want,
                    Could you please upgrade and give it a shot?

                    Comment


                    • #11
                      Hi Oleg,

                      great work - thank you!!

                      Setting send-timeout="0" to the <filter> makes it work without exception . The one thing I'm still a bit worried about is that there are still two task-scheduler threads transferring the same file:

                      Code:
                      07:24:30.942 [task-scheduler-6] DEBUG d.v.i.ftp.SingleFtpFileFilter - Accepted file indices_20111128.csv
                      07:24:30.942 [task-scheduler-2] DEBUG d.v.i.ftp.SingleFtpFileFilter - Accepted file indices_20111128.csv
                      07:24:31.271 [task-scheduler-2] INFO  o.s.i.ftp.session.FtpSession - File have been successfully transfered to: xxxx/yyyy/indices_20111128.csv
                      07:24:31.271 [task-scheduler-6] INFO  o.s.i.ftp.session.FtpSession - File have been successfully transfered to: xxxx/yyyy/indices_20111128.csv
                      Only one of these two file transfers will result in a message, that's ok. But transferring the same file twice? Hmm...

                      Concerning your second part - using the new FTP gateway instead: well, thank you, it is always good to question a certain approach ! My use case is somehow in the middle of the two scenarios:
                      • I'm interested in specific files only.
                      • Download is on demand (orchestrated by an overall workflow).
                      • But: I don't know if my piece of information has already been published on the remote system when I start my process. So I want polling as well: I want to start polling and if after lets say one hour my piece of interest hasn't been published on the remote system I'm giving up continuing with different actions in my overall workflow.
                      • I do not want to poll the remote system all day long.
                      • So this is what I want: polling, started on demand and only for a well defined period of time.
                      So I thougt that starting / stopping an inbound adapter might be a good idea. Or am I able to manage that with a gateway-adapter too?

                      Many thanks
                      Gunnar

                      Comment


                      • #12
                        The second thread should not worry you since it happens after the first one was interrupted.

                        As far as FTP gateway, remember that it currently supports several FTP commands and one of them is 'ls' which will now allow you to wire up a process (loop of some type) where you can list and download files on demand. For example some system event triggers the call to an FTP outbound gateway which executes 'ls' command. THis gateway is followed by the router which determines if file of interest is present and if so it send to a specified channel but if not it sends to a delayer which would delay the process for a bit and then send back to an FTP Outbound Gateway. Kind of like a retry. You can look at one of these samples I did at Spring One: https://github.com/olegz/s12gx.2011/...egration/retry to get a better idea of how to leverage delayer to wire similar process.

                        Comment


                        • #13
                          Alright, thank you - the sample gives a good idea on how to use retry logic
                          You have been a great help!
                          Gunnar

                          Comment

                          Working...
                          X