Announcement Announcement Module
Collapse
No announcement yet.
Spring Integration *max-messages-per-poll* and duplicate messages Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • #16
    See the javadoc for java.io.File.renameTo(). It will rename the file on disk; nothing is "deleted" so I am not sure what you mean by that. After the rename the message payload will still refer to the old file name; that's why I added a transformer to transform the payload to a File with the new file name.

    File.renameTo() takes another File as its argument. SpEL (Spring Expression Language) takes care of converting the string argument into a File object.

    Comment


    • #17
      Okay - Got it. Thanks

      For SpEL to work do we need to have matching versions of integration jars and spel jars? I am having spring-expression-3.2.2.jar and spring-integration-core-2.2.3.RELEASE.jar?

      Comment


      • #18
        You should be ok with SI 2.2.x (2.2.4 is current) and Spring 3.2.x (3.2.3 is current).

        Comment


        • #19
          Hi Gary,
          Guess almost there except the parallelism on ServiceActivator - . Here is the final config. The transformer obviously renames the file. However, when I give the outbound channel to the dispatcher, both Transformer and ServiceActivator executes on the same thread even if multiple files are picked up in the same poll. Is there a way to achieve parallelism in this configuration? Thanks

          Code:
          	<integration:channel id="fileInChannel" />
          	<integration:channel id="fileOutChannel" />
          
          	<file:inbound-channel-adapter id="filesInbound"
          		channel="fileInChannel" auto-startup="true" directory="file:C:/inbound"
          		prevent-duplicates="false" filename-regex="(.*)\.(txt|TXT)">
          		<integration:poller id="poller" fixed-delay="20"
          			time-unit="SECONDS" max-messages-per-poll="5" />
          		<file:nio-locker />
          	</file:inbound-channel-adapter>
          
          	<task:executor id="taskExecutor" pool-size="3" />
          	<integration:channel id="fileOutChannel">
          		<integration:dispatcher task-executor="taskExecutor" />
          	</integration:channel>
          
          
          	<integration:chain input-channel="fileInChannel"
          		output-channel="fileOutChannel">
          		<integration:transformer ref="transformer" />
          		<integration:service-activator ref="handler" />
          	</integration:chain>
          
          	<file:outbound-channel-adapter id="filesProcessOut"
          		channel="fileOutChannel" directory="file:C:/process"
          		delete-source-files="true" />
          Last edited by Gary Russell; Jul 9th, 2013, 09:11 PM.

          Comment


          • #20
            When posting code/config, please use [ code ] ... [ /code ] tags (no spaces in brackets).

            I think I have explained in a great deal of detail that the rename has to happen before any async processing.

            inbound-adapter->rename->executor-channel-> ...

            I still don't see any 'renameTo()' in your "final" configuration.

            Comment


            • #21
              Hi Gary,
              For whatever reason *expression* makes JBoss server not to start up. So, I did the rename inside the transformer and I now think that is not the right place. Thanks

              Code:
              @Transformer
              	public File renameFile(File input) throws Exception {
              		this.locker.unlock(input);
              		logger.info("@Transformer renaming file from: " + input.getAbsolutePath() );
              		Path source = input.toPath();
              		Path newDir = Paths.get("C:\\intermediate");
                      
              	    Path newPath = Files.move(source, newDir.resolve(source.getFileName()), REPLACE_EXISTING);
              	    logger.info("@Transformer after renaming: " + newPath.toFile().getAbsolutePath() );
              	    return newPath.toFile();
              		
              	}

              Comment


              • #22
                Hi Gary,
                I went back to the drawing board and thought through the process once more and here is my working version. I have tested it extensively and is working okay. When you get a chance, let me know if you spot any problems that could cause performance issues or reliability issues.

                The outbound gateway is basically the "rename". Service Activator now executes in parallel threads and writes to "archive" or "error" directory.

                Code:
                	<file:inbound-channel-adapter id="filesInbound"
                		channel="inbound" auto-startup="true" directory="file:C:/inbound"
                		prevent-duplicates="false" filename-regex="(.*)\.(txt|TXT)">
                		<integration:poller id="poller" fixed-delay="20"
                			time-unit="SECONDS" max-messages-per-poll="-1" />
                	</file:inbound-channel-adapter>
                
                	<task:executor id="taskExecutor" pool-size="3" />
                	<integration:channel id="process">
                		<integration:dispatcher task-executor="taskExecutor" />
                	</integration:channel>
                
                	<file:outbound-gateway id="fileProcess"
                		request-channel="inbound" reply-channel="process" directory="file:C:/process"
                		delete-source-files="true" />
                	<file:outbound-channel-adapter id="filesArchiveOut" 
                		directory="file:C:/archive" delete-source-files="true" />
                
                		<file:outbound-channel-adapter id="filesErrors" 
                		directory="file:C:/errors" delete-source-files="true" />
                
                	<integration:chain id="file-channels-enricher" input-channel="process" >
                		<integration:header-enricher>
                			<integration:reply-channel ref="filesArchiveOut" />
                			<integration:error-channel ref="filesErrors" />
                					
                		</integration:header-enricher>
                		<integration:service-activator ref="fileHandler" />
                	</integration:chain>
                
                
                Inside Service Activator
                
                MessageChannel outputChannel = isException ? (DirectChannel)headers.get(MessageHeaders.ERROR_CHANNEL) : (DirectChannel)headers.get(MessageHeaders.REPLY_CHANNEL);
                		Message<File> fileObject = MessageBuilder.withPayload(input).build();
                		outputChannel.send(fileObject, 0);
                Last edited by joson; Jul 12th, 2013, 09:41 AM.

                Comment


                • #23
                  Good choice - I had forgotten that the outbound gateway does a simple rename when delete-source-files="true".

                  However, I am not a fan of the manipulation of the reply channel header in that way.

                  I would use this instead...

                  Code:
                  	<integration:chain id="file-channels-enricher" input-channel="process" output-channel="filesArchiveOut">
                  		<integration:header-enricher>
                  			<integration:error-channel ref="filesErrors" />
                  		</integration:header-enricher>
                  		<integration:service-activator ref="fileHandler" />
                  	</integration:chain>
                  You also might want to consider setting the errorChannel header earlier, in case the rename fails (e.g. due to permissions).

                  BTW, with an ExecutorChannel, errors will be sent to the default "errorChannel" for "free" - no config needed. So you either use "errorChannel" instead of "filesErrors" or simply <bridge/> errorChannel to filesErrors.

                  Either way would avoid the need for you to manipulate the errorChannel header.


                  I don't understand why you had problems with jboss - my guess is an earlier version of Spring Integration on the classpath (which you should still check for in case you're not using the version you are expecting).
                  Last edited by Gary Russell; Jul 12th, 2013, 10:32 AM.

                  Comment


                  • #24
                    Hi Gary,
                    The reason I made the custom error-channel is any exceptions that could occur inside Service Activator( writing contents to DB failed), I wanted to write that file to "errors" directory. If not then write to the "archive" directory.

                    I have tried to throw a forced Runtime exception from Service Activator and tried to bridge the errorChannel to filesErrors. However, it is not getting written and is stuck in "process" directory. Thanks

                    Comment

                    Working...
                    X