Announcement Announcement Module
Collapse
No announcement yet.
Messages keep coming from SFTP inbound adapter Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Messages keep coming from SFTP inbound adapter

    I have an SFTP inbound adapter. In debug environment it polls some directory for files every second. These file are then sent to transformer. It worked fine, but recently I noticed that even when both local and remote directories are empty, transformer keeps receiving messages with payload of the file that was processed earlier. Even when I put some new file with other name on sftp it doesn't get processed and transformer still get messages with old file. This persists even after Tomcat restart. I'm guessing this is the issue with my poller configuration, somehow old messages being left in the queue and trying to get processed. Can flush poller queue somehow?

  • #2
    Could you please post your configuration?

    Comment


    • #3
      Configuration looks like this
      Code:
      <bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
              <property name="host" value="${host}"/>
              <property name="port" value="${port}"/>
              <property name="user" value="${user}"/>
              <property name="password" value="${password}"/>
          </bean>
      
          <sftp:inbound-channel-adapter
                  session-factory="sftpSessionFactory"
                  channel="batch-requests"
                  remote-directory="${remote.incoming.dir}"
                  local-directory="${local.incoming.dir}"
                  auto-create-local-directory="true"
                  auto-startup="true"
                  delete-remote-files="true">
              <integration:poller cron="${cron.rq.sftp}" max-messages-per-poll="100"/>
          </sftp:inbound-channel-adapter>
      
      
          <integration:chain auto-startup="true" input-channel="batch-requests" output-channel="requests">
      
              <integration:poller cron="${cron.rq.batch.processing}" max-messages-per-poll="1000">
                  <integration:transactional
                          transaction-manager="txManager"
                          isolation="READ_COMMITTED"
                          propagation="REQUIRES_NEW"/>
              </integration:poller>
      
              <!-- Store file name -->
              <integration:header-enricher>
                  <integration:header name="FILE_NAME" expression="payload.getName()"/>
              </integration:header-enricher>
      
              <integration:transformer ref="pgpTransformer"/>
      
              <!-- Parse XML to Batch object -->
              <integration:transformer ref="unMarshallingTransformer"/>
      
              <!-- Extract Batch object from JAXBElement -->
              <integration:transformer ref="batchTransformer"/>
      
              <!-- Validate batch object -->
              <integration:filter ref="batchValidator" discard-channel="requests-dead"/>
      
              <!-- Split Batch into request messages and delete file -->
              <integration:splitter ref="batchSplitter"/>
      
          </integration:chain>

      Comment


      • #4
        I also noticed that behavior that I'm experiencing is similar to one discussed in http://forum.springsource.org/showth...oller-question and https://jira.springsource.org/browse/INT-1375

        Comment


        • #5
          First, https://jira.springsource.org/browse/INT-1375 is completely unrelated issue. It was specific to IMAP, Mail and Mail flags.

          As far as your configuration I am not sure I understand the purpose of the poller inside the chain. Could you please elaborate on the use case? What are you trying to accomplish.
          As far as I see, the SFTP inbound adapter will download files and will distributed them to the chain's input-channel. Chain in this case is an EventDrivenConsumer.

          Comment


          • #6
            Purpose of the poller was to make things transactional. There are several other chains in configuration, which do further processing, they all have similar pollers. I didn't want failures in one chain to affect the others. The problem is that messages keep coming to input channel, even after file was processed and deleted. Do you think that this is related to chain's poller?

            Comment


            • #7
              That's fine. Why not move your TX configuration to the poller that is defined in the SFTP adapter?

              Comment


              • #8
                I wanted each chain to be transactional, so that changes made in one chain would not be lost in case of error in the other.

                Comment


                • #9
                  So you want nested transactions. The appropriate way of doing it would be via Transaction Isolation and Propagation http://static.springsource.org/sprin...efinition.html.
                  Basically your services could be annotated or XML declared with such attributes where each transaction and nested transaction boundaries would be defined.
                  More here
                  http://static.springsource.org/sprin...ansaction.html

                  Comment


                  • #10
                    Thanks for the link, maybe it will help to simplify configuration. However, are you sure that it will prevent same message from being delivered over and over?

                    Comment


                    • #11
                      Yes, I am sure. In fact I don't even understand how the configuration you posted above works? What is the second poller is polling? The input-channel for the chain is a DirectChannel from what I see.
                      In fact can you attach a working configuration that simply demonstrates the behavior you describe. Just use a logger to print a message.

                      Comment


                      • #12
                        I can attach configuration tomorrow. But the idea was that first poller (the one in sftp adapter) is used to download file from sftp at specific interval at put it in a channel. Second poller (the one in a chain) polls that channel and is transactional, so that whole process in chain is atomic. Data from that chain is then used as input for the other chain, which is also atomic and so on. That way I split processing in several chains, each one is atomic process.

                        Configuration actually works fine, except the problem that I described above. Probably this is expected behavior, and I'm wondering where messages for channel are stored, so I can clear that storage and prevent unprocessed message delivery even after server restart.

                        The definition of the input channel for the first chain looks like this:
                        Code:
                        <integration:channel id="batch-requests">
                           <integration:queue capacity="100" message-store="message-store"/>
                        </integration:channel>

                        Comment


                        • #13
                          Well, actually what you are trying to accomplish could be done with both Transaction isolation as well by assembling a flow from the independent transactional sub-flow blocks. Both will work and both are valid and both could be done in Polling style (something you are trying to do) as well as Event Driven style ( a new feature of SI in 2.0.4 ) where Gateway could be invoked as handler via service activator and since Gateway can be annotated with TX annotation it would create a TX boundary. Let me try to see if i can make a sample out of it and follow up on this thread.
                          Meanwhile, I would still suggest to read the links I've provided above regarding TX isolation and nasting

                          Comment

                          Working...
                          X