Announcement Announcement Module
Collapse
No announcement yet.
Defining a flow inside a PartitionStep Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Defining a flow inside a PartitionStep

    Hi,

    Based on the PartitionStep example, I'm now trying to set up a partitionstep that should execute a flow containing 3 steps.
    For every file in a given directory, this should perform the following:
    1. do some processing on the file
    2. depending on the result of 1) call one of two clean up steps (move to error/move to succes).
    The job and step definitions look like this:
    Code:
    <b:job id="partitionJob">
    	<b:step id="processAllBanstas" parent="bansta:master" />
    </b:job>
    
    <!-- master step for partitioning: pick all files in partitionTest/ subdirectory. -->
    <bean name="bansta:master" class="org.springframework.batch.core.partition.support.PartitionStep">
    	<property name="jobRepository" ref="jobRepository" />
    	<!-- how to split: one step per file in input directory. -->
    	<property name="stepExecutionSplitter">
    		<bean class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter">
    			<constructor-arg ref="jobRepository" />
    			<constructor-arg ref="completeStep" />
    			<constructor-arg>
    				<bean class="....rcm.partitioner.MultiFileResourcePartitioner">
    					<property name="resources" value="partitionTest/*" />
    				</bean>
    			</constructor-arg>
    		</bean>
    	</property>
    	<!-- partition handler: how to handle the separate partitions -->
    	<property name="partitionHandler">
    		<bean class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
    			<property name="taskExecutor">
    				<bean class="org.springframework.core.task.SyncTaskExecutor" />
    				<!--  
    				<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"></bean>
    				-->
    			</property>
    			<property name="step" ref="completeStep" />
    		</bean>
    	</property>
    </bean>
    So the PartitionStep is partitioning a FlowStep which is defined as follows:
    Code:
    <bean id="completeStep" class="org.springframework.batch.core.job.flow.FlowStep">
        <property name="flow" ref="processOneFile"/>
        <property name="jobRepository" ref="jobRepository" />
    </bean>
    
    <b:flow id="processOneFile">
        <b:step id="banstaProcessing" >
            <b:tasklet>
                <b:chunk reader="segmentgroupreader" processor="segmentGroupProcessor" writer="segmentGroupWriter" commit-interval="1"/>
            </b:tasklet>
            <b:next on="FAILED" to="failure"/>
            <b:next on="*" to="success"/>
        </b:step>
        <b:step id="success">
            <b:tasklet ref="successMover"/>
        </b:step>
        <b:step id="failure">
            <b:tasklet ref="errorMover" />
        </b:step>       
    </b:flow>
    The segmentgroupreader and ***mover tasklets are set up with step scope:
    Code:
    <!-- set up flat file reader which passes through fieldsets -->
    <bean id="delegateReader" class="org.springframework.batch.item.file.FlatFileItemReader" autowire-candidate="false" scope="step">
    	<property name="lineMapper" ref="mapper" />
    	<property name="recordSeparatorPolicy" ref="policy" />
    	<property name="resource" value="#{stepExecutionContext[fileName]}" />
    	<property name="bufferedReaderFactory">
    		<bean class="org.springframework.batch.item.file.SimpleBinaryBufferedReaderFactory">
    			<property name="lineEnding" value="'" />
    		</bean>
    	</property>
    </bean>
    
    <!-- set up BanstaSegmentGroupReader which uses this as a delegate -->
    <bean id="segmentgroupreader" class="....rcm.reader.BanstaSegmentGroupReader" autowire-candidate="false" scope="step">
    	<property name="delegateItemreader" ref="delegateReader" />
    </bean>	
    
    <bean id="successMover" class="....rcm.tasklet.ResourceLogTasklet" scope="step">
        <property name="targetDirectory" value="successDirectory"/>
        <property name="fileToMove"  value="#{stepExecutionContext[fileName]}" />
    </bean>
    <bean id="errorMover" class="....rcm.tasklet.ResourceLogTasklet" scope="step">
        <property name="targetDirectory" value="errorDirectory"/>
        <property name="fileToMove"  value="#{stepExecutionContext[fileName]}" />
    </bean>
    However when I run this, it fails, complaining that it can't open the reader. I'm pretty sure that this has something to do with the
    addition of the flow, as this was running fine inside a PartitionStep by itself (exact same reader definitions). So the difference is
    the fact that it's now wrapped in a FlowStep which is inside a PartitionStep.
    I've been googling for answers, read the reference manual, and looked at the samples in the distribution but I don't see this particular
    setup, and I don't see any (obvious) errors in this configuration. Does anybody have an idea on how to get this to work?
    Here is (part of) the stacktrace:
    Code:
    org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'lazyBindingProxy.delegateReader#sysinit' defined in class path resource [partitionedFlowJob.xml]: Initialization of bean failed; nested exception is java.lang.IllegalStateException: Cannot bind to placeholder: stepExecutionContext[fileName]
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:480)
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
    	at org.springframework.beans.factory.support.AbstractBeanFactory$2.getObject(AbstractBeanFactory.java:302)
    	at org.springframework.batch.core.scope.StepScope.get(StepScope.java:150)
    	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:298)
    	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
    	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
    	at org.springframework.batch.core.scope.util.PlaceholderTargetSource.getTarget(PlaceholderTargetSource.java:185)
    	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:184)
    	at $Proxy3.open(Unknown Source)
    	at ....rcm.reader.BanstaSegmentGroupReader.open(BanstaSegmentGroupReader.java:110)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    	at java.lang.reflect.Method.invoke(Method.java:597)
    	...
    Caused by: java.lang.IllegalStateException: Cannot bind to placeholder: stepExecutionContext[fileName]
    	at org.springframework.batch.core.scope.util.PlaceholderTargetSource$1.convertIfNecessary(PlaceholderTargetSource.java:140)
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.convertForProperty(AbstractAutowireCapableBeanFactory.java:1294)
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyPropertyValues(AbstractAutowireCapableBeanFactory.java:1250)
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1010)
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:472)
    	... 76 more	
    2010-04-20 15:45:45,464 ERROR [org.springframework.batch.core.step.AbstractStep] - <Exception while closing step execution resources>
    org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'lazyBindingProxy.delegateReader#sysinit' defined in class path resource [partitionedFlowJob.xml]: Initialization of bean failed; nested exception is java.lang.IllegalStateException: Cannot bind to placeholder: stepExecutionContext[fileName]
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:480)
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
    	at org.springframework.beans.factory.support.AbstractBeanFactory$2.getObject(AbstractBeanFactory.java:302)
    	at org.springframework.batch.core.scope.StepScope.get(StepScope.java:150)
    	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:298)
    	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
    	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
    	at org.springframework.batch.core.scope.util.PlaceholderTargetSource.getTarget(PlaceholderTargetSource.java:185)
    	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:184)
    	at $Proxy3.close(Unknown Source)
    	at ....rcm.reader.BanstaSegmentGroupReader.close(BanstaSegmentGroupReader.java:101)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    Sorry for the long post, I'm trying to provide all relevant information here...
    Last edited by tonvanbart; Aug 20th, 2010, 04:23 AM.

  • #2
    Originally posted by tonvanbart View Post
    Sorry for the long post, I'm trying to provide all relevant information here...
    Not a problem. If only others would do the same sometimes.

    I don't understand how this could work because I see nothing explicit that can handle the transfer of stepExecutionContext[fileName] from step to step. So, unless you left something out, it may be unsurprising if this is empty when it comes to you cleanup tasklet. There is an ExecutionContextPromotionListener which many people use for this purpose (http://static.springsource.org/sprin...aToFutureSteps).

    I guess a FlowStep could have a strategy for managing execution context between its constituent steps. I like that idea, but it's a new feature (raise a JIRA if you like it too).

    Comment


    • #3
      I think I'm beginning to see where I went wrong in my thinking (also did some reading in the sources in the meantime) - the StepExecutionSplitter is creating a step execution with each execution having it's own ExecutionContext, as created by my partitioner. These contain the "fileName" key. However the FlowStep is calling the steps contained in it, but it's not passing on the ExecutionContext, is that right?
      On the surface of it I had thought that the FlowStep shared the same ExecutionContext as all steps contained in the flow and hence every step in the flow would see the fileName.

      I'll let this sink in and will think about JIRA - I would imagine that performing some sequence of steps on every file in a directory is not an uncommon scenario. Unless, of course, my whole way of dealing with this is just plain wrong - see my other post where I asked about opinions on the "proper" way of setting this up.

      EDIT: I see now that you mention it failing on the cleanup tasklet, but in the stacktrace it is failing immediately on the first step (in the delegateReader definition). Based on how I understand this now that's not surprising, or am I (still) missing something?
      Last edited by tonvanbart; Apr 21st, 2010, 04:18 AM.

      Comment


      • #4
        Just to get my head around what is happening, I've created a StepExecutionListener that lists the name of the step, and all the keys/values in the step ExecutionContext. Then I configured this in all the levels:
        Code:
        <!-- job definition which will execute the master step -->
        <b:job id="partitionJob">
        	<b:step id="processAllBanstas" parent="bansta:master" />
        </b:job>
        
        <!-- master step for partitioning: pick all files in partitionTest/ subdirectory. -->
        <bean name="bansta:master" class="org.springframework.batch.core.partition.support.PartitionStep">
        	<property name="stepExecutionListeners">
        	    <list>
        		<ref bean="listExecutionContext" />
        	    </list>
        	</property>
        ... (rest of the definitions as before)
        
            <!-- FlowStep that the master step is referring to -->
            <bean id="test123" class="org.springframework.batch.core.job.flow.FlowStep" >
                <property name="stepExecutionListeners">
                    <list>
                        <ref bean="listExecutionContext" />
                    </list>
                </property>
                <property name="jobRepository" ref="jobRepository"/>
                <property name="flow">
                    <b:flow id="testflow">
                        <b:step id="banstaProcess" >
                            <b:tasklet>
                                <b:listeners>
                                     <b:listener ref="listExecutionContext"/>
                                </b:listeners>
                                <b:chunk reader="segmentgroupreader"
                                         processor="segmentGroupProcessor"
                                         writer="segmentGroupWriter"
                                         commit-interval="1"/>
                            </b:tasklet>
                            <b:next on="FAILED" to="failure2"/>
                            <b:next on="*" to="success2"/>
                        </b:step>
                        <b:step id="success2">
                            <b:tasklet ref="successMover"/>
                        </b:step>
                        <b:step id="failure2">
                            <b:tasklet ref="errorMover" />
                        </b:step>
                    </b:flow>
                </property>
            </bean>
        When I run this, the results are as follows:
        Code:
        org.springframework.batch.core.job.SimpleStepHandler] - <Executing step: [PartitionStep: [name=processAllBanstas]]>
        2010-04-21 11:00:58,794 INFO [....rcm.listener.LoggingStepExecutionListener] - <enter beforeStep>
        2010-04-21 11:00:58,794 INFO [....rcm.listener.LoggingStepExecutionListener] - <listing 0 entries in executionContext for step[processAllBanstas]:...>
        2010-04-21 11:00:58,809 DEBUG [....rcm.partitioner.MultiFileResourcePartitioner] - <ignoring [C:\workspaces_IntelliJ\bansta\RCMFlintPaymentAndStatusHub\target\test-classes\partitionTest\output]: not a file>
        2010-04-21 11:00:58,856 INFO [....rcm.listener.LoggingStepExecutionListener] - <enter beforeStep>
        2010-04-21 11:00:58,856 INFO [....rcm.listener.LoggingStepExecutionListener] - <listing 1 entries in executionContext for step[test123:partition2]:...>
        2010-04-21 11:00:58,856 INFO [....rcm.listener.LoggingStepExecutionListener] - <executionContext[fileName]=[file:/C:/workspaces_IntelliJ/bansta/RCMFlintPaymentAndStatusHub/target/test-classes/partitionTest/Bansta2_correct2.txt]>
        2010-04-21 11:00:58,888 INFO [org.springframework.batch.core.job.SimpleStepHandler] - <Executing step: [TaskletStep: [name=banstaProcess]]>
        2010-04-21 11:00:58,888 INFO [....rcm.listener.LoggingStepExecutionListener] - <enter beforeStep>
        2010-04-21 11:00:58,888 INFO [....rcm.listener.LoggingStepExecutionListener] - <listing 0 entries in executionContext for step[banstaProcess]:...>
        ...and after this, it falls over, since - obviously - the segmentgroupreader is unable to bind to the placeholder.
        I've also read chapter 11.8 on passing data to future steps. What I still don't get is this: every step has the *same* key in the execution
        context but with a different value. How is promoting these keys to the job ExecutionContext going to help me? Aren't the values not going
        to be overwritten, especially when I would use a SyncTaskExecutor?
        Thank you for bearing with me - I've been fighting with this for several days now, and am beginning to feel that I must be rather dense.
        Last edited by tonvanbart; Aug 20th, 2010, 04:25 AM.

        Comment


        • #5
          I didn't think if using a FlowStep in this way, but it totally makes sense. The "standard" promotion listener approach clearly doesn't work because (as you said) it would create key clashes in the JobExecution context. I guess you could use a specialised promotion listener that created unique keys based on the partioned step name (which is unique). If you can think of a better way that would be interesting.

          Comment


          • #6
            I had already been considering the idea you suggested, but am stumped by the next problem: I can add keys based on the partitioned step name (test123artition2 in the above log), but the step that goes wrong has it's own name (banstaProcess in my test). So the step inside the flow has a name not related to the partitioned step name.

            I've been clicking around but can't see how the banstaProcess step would be able to get to the name of its partitioned "parent" for want of a better word. So even though I could save in the job ExecutionContext with a key based on the partition name, I'm still stuck when it comes to retrieving the values inside the flow...

            Comment


            • #7
              True. Maybe you will have to resort to a ThreadLocal until there is full support in the framework (that JIRA again).

              Comment


              • #8
                OK - I've looked at JIRA but am unable to logon, I guess I have to create an account somewhere?
                Just as an idea: looking at the FlowStep, in doExecute() it does
                Code:
                	StepHandler stepHandler = new SimpleStepHandler(getJobRepository());
                	FlowExecutor executor = new JobFlowExecutor(getJobRepository(), stepHandler, stepExecution.getJobExecution());
                	executor.updateJobExecutionStatus(flow.start(executor).getStatus());
                The JobFlowExecutor creates a ThreadLocal StepExecution, then ends up getting a call to executeStep(). This in turn calls handleStep in SimpleStepHandler which, unless the step is a restart, creates a new empty ExecutionContext.

                It would be nice if in addition to promoting an ExecutionContext "up" to the job execution, there would also be a possibility of promoting it - or (a) certain key(s) "down" to the child steps inside a FlowStep. Not sure how feasible that is though, I've just gleaned at the code; the FlowStep would have to have some kind of strategy to pass on the ExecutionContext to the FlowExecutor which would have to pass it on to the StepHandler, etc.
                And of course this is just from the few classes I've seen so I've no idea how this would fit in the grander scheme of things.

                Comment


                • #9
                  I've posted a feature request in JIRA under BATCH-1559.

                  Comment

                  Working...
                  X