Announcement Announcement Module
Collapse
No announcement yet.
Integration to Batch JobLauncher error Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Integration to Batch JobLauncher error

    Hi,
    I'm trying to read a file in from an SFTP site using SI.
    That works great.. I get the file to my local directory.

    Then I would like to use Spring Batch API to spawn off several jobs depending on the contents of that file.
    To do that I used the Spring Batch Integration by defining a JobLaunchingMessageHandler like so:

    Code:
    @ServiceActivator
    	public JobExecution launch(File file) throws JobExecutionException {
                    //get the file and process it.
                    //Spawn as many jobs are needed based on file contents
                    //Here's the first instance of the job...
    		Job job = new SimpleJob("myJob");
    		
    		Map<String, JobParameter> parameters = new HashMap<String, JobParameter>();
    		parameters.put("timestamp", new JobParameter(new Date().getTime()));	
    		JobParameters jobParameters = new JobParameters(parameters);
    		JobExecution execution = jobLauncher.run(job, jobParameters);
    		System.out.println("Finis!!");
    		return execution;
    	}
    I wire my launcher like so:
    Code:
    	<file:inbound-channel-adapter id="filesIn"
    	                              directory="file:C:\\tmp\\source1"  filename-pattern="*.txt" >                     
    		<int:poller>
    			<int:interval-trigger interval="3000"/>
    		</int:poller>
    	</file:inbound-channel-adapter>
    
    	<int:service-activator input-channel="filesIn">
    	    <bean class="com.xxx.adc.si.JobLaunchingMessageHandler">
    	        <constructor-arg ref="jobLauncher" />
    	    </bean>
    	</int:service-activator>
    Here is the config of the job* beans:
    Code:
    	<bean id="jobLauncher"
    		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    		<property name="jobRepository" ref="jobRepository" />
    	</bean>
    
    	<bean class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
    		<property name="jobRegistry" ref="jobRegistry"/>
    	</bean>
    
    	<bean id="jobRepository"
    		class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"
    		p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" p:lobHandler-ref="lobHandler"/>

    When I run the above code, the file get's passed in correctly to my launch method above, but when it goes to execute the job, I get the following error stack:

    Code:
    2011-07-18 16:45:33,511 [task-scheduler-2] ERROR org.springframework.integration.handler.LoggingHandler  #### org.springframework.integration.MessageHandlingException: java.lang.NullPointerException
    	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:76)
    	at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:64)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:98)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:94)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
    	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
    	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
    	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51)
    	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:207)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    	at java.lang.Thread.run(Thread.java:619)
    Caused by: java.lang.NullPointerException
    	at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:329)
    	at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120)
    	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
    	at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114)
    	at com.xxx.adc.si.JobLaunchingMessageHandler.launch(JobLaunchingMessageHandler.java:48)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    	at java.lang.reflect.Method.invoke(Method.java:597)
    	at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:69)
    	at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:110)
    	at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:57)
    	at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
    	at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:102)
    	at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:114)
    	at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:223)
    	at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:123)
    	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)
    	... 28 more
    Looking at AbstractJob, it seems that the jobRepository is coming in as null.
    There is an assert in the afterPropertiesSet method of AbstractJob that checks that the jobRepository is set. So something is obviously setting it to null *after* the SimpleJob is instantiated.

    Since I have not configured a taskExecutor, the default SynchExecutor should be being used. Why then is the jobRepository null by the time my simpleJob is launched?

    Can someone please explain why this is happening?

    Thanks!
    Last edited by spongybob; Jul 18th, 2011, 07:39 PM.

  • #2
    Hi,

    I do something very similar, just using SI and spring batch (admin).
    My laucher is :

    Code:
    public class JobLaunchingMessageHandler {
    
        private final JobRegistry jobRegistry;
        private final JobLauncher jobLauncher;
    
        /**
         * Handle job launching request for spring batch from SI
         * 
         * @param jobRegistry
         * @param jobLauncher
         */
        public JobLaunchingMessageHandler(JobRegistry jobRegistry,
                        JobLauncher jobLauncher) {
            super();
            this.jobRegistry = jobRegistry;
            this.jobLauncher = jobLauncher;
        }
    
        /**
         * @param request
         * @return {@link JobExecution} containing the istance of excecution
         * @throws JobExecutionAlreadyRunningException
         * @throws JobRestartException
         * @throws JobInstanceAlreadyCompleteException
         * @throws JobParametersInvalidException
         * @throws NoSuchJobException
         */
        public JobExecution launch(JobLaunchRequest request)
                        throws JobExecutionAlreadyRunningException, JobRestartException,
                        JobInstanceAlreadyCompleteException, JobParametersInvalidException,
                        NoSuchJobException {
            Job job = jobRegistry.getJob(
                            request.getJobName()
                            );
            JobParametersBuilder builder = new JobParametersBuilder();
            for (Map.Entry<String, String> entry : request.getJobParameters().entrySet()) {
                builder.addString(
                                entry.getKey(), entry.getValue()
                                );
            }
            return jobLauncher.run(job, builder.toJobParameters());
        }
    What i do is splitting before the activator file and then let SI send as many request to launch job on the channel where the activator listen.

    Code:
      <!-- entry point to launch a job, job-requests channel -->
        <int:service-activator input-channel="job-requests"
            output-channel="job-executions">
            <bean
                class="com.mycompany.integration.JobLaunchingMessageHandler">
                <constructor-arg ref="jobRegistry" />
                <constructor-arg ref="jobLauncher" />
            </bean>
        </int:service-activator>
    With JobLauchRequest :

    Code:
    public class JobLaunchRequest {
    
        private final String jobName;
        private final Map<String, String> jobParameters;
    
        /**
         * Request job with name jobName to start
         * 
         * @param jobName
         */
        @SuppressWarnings("unchecked")
        public JobLaunchRequest(String jobName) {
            this(jobName, Collections.EMPTY_MAP);
        }
    
        /**
         * Request job with name jobName and parameters jobParams to start
         * 
         * @param jobName
         * @param jobParams
         */
        public JobLaunchRequest(String jobName, Map<String, String> jobParams) {
            super();
            this.jobName = jobName;
            jobParameters = jobParams;
        }
    
        /**
         * getter for jobName
         * 
         * @return jobName
         */
        public String getJobName() {
            return jobName;
        }
    
        /**
         * get the parameter for the job execution request {@link JobParameter}
         * 
         * @return man containing parameters of job {@link JobParameter}
         */
        @SuppressWarnings("unchecked")
        public Map<String, String> getJobParameters() {
            return jobParameters == null ? Collections.EMPTY_MAP :
                            Collections.unmodifiableMap(jobParameters);
        }
    
    }
    I basically have it from "Spring Batch in Action" book, chapter 11.

    Hope it help to solve your problem ... if not, will try to explain it better

    Comment


    • #3
      Integration to Batch JobLauncher error

      Hi ticino,
      Thanks for the reply.

      I understand how that enables the execution of a Job that comes in as a message over a channel. (So you can extract it using JobLaunchRequest.getJobName etc.)

      However, what I would like to do is receive a file over the channel using Spring Integration. Then process the file, split it and spawn as many jobs as are required based on the splitting.

      Since doing so requires the creation of a new Job, I am trying to create a SimpleJob and then execute it using jobLauncher.

      However, when I do so, that Job is not associated to a jobRepository. More precisely, it does not exist in the jobRepository that the jobLauncher has a reference to.

      Therefore, I cannot execute it using the jobLauncher.

      Can someone chime in and suggest how I can associate a new job to an existing jobRepositry. Then I am assuming launching it via jobLauncher would work.

      Thanks!

      Comment


      • #4
        Integration to Batch JobLauncher error

        Dang!
        I got it.. should have read the docs:
        It's simple:

        Code:
        Job job = new SimpleJob("test");
        		Map<String, JobParameter> parameters = new HashMap<String, JobParameter>();
        		parameters.put("timestamp", new JobParameter(new Date().getTime()));	
        		JobParameters jobParameters = new JobParameters(parameters);
        		JobExecution firstExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

        Comment

        Working...
        X