Announcement Announcement Module
No announcement yet.
Problems implementing the “trigger job” concept Page Title Module
Move Remove Collapse
Conversation Detail Module
  • Filter
  • Time
  • Show
Clear All
new posts

  • Problems implementing the “trigger job” concept

    Problems implementing the “trigger job” concept
    I like the idea of “triggering a job” but I must b e missing something. Let’s say I have an input file that I want to process through job execution and then move to the archive directory.
    I kind of got it to work. The job gets triggered with the right parameters and completes OK. There are two problems though:

    1. org.springframework.batch.integration.launch.JobLa unchingMessageHandler returns a JobExecution object. Threfore I found myself forced to define a yet another channel where I carve out the filename out of JobExecution and return it so that I can hook it up to outChannel which eventually would move the file to the archive directory.

    public class MyJobExecutionHandler {

    public String process(JobExecution je) {
    JobInstance ji = je.getJobInstance();
    JobParameters jp = ji.getJobParameters();
    String fileName = jp.getString("");
    return fileName;

    This looks pretty awkward to me – is there a different way to send a message to outChannel?

    2. Although outChannel receives the message it ends up not moving the file. I don’t understand why I don’t see any errors in the log. When I hook up inputChannel directly to outChannel by-passing the whole job launch process then it does do what it's supposed to do, i.e. move the file.

    <file:inbound-channel-adapter id="inputChannel" directory="file:${input.dir}" filter="myFilter">
    <sioller id="poller" fixed-delay="${polling.interval}" />

    <file:outbound-channel-adapter id="outChannel" delete-source-files="true" directory="${archive.dir}" />

    <si:channel id="job-requests"/>
    <si:channel id="job-executions"/>

    <si:service-activator input-channel="inputChannel" output-channel="job-requests" method="processFile">
    <bean class="scanner.FileToJobHandler">
    <property name="job" ref="simpleJob2"/>

    <si:service-activator input-channel="job-requests" output-channel="job-executions">
    <bean class="org.springframework.batch.integration.launc h.JobLaunchingMessageHandler">
    <constructor-arg ref="jobLauncher" />

    <si:service-activator input-channel="job-executions" output-channel="outChannel" method="process">
    <bean class="scanner.MyJobExecutionHandler"/>

  • #2
    When posting code and config, please use [ code ] ... [ /code ] tags - (no spaces in the brackets) - makes it easier to read.

    The message in job-executions has the original File object in the headers, under a header name file_orignalFile; just add a transformer, something like...

    <si:transformer input-channel="job-executions" output-channel="outChannel" expression="headers['file_originalFile'].absolutePath" />
    (omit the .absolutePath if you want a File object in the payload which you can use directly for the rename.)

    Last edited by Gary Russell; Sep 10th, 2011, 02:41 PM.


    • #3
      Thank for the transformer idea, I added the transformer:
      <si:transformer input-channel="job-executions" output-channel="outChannel" expression="headers['file_originalFile']" />
      The transformer gets engaged but I'm getting:

      org.springframework.integration.transformer.Messag [email protected] received message: [Payload=JobExecution: id=0, startTime=Sat Sep 10 19:59:40 MDT 2011, endTime=Sat Sep 10 19:59:40 MDT 2011, lastUpdated=Sat Sep 10 19:59:40 MDT 2011, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, JobParameters=[{\temp\scann\input\o.xml}], Job=[simpleJob2]]][Headers={timestamp=1315706380437, id=aaaa0e89-0b0a-4d14-a4b6-9e8dfcdde962}]
      2011-09-10 19:59:40,453 DEBUG [ nversionService] - Converting value 'file_originalFile' of java.lang.String to java.lang.String
      [ nversionService] - Converted to 'file_originalFile'
      [ istableBeanFactory] - Returning cached instance of singleton bean 'errorChannel' scribeChannel] - preSend on channel 'errorChannel', message: [Payload=org.springframework.integration.MessageHan dlingException: handler 'org.springframework.integration.transformer.Messa [email protected]' requires a reply, but no reply was received][Headers={timestamp=1315706380453, id=e7c057eb-b816-4f1d-9830-92c18eec1f7b}]

      It looks to me like the header contains only "timestamp" and "id" attributes, i.e. it does NOT contain file_originalFile.


      • #4
        It looks like you are dropping the header here...

        <si:service-activator input-channel="inputChannel" output-channel="job-requests" method="processFile">
        What does scanner.FileToJobHandler.processFile() look like?

        If it's a POJO, the framework will take care of copying the headers; if you are dealing with Message directly, you need to use the copyHeaders() method on the builder.

        Unless you are doing some serious message manipulation, it is much better to make your handler a POJO

        public JobLaunchRequest processFile(File file) {...}
        As a general tip, when debugging, turn on debug logging for org.springframework.integration and you will see the messages (payload and header) traversing the channels and endpoints.
        Last edited by Gary Russell; Sep 11th, 2011, 11:40 AM.


        • #5
          Looks to me like the message is created with a header containing only id and timestamp to begin with:

          2011-09-12 12:22:16,531 INFO [org.springframework.integration.file.FileReadingMe ssageSource] - Created message: [[Payload=c:\temp\scann\input\o.xml][Headers={timestamp=1315851728046, id=8c04ccd2-d334-45db-8241-755a652bc0bc}]]

          So when I receive it in my FileToHandler.processFile it's already missing. I did some debugging and found that the message and the header are being created here:
          	public GenericMessage(T payload, Map<String, Object> headers) {
          		Assert.notNull(payload, "payload must not be null");
          		if (headers == null) {
          			headers = new HashMap<String, Object>();
          		else {
          			headers = new HashMap<String, Object>(headers);
          		this.headers = new MessageHeaders(headers);
          		this.payload = payload;
          	public MessageHeaders(Map<String, Object> headers) {
          		this.headers = (headers != null) ? new HashMap<String, Object>(headers) : new HashMap<String, Object>();
          		if (MessageHeaders.idGenerator == null){
          			this.headers.put(ID, UUID.randomUUID());
          		else {
          			this.headers.put(ID, MessageHeaders.idGenerator.generateId());
          		this.headers.put(TIMESTAMP, new Long(System.currentTimeMillis()));

          Should the message be created somewhat different?


          • #6
            To Gary's point what is the return type of your 'processFile' method?


            • #7
              The processFile method is very simple it, extracts the filename from the input payload and returns a JobLaunchRequest. It's exactly the same as the "adapt" method presented in

              	public JobLaunchRequest processFile(File file) {	
              		JobParameters jobParameters = new JobParametersBuilder().addString(
              	            "", file.getAbsolutePath()).toJobParameters();
              	    return new JobLaunchRequest(job, jobParameters);


              • #8
                So are you sure that the your header is set before this method is invoked?


                • #9
                  My apologies; the file_orginalFile header is added if you transform the file contents to a String...

                  INFO : org.springframework.integration.file.FileReadingMessageSource - Created message: [[Payload=/tmp/spring-integration-samples/input/cc][Headers={timestamp=1315857489662, id=ca5c8774-5add-4990-9105-460ce03a40d0}]]
                  DEBUG: org.springframework.integration.endpoint.SourcePollingChannelAdapter - Poll resulted in Message: [Payload=/tmp/spring-integration-samples/input/cc][Headers={timestamp=1315857489662, id=ca5c8774-5add-4990-9105-460ce03a40d0}]
                  DEBUG: - preSend on channel 'filesIn', message: [Payload=/tmp/spring-integration-samples/input/cc][Headers={timestamp=1315857489662, id=ca5c8774-5add-4990-9105-460ce03a40d0}]
                  DEBUG: org.springframework.integration.transformer.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler#0 received message: [Payload=/tmp/spring-integration-samples/input/cc][Headers={timestamp=1315857489662, id=ca5c8774-5add-4990-9105-460ce03a40d0}]
                  DEBUG: org.springframework.integration.transformer.MessageTransformingHandler - handler 'org.springframework.integration.transformer.MessageTransformingHandler#0' sending reply Message: [Payload=[[email protected]][Headers={timestamp=1315857489665, id=1f4b24be-69d8-4011-b1b4-61d107a18a15, file_originalFile=/tmp/spring-integration-samples/input/cc, file_name=cc}]
                  You need to add a <header-enricher /> element to copy the file into a header of your choice. You can use a SpEL expression of "payload" to do that.