Announcement Announcement Module
Collapse
No announcement yet.
Large file message created before file is copied over Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Large file message created before file is copied over

    Hey all,

    I was trying out the spring integration project while testing with large files that may come in, and it seems the output-channel for a file-source/target invokes the copy() on the file before it finishes copying. (I'm just returning the message immediately.) Essentially, I can't use the file yet, it's only coincidence that I'm passing it off to another stream; I can't open a stream against the file since it's not done. Is there any way to remedy this and have it re-fire the message?

    Code:
    org.springframework.integration.message.MessageHandlingException: failure occurred mapping file to message
    	at org.springframework.integration.adapter.file.SimpleFileMessageMapper.mapMessage(SimpleFileMessageMapper.java:63)
    	at org.springframework.integration.adapter.file.SimpleFileMessageMapper.mapMessage(SimpleFileMessageMapper.java:1)
    	at org.springframework.integration.adapter.file.FileTarget.send(FileTarget.java:43)
    	at org.springframework.integration.endpoint.TargetEndpoint.send(TargetEndpoint.java:183)
    	at org.springframework.integration.dispatcher.SimpleDispatcher.dispatch(SimpleDispatcher.java:99)
    	at org.springframework.integration.dispatcher.DefaultPollingDispatcher.dispatch(DefaultPollingDispatcher.java:65)
    	at org.springframework.integration.dispatcher.PollingDispatcherTask.run(PollingDispatcherTask.java:56)
    	at org.springframework.integration.scheduling.SimpleMessagingTaskScheduler$MessagingTaskRunner.run(SimpleMessagingTaskScheduler.java:138)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:417)
    	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:280)
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:135)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:65)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:142)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:166)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
    	at java.lang.Thread.run(Thread.java:595)
    Caused by: java.io.FileNotFoundException: C:\TEMP\test_in\bigfile.txt (The process cannot access the file because it is being used by another process)
    	at java.io.FileInputStream.open(Native Method)
    	at java.io.FileInputStream.<init>(FileInputStream.java:106)
    	at org.springframework.util.FileCopyUtils.copy(FileCopyUtils.java:63)
    	at org.springframework.integration.adapter.file.SimpleFileMessageMapper.writeToFile(SimpleFileMessageMapper.java:75)
    	at org.springframework.integration.adapter.file.SimpleFileMessageMapper.mapMessage(SimpleFileMessageMapper.java:59)
    	... 16 more

  • #2
    Hi,

    Can you provide a bit of detail on your message flow - also, the configuration might actually help in order to provide some advice.
    The reason that I am asking is that you are mentioning files that are coming in, but as far as I see, the file having the issue is actually written by a FileTarget.


    Thanks,
    Marius

    Comment


    • #3
      Oh, definitely. Here's the context xml file:
      Code:
      <message-bus/>
              
              <channel id="fileInputChannel" />
              <channel id="fileOutputChannel" />
              <channel id="errorChannel" publish-subscribe="true"/>          
                    
              <file-source id="fileInputSource" directory="C:\TEMP\test_in\"/>
              <file-target id="fileOutputSource" directory="C:\TEMP\test_out\"/>
      
              <beans:bean id="fileInputHandler" class="com.test.sitesting.FileInputHandler"/>
              <!-- <beans:bean id="fileOutputHandler" class="com.test.sitesting.FileOutputHandler"/> -->
                             
              <source-endpoint source="fileInputSource" channel="fileInputChannel">
              	<schedule period="10000"/>
              </source-endpoint>
              
              <target-endpoint target="fileOutputSource" input-channel="fileOutputChannel">
              </target-endpoint>
      
              <handler-endpoint input-channel="fileInputChannel" handler="fileInputHandler" output-channel="fileOutputChannel">
              	<concurrency/>
              </handler-endpoint>
      It's pretty simple. So, I wrote the FileInputHandler to just dump the message to the next channel:

      Code:
      	public Message<?> handle(Message<?> message) {
      		
      		File file = (File)message.getPayload();
      		return message;
      	}
      I left the File casting in there because you can come up with "File in use" errors if you try to read that stream upon receiving the message too. However, just returning that message to the next channel causes the same problem since it tries to read it to to copy to the next dir. Does that make sense?
      Last edited by rlaw; Jun 29th, 2008, 11:57 AM.

      Comment


      • #4
        I tried to reproduce. I found that you were missing output-channel on your handler-enpoint, which seems unrelated. Further I saw that the file was handled multiple times during the write process. Once it was completely written in the input directory the file showed up in the output directory.

        I tried with a 67MB file, so one question remains: How big is yours? (insert funny remark here)

        I just loaded the contex in a main method and put the file there by hand btw.

        This is my marginally modified code:
        Code:
        	<message-bus />
        
        	<channel id="fileInputChannel" />
        	<channel id="fileOutputChannel" />
        	<channel id="errorChannel" publish-subscribe="true" />
        
        	<file-source id="fileInputSource" directory="/tmp/in" />
        	<file-target id="fileOutputSource" directory="/tmp/out" />
        
        	<beans:bean id="fileInputHandler"
        		class="org.springframework.integration.adapter.file.FileInputHandler" />
        
        	<source-endpoint source="fileInputSource"
        		channel="fileInputChannel">
        		<schedule period="10" />
        	</source-endpoint>
        
        	<target-endpoint target="fileOutputSource"
        		input-channel="fileOutputChannel">
        	</target-endpoint>
         
        	<handler-endpoint input-channel="fileInputChannel" output-channel="fileOutputChannel"
        		handler="fileInputHandler">
        		<concurrency />
        	</handler-endpoint>

        Comment


        • #5
          Oops, yeah I had the out-put channel specified, I had just been mucking around with the code and pasted that version... However, that's exactly what I had tested against. Also, just for clarity, this was on a XP machine w/ NTFS. The file I used was 180ish mb, which, with my system takes about 15-20s to copy over from another section of the hard drive so that the polling period will hit it in between of it copying the file over. Thus, when the polling fires and sends the message and subsequently passes off to the next channel to copy over into the other directory, it blows up.

          Comment


          • #6
            There is definitely a problem if you keep the writing stream open for a few polls. I'm creating an issue for it which you can follow. http://jira.springframework.org/browse/INT-269
            Last edited by iwein; Jun 27th, 2008, 12:11 PM. Reason: grammar

            Comment


            • #7
              @rlaw could you check if on your setup (xp, ntfs) you get the same error you saw before when you run the testcase I attached to the JIRA issue? I'm having trouble reproducing the exact error you have, but that might be related to the OS.

              Comment


              • #8
                I don't get the same error w/ that test file but the last method fails (testOpenWriter) with this:

                Code:
                java.lang.AssertionError: in incomplete file was received, that is unsafe
                	at org.junit.Assert.fail(Assert.java:69)
                	at org.junit.Assert.assertTrue(Assert.java:32)
                	at org.junit.Assert.assertNull(Assert.java:241)
                	at com.testing.integration.SampleTest.testOpenWriter(SampleTest.java:76)
                	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
                	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
                	at java.lang.reflect.Method.invoke(Method.java:585)
                	at org.junit.internal.runners.TestMethodRunner.executeMethodBody(TestMethodRunner.java:99)
                	at org.junit.internal.runners.TestMethodRunner.runUnprotected(TestMethodRunner.java:81)
                	at org.junit.internal.runners.BeforeAndAfterRunner.runProtected(BeforeAndAfterRunner.java:34)
                	at org.junit.internal.runners.TestMethodRunner.runMethod(TestMethodRunner.java:75)
                	at org.junit.internal.runners.TestMethodRunner.run(TestMethodRunner.java:45)
                	at org.junit.internal.runners.TestClassMethodsRunner.invokeTestMethod(TestClassMethodsRunner.java:71)
                	at org.junit.internal.runners.TestClassMethodsRunner.run(TestClassMethodsRunner.java:35)
                	at org.junit.internal.runners.TestClassRunner$1.runUnprotected(TestClassRunner.java:42)
                	at org.junit.internal.runners.BeforeAndAfterRunner.runProtected(BeforeAndAfterRunner.java:34)
                	at org.junit.internal.runners.TestClassRunner.run(TestClassRunner.java:52)
                	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:38)
                	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
                	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460)
                	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673)
                	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386)
                	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196)

                Comment


                • #9
                  Originally posted by rlaw View Post
                  Is there any way to remedy this and have it re-fire the message?
                  To come back to your original question. If you catch the exception and just leave it at that, you should get new messages for the same file until the file is closed. If you just process the last one everything should be fine. Does that work for you?

                  In the future we will move to an event based solution for the file source that prevents this message duplication (see jira issue).

                  Comment

                  Working...
                  X