Announcement Announcement Module
Collapse
No announcement yet.
KeyCollector only for hardcoded queries? Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • KeyCollector only for hardcoded queries?

    Hey SBatchy crowd,

    I would like to use "DrivingQueryItemReader" to provide me key by key from the result set.. well - as designed.

    However it depends on a "KeyCollector" which is a good abstraction, BUT...

    KeyCollector interface dictates all the implementations to retrieve all the keys using:

    Code:
    List retrieveKeys(ExecutionContext executionContext);
    Where "ExecutionContext" is used to restart from a certain state, in case of a failure, or if the state was saved.. well - as designed.

    However that leaves a developer, implementing this method, to only use non-parametrized queries, or hardcode arguments within the method
    (example from "SingleColumnJdbcKeyCollector"):
    Code:
    	public List retrieveKeys(ExecutionContext executionContext) {
    
    		Assert.notNull(executionContext, "The ExecutionContext must not be null");
    
                   ... ... ...
                   ... ... ...
    		return jdbcTemplate.query(sql, keyMapper);
    		}
    	}
    Now let's assume I need to use a "SingleColumnJdbcKeyCollector" for the following query:
    Code:
       SELECT APPLICATION_ID FROM APPLICATION_TABLE WHERE BATCH_STATUS = ? AND CLAIM_STATUS = ?
    -- I can't, because I can't pass the query arguments in.

    Ok, so the next logical step would be to write my own "CustomKeyCollector" that implements "KeyCollector" interface, since the end goal was to use a "DrivingQueryItemReader", right, BUT

    -- I can't. (UPDATE: can't "cleanly", but of course there are options around the limitation)

    Because "retrieveKeys" of "KeyCollector" interface does not take a "collection of arguments" for my query, and these arguments should be configured from the outside of the code (in xml, properties, etc..), and passed into the "retrieveKeys" method as a parameter...

    Am I missing something obvious? Or is "KeyCollector"?

    Thx,

    -- litius
    Last edited by litius; Oct 8th, 2008, 10:41 AM.

  • #2
    One possible way around the limitations you describe might be to use standard Spring property placeholders in configuration and pass in values when application context is created, would that work for you?

    Comment


    • #3
      Hey Robert,

      Thanks for your reply there are several possible solutions that I have:

      1. Write a custom "KeyCollector", have setter for query arguments
      2. Write a custom "KeyCollector", have it take query arguments via Constructor
      3. Extend "SingleColumnJdbcKeyCollector", add a setter for query arguments, and rewrite the "retrieveKeys" method
      4. etc..

      However I believe that:

      1. KeyCollector should be immutable - hence setQueryArguments(Collection qargs) is not a good solution
      2. Setting query arguments is a quite natural behavior, should be available via SpringBatch framework
      3. While constructor injection makes it immutable, I think it is more natural to pass arguments to the "retrieveKeys" method itself - and that is the "actual limitation" that I was talking about.

      Let me know what you guys think,

      -- litius

      Comment


      • #4
        In 2.0 there should be a generic solution to job parameters passing (used for query arguments typically) and the status of KeyCollector is deprecated in 2.0 trunk. Given that I wouldn't spend much time on finding a sound solution especially if it requires interface changes (not possible in 1.1.x).

        You can try using the property placeholders as I suggested previously or implement Step/JobExecutionListener to get access to JobParameters in your custom KeyCollector (I wouldn't worry about immutability too much).

        Comment


        • #5
          Hey Robert,

          Thx for the hint, but wouldn't JobParameters be too global of a container to hold "query arguments" for a single query?

          If it is a multi step job with parametrized writers and readers - would not it be more natural if the actual components (like KeyCollector, in this case, or just an arbitrary DAO) take their own parameters, instead of sticking everything into the "global holder"?

          Thanks Robert

          -- litius

          P.S. KeyCollector is going away in 2.0? What is used in 2.0 for driving query reading strategy?
          Last edited by litius; Oct 8th, 2008, 01:36 PM.

          Comment


          • #6
            Before diving into the actual problem, I want to make a quick note about immutability. A KeyCollector or any other type of DAO, can't really be immutable. By it's very definition it's reaching out to a data store that is very stateful. When working with any dao, this must be considered. It can, however, be stateless within itself, even if it's collaborators aren't. I think this is what you were trying to drive at.

            The first part of this question is: Where should configurable parameters come from?

            In general, with 1.1 the only framework supported feature is to put them into JobParameters. This approach is used with the StepExecutionResourceProxy, and the StepExecutionPreparredStatementSetter (which is used by the JdbcCursorItemReader) Personally, I don't have a problem with this for ItemReaders. If you're using these values to drive your input stream, then changing them should fundamentally mean that it's a different JobInstance, otherwise, restartability will be broken. If you start an instance with file A, it fails, and if you restart it again with file B, obviously, things won't work correctly. The same is true in the database. So, in most cases, the JobParameters work well. However, there are legitimate use cases where someone might need to pull these values from either the job or step level ExecutionContext. Most examples involve one step determining the values for a subsequent step. We've discussed this multiple times, but it's a tricky problem to configure, as we need a key name, and the object you're trying to pull from. It's much easier to provide this programmatically in 1.1, which is basically what's required. However, in 2.0, we're planning on building this type of functionality off of the new expression language features that are part of Spring 3.0.

            The second part of this question/issue is that of lifecycle. The application context has it's own lifecycle for beans, while Spring Batch has it's own around Jobs and Steps. This has been a recurrent problem we've tried to work around many times. Unfortunately, in the 1.1 release, which is tied to Spring 2.5.x, we have no other option but to provide lifecycle hooks to notify an already created object of lifecycle 'events' within the batch framework. This does require you to effectively store the state of the current run within a service that needs these types of things. Although, I'd like to reiterate that pretty much any reader or writer within the batch from work is inherently stateful, we could still do a better job in providing class creation and destruction within the proper lifecycle. However, there are some limitations to doing that cleanly within the current release of Spring. Robert's solution of a property placeholder is about the only way to get one of these objects in at object creation, since it will be replaced before the BeanFactory is finished initializing. In 2.0, we plan on fixing this with some of the new features in Spring 3.0, including the expression language stuff mentioned above, along with a custom scope and annotations, etc.

            One final note I'd like to make is that the DrivingQuery related interfaces are deprecated and are being removed in 2.0. Their replacement is a paging solution that can be used broadly in JPA and Jdbc related senarios.

            Comment


            • #7
              Lucas,

              Thanks for your "detailed thoughts"!

              We are currently using SB 1.0.1, and the chances that we will upgrade to 2.0 are not high. Having said that, as we talked, I have two choices:

              1. Use a DrivingQuery reader with a KeyCollector wrapper

              2. Grab "JdbcPagingItemReader" from 2.0 - it can handle greater data volumes (using sql paging), and it does accept parameters:

              Code:
              	/**
              	 * The parameter values to be used for the query execution.  If you use named parameters then the
              	 * key should be the name used in the query clause.  If you use "?" placeholders then the key should be
              	 * the relative index that the parameter appears in the query string built using the select, from and
              	 * where cluases specified.
              	 *
              	 * @param parameterValues the values keyed by the parameter named/index used in the query string.
              	 */
              	public void setParameterValues(Map<String, Object> parameterValues) {
              		this.parameterValues = parameterValues;
              	}
              I'll explore both - on one hand I am ready for processing greater volume, on the other I am not assisting the framework, and using spring batch jars directly.

              Thanks for your help Lucas & Robert!

              -- litius
              Last edited by litius; Mar 23rd, 2009, 01:39 PM.

              Comment


              • #8
                I just fell in the same trap. Did you explore both options? Which one did you choose? Was it much work moving JdbcPagingItemReader from 2.0?

                Comment


                • #9
                  @Ulrik,

                  It was a bit involved to move "JdbcPagingItemReader" from 2.0, so I went with an easier approach, and created my own KeyCollector implementation, but the one that takes "queryNamedParameters" as a property:

                  Code:
                  public class SingleColumnWithParametersJdbcKeyCollector extends ExecutionContextUserSupport implements KeyCollector 
                  {
                  
                      private static final String RESTART_KEY = "key";
                      
                      private SqlParameterSource queryNamedParameters;
                      private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
                      private String sql;
                      private String restartSql;
                  With the above KeyCollector, I can inject "queryNamedParameters", that are used in "retrieveKeys(ExecutionContext executionContext)" KeyCollector's method:

                  Code:
                  public List<Object> retrieveKeys(ExecutionContext executionContext) {
                  
                  	List<Object> keys; 
                  	List<Map<Object, Object>> keyMapList;
                  
                  	Assert.notNull( executionContext, "The ExecutionContext must not be null" );
                  
                  	if ( executionContext.containsKey( getKey( RESTART_KEY ) ) ) {
                  	    // feed restarting sql to jdbcTemplate
                  	} 
                  	else  {
                  	    keyMapList = m_namedParameterJdbcTemplate.queryForList( sql, queryNamedParameters );
                  	}
                  
                  	keys = new ArrayList<Object>();
                  
                  	for ( Map<Object, Object> keymap: keyMapList ) {
                  	    Set mapKeySet = keymap.keySet();
                  	    
                  	    // should have only one value in the map, since it is a key per row
                  	    if ( mapKeySet.size() != 1 ) {
                  		// if not,
                  		throw SomeException(...)
                  	    }
                  		       
                  	    for ( Object mapkey: mapKeySet ) {
                  		keys.add( keymap.get( mapkey ) );
                  	    }            
                  	}
                  
                  	return keys;
                  }
                  Now it can be used in a desired fashion:

                  Code:
                  <property name="keyCollector">
                  	<bean class="org.some.package.SingleColumnWithParametersJdbcKeyCollector">
                  		<property name="dataSource" ref="dataSource" />
                  		<property name="sql"> 
                  			<value>
                  				SELECT something
                  				  FROM sometable
                  				  WHERE status1 = :some_status1 AND status2 = :some_status2
                  			</value>
                  		</property>						
                  		<property name="queryNamedParametersMap">
                  			<util:map>
                  			    <entry key="some_status1">
                  			    	<util:constant static-field="org.some.package.Status.PICK_ME_UP"/>
                  			    </entry>
                  			    <entry key="some_status2">
                  			    	<util:constant static-field="org.some.package.Status.ME_TOO"/>
                  			    </entry>
                  			</util:map>
                  		</property>				
                  	</bean>
                  </property>
                  And then later on when I have time to update everything to 2.0, I will But for now this solution works. Let me know if you have any questions,

                  Good Luck,
                  -- litius

                  Comment


                  • #10
                    I modified your code a little, mainly to get the restart to work. I also chose to use an existing RowMapper rather than implement that myself.

                    The restart scenario exposes what I believe is a flaw in SqlParameterSource: there is no way to transparently add a value to it or get hold of all values in it. This is necessary in order to add the key after a restart. If you know the SqlParameterSource is Map-based, you can cast it and put a value. If you know it's bean-based, you can cast it and get all values, and then copy to a new Map-based, which you can then put to. I would have liked to see SqlParameterSource implement the Map interface, or at least provide a way to get all entries.

                    Anyway, here is my take on your code:

                    Code:
                    package example.item.reader;
                    
                    import java.util.HashMap;
                    import java.util.List;
                    import java.util.Map;
                    
                    import javax.sql.DataSource;
                    
                    import org.springframework.batch.item.ExecutionContext;
                    import org.springframework.batch.item.database.KeyCollector;
                    import org.springframework.batch.item.util.ExecutionContextUserSupport;
                    import org.springframework.jdbc.core.RowMapper;
                    import org.springframework.jdbc.core.SingleColumnRowMapper;
                    import org.springframework.jdbc.core.namedparam.BeanPropertySqlParameterSource;
                    import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
                    import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
                    import org.springframework.jdbc.core.namedparam.SqlParameterSource;
                    import org.springframework.util.Assert;
                    import org.springframework.util.ClassUtils;
                    import org.springframework.util.StringUtils;
                    
                    /**
                     * Parameterized {@link KeyCollector} implementation that supports restart.
                     * 
                     * @author Ulrik Sandberg
                     * @author "litius"
                     */
                    public class SingleColumnWithParametersJdbcKeyCollector extends ExecutionContextUserSupport implements KeyCollector {
                    
                    	private static final String RESTART_KEY = "key";
                    
                    	private SqlParameterSource queryNamedParameters;
                    
                    	private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
                    
                    	private String sql;
                    
                    	private String restartSql;
                    
                    	private String restartIdName;
                    
                    	private RowMapper keyMapper = new SingleColumnRowMapper();
                    
                    	public SingleColumnWithParametersJdbcKeyCollector() {
                    		setName(ClassUtils.getShortName(SingleColumnWithParametersJdbcKeyCollector.class));
                    	}
                    
                    	/**
                    	 * Constructs a new instance using the provided namedParameterJdbcTemplate
                    	 * and string representing the sql statement that should be used to retrieve
                    	 * keys.
                    	 * 
                    	 * @param namedParameterJdbcTemplate
                    	 * @param sql
                    	 * @throws IllegalArgumentException if jdbcTemplate is null.
                    	 * @throws IllegalArgumentException if sql string is empty or null.
                    	 */
                    	public SingleColumnWithParametersJdbcKeyCollector(NamedParameterJdbcTemplate namedParameterJdbcTemplate, String sql) {
                    		this();
                    		Assert.notNull(namedParameterJdbcTemplate, "namedParameterJdbcTemplate must not be null.");
                    		Assert.hasText(sql, "The sql statement must not be null or empty.");
                    		this.namedParameterJdbcTemplate = namedParameterJdbcTemplate;
                    		this.sql = sql;
                    	}
                    
                    	/*
                    	 * (non-Javadoc)
                    	 * 
                    	 * @see org.springframework.batch.io.driving.KeyGenerationStrategy#retrieveKeys()
                    	 */
                    	public List retrieveKeys(ExecutionContext executionContext) {
                    
                    		Assert.notNull(executionContext, "The ExecutionContext must not be null");
                    
                    		if (executionContext.containsKey(getKey(RESTART_KEY))) {
                    			Assert.state(StringUtils.hasText(restartSql), "The restart sql query must not be null or empty"
                    					+ " in order to restart.");
                    			Object storedKey = executionContext.get(getKey(RESTART_KEY));
                    			MapSqlParameterSource mapSqlParameterSource = getModifiedParameterSource(storedKey);
                    			return namedParameterJdbcTemplate.query(restartSql, mapSqlParameterSource, keyMapper);
                    		}
                    		else {
                    			return namedParameterJdbcTemplate.query(sql, queryNamedParameters, keyMapper);
                    		}
                    	}
                    
                    	private MapSqlParameterSource getModifiedParameterSource(Object storedKey) {
                    		MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource();
                    		if (queryNamedParameters instanceof MapSqlParameterSource) {
                    			Map values = ((MapSqlParameterSource) queryNamedParameters).getValues();
                    			mapSqlParameterSource.addValues(values);
                    			mapSqlParameterSource.addValue(restartIdName, storedKey);
                    		}
                    		else if (queryNamedParameters instanceof BeanPropertySqlParameterSource) {
                    			BeanPropertySqlParameterSource beanPropertySqlParameterSource = (BeanPropertySqlParameterSource) queryNamedParameters;
                    			String[] names = beanPropertySqlParameterSource.getReadablePropertyNames();
                    			HashMap values = new HashMap();
                    			for (int i = 0; i < names.length; i++) {
                    				Object value = beanPropertySqlParameterSource.getValue(names[i]);
                    				values.put(names[i], value);
                    			}
                    			mapSqlParameterSource.addValues(values);
                    			mapSqlParameterSource.addValue(restartIdName, storedKey);
                    		}
                    		else {
                    			throw new IllegalArgumentException("Unsupported SqlParameterSource implementation: "
                    					+ ClassUtils.getQualifiedName(queryNamedParameters.getClass()));
                    		}
                    		return mapSqlParameterSource;
                    	}
                    
                    	/**
                    	 * Get the restart data representing the last processed key.
                    	 * 
                    	 * @throws IllegalArgumentException if key is null.
                    	 */
                    	public void updateContext(Object key, ExecutionContext executionContext) {
                    		Assert.notNull(key, "The key must not be null.");
                    		Assert.notNull(executionContext, "The ExecutionContext must not be null");
                    		executionContext.put(getKey(RESTART_KEY), key);
                    	}
                    
                    	/*
                    	 * (non-Javadoc)
                    	 * 
                    	 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
                    	 */
                    	public void afterPropertiesSet() throws Exception {
                    		Assert.notNull(namedParameterJdbcTemplate, "JdbcTemplate must not be null.");
                    		Assert.hasText(sql, "The DrivingQuery must not be null or empty.");
                    	}
                    
                    	/**
                    	 * Set the {@link Map} to be used to create the {@link SqlParameterSource}.
                    	 * 
                    	 * @param map
                    	 */
                    	public void setQueryNamedParametersMap(Map map) {
                    		this.queryNamedParameters = new MapSqlParameterSource(map);
                    	}
                    
                    	/**
                    	 * Set the {@link RowMapper} to be used to map each key to an object.
                    	 * 
                    	 * @param keyMapper
                    	 */
                    	public void setKeyMapper(RowMapper keyMapper) {
                    		this.keyMapper = keyMapper;
                    	}
                    
                    	/**
                    	 * Set the SQL statement to be used to return the keys to be processed.
                    	 * 
                    	 * @param sql
                    	 */
                    	public void setSql(String sql) {
                    		this.sql = sql;
                    	}
                    
                    	/**
                    	 * Set the SQL query to be used to return the remaining keys to be
                    	 * processed.
                    	 * 
                    	 * @param restartSql
                    	 */
                    	public void setRestartSql(String restartSql) {
                    		this.restartSql = restartSql;
                    	}
                    
                    	/**
                    	 * Set the parameter name used in the restart query. Useful for supporting
                    	 * restart, where the parameter source needs to be updated with a new id
                    	 * that is read from the ExecutionContext.
                    	 * 
                    	 * @param idName name of the identifier used in the restartSql
                    	 */
                    	public void setRestartIdName(String idName) {
                    		this.restartIdName = idName;
                    	}
                    
                    	/**
                    	 * Set the {@link DataSource} to be used.
                    	 * 
                    	 * @param dataSource
                    	 */
                    	public void setDataSource(DataSource dataSource) {
                    		this.namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
                    	}
                    }

                    Comment


                    • #11
                      as long as it solves your problem

                      I did not need the restart, because I want all my jobs re-runnable rather then restartable - just a general style - makes it easier on working with remote systems / complex flat files / non transactional resources / etc...

                      Also I would not invest too much in "DrivingQueryItemReader", since it is "no longer with us" in subsequent versions of spring batch.

                      But again, glad it is working for you,

                      -- litius

                      Comment

                      Working...
                      X