Announcement Announcement Module
Collapse
No announcement yet.
Evaluating Spring Batch framework Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Evaluating Spring Batch framework

    Hello,

    I have a business use case and after reading the documentation, looks like spring batch framework can help me solve it.

    Use Case: Three type of files are written to a directory by mainframe. my application needs to listen on the folder, read the file and based on that needs to decide what type of file it is. Once that is done, it should read each record seperated by a delimiter(which varies based on file type) and should send the message over to a JMS queue.

    Another application recieves the message written over JMS and processes it. If a certain threshhold is crossed for the errors, it needs to send a message back to the application which is writing messages to the Queue to stop sending any more messages till the problem is rectified(i.e. the currupt file is replaced).

    I am very new to spring. (my first project!) I have following questions:
    1) Should there be a different JOB for handling each type of file (as mentioned, 3 types currenlty)?

    2) In case these can be handled using 1 Job only, it needs to keep running and listen on a perticular folder. Any ideas how to do that?

    3) In case its a single application, and error threshhold is crossed for 1 type of file type, only that process which is parsing that perticular file needs to terminate(rather suspend). It needs to move the file being processed to a Error folder. It also needs to start processing the next file/file type. Is this possible using the batch framework? Even before processing the file, the file needs to be moved to a Processing folder. This should be s STEP, right?

    4) How will the threshhold exceeded notification sent to the job will be handled? anyone has done something like this?

    5) Once the file has been rectified, the application needs to be notified that it should resume.

    6) If this has to be scheduled job, how do i do scheduling?

    Hmm! too many questions, i know. I was thinking about writing a stand alone application but found this framework so thought should take the opinion of people who have used it.
    Last edited by tequilaguy; Jun 3rd, 2008, 03:04 PM.

  • #2
    1) I would probably do it as one job that, based on the parameters it's passed, can process any of the 3 types. I've done that before on a client in a similar scenario and it worked well

    2) That's really more of a scheduling concern. I would look at Quartz, which has a directory listener. Quartz could kick off the job after the listener comes back with a file.

    3) If you implement it as one job that can handle all three files, this is possible, since each job would be handling it's particular file. You would just need a StepExecutionListener that would move the file based on whether the job finished successfully or not. Keep in mind though that there can always be errors that prevent the file from being moved, so it's dangerous to rely on that.

    4) I'm not sure I really understand what you mean by threshold notification? I'm assuming you mean a skip threshold. In this case, the framework throws a SkipLimitExceededException that you can handle in a listener

    5) There's not a lot of details in this question, but it shouldn't be a problem I've seen clients right out summary reports, or send emails, etc after a job finishes, it's fairly simple to implement.

    6) See point 2 above. It's not really a 'scheduled job' if you're kicking off a job based on a file hitting a directory. You may want to also look at Spring Integration or an EAI system for other solutions to that. If you want it to be truly scheduled, you could kick off a job at the same time everyday that reads in all the files in your 'drop zone' (the directory files are uploaded to) It's not too difficult, and should be even easier with 1.1

    Comment


    • #3
      lucasward, thanks a lot

      just to elaborate on point no 4...

      What i meant was that this aplication will keep reading files and keep adding records onto a queue.Lets call this as file reader/splitter

      there is another application (totally different from the one mentioned above)which consumes the messages written by the file reader/splitter application on the queue and does some business validation. in case the business validation fails, it increments a count of error. If this error count exceeds the configured number it needs to send the message to reader/splitter to stop sending messages from this file.lets say this application writes a message on to a queue saying that threshhold has been crossed, so dont send more messages...

      My question is that how can the file reader/splitter (written using spring batch) keep listening onto this queue and stop sending more messages for this perticular file because the file has currupted data.

      Similarily, when the corrupted file has been replaced with a correct file, the reader/splitter needs to resume the job of reading, splitting and writing messages to queue.

      Comment


      • #4
        That makes a lot more sense. We've had a lot of conversations about this topic. Dave Syer has actually coded up an example that uses JMS and I believe has added it to the codebase. (It was the basis for his presentation at JavaOne) It essentially aggregates Items and writes them out in 'chunks' to the consumers of the queue. I'm not sure how he handled this scenario though. We had talked about having it checking an error queue before returning from writing out a chunk, which would probably work, but then you have the issue of what happens when the reader is exhausted, but the queue still has items remaining. I suppose a StepExecutionListener would have to listen on the error queue for that scenario. Dave may be able to provide more detailed thoughts on this topic though.

        Comment


        • #5
          The thresholds are set globally, so there has to be a dispatcher responsible for checking the progress of all the work being done remotely. The way I did it was to listen for responses (success, failure or a mixture) in the ItemWriter that sends the requests to the middleware. I use a similar approach to the TaskExecutorRepeatTemplate - a throttle limit prevents messages being sent if they are not being processed fast enough.

          I am almost ready to publish the code I wrote as a sample. It uses Spring Integration so I needed that project to settle down a bit before I put anything out. It also uses Java 5 heavily (as does Spring Integration) so it needs to be a separate project in Spring Batch. I might get some time to do this next week at Spring One.

          Comment


          • #6
            Thanks Dave and Lucas.

            I'll eagerly wait for that sample application

            Will post more once I understand the framework a bit more.

            Thanks again

            Comment


            • #7
              Hello Dave,

              did you get time to publish the code for the sample application you wrote?

              Thanks

              Comment


              • #8
                Spring Integration seems to have settled down a bit, but there are still big changes in the pipeline. However, I might be able to use M4. I've been at SpringOne, so I didn't have time to fix my samples. Maybe today if I get time, otherwise next week.
                Last edited by Dave Syer; Jun 13th, 2008, 03:05 AM. Reason: grammar

                Comment


                • #9
                  Hi,

                  I'm beginner with spring quartz and i have a problem for my project.

                  I'would whan my job is failed or sleep, the scheduler relance it and the job can retake
                  where it has stopped.

                  This is a differents steps how i was did :
                  In the first i have m y class scheduler:

                  Code:
                  @DisallowConcurrentExecution
                  public class JobLauncherDetails extends QuartzJobBean /*implements JobExecutionListener*/ {
                  
                  	private static final String JOB_DATA_MAP_MAX_RETRY = "maxRetry";
                  	//private static final String JOB_DATA_MAP_NB_RETRIES = "nbRetries";
                  	private final static String JOB_LOCATOR_CONTEXT_KEY = "jobLocator";
                  	private final static String JOB_LAUNCHER_CONTEXT_KEY = "jobLauncher";
                  	private static final String JOB_DATA_MAP_START_DATE = "startDate";
                  	private static final String JOB_PARAM_LISTENER_DELAY_KEY = "listenerDelay";
                  	private static final long JOB_PARAM_DEFAULT_LISTENER_DELAY = 1000;
                  
                  	private static Logger log = LoggerFactory.getLogger(JobLauncherDetails.class);
                  
                  	/**
                  	 * Special key in job data map for the name of a job to run.
                  	 */
                  	private static final String JOB_NAME = "jobName";
                  
                  	private JobLocator jobLocator;
                  	private JobLauncher jobLauncher;
                  	private String jobName;
                      private JobListener MyjobListener;
                  	/**
                  	 * Method called by Quartz trigger. The given {@link JobExecutionContext}
                  	 * gives info on the Job to run.
                  	 */
                  	protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
                  		try {
                  			jobLocator = (JobLocator) context.getScheduler().getContext().get(JOB_LOCATOR_CONTEXT_KEY);
                  			jobLauncher = (JobLauncher) context.getScheduler().getContext().get(JOB_LAUNCHER_CONTEXT_KEY);
                  			MyjobListener=(JobListener)context.getScheduler().getContext().get(MyjobListener);
                  			
                  		
                  		} catch (SchedulerException se) {
                  			log.error("Unable to get jobLocator and jobLauncher from scheduler context.", se);
                  		}
                  
                  		if (jobLocator == null || jobLauncher == null) {
                  			log.error("Unable to run a job without valids jobLocator and jobLauncher.");
                  		} else {
                  			JobDetail jobDetail = context.getJobDetail();
                  			Map<String, Object> jobDataMap = context.getMergedJobDataMap();
                  			
                  			if (jobDataMap == null || jobDataMap.size() == 0) {
                  				log.error("Unable to run a job without a valid jobDataMap (no job name provided...).");
                  			} else {
                  				
                  				jobName = (String) jobDataMap.get(JOB_NAME);
                  
                  				if (jobName == null || jobName.isEmpty()) {
                  					log.error("Unable to run a job: no job name provided...");
                  				} else {
                  					
                  					if(context.getRefireCount()==0) {
                  						// Add a date to the jobDataMap so that the job is unique.
                  						// It is useful to distinguish 2 instances of the same trigger.
                  						// Otherwise, a JobExecutionAlreadyRunningException will be launched
                  						// (no need to modify this parameter for a refired job)
                  						jobDataMap.put(JOB_DATA_MAP_START_DATE, new Date());
                  					}
                  
                  					JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap);
                  					Scheduler sched= context.getScheduler();
                  					try {
                  						sched.getListenerManager().addJobListener(new NcaJobListener());
                  						
                  					} catch (SchedulerException e) {
                  						// TODO Auto-generated catch block
                  						e.printStackTrace();
                  					}	
                  					
                  					if (log.isInfoEnabled())
                  						log.info("\n**********************************************************************\n" 
                  								+ "* Quartz trigger - start job: {}. Key: {}\n"
                  								+ "* Firing unique ID: {}\n" 
                  								+ "* Refire count: {}\n" 
                  								+ "* Job parameters: {}\n"
                  								+ "* isConcurrentExectionDisallowed: {}\n"
                  								+ "**********************************************************************\n",
                  								new Object[] { jobName, jobDetail==null?"":jobDetail.getKey(), context.getFireInstanceId(), 
                  												context.getRefireCount(), jobParameters==null?"":jobParameters.toString(), 
                  												context.getJobDetail().isConcurrentExectionDisallowed() });
                  					
                  					JobExecution jobExec = null;
                  					BatchStatus jobStatus = BatchStatus.UNKNOWN;
                  
                  					try {
                  						jobExec = jobLauncher.run(jobLocator.getJob(jobName), jobParameters);
                  						log.info("Job Id: {}", jobExec.getId());
                  
                  					}
                  
                  					} catch (Exception ex) {
                  						if(!(ex instanceof JobExecutionException)) {
                  							log.error("Error while running Job [{}]. Rescheduling if possible.\nError: {}\n******************************", jobName, ex.toString());
                  							// TODO: retirer ce log...
                  							log.error("Error full stack:", ex);
                  							
                  						} else {
                  							throw (JobExecutionException) ex;
                  						}
                  					}
                  
                  					if (log.isInfoEnabled())
                  						log.info("\n**********************************************************************\n" 
                  								+ "* Quartz trigger - end job: {}\n"
                  								+ "* Firing unique ID: {}\n" 
                  								+ "* Refire count: {}\n" 
                  								+ "* Job parameters: {}\n"
                  								+ "* Start date: {}\n"
                  								+ "* End date: {}\n"
                  								+ "* Status: {}\n"
                  								+ "**********************************************************************\n",
                  								new Object[] { jobName, context.getFireInstanceId(), context.getRefireCount(), 
                  												jobParameters==null?"":jobParameters.toString(),
                  												jobExec==null?"":jobExec.getStartTime(),
                  												jobExec==null?"":jobExec.getEndTime(),
                  												jobStatus });
                  				}
                  			}
                  		}
                  	}
                  
                  	/**
                  	 * Copy parameters that are of the correct type over to
                  	 * {@link JobParameters}, ignoring jobName.
                  	 * 
                  	 * @return a {@link JobParameters} instance
                  	 */
                  	private JobParameters getJobParametersFromJobMap(Map<String, Object> jobDataMap) {
                  
                  		JobParametersBuilder builder = new JobParametersBuilder();
                  
                  		for (Entry<String, Object> entry : jobDataMap.entrySet()) {
                  			String key = entry.getKey();
                  			Object value = entry.getValue();
                  			if (value instanceof String && !key.equals(JOB_NAME)) {
                  				builder.addString(key, (String) value);
                  			} else if (value instanceof Float || value instanceof Double) {
                  				builder.addDouble(key, ((Number) value).doubleValue());
                  			} else if (value instanceof Integer || value instanceof Long) {
                  				builder.addLong(key, ((Number) value).longValue());
                  			} else if (value instanceof Date) {
                  				builder.addDate(key, (Date) value);
                  			} else {
                  				log.debug("JobDataMap contains values which are not job parameters (ignoring).");
                  			}
                  		}
                  
                  		return builder.toJobParameters();
                  	}
                  
                  	/*@Override
                  	public void afterJob(JobExecution jobExecution) {
                  		// TODO Auto-generated method stub
                  		log.info("*************************** Job ended with status: {} ********************", jobExecution.getExitStatus());
                  
                  		if (ExitStatus.FAILED.equals(jobExecution.getExitStatus())) {
                  			log.error("******************************\nJob [{}] failed. Reschedule if possible.\n******************************", jobName);
                  
                  			// rescheduleJob(jobExecution);
                  		}
                  	}
                  
                  	@Override
                  	public void beforeJob(JobExecution jobExecution) {
                  		log.info("************** Before running job {} **************", jobName);
                  	}*/
                  
                  }

                  Comment


                  • #10
                    In the second i have a class jobfaileur:

                    Code:
                    public class JobFailureListener implements JobExecutionListener {
                    	private static Logger log = LoggerFactory.getLogger(JobFailureListener.class);
                    	private static final String JOB_DATA_MAP_MAX_RETRY = "maxRetry";
                    	private static final String JOB_NAME = "jobName";
                    	private String jobName;
                    	public void beforeJob(JobExecution jobExecution) {
                    	// nothing to do
                    		log.info("*************************** Job ended with status: {} ********************", jobExecution.getExitStatus());
                    	}
                    
                    	public void afterJob(JobExecution jobExecution, JobExecutionContext context) throws JobExecutionException {
                    	
                    		if( jobExecution.getStatus() == BatchStatus.COMPLETED ){
                    			System.out.println("!!!!!!!!!!!!!!!!!  sa	 marche !!!!!!!!!!!!!!!!");
                    	    }
                    		
                    		else
                    		
                    			if (!jobExecution.getAllFailureExceptions().isEmpty()) {
                    	ExitStatus exitStatus = ExitStatus.FAILED;
                    	log.error("******************************\nJob [{}] failed. Reschedule if possible.\n******************************");
                    	
                    	//rescheduler if possible
                    	rescheduleJob(jobExecution, context);
                    	
                    	for (Throwable e : jobExecution.getAllFailureExceptions()) {
                    
                    	exitStatus = exitStatus.addExitDescription(e);
                    
                    	}
                    
                    	jobExecution.setExitStatus(exitStatus);
                    
                    	}
                    
                    	}
                    
                    	private void rescheduleJob(JobExecution jobExec, JobExecutionContext context) throws JobExecutionException {
                    		rescheduleJob(jobExec, context, null);
                    	}
                    
                    	private void rescheduleJob(JobExecution jobExec, JobExecutionContext context, Throwable excep) throws JobExecutionException {
                    		// TODO: Gérer la politique de réessai, ex :
                    		// http://stackoverflow.com/questions/4...y-when-failure
                    	
                    		if(jobExec == null)
                    			throw new IllegalArgumentException("jobExec cannot be null.");
                    		
                    		if(context == null)
                    			throw new IllegalArgumentException("context cannot be null.");
                    		
                    		String fireInstanceId =   context.getFireInstanceId();
                    		int refireCount = context.getRefireCount();
                    		
                    		if (log.isInfoEnabled())
                    			log.info("\n**********************************************************************\n" 
                    					+ "* Quartz trigger - rescheduling job: {}\n"
                    					+ "* Firing unique ID: {}\n" 
                    					+ "* Refire count: {}\n" 
                    					+ "**********************************************************************\n",
                    					new Object[] { jobName, fireInstanceId, refireCount});
                    		
                    		JobDataMap dataMap = context.getJobDetail().getJobDataMap();
                    		int maxRetry = dataMap.getIntValue(JOB_DATA_MAP_MAX_RETRY);
                    		
                    		JobExecutionException jobExecutionException = null;
                    		
                            if(maxRetry>0 && refireCount<maxRetry) {        	
                            	jobExecutionException = new JobExecutionException("Error: the job [" + jobName + "] didn't end properly, refire it immediately.", excep);
                            	jobExecutionException.setRefireImmediately(true);
                            	//boolean refireImmediatelyResult = jobExecutionException.refireImmediately();
                            	
                            	log.info("************** Job Id: {} - rescheduled. ", jobExec.getId());
                            	
                            	throw jobExecutionException;
                            } else {
                            	//jobExecutionException = new JobExecutionException("Error: the job [" + jobName + "] didn't end properly. No more retry possible, sending alarm.", excep);
                            	
                            	log.error("\n**********************************************************************\n" 
                    					+ "* Quartz trigger - No more retry possible for job: {}\n"
                    					+ "* Firing unique ID: {}\n" 
                    					+ "* UNSCHEDULING TRIGGER + SENDING EXPLOIT ALARM...\n"
                    					+ "**********************************************************************\n",
                    					jobName, fireInstanceId);
                            	
                            	AlarmDef alarmDef = AlarmExploit.searchBatchFailedAlarmDefByBatchName(jobName);
                            	AlarmExploit.generateAlarmWithoutTemplate(alarmDef, "Batch failed", "The following batch failed (no more retry possible): " + jobName, Locale.ENGLISH.toString(), null);
                            }
                    	}
                    
                    	@Override
                    	public void afterJob(JobExecution jobExecution) {
                    		// TODO Auto-generated method stub
                    			}
                    	
                    }
                    I also add an sleep for my job and when i run i have a message who said that my job is finished or it's nnot there but juste sleep.
                    Please can i have a response, exemple or others suggestions , i need some one to help me please

                    Thanks a lot

                    Comment

                    Working...
                    X