Announcement Announcement Module
Collapse
No announcement yet.
Deadlocks when job instances/executions/steps are being created Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • #16
    not a thread safe

    Comment


    • #17
      Originally posted by alexbt View Post
      Hi all,

      I have 14 jobs running concurrently (they are started and stopped randomly according to the data received in blocking queues (1 queue per 1 job)). They are bootstrapped via spring @Component annotation. After a job finishes its execution it is restarted via afterJob() method in my JobListener implementation. A job waits for a data to appear in a blocking queue, and when it is received the job is initially created and started.

      (when restarting I change a start date parameter and file name so the job configuration is different) in JobListener.afterJob()? Or is it a new spring-batch version issue?

      Thanks in advance,
      Alex
      Hello Alex,
      I have a very similar requirement...creating different jobs based on whats there on queue etc. I have just started using the batch framework. Would be of great help to start the design if you have can give some design document or pseudo code for it.

      Comment


      • #18
        Dave/Lucas,
        We had changed the isolation level to REPEATABLE_READ in our application and found we are not getting the deadlock exception for the last 5-6 days. Earlier with isolation level as SERIALIZABLE, we used to get deadlocks alsmost every day.

        BTW i have seen in one of the 1.1 snapshots that these configurations are removed from the xml and moved to the java code. Please correct me if i am wrong. We need this to be configurable when we upgrade to 1.1.

        Comment


        • #19
          I have experienced the same deadlock problem in my application. We are using Spring Batch 1.1.1 and ahve 2 different jobs running. The two jobs started normally at the same time, which was the default behaviour of the SchedulerFactoryBean with SimpleTriggerBeans. This is especially the case when starting the application.

          The only workaround for us was to prevent the two jobs being started at the same time by setting the startDelay of one of the jobs. Now there is only the small posssibility left that the two jobs execute later at the same time by accident.

          Comment


          • #20
            Which database you are using and what is the isolation level?

            Comment


            • #21
              Hi Ramkis,

              thanks for the pointing to the isolation level. After wlking through some spring docs, I finally found out which I was using and also tried other ones.
              And surprise surprise it does work with ISOLATION_READ_UNCOMMITTED. Thanks for your help.
              Just to totally answer your question: We are using SQL Server.

              Regards,
              Thomas

              Comment


              • #22
                For the ones that are (as I am) not spring experts, here the point were I successfully set the isolation level:
                <bean id="jobRepository" class="org.springframework.batch.core.repository.s upport.JobRepositoryFactoryBean">
                <property name="databaseType" value="${database.provider}" />
                <property name="dataSource" ref="dataSource" />
                <property name="transactionManager" ref="transactionManager" />
                <property name="isolationLevelForCreate" value="ISOLATION_READ_UNCOMMITTED" />
                </bean>

                Comment


                • #23
                  ConcurrentModification Exception during multiple job instances

                  Can we launch multiple jobs simultanously? Should there be a time delay? Should a new repository created each time for job?

                  We are getting ConcurrentModificationException, Posting here the stacktrace,
                  How to avoid it ?

                  java.util.ConcurrentModificationException
                  at java.util.AbstractList$Itr.checkForComodification( AbstractList.java:449)
                  at java.util.AbstractList$Itr.next(AbstractList.java: 420)
                  at org.springframework.batch.core.repository.dao.MapJ obInstanceDao.getJobInstance(MapJobInstanceDao.jav a:39)
                  at org.springframework.batch.core.repository.dao.MapJ obInstanceDao.createJobInstance(MapJobInstanceDao. java:27)
                  at org.springframework.batch.core.repository.support. SimpleJobRepository.createJobExecution(SimpleJobRe pository.java:179)
                  at org.springframework.batch.core.launch.support.Simp leJobLauncher.run(SimpleJobLauncher.java:79)
                  at com.alservices.imaging.export.launcher.ExportWorke r.process(ExportWorker.java:175)
                  at com.alservices.imaging.export.launcher.ExportWorke r.run(ExportWorker.java:100)
                  at com.alservices.imaging.export.ejb.message.RequestM anagerBean.onMessage(RequestManagerBean.java:214)
                  at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method)
                  at sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:39)
                  at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:25)
                  at java.lang.reflect.Method.invoke(Method.java:585)
                  at org.jboss.invocation.Invocation.performCall(Invoca tion.java:359)
                  at org.jboss.ejb.MessageDrivenContainer$ContainerInte rceptor.invoke(MessageDrivenContainer.java:495)
                  at org.jboss.resource.connectionmanager.CachedConnect ionInterceptor.invoke(CachedConnectionInterceptor. java:158)
                  at org.jboss.ejb.plugins.MessageDrivenInstanceInterce ptor.invoke(MessageDrivenInstanceInterceptor.java: 116)
                  at org.jboss.ejb.plugins.CallValidationInterceptor.in voke(CallValidationInterceptor.java:63)
                  at org.jboss.ejb.plugins.AbstractTxInterceptor.invoke Next(AbstractTxInterceptor.java:121)
                  at org.jboss.ejb.plugins.TxInterceptorCMT.runWithTran sactions(TxInterceptorCMT.java:350)
                  at org.jboss.ejb.plugins.TxInterceptorCMT.invoke(TxIn terceptorCMT.java:181)
                  at org.jboss.ejb.plugins.RunAsSecurityInterceptor.inv oke(RunAsSecurityInterceptor.java:109)
                  at org.jboss.ejb.plugins.LogInterceptor.invoke(LogInt erceptor.java:205)
                  at org.jboss.ejb.plugins.ProxyFactoryFinderIntercepto r.invoke(ProxyFactoryFinderInterceptor.java:138)
                  at org.jboss.ejb.MessageDrivenContainer.internalInvok e(MessageDrivenContainer.java:402)
                  at org.jboss.ejb.Container.invoke(Container.java:960)
                  at org.jboss.ejb.plugins.jms.JMSContainerInvoker.invo ke(JMSContainerInvoker.java:1092)
                  at org.jboss.ejb.plugins.jms.JMSContainerInvoker$Mess ageListenerImpl.onMessage(JMSContainerInvoker.java :1392)
                  at org.jboss.jms.asf.StdServerSession.onMessage(StdSe rverSession.java:266)
                  at org.jboss.jms.client.container.ClientConsumer.call OnMessage(ClientConsumer.java:159)
                  at org.jboss.jms.client.container.SessionAspect.handl eRun(SessionAspect.java:802)
                  at org.jboss.aop.advice.org.jboss.jms.client.containe r.SessionAspect14.invoke(SessionAspect14.java)
                  at org.jboss.jms.client.delegate.ClientSessionDelegat e$run_N8003352271541955702.invokeNext(ClientSessio nDelegate$run_N8003352271541955702.java)
                  at org.jboss.jms.client.container.ClosedInterceptor.i nvoke(ClosedInterceptor.java:170)
                  at org.jboss.aop.advice.PerInstanceInterceptor.invoke (PerInstanceInterceptor.java:105)
                  at org.jboss.jms.client.delegate.ClientSessionDelegat e$run_N8003352271541955702.invokeNext(ClientSessio nDelegate$run_N8003352271541955702.java)
                  at org.jboss.jms.client.delegate.ClientSessionDelegat e.run(ClientSessionDelegate.java)
                  at org.jboss.jms.client.JBossSession.run(JBossSession .java:199)
                  at org.jboss.jms.asf.StdServerSession.run(StdServerSe ssion.java:194)
                  at EDU.oswego.cs.dl.util.concurrent.PooledExecutor$Wo rker.run(PooledExecutor.java:756)
                  at java.lang.Thread.run(Thread.java:595)

                  Comment


                  • #24
                    The volatile map-based JobRepository is not supposed to be thread safe. Do you need to use it?

                    Comment


                    • #25
                      Hi All,
                      I have a scenario like starting more than a job at same time. we are getting deadlocks on the spring tables with the isolation level SERIALIZABLE, then changed the isolation level to REPEATABLE_READ there is no deadlock on the spring tables.

                      Deadlock occurs in the Batch_Job_Execution_Seq. While checking the logic in the getNextKey() of the SqlServerMaxValueIncrementer class.
                      1. insert a row
                      2. select identity value
                      3. Delete the rows.

                      In SERIALIZABLE Level: if one transcation is doing an insert or delete, other transaction can't able modify, delete nor insert any row.

                      In REPEATABLE_READ Level: if one transcation is doing an insert or delete, other transaction can't able to modify nor delete any row but can Insert a row.

                      Both the case of SERIALIZABLE and REPEATABLE_READ, we cann't able to delete the rows. This case may case a block but deadlock ?

                      Update:
                      I have modified the getNextKey() of the SqlServerMaxValueIncrementer class by adding the table lock while inserting a row. No deadlock occured even in the SERIALIZABLE isolation level.
                      Last edited by arun4; Apr 17th, 2012, 10:07 AM.

                      Comment


                      • #26
                        Hello

                        Originally posted by arun4 View Post
                        I have modified the getNextKey() of the SqlServerMaxValueIncrementer class by adding the table lock while inserting a row. No deadlock occured even in the SERIALIZABLE isolation level.
                        did you submit a patch to the community? I don't see update in the source code.

                        Regards

                        Geoff

                        Comment


                        • #27
                          Hi Geoff
                          I didn't submit a patch to the community. Please find the sample code for reference.
                          Code:
                           
                          public class BatchSqlServerMaxValueIncrementer extends SqlServerMaxValueIncrementer {
                              
                              /** Containing Logger for the Class. */
                              private static final Logger LOGGER = Logger.getLogger(BatchSqlServerMaxValueIncrementer.class);
                          
                              /** Dead lock error code.*/
                              private static final long DEADLOCK_ERROR_CODE = 1205;
                              
                              /** The deadlock error code. */
                              private int deadLockErrorCode;
                              
                              /** The retryLimit. */
                              private int retryLimit;
                              
                              /** The sleepTime. */
                              private int sleepTime;
                              
                              /**
                               * Default constructor for bean property style usage.
                               * 
                               * @see #setDataSource
                               * @see #setIncrementerName
                               * @see #setColumnName
                               */
                              public BatchSqlServerMaxValueIncrementer() {
                          
                              }
                              
                              /**
                               * BatchSqlServerMaxValueIncrementer constructor.
                               * 
                               * @param dataSource
                               *            the data source.
                               * @param incrementerName
                               *            the incrementerName
                               * @param columnName
                               *            the columnName
                               * @param deadLockErrorCode
                               *            the deadLockErrorCode.
                               * @param retryLimit
                               *            the retryLimit.
                               * @param sleepTime
                               *            the sleepTime.
                               */
                              public BatchSqlServerMaxValueIncrementer(DataSource dataSource, String incrementerName, String columnName,
                                      int deadLockErrorCode, int retryLimit, int sleepTime) {
                          
                                  super(dataSource, incrementerName, columnName);
                                  this.deadLockErrorCode = deadLockErrorCode;
                                  this.retryLimit = retryLimit;
                                  this.sleepTime = sleepTime;
                              }
                              
                              /**
                               * get the getNextKey value.
                               * 
                               * @return identity value
                               */
                              protected synchronized long getNextKey() {
                                  
                                  LOGGER.debug("Enter getNextKey.");
                                  
                                  long lCurrentId;
                                  lCurrentId = getNextIdLot(); 
                                  
                                  LOGGER.debug("Exit getNextKey.");
                                  
                                  return lCurrentId;
                              }
                              
                              /**
                               * getNextIdLot get the new connection and do an insert and select with in the same transaction.
                               * 
                               * @return the incremented value.
                               */
                              private long getNextIdLot() {
                          
                                  LOGGER.debug("Enter getNextIdLot.");
                                  
                                  Connection con = null;
                                  int retryCount = 0;
                                  long nextIdValue = 0;
                                  /*
                                  * Need to use straight JDBC code because we need to make sure that the insert and select
                                  * are performed on the same connection (otherwise we can't be sure that @@identity
                                  * returns the correct value)
                                  */
                                  try {
                                      con = DataSourceUtils.getConnection(getDataSource());
                                      do {
                                          try {
                                              
                                              nextIdValue = getNextLotStartNumber(con);
                                              con.commit();
                                              break;
                                          } catch (SQLException lSQLException) {
                                               try {
                                                  if (isDeadLockTimeout(lSQLException)) {
                                                      con.rollback();
                                                      con.setAutoCommit(false);
                                                      retryCount++;
                                                      sleep(sleepTime);
                                                  } else {
                                                      
                                                      throw lSQLException;
                                                      
                                                  }
                                              } catch (SQLException e) {
                                                  throw new DataAccessResourceFailureException("Unable to Rollback", e);
                                                  
                                              }
                                              if (retryCount == retryLimit) {
                                                  LOGGER.error("Retry failed with ErrorCode: " + lSQLException.getErrorCode());
                                                  //Throw system exception
                                              }
                                          }
                                      }  while (retryCount <= retryLimit);
                                  } finally {
                                      DataSourceUtils.releaseConnection(con, getDataSource());
                                  }
                                          
                                  LOGGER.debug("Exit getNextIdLot.");
                                  
                                  return nextIdValue;
                              }
                              
                              /**
                               * getNextLotStartNumber set auto commit to false and get the next id value from SEQ table.
                               * 
                               * @param connection
                               *            the connection.
                               * @return the currentValue.
                               * @throws SQLException
                               *             the sql exception
                               */
                              private synchronized long getNextLotStartNumber(Connection connection) throws SQLException {
                          
                                  LOGGER.debug("Enter getNextLotStartNumber.");
                                  
                                  long nextIdValue;
                                  Statement stmt = null;
                                  try {
                                      connection.setAutoCommit(false);
                                      stmt = connection.createStatement();
                                      nextIdValue = getNextIDValue(connection, stmt);
                                  } finally {
                                      JdbcUtils.closeStatement(stmt);
                                  }
                                  
                                  LOGGER.debug("Exit getNextLotStartNumber.");
                                  
                                  return nextIdValue;
                              }
                              
                              /**
                               * Get the incremented value by using update and select.
                               * 
                               * @param connection
                               *            the connection
                               * @param statement
                               *            the statement
                               * @return the currentValue.
                               * @throws SQLException
                               *             the SQLException
                               */
                              private long getNextIDValue(Connection connection, Statement statement) throws SQLException {
                          
                                  LOGGER.debug("Enter getNextIDValue.");
                                  
                                  ResultSet resultSet = null;
                                  long nextIDValue;
                                  try {
                                      statement.executeUpdate("UPDATE " + getIncrementerName() + " SET ID = ID + 1");
                                      resultSet = statement.executeQuery("select ID from " + getIncrementerName());
                                      if (!resultSet.next()) {
                                          throw new DataAccessResourceFailureException("Fetching identity for " + getIncrementerName()
                                                  + " + failed.");
                                      }
                                      nextIDValue = resultSet.getLong(1);
                                  } finally {
                                      JdbcUtils.closeResultSet(resultSet);
                                  }
                                  
                                  LOGGER.debug("Exit getNextIDValue.");
                                  
                                  return nextIDValue;
                              }
                              
                              
                              /**
                               * Sleep.
                               * 
                               * @param sleepTime
                               *            the sleep time
                               */
                              private void sleep(int sleepTime) {
                                  try {
                                      Thread.sleep(sleepTime);
                                  } catch (InterruptedException interruptedException) {
                                      LOGGER.error(interruptedException);
                                  }
                              }
                              
                              /**
                               * validate the deadlock timeout and dead lock error code.
                               * 
                               * @param lSQLException
                               *            the SQLException
                               * @return deadLockTimeout indicator
                               */
                              private boolean isDeadLockTimeout(SQLException lSQLException) {
                          
                                  return (lSQLException.getErrorCode() == deadLockErrorCode
                                          || lSQLException.getErrorCode() == DEADLOCK_ERROR_CODE);
                              }
                          }

                          Comment


                          • #28
                            Originally posted by arun4 View Post
                            Hi Geoff
                            I didn't submit a patch to the community. Please find the sample code for reference.
                            Code:
                             
                            public class BatchSqlServerMaxValueIncrementer extends SqlServerMaxValueIncrementer {
                                
                                /** Containing Logger for the Class. */
                                private static final Logger LOGGER = Logger.getLogger(BatchSqlServerMaxValueIncrementer.class);
                            
                                /** Dead lock error code.*/
                                private static final long DEADLOCK_ERROR_CODE = 1205;
                                
                                /** The deadlock error code. */
                                private int deadLockErrorCode;
                                
                                /** The retryLimit. */
                                private int retryLimit;
                                
                                /** The sleepTime. */
                                private int sleepTime;
                                
                                /**
                                 * Default constructor for bean property style usage.
                                 * 
                                 * @see #setDataSource
                                 * @see #setIncrementerName
                                 * @see #setColumnName
                                 */
                                public BatchSqlServerMaxValueIncrementer() {
                            
                                }
                                
                                /**
                                 * BatchSqlServerMaxValueIncrementer constructor.
                                 * 
                                 * @param dataSource
                                 *            the data source.
                                 * @param incrementerName
                                 *            the incrementerName
                                 * @param columnName
                                 *            the columnName
                                 * @param deadLockErrorCode
                                 *            the deadLockErrorCode.
                                 * @param retryLimit
                                 *            the retryLimit.
                                 * @param sleepTime
                                 *            the sleepTime.
                                 */
                                public BatchSqlServerMaxValueIncrementer(DataSource dataSource, String incrementerName, String columnName,
                                        int deadLockErrorCode, int retryLimit, int sleepTime) {
                            
                                    super(dataSource, incrementerName, columnName);
                                    this.deadLockErrorCode = deadLockErrorCode;
                                    this.retryLimit = retryLimit;
                                    this.sleepTime = sleepTime;
                                }
                                
                                /**
                                 * get the getNextKey value.
                                 * 
                                 * @return identity value
                                 */
                                protected synchronized long getNextKey() {
                                    
                                    LOGGER.debug("Enter getNextKey.");
                                    
                                    long lCurrentId;
                                    lCurrentId = getNextIdLot(); 
                                    
                                    LOGGER.debug("Exit getNextKey.");
                                    
                                    return lCurrentId;
                                }
                                
                                /**
                                 * getNextIdLot get the new connection and do an insert and select with in the same transaction.
                                 * 
                                 * @return the incremented value.
                                 */
                                private long getNextIdLot() {
                            
                                    LOGGER.debug("Enter getNextIdLot.");
                                    
                                    Connection con = null;
                                    int retryCount = 0;
                                    long nextIdValue = 0;
                                    /*
                                    * Need to use straight JDBC code because we need to make sure that the insert and select
                                    * are performed on the same connection (otherwise we can't be sure that @@identity
                                    * returns the correct value)
                                    */
                                    try {
                                        con = DataSourceUtils.getConnection(getDataSource());
                                        do {
                                            try {
                                                
                                                nextIdValue = getNextLotStartNumber(con);
                                                con.commit();
                                                break;
                                            } catch (SQLException lSQLException) {
                                                 try {
                                                    if (isDeadLockTimeout(lSQLException)) {
                                                        con.rollback();
                                                        con.setAutoCommit(false);
                                                        retryCount++;
                                                        sleep(sleepTime);
                                                    } else {
                                                        
                                                        throw lSQLException;
                                                        
                                                    }
                                                } catch (SQLException e) {
                                                    throw new DataAccessResourceFailureException("Unable to Rollback", e);
                                                    
                                                }
                                                if (retryCount == retryLimit) {
                                                    LOGGER.error("Retry failed with ErrorCode: " + lSQLException.getErrorCode());
                                                    //Throw system exception
                                                }
                                            }
                                        }  while (retryCount <= retryLimit);
                                    } finally {
                                        DataSourceUtils.releaseConnection(con, getDataSource());
                                    }
                                            
                                    LOGGER.debug("Exit getNextIdLot.");
                                    
                                    return nextIdValue;
                                }
                                
                                /**
                                 * getNextLotStartNumber set auto commit to false and get the next id value from SEQ table.
                                 * 
                                 * @param connection
                                 *            the connection.
                                 * @return the currentValue.
                                 * @throws SQLException
                                 *             the sql exception
                                 */
                                private synchronized long getNextLotStartNumber(Connection connection) throws SQLException {
                            
                                    LOGGER.debug("Enter getNextLotStartNumber.");
                                    
                                    long nextIdValue;
                                    Statement stmt = null;
                                    try {
                                        connection.setAutoCommit(false);
                                        stmt = connection.createStatement();
                                        nextIdValue = getNextIDValue(connection, stmt);
                                    } finally {
                                        JdbcUtils.closeStatement(stmt);
                                    }
                                    
                                    LOGGER.debug("Exit getNextLotStartNumber.");
                                    
                                    return nextIdValue;
                                }
                                
                                /**
                                 * Get the incremented value by using update and select.
                                 * 
                                 * @param connection
                                 *            the connection
                                 * @param statement
                                 *            the statement
                                 * @return the currentValue.
                                 * @throws SQLException
                                 *             the SQLException
                                 */
                                private long getNextIDValue(Connection connection, Statement statement) throws SQLException {
                            
                                    LOGGER.debug("Enter getNextIDValue.");
                                    
                                    ResultSet resultSet = null;
                                    long nextIDValue;
                                    try {
                                        statement.executeUpdate("UPDATE " + getIncrementerName() + " SET ID = ID + 1");
                                        resultSet = statement.executeQuery("select ID from " + getIncrementerName());
                                        if (!resultSet.next()) {
                                            throw new DataAccessResourceFailureException("Fetching identity for " + getIncrementerName()
                                                    + " + failed.");
                                        }
                                        nextIDValue = resultSet.getLong(1);
                                    } finally {
                                        JdbcUtils.closeResultSet(resultSet);
                                    }
                                    
                                    LOGGER.debug("Exit getNextIDValue.");
                                    
                                    return nextIDValue;
                                }
                                
                                
                                /**
                                 * Sleep.
                                 * 
                                 * @param sleepTime
                                 *            the sleep time
                                 */
                                private void sleep(int sleepTime) {
                                    try {
                                        Thread.sleep(sleepTime);
                                    } catch (InterruptedException interruptedException) {
                                        LOGGER.error(interruptedException);
                                    }
                                }
                                
                                /**
                                 * validate the deadlock timeout and dead lock error code.
                                 * 
                                 * @param lSQLException
                                 *            the SQLException
                                 * @return deadLockTimeout indicator
                                 */
                                private boolean isDeadLockTimeout(SQLException lSQLException) {
                            
                                    return (lSQLException.getErrorCode() == deadLockErrorCode
                                            || lSQLException.getErrorCode() == DEADLOCK_ERROR_CODE);
                                }
                            }
                            Arun - I tried to use the class you provided. But, looks like it is not using this class and I am still getting deadlocks. Please let me know the usage of this class.

                            Thanks,

                            Comment

                            Working...
                            X