Announcement Announcement Module
No announcement yet.
Spring Integration *max-messages-per-poll* and duplicate messages Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Spring Integration *max-messages-per-poll* and duplicate messages

    My configuration is given below. My two questions are

    1). Even if there is only 1 file in the directory, *max-messages-per-poll* is sending 5 messages with same file information. I think that is correct per spec. However, What would be the best way to discard/prevent the four duplicates( files) from reaching the Service Activator?

    2)Currently, based on this configuration, the file ends up in the outbound directory. Its fine if there are no errors. What would be the best way to redirect the file to a different directory from the Service Activator? ( Pass an error-channel and send it there from Service Activator?)

    Please advise and thanks very much in advance!
    	<file:inbound-channel-adapter id="filesInbound"
    	    <integration:poller  id="poller" fixed-delay="20000" max-messages-per-poll="5"/>
    	<task:executor id="taskExecutor" pool-size="3"/>
    	 <integration:channel id="filesInbound">
    	    <integration:dispatcher task-executor="taskExecutor"/>
    <integration:service-activator input-channel="filesInbound" output-channel="filesOutbound"	                              
    	<file:outbound-channel-adapter id="filesOutBound"
    Last edited by Gary Russell; Jul 6th, 2013, 07:26 AM.

  • #2
    With prevent-duplicates=false, the same file will be sent 5 times if you don't remove the file.

    However, since you are multi-threading the flow (with a task executor), it's certainly possible for another thread to see the file.

    Probably the easiest thing to do is to add another service before the filesInboundChannel to rename the file (on the poller thread) - that way it (or another thread) won't "see" the file.

    You can handle errors by putting an error-channel on the poller; or you could use an ExpressionEvaluatingRequestHandlerAdvice on the service-activator; there's an example of using this advice in the retry-and-more sample.


    • #3
      Hi Gary,
      Thanks very much and that worked and that created a follow up question
      1. When we set *prevent-duplicates=true*, is there a configuration that we can specify that keeps the number of files in the spring queue for checking? Or the entries keep on piling up in one running session until the memory exhausts? If there is a configurable max entry - how can we specify that and can we reset that using any admin option while app is running?

      2.This is a multiple JVM question. Ideally we want only the code in one JVM to poll the directory. If there are multiple JVMs polling the same directory - the same file could end up in those service activators and present a sticky situation. I presume even the *prevent-duplicates* won't work across multiple JVMs. If that is the case what would be the best option in those type of situations? IS there an admin way to start and stop the pollers on JVMs while the app is running. Is *auto-startup* the correct config to handle this?

      Please advise and oh the main thing - You guys have done an amazing job that saved me thousand of coding hours. Keep up the good work!



      • #4
        1. By default, with a regex-pattern, the adapter gets a CompositeFileListFilter (with a RegexPatternFileListFilter and an AcceptOnceFileListFilter with an unbounded "memory").

        To change this, you'd have to define your own <bean/> for a CompositeFileListFilter with a RegexPatternFileListFilter, with an AcceptOnceFileListFilter with a 'maxCapacity' constructor-arg. You'd then supply the filter to the adapter with the 'filter' attribute (you can't have both filter and a pattern attributes, hence the need to build a CompositeFileListFilter).

        2. The NIO locker should prevent the same file being used by multiple JVMs, but see the caveat about releasing locks in the reference manual

        But, yes, you can start/stop the adapters at will; setting auto-startup to false will prevent them starting automatically.

        You can control the adapters directly (get a reference to the SourcePollingChannelAdapter and invoke stop(), start()). Or, you can expose the endpoints as JMX MBeans and use stop/start operations; or you can use a <control-bus/> to send messages; e.g. "@filesInbound.stop()"


        • #5
          Thanks very much, Gary. Lots of useful info. Now that you mentioned the file locker I got a question on that one. Currently, I am unlocking the file as the first thing in my Service Activator. Is that the right way to do it? I mean without doing that we won't be able to read the file inside Service Activator? Also, that brings up a sticky situation that other process can manipulate the files when service activator is reading its contents?

          Let me know your thoughts when you get a chance. Thanks


          • #6
            But I thought you'd changed your flow to immediately rename the file before it even gets to the service - so what's the issue?

            Caveat: I am not a file locking expert.


            • #7
              Hi Gary,
              I did not rename the file as the name of the file has a business significance. So I am not sure renaming will work in my case. Anyway, with *prevent-duplicates=true* that issue no longer exists. However, as you mentioned above "With prevent-duplicates=false, the same file will be sent 5 times if you don't remove the file" - I am trying to get a handle on that statement. When you say remove the file, who would be responsible to remove the file? - Because as it stands now, the runtime invokes the service activator with duplicate messages/files. Where would I interject(What should I do specifically) to prevent the duplicates before Service Activator gets invoked with duplicate messages/files?

              Please advise and thanks very much in advance.


              • #8
                It depends on your application. I see you have the poller running every 20 seconds; how long does it take to process a file?

                If less than 20 seconds, you can use an ExpressionEvaluatingRequestHandlerAdvice to rename or delete the file.

                There's an example of this in the retry-and-more sample. While that example is for a FTP channel adapter you can advise most components.

                If it takes longer than 20 seconds you have no choice but to rename the file (or move it to another directory) to avoid the next poll seeing it.


                • #9
                  Hi Gary,
                  It takes less than 5 seconds to process the file and obviously I am going to increase polling to 1 minute intervals. However, I feel we will be at the mercy of timings and in a worst - case for whatever reason it file processing takes more than a minute, we might end up in soup. So, I am going to take a look at the alternative that you suggested and have a feeling that it will solve it.

                  I am thrilled with your "move it to another directory" option. So, I guess to connect service activator to that "new directory", wouldn't I have to create another poller on that *new directory* or is there another way to invoke the service activator once the file appears on that new directory.
                  Also, *move it another directory* could bring up another issue that is well documented. - The app picking up the file before its completely moved. I guess we need to do the *renaming file* trick so that app will pick up the complete file only?

                  Thanks a lot Gary for all your amazing inputs!


                  • #10
                    How will you know when the file is "complete"?

                    You can use a custom FileListFilter to apply whatever algorithm you need or, if you can get the sender to send it with a temporary suffix and rename it when it's complete, that is the best solution (it's what we use in our (s)ftp adapters).

                    Regarding the renaming; you can just do it with a couple of extra components; you don't need another adapter...


                    Something like...

                    <int:chain input-channel="filesInbound" output-channel="filesOutbound">
                        <int:service-activator expression="payload.renameTo('/foo/' +" />
                        <int:transformer expression="new'/foo/' +" />
                        <int:service-activator ref="handler" />


                    • #11
                      Yes, the sender is actually sending with a .tmp and then once the writing is complete, the sender will rename it to .txt/.TXT. So, we are good there.

                      So coming back to the original question - as my original confusion again re-surfaced. With "prevent-duplicates=false" and max-"messages-per-poll=5" and only "File1.txt" in the directory, the runtime will send 3 messages (File payload) to service activator parallely( 3 threads) with same "File1.txt" payload. So I am wondering how would renaming the "File1.txt" help to prevent handling the duplicate messages?
                      I was looking for a solution to detect those are duplicate messages and not to process at all. Ideally Service Activator should not be invoked at all for those duplicates. Sorry for being naive and hope I did not confuse you?


                      • #12
                        Just don't use an ExecutorChannel until AFTER you've renamed the file. That way, when the poller hands over to the the other thread and goes back to the adapter, the file will no longer be there.

                        i.e. put the renaming service BETWEEN the inbound adapter and the channel with the executor

                        <int:service-activator input-channel="fromAdapter" output-channel="filesInbound" expression="payload.renameTo('/foo/' +" />


                        • #13
                          Hi Gary,

                          i.e. put the renaming service BETWEEN the inbound adapter and the channel with the executor

                          <int:service-activator input-channel="fromAdapter" output-channel="filesInbound" expression="payload.renameTo('/foo/' +" />

                          The channel with the executor is the "filesInbound" and that is the same ID as the inbound file adapter ID. You meant to rename that to *fromAdapter* and process it?



                          • #14
                            Sorry - I didn't notice the configuration of the adapter; it's probably easier to use

                            <int:service-activator input-channel="filesInbound" output-channel="nextChannel" expression="payload.renameTo('/foo/' +" />

                            and define nextChannel with the executor.


                            • #15
                              Hi Gary,
                              I still need to figure out why the presence of "expression" is preventing JBoss from starting up. But on the "rename" part, will the expression takes care of creating two objects, delete it and then rename it?