Announcement Announcement Module
Collapse
No announcement yet.
file:inbound-channel-adapter with nio-locker and file-to-string-transformer Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • file:inbound-channel-adapter with nio-locker and file-to-string-transformer

    Hi everybody,

    Using inbound-channel-adapter, locking the file with nio-locker cause file-to-string-transformer not able to access the file anymore.

    I thought that the file locked would be still accessible by the thread that has taken the lock to transform it and then delete it.

    I am wrong?

    Here is my config:

    Code:
    <int-file:inbound-channel-adapter 
      channel="fileToStringChannel" 
      directory="file:c:/temp/scm" 
      prevent-duplicates="true">
      <int:poller id="poller" fixed-delay="1000">
        <int:transactional transaction-manager="txManager"/>
      </int:poller>
      <int-file:nio-locker/>
    </int-file:inbound-channel-adapter>
    
    <int-file:file-to-string-transformer
      input-channel="fileToStringChannel" 
      output-channel="stringToEnricherChannel"
      charset="UTF-8"
      delete-files="false"/>
    
    <int:header-enricher 
      input-channel="stringToEnricherChannel" 
      output-channel="dispatchChannel">
      <int:header name="correlationId" ref="messageCoreKeyExtractor" method="extractKey"/>
      <int:header name="JMSXGroupID" ref="messageCoreKeyExtractor" method="extractKey"/>
    </int:header-enricher>
    
    <int-jms:outbound-channel-adapter order="1"
      channel="dispatchChannel"
      connection-factory="cachedConnectionFactory" 
      destination-name="outQueue" 
      header-mapper="defaultJmsHeaderMapper"/>
    
    <!-- If JMS Send was successful, remove the file (within the transaction)
         http://forum.springsource.org/showthread.php?102004-File-Channel-Adapter-issues-->
    <int:service-activator order="2"
      input-channel="dispatchChannel" 
      output-channel="nullChannel"
      expression="headers.file_originalFile.delete()"/>
      
    <!-- Channels -->
    <int:channel id="fileToStringChannel"/>
    <int:channel id="stringToEnricherChannel"/>
    <int:publish-subscribe-channel id="dispatchChannel"/>
    
    <!-- Beans-->
    <bean id="messageCoreKeyExtractor" class="com.xxx.integration.MessageCoreKeyExtractor"/>
    <bean id="defaultJmsHeaderMapper" class="org.springframework.integration.jms.DefaultJmsHeaderMapper"/>
    
    <!-- JMS connection factory and tx manager-->
    <bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" 
      p:targetConnectionFactory-ref="connectionFactory"
      p:sessionCacheSize="5"
      p:cacheProducers="false"/>
    
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" 
      p:brokerURL="failover://(${broker.url})?jms.prefetchPolicy.all=1" />
    	
    <bean id="txManager" class="org.springframework.jms.connection.JmsTransactionManager" 
      p:connectionFactory-ref="connectionFactory"/>
    Last edited by pwanner; May 7th, 2012, 08:26 AM.

  • #2
    Can you please show the config excerpt for 'dispatchChannel'?

    Comment


    • #3
      I have edited the above config.

      Comment


      • #4
        Can you show us your config so we can see the channel configuration?

        Comment


        • #5
          I have updated my first post so that the config is now complete and reflects the whole flow.

          Comment


          • #6
            Sorry, i see it now. I was afraid that one of those channels is async channel but they all seem to be sync channels so let's try something else. Put a logging channel adapter after int-file:inbound-channel-adapter
            For example:
            Code:
            <int-file:inbound-channel-adapter 
              channel="fileToStringChannel" 
              directory="file:c:/temp/scm" 
              prevent-duplicates="true">
              <int:poller id="poller" fixed-delay="1000">
                <int:transactional transaction-manager="txManager"/>
              </int:poller>
              <int-file:nio-locker/>
            </int-file:inbound-channel-adapter>
            <int:logging-channel-adapter channel="fileToStringChannel" level="WARN"/>
            And see if the message comes out of inbound-channel-adapter. If so than put logging channel adapter after file-to-string-transformer and so on until you see where the problem is.
            You can also add channel interceptors to the channels and track at what point the message does not reach its destination.

            Comment


            • #7
              This is really weird, when I add the logging-channel-adapter there is no more exception thrown but the message is not consumed and not delivered.

              Below the log with and without the logging-channel-adapter.
              The ONLY thing that I have changed in the config
              Code:
              <int:logging-channel-adapter channel="fileToStringChannel" level="DEBUG"/>
              Stacktrace without the logging-channel-adapter
              Code:
              INFO : org.springframework.integration.endpoint.SourcePollingChannelAdapter - started org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0
              INFO : org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
              DEBUG: org.springframework.integration.file.FileReadingMessageSource - Added to queue: [c:\temp\scm\messageA2.xml]
              INFO : org.springframework.integration.file.FileReadingMessageSource - Created message: [[Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336397994640, id=1537f333-895c-437e-92a8-c8d20cb5c693}]]
              DEBUG: org.springframework.integration.endpoint.SourcePollingChannelAdapter - Poll resulted in Message: [Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336397994640, id=1537f333-895c-437e-92a8-c8d20cb5c693}]
              DEBUG: org.springframework.integration.channel.DirectChannel - preSend on channel 'fileToStringChannel', message: [Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336397994640, id=1537f333-895c-437e-92a8-c8d20cb5c693}]
              DEBUG: org.springframework.integration.transformer.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler#0 received message: [Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336397994640, id=1537f333-895c-437e-92a8-c8d20cb5c693}]
              DEBUG: org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'errorChannel', message: [Payload=org.springframework.integration.transformer.MessageTransformationException: org.springframework.integration.MessagingException: failed to transform File Message][Headers={timestamp=1336397994835, id=53d30828-4635-4fac-9612-e361801460ae}]
              DEBUG: org.springframework.integration.handler.LoggingHandler - (inner bean)#4 received message: [Payload=org.springframework.integration.transformer.MessageTransformationException: org.springframework.integration.MessagingException: failed to transform File Message][Headers={timestamp=1336397994835, id=53d30828-4635-4fac-9612-e361801460ae}]
              ERROR: org.springframework.integration.handler.LoggingHandler - org.springframework.integration.transformer.MessageTransformationException: org.springframework.integration.MessagingException: failed to transform File Message
              	at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:73)
              	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:97)
              	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
              	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
              	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
              	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
              	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
              	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
              	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
              	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
              	at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:97)
              	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
              	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
                ...
              	at $Proxy9.call(Unknown Source)
              	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
              	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
              	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
              	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
              	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
              	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
              	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
                ...
              	at java.lang.Thread.run(Thread.java:662)
              Caused by: org.springframework.integration.MessagingException: failed to transform File Message
              	at org.springframework.integration.file.transformer.AbstractFilePayloadTransformer.transform(AbstractFilePayloadTransformer.java:71)
              	at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:67)
              	... 38 more
              Caused by: java.io.IOException: Le processus ne peut pas accéder au fichier car un autre processus en a verrouillé une partie
              	at java.io.FileInputStream.readBytes(Native Method)
              	at java.io.FileInputStream.read(FileInputStream.java:220)
              	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:264)
              	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:306)
              	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:158)
              	at java.io.InputStreamReader.read(InputStreamReader.java:167)
              	at java.io.Reader.read(Reader.java:123)
              	at org.springframework.util.FileCopyUtils.copy(FileCopyUtils.java:188)
              	at org.springframework.util.FileCopyUtils.copyToString(FileCopyUtils.java:240)
              	at org.springframework.integration.file.transformer.FileToStringTransformer.transformFile(FileToStringTransformer.java:49)
              	at org.springframework.integration.file.transformer.FileToStringTransformer.transformFile(FileToStringTransformer.java:32)
              	at org.springframework.integration.file.transformer.AbstractFilePayloadTransformer.transform(AbstractFilePayloadTransformer.java:58)
              	... 39 more
              
              DEBUG: org.springframework.integration.channel.PublishSubscribeChannel - postSend (sent=true) on channel 'errorChannel', message: [Payload=org.springframework.integration.transformer.MessageTransformationException: org.springframework.integration.MessagingException: failed to transform File Message][Headers={timestamp=1336397994835, id=53d30828-4635-4fac-9612-e361801460ae}]

              Stacktrace with the logging-channel-adapter
              Code:
              INFO : org.springframework.integration.endpoint.SourcePollingChannelAdapter - started org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0
              INFO : org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
              DEBUG: org.springframework.integration.file.FileReadingMessageSource - Added to queue: [c:\temp\scm\messageA2.xml]
              INFO : org.springframework.integration.file.FileReadingMessageSource - Created message: [[Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336397894635, id=4c487c7c-0932-4a03-86c6-02b25403a071}]]
              DEBUG: org.springframework.integration.endpoint.SourcePollingChannelAdapter - Poll resulted in Message: [Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336397894635, id=4c487c7c-0932-4a03-86c6-02b25403a071}]
              DEBUG: org.springframework.integration.channel.DirectChannel - preSend on channel 'fileToStringChannel', message: [Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336397894635, id=4c487c7c-0932-4a03-86c6-02b25403a071}]
              DEBUG: org.springframework.integration.transformer.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler#0 received message: [Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336397894635, id=4c487c7c-0932-4a03-86c6-02b25403a071}]
              DEBUG: org.springframework.integration.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336397894635, id=4c487c7c-0932-4a03-86c6-02b25403a071}]
              INFO : org.springframework.integration.handler.LoggingHandler - c:\temp\scm\messageA2.xml
              DEBUG: org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'fileToStringChannel', message: [Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336397894635, id=4c487c7c-0932-4a03-86c6-02b25403a071}]
              INFO : org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
              DEBUG: org.springframework.integration.endpoint.SourcePollingChannelAdapter - Poll resulted in Message: null
              DEBUG: org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'

              Comment


              • #8
                Hmm, strange since I see that you do have exception in 'file-to-string-transformer'
                Let's try this. I just tried a subset of your sample and it works fine so try it as well and see what happens. So modify your configuration to look like this (only this)
                Code:
                <int-file:inbound-channel-adapter
                		channel="fileToStringChannel" directory="file:c:/temp/scm"
                		prevent-duplicates="true">
                		<int:poller  fixed-rate="5000" max-messages-per-poll="1">
                <!-- 			<int:transactional transaction-manager="txManager" /> -->
                		</int:poller>
                		<int-file:nio-locker />
                	</int-file:inbound-channel-adapter>
                
                	<int-file:file-to-string-transformer
                		input-channel="fileToStringChannel" output-channel="loggingChannel"
                		charset="UTF-8" delete-files="false" />
                		
                    <int:logging-channel-adapter id="loggingChannel" level="WARN"/>

                Comment


                • #9
                  There is no difference as the error occurs in the file-to-string-transformer that cannot open the file locked by the nio-locker.

                  Code:
                  INFO : org.springframework.integration.endpoint.SourcePollingChannelAdapter - started org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0
                  DEBUG: org.springframework.integration.file.FileReadingMessageSource - Added to queue: [c:\temp\scm\messageA2.xml]
                  INFO : org.springframework.integration.file.FileReadingMessageSource - Created message: [[Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336401294594, id=5aedf9e8-c2eb-48e7-b425-5973d65d7edd}]]
                  DEBUG: org.springframework.integration.endpoint.SourcePollingChannelAdapter - Poll resulted in Message: [Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336401294594, id=5aedf9e8-c2eb-48e7-b425-5973d65d7edd}]
                  DEBUG: org.springframework.integration.channel.DirectChannel - preSend on channel 'fileToStringChannel', message: [Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336401294594, id=5aedf9e8-c2eb-48e7-b425-5973d65d7edd}]
                  DEBUG: org.springframework.integration.transformer.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler#0 received message: [Payload=c:\temp\scm\messageA2.xml][Headers={timestamp=1336401294594, id=5aedf9e8-c2eb-48e7-b425-5973d65d7edd}]
                  DEBUG: org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'errorChannel', message: [Payload=org.springframework.integration.transformer.MessageTransformationException: org.springframework.integration.MessagingException: failed to transform File Message][Headers={timestamp=1336401294688, id=b44ecb56-2e1d-4a5f-a3b9-7eece16342dc}]
                  DEBUG: org.springframework.integration.handler.LoggingHandler - (inner bean)#4 received message: [Payload=org.springframework.integration.transformer.MessageTransformationException: org.springframework.integration.MessagingException: failed to transform File Message][Headers={timestamp=1336401294688, id=b44ecb56-2e1d-4a5f-a3b9-7eece16342dc}]
                  ERROR: org.springframework.integration.handler.LoggingHandler - org.springframework.integration.transformer.MessageTransformationException: org.springframework.integration.MessagingException: failed to transform File Message

                  Comment


                  • #10
                    But that is what I am asking since the configuration i posted works for me here and that's why I wanted you to try that exact configuration.
                    So, are you saying you did try the configuration I attached and it fails in the transformer?
                    If so, can you also post your SI version, Spring, Java etc.

                    Comment


                    • #11
                      Yes I tried the exact configuration you posted, without anything else in the config file

                      In my POM I have

                      Code:
                      <properties>
                        <spring.version>3.1.1.RELEASE</spring.version>
                        <spring.integration.version>2.1.1.RELEASE</spring.integration.version>
                        <activeMQ.version>5.5.0</activeMQ.version>
                        <xbean-spring.version>3.7</xbean-spring.version>
                        <slf4j.version>1.5.11</slf4j.version>
                      </properties>
                      and Java 1.6.0_25, Windows 7

                      Comment


                      • #12
                        We are looking into it since it seems like the problem might be windows specific.

                        Comment


                        • #13
                          Reproduced on Windows XP - this seems to be a limitation/"feature" of the Windows operating system.

                          Comment


                          • #14
                            Does it mean that there is no solution to have nio-locker working on windows?

                            Comment


                            • #15
                              Well, if you read FileLock documentation it talks about it being specific to the OS - after all its a gateway to the native facilities and on the windows (the way i read it) the lock is exclusive.
                              We are currently discussing the possibility of enabling configuration to make the lock shared, but the issue with that is that it wil be shared by any process within the same JVM which means it is not bound to threads. We did verify that the making the lock shared will get you passed the windows problem, but will it be appropriate stil remains a question.
                              As I said we are stil discussing it so stay tuned

                              Comment

                              Working...
                              X