Announcement Announcement Module
Collapse
No announcement yet.
Inbound channel adapter issue Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Inbound channel adapter issue

    Here is my inbound channel adapter config:

    Code:
    <int-file:inbound-channel-adapter directory="DecryptedFiles" channel="decryptedFilesChannel" prevent-duplicates="true">
    		<int:poller cron="* * 6-23 * * SUN-SAT" />
    	</int-file:inbound-channel-adapter>
    However, when my application runs, I see that multiple threads are processing the same file. How do I ensure that the file is picked up only once?

    Any help is greatly appreciated.

  • #2
    Bobby

    From your configuration i can't see how is it possible.
    Could you please provide more detail
    All I see is that new file will be picked up by the poller's thread in intervals governed by the cron expression. And your poller is single-threaded

    Comment


    • #3
      I totally understand and that was my expectation too as I did not assign any task executor. According to my application log, two task scheduler threads (10 and 8) were spanned to access the same file at the same time.

      Please see below:

      Code:
      2011-05-03 15:33:02,006  DEBUG FileReadingMessageSource [task-scheduler-10] (FileReadingMessageSource.java:272) - Added to queue: [DecryptedFiles\DECRYPTED_YwHsP9Evt5BY$gAA]
      2011-05-03 15:33:02,006  INFO  FileReadingMessageSource [task-scheduler-10] (FileReadingMessageSource.java:260) - Created message: [[Payload=DecryptedFiles\DECRYPTED_YwHsP9Evt5BY$gAA][Headers={timestamp=1304454782006, id=7ac20ada-c13f-460e-b05d-1291eb49a144}]]
      2011-05-03 15:33:02,006  DEBUG FileReadingMessageSource [task-scheduler-8] (FileReadingMessageSource.java:272) - Added to queue: [DecryptedFiles\DECRYPTED_YwHsP9Evt5BY$gAA]
      2011-05-03 15:33:02,006  INFO  FileReadingMessageSource [task-scheduler-8] (FileReadingMessageSource.java:260) - Created message: [[Payload=DecryptedFiles\DECRYPTED_YwHsP9Evt5BY$gAA][Headers={timestamp=1304454782006, id=4c422df2-dfb9-453e-91ac-1cda56a6683d}]]

      Comment


      • #4
        Bobby

        I was just trying to reproduce and I don't see any duplication. The only way I was able to reproduce it is by starting the ApplicationContext (AC) twice or by configuring two adapter instances in the same AC.
        Can you look at your config and make sure you don't have a second instance of the adapter?

        Comment


        • #5
          I do not have multiple inbound channel adapters polling the same folder; however, I do have inbound channel adapters polling different folders (within the same applicationContext). Could this be an issue?

          This is what is happening in my application:
          1. I poll an FTP site, pick an encrypted file when present.
          2. There is a command line utility used to decrypt the file and is stored in DecryptedFiles folder.
          3. I use inbound channel adapter to process the decrypted file.

          There is other inbound-channel-adapter polling to FTP an encrypted file (outbound processing). The same behavior is seen there as well and is consistent even between Tomcat recycles.

          Comment


          • #6
            Not sure I understand that. Could you clarify?
            I do not have multiple inbound channel adapters polling the same folder; however, I do have inbound channel adapters polling different folders

            Comment


            • #7
              I meant to say there are multiple inbound channel adapters polling different folders say folder1, folder2, folder3 at various stages of processing (a file being processed in phase1 would be in folder1, and then moved to folder2 for phase2 processing etc).

              Comment


              • #8
                Ok, i think i know what's going on.

                Correct me if I am wrong.
                FTP downloads a file in the directory dir1. Then your inbound-channel-adapter polls dir-1. Right?
                If so then here is what's happening.
                FTP after downloading the file uses FileReadingMessageSource to generate a Message from the the newly downloaded file and send it to a channel. So the second inbound-channel-adapter is really not necessary if all you need is process files as soon as they arrive, otherwise that is what's causing the illusion of duplication. Makes sense?

                Comment


                • #9
                  Kind of.

                  FTP Process downloads the file to folder1 is put on channel. It is then passed to a service activator which decrypts the file and puts it in folder2. Then I have an inbound channel adapter polling folder2 to process the decrypted file. This is where I am seeing what I am seeing (two task scheduler threads generating two messages).

                  Comment


                  • #10
                    Could you post your ful config, since I can't see how that's possible.

                    Comment


                    • #11
                      Code:
                      	<int-ftp:inbound-channel-adapter id="fromFidFTPReader" local-directory="EncryptedFilesFromFid" 
                      			channel="fromFidEncryptedBatchChannel" session-factory="ftpClientFactory" remote-directory="Fid" 
                      			delete-remote-files="true"  filename-pattern="odt*.*">
                      		<int:poller cron="0 0/3 6-23 * * SUN-SAT" />
                      	</int-ftp:inbound-channel-adapter>
                      	
                      	<!--File is always posted with the same name so rename it -->
                      	<int:service-activator input-channel="fromFidEncryptedBatchChannel" ref="fileRenameProcessor" method="rename"  output-channel="fromFidEncryptedBatchChannel2"/>
                      	
                      	<!-- This service activator decrypts the incoming file and saves in the folder mentioned by propertiesCryptoConfig bean -->
                      	<int:service-activator input-channel="fromFidEncryptedBatchChannel2" ref="cryptographyManager" method="decrypt" />
                      	<!-- Directory *MUST* match with the directory configured in propertiesCryptoConfig bean -->
                      	<int-file:inbound-channel-adapter directory="DecryptedFilesFromFid" channel="fromFidBatchChannel" prevent-duplicates="true">
                      		<int:poller cron="* * 6-23 * * SUN-SAT" />
                      	</int-file:inbound-channel-adapter>
                      	
                      	<!-- Processed batch transactions saved to mysql database -->
                      	<int:service-activator id="fromFidBatchActivator" input-channel="fromFidBatchChannel" ref="fromFidBatchCreateProcessor" method="processBatch" />
                      	
                      	<!-- polls mysql db for pending transactions; processes/channels named 'outbound' because this part of the application send out messages -->
                      	<int:inbound-channel-adapter id="outboundTransactionsReader" channel="outboundMessagesChannel" ref="pendingOutboundTransactionProcessor" method="getPendingMessage" >
                      		<int:poller max-messages-per-poll="1" cron="* * 6-23 * * SUN-SAT" />
                      	</int:inbound-channel-adapter>
                      	
                      	<int:header-enricher input-channel="outboundMessagesChannel" output-channel="outboundMessagesEnrichedChannel" >
                      		<int:header name="ObjectID" expression="payload.objectId"/>
                      		<int:header name="Retryable" expression="payload.retryable" />
                      	</int:header-enricher>
                      	<int:service-activator input-channel="outboundMessagesEnrichedChannel" output-channel="responseChannel" ref="outboundMessageConverter" method="convert" />
                      	
                      	<int-http:outbound-gateway request-channel="responseChannel" extract-request-payload="true" url="http://outbound-url-to-post-xml-message.com" reply-channel="outboundResponsesChannel" />

                      Comment


                      • #12
                        Ok, let's try to go through process fo elimination.
                        Lets try to comment everything past file:inbound-channel-adapter and instead add logger to the 'fromFidBatchChannel' and see if you see the same thing.
                        I can't pick up anything wring with this configuration other then possible improvements.
                        For example; you don't need file:inbound-channel-adapter. You can simply have
                        <int:service-activator input-channel="fromFidEncryptedBatchChannel2" ref="cryptographyManager" method="decrypt" />
                        return the file name and define output-channel pointing to 'fromFidBatchActivator'

                        Comment


                        • #13
                          I will update my config and post the details.

                          Just to give you some background, the cryptography manager is a wrapper for the command line utility (my company licensed) that decrypts the incoming file and writes it to a folder. At this point I could either read that file from the file system or use the inbound channel adapter - I chose the later one.

                          Thanks and regards.

                          Comment


                          • #14
                            I apologize for not getting back to you right away.

                            To continue with the original issue, I followed your suggestion and re-wired my application by eliminating folder polling (my service activator passes it to the next channel) and was monitoring the application on Thursday and Friday. Everything seemed to be working just fine...until this morning.

                            I am seeing the similar behavior of 'int-file:inbound-channel-adapter' (two task scheduler threads polling at the same time, including micro seconds) in FTP inbound channel adapter as well. Two task scheduler threads polled the FTP site at the same time and one thread deleted the file after picking it up - as configured; then the remaining processing got completely dropped off. It continued with the next poll that is after 3 minutes.

                            Here is the log when that happened:
                            Code:
                            2011-05-09 09:30:00,833  INFO  FtpSession [task-scheduler-2] (FtpSession.java:73) - File have been successfully transfered to: Fid/odt05.txt.pgp
                            2011-05-09 09:30:00,833  INFO  FtpSession [task-scheduler-7] (FtpSession.java:73) - File have been successfully transfered to: Fid/odt05.txt.pgp
                            2011-05-09 09:30:00,833  DEBUG FtpInboundFileSynchronizer [task-scheduler-2] (AbstractInboundFileSynchronizer.java:196) - deleted Fid/odt05.txt.pgp
                            2011-05-09 09:33:00,286  DEBUG DefaultFtpSessionFactory [task-scheduler-2] (AbstractFtpSessionFactory.java:154) - Connected to server [HOSTS-FTP:21]
                            2011-05-09 09:33:00,286  DEBUG DefaultFtpSessionFactory [task-scheduler-5] (AbstractFtpSessionFactory.java:154) - Connected to server [HOSTS-FTP:21]
                            However, when these task scheduler threads polled the FTP server with little time difference, it is processed just fine.

                            Code:
                            2011-05-09 11:42:00,224  DEBUG DefaultFtpSessionFactory [task-scheduler-4] (AbstractFtpSessionFactory.java:154) - Connected to server [HOSTS-FTP:21]
                            2011-05-09 11:42:00,333  DEBUG DefaultFtpSessionFactory [task-scheduler-8] (AbstractFtpSessionFactory.java:154) - Connected to server [HOSTS-FTP:21]
                            2011-05-09 11:42:00,833  INFO  FtpSession [task-scheduler-4] (FtpSession.java:73) - File have been successfully transfered to: Fid/odt05.txt.pgp
                            2011-05-09 11:42:00,927  INFO  FtpSession [task-scheduler-8] (FtpSession.java:73) - File have been successfully transfered to: Fid/odt05.txt.pgp
                            2011-05-09 11:42:00,942  DEBUG FtpInboundFileSynchronizer [task-scheduler-8] (AbstractInboundFileSynchronizer.java:196) - deleted Fid/odt05.txt.pgp
                            2011-05-09 11:42:00,942  DEBUG FileReadingMessageSource [task-scheduler-8] (FileReadingMessageSource.java:272) - Added to queue: [EncryptedFilesFromFid\odt05.txt.pgp]
                            2011-05-09 11:42:00,974  INFO  FileReadingMessageSource [task-scheduler-8] (FileReadingMessageSource.java:260) - Created message: [[Payload=EncryptedFilesFromFid\odt05.txt.pgp]
                            How do I ensure that only one task scheduler thread polls FTP server?

                            Here is the complete config of my application:


                            Code:
                            	<int-http:inbound-gateway id="InboundServlet" request-channel="inputChannel" reply-channel="outputChannel" extract-reply-payload="true" />
                            
                                <int:transformer id="crdbMessageTransformer" input-channel="inputChannel" ref="inboundMessageConverter" method="convert" output-channel="convertChannel" />
                            	
                            	<int:filter id="requestFilter" expression="payload.transactionSubType!=null and  payload.transactionType!=null and payload.messageId!=null and payload.payload!=null" input-channel="convertChannel" output-channel="filteredChannel" discard-channel="requestRejectedChannel"/>
                            	<!-- Convert this to interceptor -->
                            	<int:service-activator id="responseForRejectedRequest" ref="responseProcessor" method="getRejectedResponse" input-channel="requestRejectedChannel" output-channel="outputChannel" />
                            	<int:service-activator id="responseForAcceptedRequest" ref="responseProcessor" method="getAcceptedResponse" input-channel="filteredChannel" output-channel="outputChannel" />
                            	
                            	<int-jdbc:outbound-channel-adapter channel="filteredChannel" data-source="mysqlDataSource" query="INSERT INTO TXN_QUEUE ..."/>
                            	
                            	
                            	<!-- In-bound transaction processing -->
                            	<int:inbound-channel-adapter ref="pendingInboundTransactionProcessor" method="processPendingMessages" channel="inboundMessagesChannel" >
                            		<int:poller cron="0 0 6-22/1 * * SUN-SAT" />
                            	</int:inbound-channel-adapter>
                            
                            	<int:service-activator id="tofidBatchActivator" input-channel="inboundMessagesChannel" ref="tofidBatchCreateProcessor" method="createBatch" output-channel="tofidBatchChannel"/> 
                            	
                            	<int:service-activator input-channel="tofidBatchChannel" ref="cryptographyManager" method="encrypt" output-channel="tofidEncryptedBatchChannel"/>
                            	<int:header-enricher input-channel="tofidEncryptedBatchChannel" output-channel="tofidBatchEnrichedChannel">
                            		<int:header name="file_name" value="inbound.txt.pgp"/>
                            	</int:header-enricher>
                            	<int-ftp:outbound-channel-adapter id="tofidFTP" channel="tofidBatchEnrichedChannel" remote-directory="fid" session-factory="ftpClientFactory" />
                            	
                            	<!-- Outbound transaction processing -->
                            	<int-ftp:inbound-channel-adapter id="fromfidFTPReader" local-directory="EncryptedFilesFromfid" 
                            			channel="fromfidEncryptedBatchChannel" session-factory="ftpClientFactory" remote-directory="fid" 
                            			delete-remote-files="true"  filename-pattern="odt*.*">
                            		<int:poller cron="0 0/3 6-23 * * SUN-SAT" />
                            	</int-ftp:inbound-channel-adapter>
                            	
                            	<int:service-activator input-channel="fromfidEncryptedBatchChannel" ref="fileRenameProcessor" method="rename"  output-channel="fromfidEncryptedBatchChannel2"/>
                            	
                            	<int:service-activator input-channel="fromfidEncryptedBatchChannel2" ref="cryptographyManager" method="decrypt" output-channel="fromfidBatchChannel"/>
                            	
                            	<int:service-activator id="fromfidBatchActivator" input-channel="fromfidBatchChannel" ref="fromfidBatchCreateProcessor" method="processBatch" />
                            
                            	<int:inbound-channel-adapter id="outboundTransactionsReader" channel="outboundMessagesChannel" ref="pendingOutboundTransactionProcessor" method="getPendingMessage" >
                            		<int:poller max-messages-per-poll="1" cron="* * 6-23 * * SUN-SAT" />
                            	</int:inbound-channel-adapter>
                            	
                            	<int:header-enricher input-channel="outboundMessagesChannel" output-channel="outboundMessagesEnrichedChannel" >
                            		<int:header name="ObjectID" expression="payload.objectId"/>
                            		<int:header name="Retryable" expression="payload.retryable" />
                            	</int:header-enricher>
                            	<int:service-activator input-channel="outboundMessagesEnrichedChannel" output-channel="responseChannel" ref="outboundMessageConverter" method="convert" />
                            	
                            	<int-http:outbound-gateway request-channel="responseChannel" extract-request-payload="true" url="http://outbound-url" reply-channel="outboundResponsesChannel" />
                            Thank you!

                            Comment


                            • #15
                              Bobby

                              I think we found the reason, but while we still working on it could you please set 'cache-sessions' attribute of the <int-ftp:inbound-channel-adapter> to false (make sure you use 2.0.4.BUILD-SNAPSHOT) and see if you see the same issue. My hope you shouldn't.

                              Comment

                              Working...
                              X