Announcement Announcement Module
Collapse
No announcement yet.
PartitionHandler: step.execute() throws exception on remote machine Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • PartitionHandler: step.execute() throws exception on remote machine

    Hello,

    I've written a PartitionHandler for a compute grid framework that executes steps on remote machines. These machines may be on the same network as the master step, or they may be on different networks, so I cannot rely on the remote step having access to the Spring Batch job repository. I can simulate this on my machine by using an in-memory job repository and running the partitioned steps in a separate JVM. Unfortunately I receive the following exception:

    Code:
    java.lang.IllegalArgumentException: step executions for given job execution are expected to be already saved
        at org.springframework.util.Assert.notNull(Assert.java:112)
        at org.springframework.batch.core.repository.dao.MapStepExecutionDao.updateStepExecution(MapStepExecutionDao.java:79)
        at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:167)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:307)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:106)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
        at $Proxy2.update(Unknown Source)
        at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:185)
        at uk.ac.ebi.interpro.scan.batch.RemoteStepExecutor.execute(RemoteStepExecutor.java:39)
    The following RemoteStepExecutor class runs on the remote node. This has a reference to the StepExecution, the location of the application context file and the step name so the step can be created on the remote node (some lines have been removed for clarity):

    Code:
    public class RemoteStepExecutor implements Serializable {	
    
    	private final String stepName;
    	private final String appContextLocation;
    	private final StepExecution stepExecution
    
    	public StepExecution execute() throws JobInterruptedException {
    		Step step = (Step) new ClassPathXmlApplicationContext(appContextLocation).getBean(stepName, Step.class);
    		step.execute(stepExecution);
    		return stepExecution;
    	}
    
    }
    The exception originates on AbstractStep's execute() method (line 185):
    Code:
    getJobRepository().update(stepExecution);
    ... which, in my case, calls MapStepExecutionDao.updateStepExecution() (line 78):

    Code:
    Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId());
    Assert.notNull(executions, "step executions for given job execution are expected to be already saved");
    Any ideas on how to run a step without access to the job repository?

    Thanks,

    Antony

  • #2
    Short answer: you can't. Long answer: you need an implementation of JobRepository that works for your remote nodes. Ideally it really needs to be transactional, so I thought once of writing a JMS implementation, but never got round to it. There might be use cases where you can weaken the transaction semantics (maybe yours is one of those), and make a web service or even a local virtual repository that creates StepExecutions on the fly. Or you could try a local off the shelf repository for each worker. You would also need to pass the ExecutionContext and JobParameters to the remote worker in that case so they can be used to create the right starting conditions for the Step.

    Comment


    • #3
      Dave,

      Thanks for the reply, I thought this might be the case.

      For the job repositories, I will use a MySQL database on the local node and in-memory off-the-shelf implementations for the remote nodes (MapExecutionContextDao, MapStepExecutionDao ...etc).

      To populate the remote repository, I assume it's just a question of serialising StepExecution and calling MapJobExecutionDao.saveJobExecution(stepExecution. getJobExecution()) ...etc on the remote node before calling step.execute(). Is that correct?

      The remote node returns StepExecutions to PartitionHandler.handle(), which are returned to PartitionStep as a Collection. Given that PartitionStep will aggregate and persist the collection of StepExecutions, is there any value in persisting the individual StepExecutions from the remote node to the local (MySQL) repository in PartitionHandler.handle()?

      Thanks,

      Antony

      Comment


      • #4
        Interesting model. I would like to know if it works.

        I would assume that you don't need the *Dao interfaces directly to persist your serialized stuff remotey; the JobRepository should be enough.

        The whole partition of StepExecution instances should be saved locally by the PartitionStep (I can't check right now, but that makes sense to me). There is certainly value in updating them when they come back, otherwise there is no way to know which ones were successful and don't need to be restarted if there is a partial failure.

        Comment


        • #5
          Hello,

          I got this working by using a "dummy" job repository on the remote node:

          Code:
          public class RemoteJobRepository implements JobRepository {
          
          ...
          
              public void add(StepExecution stepExecution) {
                  throw new UnsupportedOperationException();
              }
          
              public void update(StepExecution stepExecution) {
                  LOGGER.info("update: " + stepExecution.toString());
              }
          
              public void updateExecutionContext(StepExecution stepExecution) {
                  LOGGER.info("updateExecutionContext: " + stepExecution.getExecutionContext().toString());
              }
          
          ...
          
          }
          I then save the step executions on the master node by calling the same methods as in AbstractJob (jobRepository.update and jobRepository.updateExecutionContext):

          Code:
          for (StepExecution stepExecution : results) {
            jobRepository.updateExecutionContext(stepExecution);
            jobRepository.update(stepExecution);
          }
          Failures on the remote node are communicated back to the master node as a ProActive TaskException:

          Code:
          catch (TaskException e) {
            throw (e);
          }
          One question: do I need to update the job repository in the above catch block?

          I'll include the full code listing in a separate thread below.

          Cheers,

          Antony

          Comment


          • #6
            Code:
            /**
             * Implementation of {@link JobRepository} that allows partitioned steps to run on remote nodes with no access
             * to the job repository on the master node.
             *
             * @author  Antony Quinn
             * @version $Id: RemoteJobRepository.java,v 1.1 2009/06/18 15:08:38 aquinn Exp $
             * @since   1.0
             */
            public class RemoteJobRepository implements JobRepository {
            
                private static Logger LOGGER = Logger.getLogger(RemoteJobRepository.class);
            
                /**
            	 * Provide default constructor with low visibility in case user wants to use
            	 * use aop:proxy-target-class="true" for AOP interceptor.
            	 */    
                RemoteJobRepository()    {
                }
            
                public boolean isJobInstanceExists(String jobName, JobParameters jobParameters) {
                    throw new UnsupportedOperationException();
                }
            
                public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
                        throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
                    throw new UnsupportedOperationException();
                }
            
                public void update(JobExecution jobExecution) {
                    throw new UnsupportedOperationException();
                }
            
                public void add(StepExecution stepExecution) {
                    throw new UnsupportedOperationException();
                }
            
                public void update(StepExecution stepExecution) {
                    LOGGER.info("update: " + stepExecution.toString());
                }
            
                public void updateExecutionContext(StepExecution stepExecution) {
                    LOGGER.info("updateExecutionContext: " + stepExecution.getExecutionContext().toString());
                }
            
                public void updateExecutionContext(JobExecution jobExecution) {
                    throw new UnsupportedOperationException();
                }
            
                public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
                    throw new UnsupportedOperationException();
                }
            
                public int getStepExecutionCount(JobInstance jobInstance, String stepName) {
                    throw new UnsupportedOperationException();
                }
            
                public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) {
                    throw new UnsupportedOperationException();
                }
            
            }
            Code:
            import org.objectweb.proactive.extensions.masterworker.ProActiveMaster;
            import org.objectweb.proactive.extensions.masterworker.TaskException;
            import org.objectweb.proactive.extensions.masterworker.interfaces.Task;
            import org.objectweb.proactive.core.ProActiveException;
            import uk.ac.ebi.interpro.scan.batch.partition.remote.RemoteStepExecutor;
            
            /**
             * Executes steps on remote nodes using ProActive Parallel Suite's Master Worker API.
             *
             * @author  Antony Quinn
             * @version $Id: ProActiveMasterWorkerPartitionHandler.java,v 1.1 2009/06/18 15:08:38 aquinn Exp $
             * @see     <a href="http://proactive.inria.fr/">ProActive Parallel Suite</a>
             * @since   1.0
             */
            public class ProActiveMasterWorkerPartitionHandler implements PartitionHandler, InitializingBean {
            
                private Step step;
                private Resource applicationDescriptor;
                private String remoteLaunchContext;
                private JobRepository jobRepository;
                private boolean persistRemoteStepExecutions = true;    
            
                // Not sure if we need this...
                private int gridSize = 1;
            
                public void afterPropertiesSet() throws Exception {
                    Assert.notNull(applicationDescriptor, "An ApplicationDescriptor must be provided");
                    Assert.notNull(remoteLaunchContext,   "A RemoteLaunchContext must be provided");
                    Assert.notNull(step,                  "A Step must be provided");
                    if (persistRemoteStepExecutions)    {
                        Assert.notNull(jobRepository, "A JobRepository must be provided");
                    }
                }
            
                public void setApplicationDescriptor(Resource applicationDescriptor) {
                    this.applicationDescriptor = applicationDescriptor;
                }
            
                public void setJobRepository(JobRepository jobRepository) {
                    this.jobRepository = jobRepository;
                }
            
                public void setPersistRemoteStepExecutions(boolean persistRemoteStepExecutions) {
                    this.persistRemoteStepExecutions = persistRemoteStepExecutions;
                }
            
                public void setRemoteLaunchContext(String remoteLaunchContext) {
                    this.remoteLaunchContext = remoteLaunchContext;
                }
            
                public void setStep(Step step) {
                    this.step = step;
                }   
            
                /**
                 * @see PartitionHandler#handle(StepExecutionSplitter, StepExecution)
                 */    
                public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter, 
                                                        StepExecution masterStepExecution) throws Exception {
            
                    // Tasks
                    List<Task<StepExecution>> tasks = new ArrayList<Task<StepExecution>>();
            
                    // Results
                    Collection <StepExecution> results = null;        
            
                    // Master
                    ProActiveMaster<Task<StepExecution>, StepExecution> master =
                            new ProActiveMaster<Task<StepExecution>, StepExecution>();
            
                    try {  
            
                        // Add virtual nodes
                        master.addResources(applicationDescriptor.getURL());
            
                        // Create tasks
                        for (final StepExecution stepExecution : stepExecutionSplitter.split(masterStepExecution, gridSize)) {
                            RemoteStepExecutor remoteStepExecutor =
                                    new RemoteStepExecutor(remoteLaunchContext, step.getName(), stepExecution);
                            Task<StepExecution> task = new ProActiveStepExecutorTask(remoteStepExecutor);
                            tasks.add(task);
                        }
            
                        // Run tasks
                        master.solve(tasks);
            
                        // Collect results
                        results = master.waitAllResults();
                        
                    }
                    catch (ProActiveException e)    {
                        // Couldn't add resources
                        throw (e);
                    }
                    catch (TaskException e) {
                        throw (e);
                    }
                    finally {
                        // Shutdown ProActive nodes
                        master.terminate(true);
            	}
            
                    // Persist step executions received from remote node
                    if (persistRemoteStepExecutions)  {
                        for (StepExecution stepExecution : results) {
                            jobRepository.updateExecutionContext(stepExecution);
                            jobRepository.update(stepExecution);
                        }
                    }        
            
                    return results;
            
                }
            
            }
            Code:
            import org.objectweb.proactive.extensions.masterworker.interfaces.Task;
            import org.objectweb.proactive.extensions.masterworker.interfaces.WorkerMemory;
            import org.objectweb.proactive.extensions.annotation.RemoteObject;
            import uk.ac.ebi.interpro.scan.batch.partition.remote.RemoteStepExecutor;
            
            /**
             * Executes step on remote node as a ProActive task.
             *
             * @author  Antony Quinn
             * @version $Id: ProActiveStepExecutorTask.java,v 1.1 2009/06/18 15:08:38 aquinn Exp $
             * @since   1.0
             */
            @RemoteObject
            public class ProActiveStepExecutorTask implements Task<StepExecution> {
            
                private final RemoteStepExecutor remoteStepExecutor;
            
                public ProActiveStepExecutorTask(RemoteStepExecutor remoteStepExecutor)    {
                    this.remoteStepExecutor = remoteStepExecutor;
                }
            
                public StepExecution run(WorkerMemory workerMemory) throws Exception {
                    return remoteStepExecutor.execute();
                }
               
            }
            Code:
            /**
             * Executes step on remote node.
             *
             * @author  Antony Quinn
             * @version $Id: RemoteStepExecutor.java,v 1.2 2009/06/19 09:30:40 aquinn Exp $
             * @since   1.0
             */
            public class RemoteStepExecutor implements Serializable {
                
                private static final Log logger = LogFactory.getLog(RemoteStepExecutor.class);
            	
            	private final StepExecution stepExecution;
            	private final String stepName;
            	private final String appContextLocation;
            
            	public RemoteStepExecutor(String appContextLocation, String stepName, StepExecution stepExecution) {
            		this.appContextLocation = appContextLocation;
            		this.stepName           = stepName;
            		this.stepExecution      = stepExecution;
            	}
            
            	public StepExecution execute() throws JobInterruptedException {
                    // Log stepExecution parameters
                    logger.info(stepExecution);
                    logger.info(stepExecution.getJobExecution().getJobInstance());
                    // Get step to run
                    ApplicationContext context = new ClassPathXmlApplicationContext(appContextLocation);
                    Step step = (Step) context.getBean(stepName, Step.class);        
                    try {
                        step.execute(stepExecution);
                    }
            		catch (JobInterruptedException e) {
            			stepExecution.getJobExecution().setStatus(BatchStatus.STOPPING);
            			throw (e);
            		}
                    return stepExecution;
            	}
            
            	public String getStepName() {
            		return stepName;
            	}
            
            }

            Comment


            • #7
              Seems like a good start. I'm confused about the results in the case of partial failure though. Surely you need to obtain the StepExecution result for each node, whether or not it was successful, and hand them back to the PartitionStep? It looks like you bomb out of the PartitionHandler without collecting enough information.

              Comment

              Working...
              X