Announcement Announcement Module
Collapse
No announcement yet.
SFTP Inbound channel adaptor and service activator issues Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • SFTP Inbound channel adaptor and service activator issues

    I have written a small application using sftpInbound adaptor and service activator. The sftpInbound adaptor listens for creation of files and on creation, downloads it, puts it on a channel. There is a service activator which should be executed 'for each file' transfered.

    Inbound adapter polls files correctly. But service activator method gets executed only once irrespective of the number of files in a single poll. I need to execute service activator for each file polled.

    Could you please help to resolve?

    Following is my config...

    <bean id="sftpSessionFactory1"
    class="org.springframework.integration.sftp.sessio n.DefaultSftpSessionFactory">
    <property name="host" value="" />
    <property name="privateKeyPassphrase" value="" />
    <property name="port" value="22" />
    <property name="user" value="" />
    <property name="password" value="" />
    </bean>

    <int-sftp:inbound-channel-adapter id="sftpInboundAdapter1"
    channel="inputChannel1" session-factory="sftpSessionFactory1"
    local-directory="/u01/tmpSFTP/FromExchange/"
    remote-directory="/tmp/sftptest/sending" auto-create-local-directory="true"
    delete-remote-files="true" filename-pattern="*.txt">
    <intoller cron="2 */4 * * * *" max-messages-per-poll="1" />
    </int-sftp:inbound-channel-adapter>

    <int:channel id="inputChannel1" />
    <int:channel id="inputChannelUpdated1" />
    <int:header-enricher input-channel="inputChannel1" output-channel="inputChannelUpdated1">
    <int:header name="exchangeUnitId" value="120" />
    </int:header-enricher>

    <int:service-activator input-channel="inputChannelUpdated1" ref="handler" method="handleMessage" />

    <bean id="handler" class="com.myapp.InboundServiceActivator" />

    Please note that I have files with different names at sftp location.

    Many thanks in advance.
    Last edited by joby; Jun 26th, 2013, 08:07 AM.

  • #2
    I suggest you turn on DEBUG logging to see what is happening be sure to log the thread (%t in log4j).

    Look for PreSend and PostSend logs on inputChannelUpdated1

    Comment


    • #3
      I have attached logs below. I have 3 files in remote directory.

      /tmp/sftptest/sending//EDISample.txt
      /tmp/sftptest/sending//EDISample1.txt
      /tmp/sftptest/sending//EDISample2.txt

      Inbound channel adapter fetched all files in the first poll itself and adds to queue as highlighted below. But during the first poll, it processes only 'EDISample.txt' through service activator. Then, during next poll intervals, it processes next two files as shown in log below.

      I am looking for a mechanism where after polling a single file by inbound adapter, the flow should go to service activator so that we need to do some processing for each file. Since I am new to SI, I am not able to get a suitable solution. I appreciate if you can provide more insights.
      ----------------------------------
      12:10:01,427 DEBUG SimplePool:182 - Obtained new org.springframework.integration.sftp.session.SftpS ession@32a325.
      12:10:02,443 DEBUG SftpInboundFileSynchronizer:219 - deleted /tmp/sftptest/sending//EDISample1.txt
      12:10:02,989 DEBUG SftpInboundFileSynchronizer:219 - deleted /tmp/sftptest/sending//EDISample.txt
      12:10:03,521 DEBUG SftpInboundFileSynchronizer:219 - deleted /tmp/sftptest/sending//EDISample2.txt
      12:10:03,521 DEBUG CachingSessionFactory:109 - Releasing Session back to the pool.
      12:10:03,521 DEBUG SimplePool:210 - Releasing org.springframework.integration.sftp.session.SftpS ession@32a325 back to the pool
      12:10:03,521 DEBUG FileReadingMessageSource:281 - [B]Added to queue: [/u01/tmpSFTP/FromExchange/EDISample.txt, /u01/tmpSFTP/FromExchange/EDISample1.txt, /u01/tmpSFTP/FromExchange/EDISample2.txt][/B]
      12:10:03,521 INFO FileReadingMessageSource:264 - Created message: [[Payload=/u01/tmpSFTP/FromExchange/EDISample.txt][Headers={timestamp=1372315203521, id=f0d16732-45df-4980-8bae-edb65e216f8a}]]
      12:10:03,521 DEBUG SourcePollingChannelAdapter:77 - Poll resulted in Message: [Payload=/u01/tmpSFTP/FromExchange/EDISample.txt][Headers={timestamp=1372315203521, id=f0d16732-45df-4980-8bae-edb65e216f8a}]
      12:10:03,521 DEBUG DirectChannel:224 - preSend on channel 'inputChannel1', message: [Payload=/u01/tmpSFTP/FromExchange/EDISample.txt][Headers={timestamp=1372315203521, id=f0d16732-45df-4980-8bae-edb65e216f8a}]
      12:10:03,521 DEBUG MessageTransformingHandler:67 - org.springframework.integration.transformer.Messag eTransformingHandler#0 received message: [Payload=/u01/tmpSFTP/FromExchange/EDISample.txt][Headers={timestamp=1372315203521, id=f0d16732-45df-4980-8bae-edb65e216f8a}]
      12:10:03,521 DEBUG MessageTransformingHandler:197 - handler 'org.springframework.integration.transformer.Messa geTransformingHandler#0' sending reply Message: [Payload=/u01/tmpSFTP/FromExchange/EDISample.txt][Headers={timestamp=1372315203521, id=8fd8a85f-4901-41ee-b92d-ac96915b2ef1, exchangeUnitId=120}]
      12:10:03,521 DEBUG DirectChannel:224 - preSend on channel 'inputChannelUpdated1', message: [Payload=/u01/tmpSFTP/FromExchange/EDISample.txt][Headers={timestamp=1372315203521, id=8fd8a85f-4901-41ee-b92d-ac96915b2ef1, exchangeUnitId=120}]
      12:10:03,521 DEBUG ServiceActivatingHandler:67 - ServiceActivator for [org.springframework.integration.handler.MethodInvo kingMessageProcessor@1aea727] received message: [Payload=/u01/tmpSFTP/FromExchange/EDISample.txt][Headers={timestamp=1372315203521, id=8fd8a85f-4901-41ee-b92d-ac96915b2ef1, exchangeUnitId=120}]

      From Service Activator Method: Copying file: /u01/tmpSFTP/FromExchange/EDISample.txt

      12:10:03,536 DEBUG ServiceActivatingHandler:197 - handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvo kingMessageProcessor@1aea727]' sending reply Message: [Payload=/u01/tmpSFTP/FromExchange/EDISample.txt][Headers={timestamp=1372315203536, id=f046590f-ae0c-4f15-b84f-d20adf2f66b0, exchangeUnitId=120}]
      12:10:03,536 DEBUG DefaultListableBeanFactory:245 - Returning cached instance of singleton bean 'errorChannel'
      12:10:03,536 DEBUG PublishSubscribeChannel:224 - preSend on channel 'errorChannel', message: [Payload=org.springframework.integration.support.ch annel.ChannelResolutionException: no output-channel or replyChannel header available][Headers={timestamp=1372315203536, id=216cded3-9fda-4a80-9fb3-ec6f3269ebc9}]
      12:10:03,536 DEBUG PublishSubscribeChannel:237 - postSend (sent=true) on channel 'errorChannel', message: [Payload=org.springframework.integration.support.ch annel.ChannelResolutionException: no output-channel or replyChannel header available][Headers={timestamp=1372315203536, id=216cded3-9fda-4a80-9fb3-ec6f3269ebc9}]




      Below log is for second poll interval. I can see that second file is processed now through Service activator.


      12:10:20,005 INFO FileReadingMessageSource:264 - Created message: [[Payload=/u01/tmpSFTP/FromExchange/EDISample1.txt][Headers={timestamp=1372315220005, id=9b34b99c-4d37-4eb1-86f8-0ad2aedb38f8}]]
      12:10:20,005 DEBUG SourcePollingChannelAdapter:77 - Poll resulted in Message: [Payload=/u01/tmpSFTP/FromExchange/EDISample1.txt][Headers={timestamp=1372315220005, id=9b34b99c-4d37-4eb1-86f8-0ad2aedb38f8}]
      12:10:20,005 DEBUG DirectChannel:224 - preSend on channel 'inputChannel1', message: [Payload=/u01/tmpSFTP/FromExchange/EDISample1.txt][Headers={timestamp=1372315220005, id=9b34b99c-4d37-4eb1-86f8-0ad2aedb38f8}]
      12:10:20,005 DEBUG MessageTransformingHandler:67 - org.springframework.integration.transformer.Messag eTransformingHandler#0 received message: [Payload=/u01/tmpSFTP/FromExchange/EDISample1.txt][Headers={timestamp=1372315220005, id=9b34b99c-4d37-4eb1-86f8-0ad2aedb38f8}]
      12:10:20,005 DEBUG MessageTransformingHandler:197 - handler 'org.springframework.integration.transformer.Messa geTransformingHandler#0' sending reply Message: [Payload=/u01/tmpSFTP/FromExchange/EDISample1.txt][Headers={timestamp=1372315220005, id=b3d6c914-4727-4a09-aeea-9d63f3e7397c, exchangeUnitId=120}]
      12:10:20,005 DEBUG DirectChannel:224 - preSend on channel 'inputChannelUpdated1', message: [Payload=/u01/tmpSFTP/FromExchange/EDISample1.txt][Headers={timestamp=1372315220005, id=b3d6c914-4727-4a09-aeea-9d63f3e7397c, exchangeUnitId=120}]
      12:10:20,005 DEBUG ServiceActivatingHandler:67 - ServiceActivator for [org.springframework.integration.handler.MethodInvo kingMessageProcessor@1aea727] received message: [Payload=/u01/tmpSFTP/FromExchange/EDISample1.txt][Headers={timestamp=1372315220005, id=b3d6c914-4727-4a09-aeea-9d63f3e7397c, exchangeUnitId=120}]

      From Service Activator Method: Copying file: /u01/tmpSFTP/FromExchange/EDISample1.txt

      12:10:20,005 DEBUG ServiceActivatingHandler:197 - handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvo kingMessageProcessor@1aea727]' sending reply Message: [Payload=/u01/tmpSFTP/FromExchange/EDISample1.txt][Headers={timestamp=1372315220005, id=cdffac47-7004-46be-b417-d356389a17ce, exchangeUnitId=120}]
      12:10:20,005 DEBUG PublishSubscribeChannel:224 - preSend on channel 'errorChannel', message: [Payload=org.springframework.integration.support.ch annel.ChannelResolutionException: no output-channel or replyChannel header available][Headers={timestamp=1372315220005, id=941006f7-d534-4263-bede-f1a24a0d4b17}]
      12:10:20,005 DEBUG PublishSubscribeChannel:237 - postSend (sent=true) on channel 'errorChannel', message: [Payload=org.springframework.integration.support.ch annel.ChannelResolutionException: no output-channel or replyChannel header available][Headers={timestamp=1372315220005, id=941006f7-d534-4263-bede-f1a24a0d4b17}]


      Below log is for third poll interval. Here third file is processed through Service activator.


      12:10:40,005 INFO FileReadingMessageSource:264 - Created message: [[Payload=D:\tmpSFTP\FilesToBePlacedHere\Exchange1\E DISample2.txt][Headers={timestamp=1372315240005, id=b580da09-62c1-4c01-81ee-8f5bdfb54c14}]]
      12:10:40,005 DEBUG SourcePollingChannelAdapter:77 - Poll resulted in Message: [Payload=D:\tmpSFTP\FilesToBePlacedHere\Exchange1\E DISample2.txt][Headers={timestamp=1372315240005, id=b580da09-62c1-4c01-81ee-8f5bdfb54c14}]
      12:10:40,005 DEBUG DirectChannel:224 - preSend on channel 'inputChannel1', message: [Payload=D:\tmpSFTP\FilesToBePlacedHere\Exchange1\E DISample2.txt][Headers={timestamp=1372315240005, id=b580da09-62c1-4c01-81ee-8f5bdfb54c14}]
      12:10:40,005 DEBUG MessageTransformingHandler:67 - org.springframework.integration.transformer.Messag eTransformingHandler#0 received message: [Payload=D:\tmpSFTP\FilesToBePlacedHere\Exchange1\E DISample2.txt][Headers={timestamp=1372315240005, id=b580da09-62c1-4c01-81ee-8f5bdfb54c14}]
      12:10:40,005 DEBUG MessageTransformingHandler:197 - handler 'org.springframework.integration.transformer.Messa geTransformingHandler#0' sending reply Message: [Payload=D:\tmpSFTP\FilesToBePlacedHere\Exchange1\E DISample2.txt][Headers={timestamp=1372315240005, id=aeb80533-a1ab-4c0c-98ca-83412aacf752, exchangeUnitId=120}]
      12:10:40,005 DEBUG DirectChannel:224 - preSend on channel 'inputChannelUpdated1', message: [Payload=D:\tmpSFTP\FilesToBePlacedHere\Exchange1\E DISample2.txt][Headers={timestamp=1372315240005, id=aeb80533-a1ab-4c0c-98ca-83412aacf752, exchangeUnitId=120}]
      12:10:40,005 DEBUG ServiceActivatingHandler:67 - ServiceActivator for [org.springframework.integration.handler.MethodInvo kingMessageProcessor@1aea727] received message: [Payload=D:\tmpSFTP\FilesToBePlacedHere\Exchange1\E DISample2.txt][Headers={timestamp=1372315240005, id=aeb80533-a1ab-4c0c-98ca-83412aacf752, exchangeUnitId=120}]

      From Service Activator Method: Copying file: D:\tmpSFTP\FilesToBePlacedHere\Exchange1\EDISample 2.txt

      Comment


      • #4
        Increase the max-messages-per-poll attribute; currently...

        max-messages-per-poll="1"
        This limits the number of files processed on each poll to 1.

        Comment


        • #5
          I increased as max-messages-per-poll="5". Still, only the first file is routed to service activator for the first poll interval.

          Comment


          • #6
            They won't run in parallel, but you won't have to wait until the next poll to process the subsequent files (up to 5).

            Comment


            • #7
              In my app, the sftp inbound adapter poller uses a cron expression (which may not be frequent polls). Since all files are polled in the first poll, the processing of all files (through service activator) are getting delayed since a single file is processed in a one cron interval.

              Is there any alternate method to acheive processing of all files fetched in a single sftp poll?

              Comment


              • #8
                As I said, max-messages-per-poll = n WILL allow up to n files to be processed on each poll. If you are seeing something different, please attach a log.

                Comment

                Working...
                X