Announcement Announcement Module
Collapse
No announcement yet.
Spring Integration *max-messages-per-poll* and duplicate messages Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Spring Integration *max-messages-per-poll* and duplicate messages

    My configuration is given below. My two questions are

    1). Even if there is only 1 file in the directory, *max-messages-per-poll* is sending 5 messages with same file information. I think that is correct per spec. However, What would be the best way to discard/prevent the four duplicates( files) from reaching the Service Activator?

    2)Currently, based on this configuration, the file ends up in the outbound directory. Its fine if there are no errors. What would be the best way to redirect the file to a different directory from the Service Activator? ( Pass an error-channel and send it there from Service Activator?)

    Please advise and thanks very much in advance!
    Code:
    	<file:inbound-channel-adapter id="filesInbound"
    								  directory="file:C:/inbound"
    								  prevent-duplicates="false"
    	                              filename-regex="(.*)\.(txt|TXT)" 
    	                              auto-startup="true">
    	    <integration:poller  id="poller" fixed-delay="20000" max-messages-per-poll="5"/>
    	    <file:nio-locker/>   
    	</file:inbound-channel-adapter>
    
    	
    	<task:executor id="taskExecutor" pool-size="3"/>
    	 <integration:channel id="filesInbound">
    	    <integration:dispatcher task-executor="taskExecutor"/>
    	</integration:channel>
    
    <integration:service-activator input-channel="filesInbound" output-channel="filesOutbound"	                              
    	                               ref="handler">
    	</integration:service-activator>
    
    	<file:outbound-channel-adapter id="filesOutBound"
    								   directory="file:C:/process"
    								   delete-source-files="true"/>
    Last edited by Gary Russell; Jul 6th, 2013, 08:26 AM.

  • joson
    replied
    Hi Gary,
    The reason I made the custom error-channel is any exceptions that could occur inside Service Activator( writing contents to DB failed), I wanted to write that file to "errors" directory. If not then write to the "archive" directory.

    I have tried to throw a forced Runtime exception from Service Activator and tried to bridge the errorChannel to filesErrors. However, it is not getting written and is stuck in "process" directory. Thanks

    Leave a comment:


  • Gary Russell
    replied
    Good choice - I had forgotten that the outbound gateway does a simple rename when delete-source-files="true".

    However, I am not a fan of the manipulation of the reply channel header in that way.

    I would use this instead...

    Code:
    	<integration:chain id="file-channels-enricher" input-channel="process" output-channel="filesArchiveOut">
    		<integration:header-enricher>
    			<integration:error-channel ref="filesErrors" />
    		</integration:header-enricher>
    		<integration:service-activator ref="fileHandler" />
    	</integration:chain>
    You also might want to consider setting the errorChannel header earlier, in case the rename fails (e.g. due to permissions).

    BTW, with an ExecutorChannel, errors will be sent to the default "errorChannel" for "free" - no config needed. So you either use "errorChannel" instead of "filesErrors" or simply <bridge/> errorChannel to filesErrors.

    Either way would avoid the need for you to manipulate the errorChannel header.


    I don't understand why you had problems with jboss - my guess is an earlier version of Spring Integration on the classpath (which you should still check for in case you're not using the version you are expecting).
    Last edited by Gary Russell; Jul 12th, 2013, 11:32 AM.

    Leave a comment:


  • joson
    replied
    Hi Gary,
    I went back to the drawing board and thought through the process once more and here is my working version. I have tested it extensively and is working okay. When you get a chance, let me know if you spot any problems that could cause performance issues or reliability issues.

    The outbound gateway is basically the "rename". Service Activator now executes in parallel threads and writes to "archive" or "error" directory.

    Code:
    	<file:inbound-channel-adapter id="filesInbound"
    		channel="inbound" auto-startup="true" directory="file:C:/inbound"
    		prevent-duplicates="false" filename-regex="(.*)\.(txt|TXT)">
    		<integration:poller id="poller" fixed-delay="20"
    			time-unit="SECONDS" max-messages-per-poll="-1" />
    	</file:inbound-channel-adapter>
    
    	<task:executor id="taskExecutor" pool-size="3" />
    	<integration:channel id="process">
    		<integration:dispatcher task-executor="taskExecutor" />
    	</integration:channel>
    
    	<file:outbound-gateway id="fileProcess"
    		request-channel="inbound" reply-channel="process" directory="file:C:/process"
    		delete-source-files="true" />
    	<file:outbound-channel-adapter id="filesArchiveOut" 
    		directory="file:C:/archive" delete-source-files="true" />
    
    		<file:outbound-channel-adapter id="filesErrors" 
    		directory="file:C:/errors" delete-source-files="true" />
    
    	<integration:chain id="file-channels-enricher" input-channel="process" >
    		<integration:header-enricher>
    			<integration:reply-channel ref="filesArchiveOut" />
    			<integration:error-channel ref="filesErrors" />
    					
    		</integration:header-enricher>
    		<integration:service-activator ref="fileHandler" />
    	</integration:chain>
    
    
    Inside Service Activator
    
    MessageChannel outputChannel = isException ? (DirectChannel)headers.get(MessageHeaders.ERROR_CHANNEL) : (DirectChannel)headers.get(MessageHeaders.REPLY_CHANNEL);
    		Message<File> fileObject = MessageBuilder.withPayload(input).build();
    		outputChannel.send(fileObject, 0);
    Last edited by joson; Jul 12th, 2013, 10:41 AM.

    Leave a comment:


  • joson
    replied
    Hi Gary,
    For whatever reason *expression* makes JBoss server not to start up. So, I did the rename inside the transformer and I now think that is not the right place. Thanks

    Code:
    @Transformer
    	public File renameFile(File input) throws Exception {
    		this.locker.unlock(input);
    		logger.info("@Transformer renaming file from: " + input.getAbsolutePath() );
    		Path source = input.toPath();
    		Path newDir = Paths.get("C:\\intermediate");
            
    	    Path newPath = Files.move(source, newDir.resolve(source.getFileName()), REPLACE_EXISTING);
    	    logger.info("@Transformer after renaming: " + newPath.toFile().getAbsolutePath() );
    	    return newPath.toFile();
    		
    	}

    Leave a comment:


  • Gary Russell
    replied
    When posting code/config, please use [ code ] ... [ /code ] tags (no spaces in brackets).

    I think I have explained in a great deal of detail that the rename has to happen before any async processing.

    inbound-adapter->rename->executor-channel-> ...

    I still don't see any 'renameTo()' in your "final" configuration.

    Leave a comment:


  • joson
    replied
    Hi Gary,
    Guess almost there except the parallelism on ServiceActivator - . Here is the final config. The transformer obviously renames the file. However, when I give the outbound channel to the dispatcher, both Transformer and ServiceActivator executes on the same thread even if multiple files are picked up in the same poll. Is there a way to achieve parallelism in this configuration? Thanks

    Code:
    	<integration:channel id="fileInChannel" />
    	<integration:channel id="fileOutChannel" />
    
    	<file:inbound-channel-adapter id="filesInbound"
    		channel="fileInChannel" auto-startup="true" directory="file:C:/inbound"
    		prevent-duplicates="false" filename-regex="(.*)\.(txt|TXT)">
    		<integration:poller id="poller" fixed-delay="20"
    			time-unit="SECONDS" max-messages-per-poll="5" />
    		<file:nio-locker />
    	</file:inbound-channel-adapter>
    
    	<task:executor id="taskExecutor" pool-size="3" />
    	<integration:channel id="fileOutChannel">
    		<integration:dispatcher task-executor="taskExecutor" />
    	</integration:channel>
    
    
    	<integration:chain input-channel="fileInChannel"
    		output-channel="fileOutChannel">
    		<integration:transformer ref="transformer" />
    		<integration:service-activator ref="handler" />
    	</integration:chain>
    
    	<file:outbound-channel-adapter id="filesProcessOut"
    		channel="fileOutChannel" directory="file:C:/process"
    		delete-source-files="true" />
    Last edited by Gary Russell; Jul 9th, 2013, 10:11 PM.

    Leave a comment:


  • Gary Russell
    replied
    You should be ok with SI 2.2.x (2.2.4 is current) and Spring 3.2.x (3.2.3 is current).

    Leave a comment:


  • joson
    replied
    Okay - Got it. Thanks

    For SpEL to work do we need to have matching versions of integration jars and spel jars? I am having spring-expression-3.2.2.jar and spring-integration-core-2.2.3.RELEASE.jar?

    Leave a comment:


  • Gary Russell
    replied
    See the javadoc for java.io.File.renameTo(). It will rename the file on disk; nothing is "deleted" so I am not sure what you mean by that. After the rename the message payload will still refer to the old file name; that's why I added a transformer to transform the payload to a File with the new file name.

    File.renameTo() takes another File as its argument. SpEL (Spring Expression Language) takes care of converting the string argument into a File object.

    Leave a comment:


  • joson
    replied
    Hi Gary,
    I still need to figure out why the presence of "expression" is preventing JBoss from starting up. But on the "rename" part, will the expression takes care of creating two java.io.File objects, delete it and then rename it?

    Thanks

    Leave a comment:


  • Gary Russell
    replied
    Sorry - I didn't notice the configuration of the adapter; it's probably easier to use

    <int:service-activator input-channel="filesInbound" output-channel="nextChannel" expression="payload.renameTo('/foo/' + payload.name)" />

    and define nextChannel with the executor.

    Leave a comment:


  • joson
    replied
    Hi Gary,

    i.e. put the renaming service BETWEEN the inbound adapter and the channel with the executor

    <int:service-activator input-channel="fromAdapter" output-channel="filesInbound" expression="payload.renameTo('/foo/' + payload.name)" />

    The channel with the executor is the "filesInbound" and that is the same ID as the inbound file adapter ID. You meant to rename that to *fromAdapter* and process it?

    Thanks!

    Leave a comment:


  • Gary Russell
    replied
    Just don't use an ExecutorChannel until AFTER you've renamed the file. That way, when the poller hands over to the the other thread and goes back to the adapter, the file will no longer be there.

    i.e. put the renaming service BETWEEN the inbound adapter and the channel with the executor

    <int:service-activator input-channel="fromAdapter" output-channel="filesInbound" expression="payload.renameTo('/foo/' + payload.name)" />

    Leave a comment:


  • joson
    replied
    Yes, the sender is actually sending with a .tmp and then once the writing is complete, the sender will rename it to .txt/.TXT. So, we are good there.

    So coming back to the original question - as my original confusion again re-surfaced. With "prevent-duplicates=false" and max-"messages-per-poll=5" and only "File1.txt" in the directory, the runtime will send 3 messages (File payload) to service activator parallely( 3 threads) with same "File1.txt" payload. So I am wondering how would renaming the "File1.txt" help to prevent handling the duplicate messages?
    I was looking for a solution to detect those are duplicate messages and not to process at all. Ideally Service Activator should not be invoked at all for those duplicates. Sorry for being naive and hope I did not confuse you?

    Leave a comment:

Working...
X