Announcement Announcement Module
Collapse
No announcement yet.
Sftp Inbound Adapter with MessageStoreReaper problem. Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Sftp Inbound Adapter with MessageStoreReaper problem.

    Hello all, I'm having an issue that doesn't seem to be addressed anywhere on the forum. So here is my problem:

    I have an inbound channel adapter set up to pick up files from a remote directory. Simple. Also, I have a message store reaper to expire messages after a certain timeout. Good so far. However, I'm getting an error that the two cannot synchronize because a file cannot be found (this is sent to an error channel). Here is my code, edited to preserve company information.

    Code:
        <sprint:publish-subscribe-channel id="myChannel">
            <sprint:interceptors>
                <sprint:wire-tap channel="inboundLogger" />
            </sprint:interceptors>
        </sprint:publish-subscribe-channel>
        
        <sftp:inbound-channel-adapter
                id="pull.gfmhost.824.udf"
                session-factory="gfmHostSftpSessionFactory"
                channel="myChannel"
                remote-directory="${pull.directory}"
                local-directory="${local.directoy}"
                auto-create-local-directory="true"
                cache-sessions="false"
                delete-remote-files="true">
            <sprint:poller fixed-rate="${defined.interval}" max-messages-per-poll="-1" error-channel="errorChannel"/>    
        </sftp:inbound-channel-adapter>
    
        <bean id="myTransformer" class="myProject.MyTransformer" />    
         <bean id="myAggregator" class="myProject.MyAggregator">
             <constructor-arg name="path" value="${agg.directory}"/>
         </bean>
        
        <sprint:channel id="outPutChannel" >
            <sprint:interceptors>
                <sprint:wire-tap channel="outboundLogger" />
            </sprint:interceptors>
        </sprint:channel>
        
        <sprint:chain input-channel="myChannel" output-channel="outPutChannel" >
            <sprint:transformer    ref="myTransformer"  />
            <sprint:aggregator  
                    send-partial-result-on-expiry="true"
                     discard-channel="errorChannel"
                    ref="fileAggregator"
                    method="aggregateFiles"
                    message-store="messageStore"/>
        </sprint:chain>
        
        <file:outbound-gateway directory="${backup.dir}"
                               request-channel="outPutChannel"
                               delete-source-files="true"
                               reply-channel="mergeChannel"/>
                               
        <bean id="messageStore" class="org.springframework.integration.store.SimpleMessageStore"/>
        <bean id="messageStoreReaper" destroy-method="destroy" class="org.springframework.integration.store.MessageGroupStoreReaper">    
            <property name="messageGroupStore" ref="messageStore"/>        
            <property name="timeout" value="${release.merge}" />    
        </bean>    
        <task:scheduled-tasks scheduler="scheduler" >        
            <task:scheduled ref="messageStoreReaper"  method="run" fixed-rate="100000" />
        </task:scheduled-tasks>
        <task:scheduler pool-size="5" id="scheduler"/>
    Code:
    Feb 28 15:56:13 [scheduler-2] DEBUG MessageGroupStoreReaper run                       Expiring all messages older than timeout=60000 from message group store: org.springframework.integration.store.SimpleMessageStore@97c9cf
    Feb 28 15:56:55 [task-scheduler-9] DEBUG ShellCommandExecutor call                       Set Permissions: 0
    Feb 28 15:56:55 [task-scheduler-5] DEBUG AbstractInboundFileSynchronizer copyFileToLocalDirectory  cannot copy, not a file: (this is the remote directory)/in/.
    Feb 28 15:56:55 [task-scheduler-5] DEBUG AbstractInboundFileSynchronizer copyFileToLocalDirectory  cannot copy, not a file: (this is the remote directory)/in/..
    Feb 28 15:56:56 [task-scheduler-8] DEBUG ShellCommandExecutor call                       Set Permissions: 0
    Feb 28 15:56:56 [task-scheduler-6] DEBUG AbstractMessageChannel$ChannelInterceptorList preSend                   preSend on channel 'errorChannel', message: [Payload=org.springframework.integration.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={timestamp=1362085016712, id=838d4aa6-54dd-49ba-854d-b5e23d1b3da9, history=errorChannel}]
    Feb 28 15:56:56 [task-scheduler-6] DEBUG AbstractMessageHandler handleMessage             org.springframework.integration.stream.CharacterStreamWritingMessageHandler#0 received message: [Payload=org.springframework.integration.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={timestamp=1362085016712, id=838d4aa6-54dd-49ba-854d-b5e23d1b3da9, history=errorChannel}]
    Feb 28 15:56:56 [task-scheduler-6] DEBUG AbstractMessageChannel$ChannelInterceptorList postSend                  postSend (sent=true) on channel 'errorChannel', message: [Payload=org.springframework.integration.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={timestamp=1362085016712, id=838d4aa6-54dd-49ba-854d-b5e23d1b3da9, history=errorChannel}]
    EDIT: Stack trace
    Code:
    org.springframework.integration.MessagingException: Problem occurred while synchronizing remote to local directory
    	at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:156)
    	at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:144)
    	at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:89)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
    	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
    	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
    	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
    	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    	at java.lang.Thread.run(Thread.java:722)
    Caused by: org.springframework.core.NestedIOException: Failed to remove file: 2: File not found
    	at org.springframework.integration.sftp.session.SftpSession.remove(SftpSession.java:68)
    	at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.copyFileToLocalDirectory(AbstractInboundFileSynchronizer.java:217)
    	at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:150)
    	... 19 more
    Things I've tried so far:
    Adding filename-pattern="*" to the adapter

    ...and that's all. I can't think of anything to do. Any thoughts?
    Last edited by zgillet; Feb 28th, 2013, 04:17 PM. Reason: More information

  • #2
    Looks like this placeholder
    ${pull.directory}
    resolves to
    (this is the remote directory)/in/.
    which doesn't look valid.

    Even after this is fixed, can you also explain what you are trying to do with the aggregator and reaper? I don't see a correlation or release strategy, and I don't see you setting up the standard headers etc, for the default strategies - this will fail when a message eventually arrives at the aggregator.

    Comment


    • #3
      The pull.directory in my properties file is "(this is the remote directory)/in"... this is what is baffling me. It seems to look for . and .. as files.

      Also, I assumed the default release strategy would work in this case, can you explain why the message would fail?

      Edit: The aggregator seems to be working just fine, I just get a synchronization error as seen in the stack trace.
      Last edited by zgillet; Mar 1st, 2013, 09:44 AM.

      Comment


      • #4
        These are just a debug messages, the adapter lists the entire remote directory; for *nix hosts, the list returns the current (.) and parent (..) directories. This debug message is simply saying these (and any other) directories are skipped.

        The default correlation strategy expects a header 'correlationId'. You have no componenents that populate that, so you will get...

        Assert.state(correlationKey!=null, "Null correlation not allowed. Maybe the CorrelationStrategy is failing?");
        If you want all messages correlated the same way simply use
        Code:
        correlation-expression="'foo'"

        Comment


        • #5
          I apologize, I failed to include that I am using a custom aggregator that pulls specific IDs from files, and it works just fine.

          No I'm not sure what is causing the exception, is there any way to see WHAT file is not found?

          Comment


          • #6
            This message
            Caused by: org.springframework.core.NestedIOException: Failed to remove file: 2: File not found
            is coming from the server side.

            So the problem is with removing the file after it was fetched; why don't you try setting
            Code:
            delete-remote-files="false"
            and see what happens?

            Maybe the account you are using only has read access to the file?

            Comment


            • #7
              I've narrowed the problem down and I believe it boils down to this function in AbstractInboundFileSynchronizer.

              I get the first debug message, showing that the remoteFilePath is "(this is the remote directory)/in/."
              Then, the code continues and passes into the session.remove function that throws the error because it can't find the "." file. Am I mistaken?

              Code:
              	private void copyFileToLocalDirectory(String remoteDirectoryPath, F remoteFile, File localDirectory, Session<F> session) throws IOException {
              		String remoteFileName = this.getFilename(remoteFile);
              		String localFileName = this.generateLocalFileName(remoteFileName);
              		String remoteFilePath = remoteDirectoryPath + remoteFileSeparator + remoteFileName;
              		if (!this.isFile(remoteFile)) {
              			if (logger.isDebugEnabled()) {
              				logger.debug("cannot copy, not a file: " + remoteFilePath);
              			}
              			return;
              		}
              				
              		File localFile = new File(localDirectory, localFileName);
              		if (!localFile.exists()) {
              			String tempFileName = localFile.getAbsolutePath() + this.temporaryFileSuffix;
              			File tempFile = new File(tempFileName);
              			InputStream inputStream = null;
              			FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
              			try {
              				session.read(remoteFilePath, fileOutputStream);
              			}
              			catch (Exception e) {
              				if (e instanceof RuntimeException){
              					throw (RuntimeException) e;
              				}
              				else {
              					throw new MessagingException("Failure occurred while copying from remote to local directory", e);
              				}
              			}
              			finally {
              				try {
              					if (inputStream != null) {
              						inputStream.close();
              					}
              				}
              				catch (Exception ignored1) {
              				}
              				try {
              					fileOutputStream.close();
              				}
              				catch (Exception ignored2) {
              				}
              			}
              			
              			if (tempFile.renameTo(localFile)) {
              				if (this.deleteRemoteFiles) {
              					session.remove(remoteFilePath);
              					if (logger.isDebugEnabled()) {
              						logger.debug("deleted " + remoteFilePath);
              					}
              				}
              			}
              		}
              	}

              Comment


              • #8
                Yes, I am afraid you are mistaken - this code is further up in that method...

                Code:
                		if (!this.isFile(remoteFile)) {
                			if (logger.isDebugEnabled()) {
                				logger.debug("cannot copy, not a file: " + remoteFilePath);
                			}
                			return;
                		}
                This implies it found a file (after skipping . and ..) copied it, and then failed on the remove.

                Comment


                • #9
                  Like I said, try it with the remove turned off.

                  Comment


                  • #10
                    I tried omitting delete-remote-files="true" and still got the same exception.

                    Comment


                    • #11
                      Sorry, you can't possibly have gotten the same exception because line 217

                      Caused by: org.springframework.core.NestedIOException: Failed to remove file: 2: File not found
                      at org.springframework.integration.sftp.session.SftpS ession.remove(SftpSession.java:68)
                      at org.springframework.integration.file.remote.synchr onizer.AbstractInboundFileSynchronizer.copyFileToL ocalDirectory(AbstractInboundFileSynchronizer.java :217)
                      ...
                      is NOT executed when that attribute is false.

                      Comment

                      Working...
                      X