Announcement Announcement Module
Collapse
No announcement yet.
FTP inbound adapter with poller sometimes stops Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • FTP inbound adapter with poller sometimes stops

    Hi, we are using ftp:inbound-channel-adapter with poller (Spiring Integration 2.0.4.RELEASE):

    Code:
    <int-ftp:inbound-channel-adapter channel="epoSourceChannel" local-directory="${ftp.localFolder}"
    		session-factory="ftpSessionFactory" remote-directory="${ftp.remoteFolder}"
    		cache-sessions="false" filter="epoFilenameFilter"> 
    		<int:poller max-messages-per-poll="-1" cron="0 * 2-22 * * *" />
    </int-ftp:inbound-channel-adapter>
    epoFilenameFilter bean is implementation of FileListFilter. Processed files are not deleted from FTP server so we check for new files by their filenames (they contain timestamps, latest processed timestamp is stored). There are other integration elements for processing of downloaded files but they shouldn't be important here. This all works under Tomcat 6, loaded from web.xml of SWF-JSF-webapplication.

    Here is the problem. Sometimes (and lately - more often, like once a week) this ftp:inbound-channel-adapter together with poller silently stops. This looks like this: every minute epoFilenameFilter logs information about processing of files. And then at one moment it just stops - logs from filter (and of course from next elements in processing chain) do not come up more. I've set DEBUG logging level and added LOG statement specifically in the beginning of filter method - so each filtering log message was prepended by "connected" message from FTP session factory The last messages were:

    Code:
    2012-02-05 18:17:00,053 DEBUG [task-scheduler-5] org.springframework.integration.ftp.session.DefaultFtpSessionFactory: Connected to server [ftp.server.com:21]
    2012-02-05 18:17:00,591 INFO  [task-scheduler-5] com.example.integration.epoimport.filter.EpoFilenameFilter: Filtering files...
    2012-02-05 18:17:00,593 INFO  [task-scheduler-5] com.example.integration.epoimport.filter.EpoFilenameFilter: Looking for ePO from 05.02.2012 17:56:37...
    2012-02-05 18:17:00,598 INFO  [task-scheduler-5] com.example.integration.epoimport.filter.EpoFilenameFilter: Number of new ePO: 0
    2012-02-05 18:18:00,061 DEBUG [task-scheduler-3] org.springframework.integration.ftp.session.DefaultFtpSessionFactory: Connected to server [ftp.server.com:21]
    And after that integration beans do not send any log messages at all. From what I see it looks like either an infinite loop somewhere between session creation and calling of filter method or like simultaneous fail of both ftp inbound adapter and poller - but this can as well be far away from truth.

    The moment is rather arbitrary and I can't find any relations to any actions/movements in application (and it's not at 22 o'clock ). For example, that happened at 16:00, 08:48 and at 18:18. Logs contain no other errors, application continues to work as usual.

    Are there any suggestions as what to look at additionally to find out real cause of problem?
    Last edited by yozh-tema; Feb 6th, 2012, 04:27 AM. Reason: spring integration version added

  • #2
    I am guessing the thread (in this case task-scheduler-3) is hanging trying to read data from your server. You can confirm this by taking a thread dump using jstack or visualvm.

    We don't currently expose the setDataTimeout() and setDefaultTimeout() methods on the FtpClient, so the socket SO_TIMEOUT option is not set, and so tcp will wait indefinitely for data; we probably should allow these to be set, so please open an Improvement JIRA here https://jira.springsource.org/browse/INT
    Last edited by Gary Russell; Feb 6th, 2012, 09:14 AM.

    Comment


    • #3
      If you want to see if this solves your problem, you could try making your own session factory....

      Code:
      public class MyFtpSessionFactory extends AbstractFtpSessionFactory<FTPClient> {
      
      	@Override
      	protected FTPClient createClientInstance() {
      		FTPClient ftpClient = new FTPClient();
      		ftpClient.setDataTimeout(5000);
      		ftpClient.setDefaultTimeout(5000);
      		return ftpClient;
      	}
      
      }
      And use an instance of that instead of DefaultFtpSessionFactory.

      Of course, when these timeouts occur, you still won't get any data, but you should see log activity.

      Comment


      • #4
        Thank you Gary.
        I raised JIRA issue https://jira.springsource.org/browse/INT-2429 (didn't find FTP component there though). I'll try to use own session factory.

        Comment


        • #5
          There is actually another way of solving it and is incorporated into the design. AbstractFtpSessionFactory defines two NOOP methods
          Code:
          /**
          * Will handle additional initialization after client.connect() method was invoked, 
          * but before any action on the client has been taken 
          */
          protected void postProcessClientAfterConnect(T t) throws IOException {
          	// NOOP
          }
          /**
          * Will handle additional initialization before client.connect() method was invoked. 
          */
          protected void postProcessClientBeforeConnect(T client) throws IOException {
          	// NOOP
          }
          So all you need is to extend DefaultFtpSessionFactory and provide implementation of one or both of these methods. I guess for your case you only need 'postProcessClientBeforeConnect' method.

          Comment


          • #6
            Hi Oleg,
            thanks for pointing that out, mentioned JIRA improvement is then probably redundant. I think however that these hooks should be documented in reference.

            Comment


            • #7
              Yes Artem, that is correct, it should be documented. I'll change the JIRA to be a documentation issue.

              Comment


              • #8
                Hi Gary,
                it looks like you were right, I've checked stacktraces next time this problem happened (although this time it was connection timeout, not data reading) and one of the threads showed:

                Code:
                "task-scheduler-4" prio=10 tid=0x00002aabbd39d400 nid=0x1335 in Object.wait() [0x000000004274f000..0x000000004274fda0]
                   java.lang.Thread.State: WAITING (on object monitor)
                        at java.lang.Object.wait(Native Method)
                        at java.lang.Object.wait(Object.java:485)
                        at org.apache.commons.net.telnet.TelnetInputStream.read(TelnetInputStream.java:339)
                        - locked <0x00002aaadb012f80> (a [I)
                        at org.apache.commons.net.telnet.TelnetInputStream.read(TelnetInputStream.java:466)
                        at java.io.BufferedInputStream.read1(BufferedInputStream.java:256)
                        at java.io.BufferedInputStream.read(BufferedInputStream.java:317)
                        - locked <0x00002aaadb010ed8> (a java.io.BufferedInputStream)
                        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:264)
                        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:306)
                        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:158)
                        - locked <0x00002aaac2c69910> (a java.io.InputStreamReader)
                        at java.io.InputStreamReader.read(InputStreamReader.java:167)
                        at java.io.BufferedReader.fill(BufferedReader.java:136)
                        at java.io.BufferedReader.readLine(BufferedReader.java:299)
                        - locked <0x00002aaac2c69910> (a java.io.InputStreamReader)
                        at java.io.BufferedReader.readLine(BufferedReader.java:362)
                        at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:264)
                        at org.apache.commons.net.ftp.FTP._connectAction_(FTP.java:335)
                        at org.apache.commons.net.ftp.FTPClient._connectAction_(FTPClient.java:550)
                        at org.apache.commons.net.SocketClient.connect(SocketClient.java:163)
                        at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.createClient(AbstractFtpSessionFactory.java:148)
                        at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:128)
                        at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:131)
                        at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:144)
                        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:89)
                        at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
                        at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
                        at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
                        at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
                        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
                        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
                        at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
                        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51)
                        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
                        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
                        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
                        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
                        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
                        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:207)
                        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
                        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
                        at java.lang.Thread.run(Thread.java:619)
                It seems that default timeout settings of commons-net FTP client aren't very friendly (at least for Linux). Anyway, hopefully setting timeouts will work.

                Comment


                • #9
                  Artem

                  Now since you know how to adjust the settings it would be nice if at some point you could share what works for you as we may consider setting more friendly defaults on our end so real user stories would definitely help.

                  Comment


                  • #10
                    Hi Oleg,

                    I've implemented described solution:

                    Code:
                        @Override
                        protected void postProcessClientBeforeConnect(FTPClient client) throws IOException {
                            super.postProcessClientBeforeConnect(client);
                            
                            client.setDataTimeout(timeout);
                            client.setDefaultTimeout(timeout);
                        }
                    Timeout set to 30 seconds. So far problem did not repeat. And in two days after deployment logs show new entries:

                    Code:
                    2012-02-14 11:55:30,222 ERROR [task-scheduler-7] org.springframework.integration.handler.LoggingHandler: org.springframework.integration.MessagingException: Problem occurred while synchronizing remote to local directory
                            at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:144)
                            at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:144)
                            at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:89)
                            at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
                            at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
                            at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
                            at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
                            at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
                            at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
                            at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
                            at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51)
                            at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
                            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
                            at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
                            at java.util.concurrent.FutureTask.run(FutureTask.java:138)
                            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
                            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:207)
                            at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
                            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
                            at java.lang.Thread.run(Thread.java:619)
                    Caused by: java.net.SocketTimeoutException: Read timed out
                            at java.net.SocketInputStream.socketRead0(Native Method)
                            at java.net.SocketInputStream.read(SocketInputStream.java:129)
                            at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
                            at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
                            at java.io.FilterInputStream.read(FilterInputStream.java:66)
                            at java.io.PushbackInputStream.read(PushbackInputStream.java:122)
                            at org.apache.commons.net.io.FromNetASCIIInputStream.__read(FromNetASCIIInputStream.java:75)
                            at org.apache.commons.net.io.FromNetASCIIInputStream.read(FromNetASCIIInputStream.java:170)
                            at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
                            at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
                            at org.apache.commons.net.telnet.TelnetInputStream.__read(TelnetInputStream.java:114)
                            at org.apache.commons.net.telnet.TelnetInputStream.run(TelnetInputStream.java:535)
                            ... 1 more
                    Hence I think the problem is solved.

                    I'm not sure what would I expect from spring-intergation to do in this case: set its own default timeouts or leave as is. Daniel Savarese's reasoning in mentioned commons-net issue is understandable - while 30 sec is OK for our project considering that we check FTP every minute, this might be inappropriate for others. On the other hand, I don't think that infinite timeouts is what most of the users needs and in this case almost everyone would need to extend DefaultFtpSessionFactory only to configure timeouts.

                    Thank you again for your help.

                    Comment

                    Working...
                    X