Announcement Announcement Module
Collapse
No announcement yet.
Using JobParameters for itemWriter Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Using JobParameters for itemWriter

    Hello everybody,

    Before I'll explain the problem, here is a brief description of the architecture:

    I'm using Spring Integration to poll for files (referred to as trigger-files) which are processed by a class called FileReceiver. The FileReceiver uses a JobLauncher to launch a SimpleJob. What the job does is basically creating one object per record (usually one line of the file) and send it to a channel (referred to as trigger-channel) and finally stored on a JMS topic. The system that accesses the topic needs to aggregate the messages again in order to forward the entire information of one trigger-file in one message to another system.

    Now the problem:
    For the aggregator, the messages that were initially sent to the trigger-channel need to have a correlation ID and sequence size. The sequence size (line count of the trigger-file) and the correlation id (name of the trigger-file) are set as JobParameters in the FileReceiver. Here is the code extract:

    Code:
    JobParametersBuilder paramBuilder = new JobParametersBuilder();
    paramBuilder.addLong("line.count", lineCount);
    paramBuilder.addString("file.name", fileContents.getName());
    JobParameters params = paramBuilder.toJobParameters();
    jobLauncher.run(job, params);
    I wanted to uses the job parameters to set the message headers accordingly in the TriggerWritter (property itemWriter, to be found in the code below)

    Code:
    <bean id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true">
    	<property name="jobRepository" ref="jobRepository" />
    	<property name="restartable" value="false" />
    </bean>
    	
    <bean id="fixedLengthImportJob" parent="simpleJob">
    	<property name="steps">
    		<bean id="step1" parent="simpleStep">
    			<property name="commitInterval" value="3" />
    			<property name="listeners">
    				<list>
    					<ref bean="inputFile" />
    					<bean class="...CorrelationAwareTriggerListener" />
    				</list>
    			</property>
    			<property name="streams" ref="fileItemReader" />
    			<property name="itemReader" ref="flatFileItemReader" />
    			<property name="itemWriter">
    				<bean class="...TriggerWriter">
    					<property name="channel" ref="eventChannel" />
    				</bean>
    			</property>
    		</bean>
    	</property>
    	<property name="restartable" value="true" />
    </bean>
    For the inputFile (see code above), I could reference the parameter like this: %file.name%

    Code:
    <bean id="inputFile" class="org.springframework.batch.core.resource.StepExecutionResourceProxy">
    	<property name="filePattern" value="${trigger.file.location}/%file.name%" />
    </bean>
    It seems to me that this mechanism was not intended to be used for the itemWriter (something like <property name="linecount" value="%line.count%" /> does not work). What I am doing instead is using an additional listener (CorrelationAwareTriggerListener, also used in the second code fragment) which is both, an ItemWriteListener and a StepExecutionListener. The most importnat methods are shown below.

    Code:
    	public void beforeWrite(Object trigger) {
    		if (!(trigger instanceof CorrelationAwareTrigger)) {
    			LOG.warn("Received object of type " + trigger.getClass() + " in step " + stepName);
    			return;
    		} else {
    			((CorrelationAwareTrigger) trigger).setCorrelationId(params.getString("correlation.id"));
    			((CorrelationAwareTrigger) trigger).setTriggerCount(params.getLong("line.count").intValue());
    			((CorrelationAwareTrigger) trigger).setIndex(counter++);
    		}
    	}
    	
    	public void beforeStep(StepExecution stepExecution) {
    		stepName = stepExecution.getStepName();
    		params = stepExecution.getJobParameters();
    		counter = 1;
    	}
    In beforeStep, the job parameters etc. are stored and then used again in beforeWrite. This works as long as there is only one file processed at a time. As soon as two files arrive at approx. the same time, the job parameters of the first file will be overwritten.
    Is there a better way to solve this?

  • #2
    Have you tried step scoping the itemWriter & passing the job params with late binding?

    Code:
    <bean class="...TriggerWriter" scope="step">
    	<property name="channel" ref="eventChannel" />
    	<property name="lineCount" value="#{jobParameters[line.count]}"/>
    	<property name="fileName" value="#{jobParameters[file.name]}"/>
    </bean>

    Comment


    • #3
      Ive found the best method to get parameters to your Reader and Writer is by using the @BeforeStep annotation which gives you access to the StepExecution. Then you can get the job parameters with: stepExecution.getJobParameters().

      Late binding doesn't play nice with task executors which I use a lot of.

      Comment


      • #4
        Hi, thanks for the replies.

        I don't think that they work for me, let me know if I'm wrong. The problem is, that once I set local variables in the beforeStep-method, they are overwritten if the beforeStep-method is called again because there is only one instance of the listener. If the execution of the first step is not finished, the beforeWrite-method sets the wrong values.

        I solved the problem by using a semaphore that it acquired in the beforeStep- and released in the afterStep-method. This works fine so far.

        This is my solution for the listener:

        Code:
        package ...;
        
        import java.util.concurrent.Semaphore;
        
        import org.apache.log4j.Logger;
        import ...CorrelationAwareTrigger;
        import org.springframework.batch.core.ItemWriteListener;
        import org.springframework.batch.core.StepExecution;
        import org.springframework.batch.core.StepExecutionListener;
        import org.springframework.batch.repeat.ExitStatus;
        
        
        public class CorrelationAwareTriggerListener implements ItemWriteListener, StepExecutionListener {
        	
        	private static final Logger LOG = Logger.getLogger(CorrelationAwareTriggerListener.class);
        	
        	private Semaphore semaphore;
        	
        	private long sequenceSize;
        	
        	private String correlationId;
        	
        	private String stepName;
        	
        	private int counter;
        	
        	public CorrelationAwareTriggerListener() {
        		semaphore = new Semaphore(1);
        	}
        
        	public void afterWrite(Object arg0) {
        		// Do nothing
        	}
        	
        	public void beforeWrite(Object trigger) {
        		if (!(trigger instanceof CorrelationAwareTrigger)) {
        			LOG.warn("Received object of type " + trigger.getClass() + " in step " + stepName);
        			return;
        		} else {
        			((CorrelationAwareTrigger) trigger).setCorrelationId(correlationId);
        			((CorrelationAwareTrigger) trigger).setTriggerCount((int) sequenceSize);
        			((CorrelationAwareTrigger) trigger).setIndex(counter++);
        		}
        	}
        	
        	public void onWriteError(Exception arg0, Object arg1) {
        		LOG.error("Write error " + arg1, arg0);
        	}
        	
        	public ExitStatus afterStep(StepExecution arg0) {
        		semaphore.release();
        		return arg0.getExitStatus();
        	}
        	
        	public void beforeStep(StepExecution stepExecution) {
        		try {
        	                semaphore.acquire();
                        } catch (InterruptedException e) {
                        	LOG.error("Error while waiting for import job to end", e);
                        }
        		stepName = stepExecution.getStepName();
        		correlationId = stepExecution.getJobParameters().getString("correlation.id");
        		sequenceSize = stepExecution.getJobParameters().getLong("line.count");
        		counter = 1;
        	}
        	
        	public ExitStatus onErrorInStep(StepExecution arg0, Throwable arg1) {
        		LOG.error("Error in execution of step " + arg0.getStepName());
        		return arg0.getExitStatus();
        	}
        	
        }

        Comment


        • #5
          Ah, I see -- typically I use the Step Execution context for storing stateful values.

          Comment


          • #6
            If I understand your problem...you are sharing stateful objects between jobs?

            That's why I don't do that. The only 'shared' beans that I have defined are the stateless ones, or abstract parent beans.

            Everything else is done using stateful prototype beans using the ClassPathXmlJobRegistry. When a job is started, it gets a fresh context with it's own beans. All the stateful beans are defined in their own bean file and bound to the job name in the registry:

            Code:
            <bean id="jobLocator" class="org.springframework.batch.core.configuration.support.ClassPathXmlJobRegistry">
                  <property name="jobPaths">
                    <list>
                      <value>classpath:spring/batch-jobtype1-prototype-beans.xml</value>
                      <value>classpath:spring/batch-jobtype2-prototype-beans.xml</value>
                    </list>
                  </property>
                </bean>
            There are jobs defined in each file:

            jobtype1
            Code:
            <bean id="jobtype1" parent="simpleJob">
            ...
            jobtype2
            Code:
            <bean id="jobtype2" parent="simpleJob">
            ...
            Then, when I get a job to start it, it has a fresh context with beans instances that aren't shared between other jobs:

            Code:
            Job batchJob = jobLocator.getJob("jobtype1");
            // add parameters and start the job

            Comment


            • #7
              Oh yeah, the step execution context could be the solution I was looking for. I configured the StepListener likes this:

              Code:
                  <bean class="org.springframework.batch.core.scope.StepScope" />
                  <bean id="correlationTriggerListener" class="...CorrelationAwareTriggerListener" scope="step">
                      <property name="sequenceSize" value="#{stepExecutionContext['line.count']}"/>
                  </bean>
              I've read in the spring-batch manual, that the StepScope has to be imported in oreder to use the scope-attribute for the correlationTriggerListener. Anyway, this is the job config:

              Code:
              <bean id="fixedLengthImportJob" parent="simpleJob">
                <property name="steps">
                  <bean id="step1" parent="simpleStep">
                    <property name="commitInterval" value="3" />
                    <property name="listeners">
                      <list>
                        <ref bean="inputFile" />
                        <ref bean="correlationTriggerListener" />
                      </list>
                    </property>
                    <property name="streams" ref="fileItemReader" />
                    <property name="itemReader" ref="flatFileItemReader">
                    </property>
                    <property name="itemWriter">
                      <bean class="...TriggerWriter">
                        <property name="channel" ref="eventChannel" />
                      </bean>
                    </property>
                  </bean>
                </property>
                <property name="restartable" value="true" />
              </bean>
              And when I run the application I get the following exception:

              Code:
              Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1': Cannot resolve reference to bean 'org.springframework.integration.handler.ServiceActivatingHandler#0' while setting constructor argument; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.handler.ServiceActivatingHandler#0': Cannot resolve reference to bean 'applicationFileReceiver' while setting constructor argument; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'applicationFileReceiver' defined in class path resource [integration.xml]: Cannot resolve reference to bean 'fixedLengthImportJob' while setting bean property 'job'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'fixedLengthImportJob' defined in class path resource [batch.xml]: Cannot create inner bean 'step1' of type [org.springframework.batch.core.step.item.SimpleStepFactoryBean] while setting bean property 'steps'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'step1' defined in class path resource [batch.xml]: Initialization of bean failed; nested exception is java.lang.IllegalStateException: No Scope registered for scope 'step'
              	at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:275)
              	at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveValueIfNecessary(BeanDefinitionValueResolver.java:104)
              	at org.springframework.beans.factory.support.ConstructorResolver.resolveConstructorArguments(ConstructorResolver.java:479)
              ...
              
              Caused by: java.lang.IllegalStateException: No Scope registered for scope 'step'
              	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:295)
              	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
              	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
              	at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:269)
              	at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveValueIfNecessary(BeanDefinitionValueResolver.java:104)
              	at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveManagedList(BeanDefinitionValueResolver.java:287)
              	at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveValueIfNecessary(BeanDefinitionValueResolver.java:126)
              	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyPropertyValues(AbstractAutowireCapableBeanFactory.java:1245)
              	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1010)
              	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:472)
              	... 66 more
              I had to remove some parts of the stack trace, because it would have exceeded the maximum length for a thread.

              Comment


              • #8
                Oh sorry, I didn't see the chudaks post. I'll try this, tomorrow. Just ignore my last post for now.

                Comment


                • #9
                  Ok, using a JobLocator makes sense.
                  I still like to get this step execution stuff runnning, but I'll try myself when I have more time.
                  Thanks both of you.

                  Comment


                  • #10
                    From you config '${trigger.file.location}/%file.name%', it seems you are using SpringBatch 1.x instead of 2.0. The scope="step" just can be used in SpringBatch 2.

                    Comment


                    • #11
                      Hello,
                      first of all, thanks for all the reply. Fortunately, the project to which my posts were related came to a successful end. But I do not have access to the source code and I can hardly remember any details.
                      What I do know is, that it all worked in the end. :-)

                      Comment


                      • #12
                        Congratulations. If possible, may you can write something to share your experience to help others who is struggling on the similar issue. So far, there isn't much info on the Internet about the usage of SpringBatch. The example is not cover everything. Some tutorials really need.

                        Comment

                        Working...
                        X