Announcement Announcement Module
No announcement yet.
inbound-channel-adapter and task-executor pool Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • inbound-channel-adapter and task-executor pool


    I'm trying to import files from a directory. First I used a simple file adapter such as:

    <file:inbound-channel-adapter channel="FILE_IMPORT_CHANNEL" directory="${import.dir}" prevent-duplicates="false"/>
    This seems to work fine. However only one thread seems to be used for processing all the files. So I played around with the poller element and found an option to add a task-executor reference.

    <file:inbound-channel-adapter channel="FILE_IMPORT_CHANNEL" directory="${import.dir}" prevent-duplicates="false">
      <int:poller fixed-rate="${import.polldelay}" max-messages-per-poll="100" task-executor="taskExecutor"/>
    When using the configuration above the thread pool is indeed used and processing files will be much faster. However, I noticed that in some cases multiple messages will be created for the same file during one poll interval, which is not what I expected.

    INFO  [taskExecutor-25] - 2011-02-01 01:18:39 org.springframework.integration.file.FileReadingMessageSource: Created message: [[Payload=/home/import/report_1122345_20110130-20110131.xml.gz][Headers={timestamp=1296519519690, id=2e5b2565-7d7f-4f30-8ff8-5f53acb8b5fa}]]
    INFO  [taskExecutor-17] - 2011-02-01 01:18:39 org.springframework.integration.file.FileReadingMessageSource: Created message: [[Payload=/home/import/report_1122345_20110130-20110131.xml.gz][Headers={timestamp=1296519519690, id=b2f29747-538f-42bd-8a18-8bc0084b88a5}]]
    INFO  [taskExecutor-21] - 2011-02-01 01:18:39 org.springframework.integration.file.FileReadingMessageSource: Created message: [[Payload=/home/import/report_1122345_20110130-20110131.xml.gz][Headers={timestamp=1296519519690, id=095a0cbd-936c-4ad4-90cb-d9038ec98bea}]]
    I took a look at the source code but wasn't able to find out what could have happened. So my question is if theres something wrong with the configuration above? Is it intended to be used that way? Also, I tried to use the nio-locker but it seems to be not allowed together with the poller. I'm wondering why.


    Edit: I forgot to mention that the very next thing I'm doing with each message emitted through the inbound-channel-adapter is to move it to a different directory, therefor the prevent-duplicates="false" option.
    Last edited by spodxx; Feb 1st, 2011, 04:48 AM.

  • #2
    Wel, i see that you have prevent-duplicates set to 'false'. Is there a reason why?


    • #3
      I want to be able to re-import files at any time by moving them back to the import directory. Also since the application runs as a daemon process 24/7 I want to avoid memory leaks on an ever growing duplicate file list.


      • #4
        Then how else do we know that a particular file has been processed?
        There are certain file processing patterns that you can use to address your use case. Interestingly enough there was a long thread regarding a use case very similar to yours last week, so you may want to browse through it


        • #5
          As I'm taking care to move the file to a work directory immediately after the file message has been created from the input adapter, there should be no need to "remember" the file as far as the adapter is concerned. Currently I've fixed the issue described above by just having the move-to-work-dir transformer ignore non-existing files.

          However, the way I expected the configuration listed above to work would be as follows.
          The polling should take place in a task-scheduler thread, scans the specified directory and creates a queue of max-messages-per-poll number of files. In case a task-executor was specified, files from the queue would be handled by the executor, until either the queue is empty or the executor resources are exhausted. In later case, the behaviour should be configurable, e.g. discard current queue or wait for the executor to allow new task allocation.

          Just my 2 cents..