Announcement Announcement Module
Collapse
No announcement yet.
Switch Atomikos Connection Pool Url Naming Exception Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Switch Atomikos Connection Pool Url Naming Exception

    Hello,

    The need: switch an Atomikos Connection pool bean's url dynamically. There is a default bean with pool settings etc in xml config file.

    I tested the code using the following:

    Code:
    @Test
    public void testJob() throws Exception {
    	AtomikosDataSourceBean dsb = (AtomikosDataSourceBean) context.getBean("uymDS");
    	SwitchAtomikosDataSource uymSwitch = (SwitchAtomikosDataSource) context.getBean("switchAtomikosDataSource");
    
    	// Default url
    	Connection conn = null;
    	Statement stmt = null;
    	ResultSet rs = null;
    	try {
    		conn = dsb.getConnection();
    		stmt = conn.createStatement();
    		System.out.println("before 1st db:  " + dsb.getXaProperties().getProperty("url"));
    		rs = stmt.executeQuery("SELECT id FROM file");
    		while (rs.next()) {
    			System.out.println("file id " + rs.getInt("id"));
    		}
    		stmt.close();
    	} finally {
    		conn.close();
    	}
    		
    	String oldUrl = dsb.getXaProperties().getProperty("url");
    	String newUrl = "JTS";
    		
    	// Check if no need to switch
    	if (uymSwitch.needToSwitchDataSource(oldUrl, newUrl)) {
    		dsb = uymSwitch.getNewUymDataSource(dsb, newUrl);
    	}
    
    	try {
    		conn = dsb.getConnection();
    		stmt = conn.createStatement();
    		System.out.println("before 2nd db:  " + dsb.getXaProperties().getProperty("url"));
    		rs = stmt.executeQuery("SELECT id FROM file");
    		while (rs.next()) {
    			System.out.println("file id " + rs.getInt("id"));
    		}
    		stmt.close();
    	} finally {
    		conn.close();
    	}
    }
    and it worked.

    I then setup an aop implementation that gets the needed oldurl from the datasource and newurl from a stepExecutionListener. The idea being that I get the first index of the stepExecutionListener array object in the aop proxy - close (if needed) and initialize with new url and the jdbcTemplate is none the wiser...

    I then run the Spring Batch job and get the following:

    Code:
    INFO : com.atomikos.icatch.imp.BaseTransactionManager - createCompositeTransaction ( 300000 ): created new ROOT transaction with id tmdev0005400236
    INFO : com.atomikos.jdbc.AbstractDataSourceBean - AtomikosDataSoureBean 'uymDS': close...
    dbName is: MFS_2013_Feb
    url is: jdbc:mysql://xxx.xxx.xxx.xxx:xxxx/MFS_2013_Feb?zeroDateTimeBehavior=convertToNull
    INFO : com.atomikos.jdbc.AbstractDataSourceBean - AtomikosDataSoureBean 'uymDS': init...
    INFO : com.atomikos.jdbc.AtomikosDataSourceBean - AtomikosDataSoureBean 'uymDS': initializing with [ xaDataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlXADataSource, uniqueResourceName=uymDS, maxPoolSize=2, minPoolSize=2, borrowConnectionTimeout=30, maxIdleTime=60, reapTimeout=0, maintenanceInterval=60, testQuery=select 1, xaProperties=[autoReconnect=true,user=xxx,url=jdbc:mysql://xxx.xxx.xxx.xxx:xxxx/MFS_2013_Feb?zeroDateTimeBehavior=convertToNull,password=xxx,autoReconnectForPools=true,autoReconnectForConnectionPools=true,pinGlobalTxToPhysicalConnection=true] loginTimeout=0]
    INFO : com.atomikos.icatch.imp.TransactionServiceImp - Coordinator not recoverable: tmdev0005400219
    INFO : com.atomikos.datasource.xa.XATransactionalResource - uymDS: refreshed XAResource
    INFO : com.k12knowledge.aop.SwitchDataSourceImpl - datasource switched from jdbc:mysql://xxx.xxx.xxx.xxx:xxxx/TEST_2012_Nov?zeroDateTimeBehavior=convertToNull to : jdbc:mysql://xxx.xxx.xxx.xxx:xxxx/MFS_2013_Feb?zeroDateTimeBehavior=convertToNull
    INFO : com.atomikos.jdbc.AbstractDataSourceBean - AtomikosDataSoureBean 'uymDS': getConnection ( null )...
    INFO : com.atomikos.jdbc.AbstractDataSourceBean - AtomikosDataSoureBean 'uymDS': init...
    WARN : com.atomikos.jdbc.AtomikosSQLException - Cannot initialize AtomikosDataSourceBean
    javax.naming.NamingException: Another resource already exists with name uymDS - pick a different name
    [The 'createCompositeTransaction' and 'Coordinator not recoverable' are two lines I don't see on my test run.]

    I have dug around in the Atomikos code and confirmed that Atomikos is removing the named resource on close.
    I have tried setting lazy-init="false" with no change.
    I have tried altering the properties file url on close thinking the init will re-read the properties file on init - no change.

    Kinda want this Switching Datasources Dynamically at runtime (I understand boneCP is not XA Transaction compliant)

    Any suggestions / pointers?

    Thank you,
    Peter
    Last edited by pgtips; Feb 5th, 2013, 10:31 AM.

  • #2
    How is this being used within the scope of a job? My gut is to say "don't do that", but without seeing what you are trying to attempt within the scope of this job I can't be sure.

    Comment


    • #3
      Thank you for responding.

      Through earlier tasklets that retrieve files, unzip if necessary, I arrive with an arraylist of file objects at the ItemReader stage. Each file may go to different databases to be inserted or it may be multiplies going to the same database. I don't know before hand what files are coming in for what databases. (New databases are created on the 1st of each month which means large numbers of dbs that I don't want to have to declare in a config file (and may not know) adding to the dynamic nature) I thought if I set up default settings for the Atomikos pool (username, password etc are the same) I could reset the pool before the actual writing by the jdbcTemplate.

      I want (I think) the dynamic Dynamic DataSource Routing with no already determined database sources.

      I think I will try the above approach using Atomikos and known databases and see if it works like that and then try and make it dynamic. Will post back what I found.

      In the meantime still welcome input / correction.

      Thank you.

      Comment


      • #4
        Just to understand, the step in question is to pipe files into multiple databases, correct? If that is the case, why not define connections to all of the potential databases and use the ClassifierCompositeItemWriter? This ItemWriter allows you to delegate to the appropriate ItemWriter (in your case, each configured to point to a new database) based on a classifier of some kind.

        Comment


        • #5
          Yes - you are correct.

          When you say 'define connections to all of the potential databases' are you meaning define them in a bean context file or on the fly? Currently if the database does not exist an earlier step before the reader/processor/writer step creates the db and accompanying schema, so no record of it in the context file.

          If on the fly - create and store the connections and subsequent ItemWriters in the stepExecutionerListener? I think I only need to change one thing - the datasource. Say the datasource (ds1) points to ABC_Feb_2013. I have my Dao injected with (ds1) but before the dao.insert() / dao.update() I change (ds1) to point to BCD_Feb_2013. Dao is none the wiser, 'blindly' inserts and am thinking, I have been Spring like in making it very loose.

          Admit to being fuzzy - as much as I have jumped into using Spring (and the bits I get - love it!) I am of the impression - you must know all beans up front e.g datasources. So with what I have seen (Dynamic DataSource Routing) in our case I have 150 databases and on the 1st of Feb 30 more were created. I now need to add 30 more entries in the datasource-context.xml. I then need to add 30 more entries to the enum type and re-deploy. Doesn't make sense and pretty sure that is my own doing...

          I am reading up on ClassifierCompositeItemWriter in Spring Batch In Action - has an example.

          Thank you,

          Comment


          • #6
            So where does the connection information come from? Are you saying that the database connection information for all databases to be written to are not known at the start of the job? If that is the case, I would advocate for creating a job that does the db creation and another one that runs after it for the inserting into those databases so that all of the connection information is available at startup.

            Comment


            • #7
              'Database connection information for all databases to be written to are not known at the start of the job?' - Yes.

              The connection information is parsed from part of the file name and a lookup in a master database (master database is known at the start of the job).

              So job1 creates a job2 specific datasource-context.xml and job2 specific enum file using this idea Property File as Enum and then I could use the Dynamic DataSource Routing methodology in job2....

              The thinking is sound, no?

              Comment


              • #8
                Job 1 wouldn't need to create a context.xml file. I would use a factory bean to create the ItemWriter that needs all of the database connections and encapsulate all of that in there. In that factory bean you could create the datasources, the ClassifierCompositeItemWriter, etc. While still less than ideal, it's probably the best solution I can think of for having to deal with over 150 databases...

                In case you can't tell, I'm not a big fan of changing data sources out from under the classes using it. Different classes do different things with those connections so unless you are 100% certian you understand exactly how those connections are being used, it sounds like a dangerous approach.

                Comment


                • #9
                  Okay - will look into the factory bean approach as I am far from certain how the connections are being used.

                  Thank you for your input and time.

                  Comment


                  • #10
                    Sorry - I am out of my depth as to how to go about doing what you recommend.

                    This is what I have so far...

                    Code:
                    public interface ItemWriterService {
                    	BasicDataSource getDataSource(Properties prop) ;
                    	ClassifierCompositeItemWriter<ItemWriter<Tran>> getItemWriter();
                    }
                    
                    public class ItemWriterServiceImpl implements ItemWriterService {
                    
                    @Override
                    public BasicDataSource getDataSource(Properties prop) {
                    	// Set up apache dbcp datasource udyDS
                    	return udyDs;
                    }
                    
                    @Override
                    public ClassifierCompositeItemWriter<ItemWriter<Tran>> getItemWriter() {
                     // don't know yet
                    }
                    
                    public class ItemWriterFactoryBean implements FactoryBean<ItemWriterService>, ApplicationContextAware {
                    	private ApplicationContext applicationContext;
                    
                    	@Override
                    	public ItemWriterService getObject() throws Exception {
                    		ItemWriterService iws = null;
                    		
                    		iws = (ItemWriterService) applicationContext.getBean("itemWriterServiceImpl");
                    		return iws;
                    	}
                    
                    	@Override
                    	public Class<?> getObjectType() {
                    		return ItemWriterService.class;
                    	}
                    
                    	@Override
                    	public boolean isSingleton() {
                    		return false;
                    	}
                    	// get & set applicationContext
                    	}
                    
                    }
                    
                    <bean id="itemWriterFactoryBean" class="com.xxx.factory.itemwriter.ItemWriterFactoryBean" />
                    	
                    <bean id="itemWriterService" factory-bean="&amp;itemWriterFactoryBean" factory-method="getObject" />
                    	
                    <bean id="itemWriterServiceImpl" class="com.xxx.factory.itemwriter.ItemWriterServiceImpl" />
                    
                    
                    // Check DataSource switching working (it is)... 
                    public class ItemWriterBeanDemo {
                    
                    	public static void main(String[] args) {
                    		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("launch-context.xml");
                    
                    		ItemWriterService iws = (ItemWriterService) applicationContext.getBean("itemWriterService");
                    		Properties prop = new Properties();
                    		prop.setProperty("urldb", "AFS_2013_Feb");
                    		Connection conn = null;
                    		try {
                    			conn = iws.getDataSource(prop).getConnection();
                    
                    			Statement stmt = conn.createStatement();
                    			String query = "SELECT * from tran";
                    			ResultSet rs = stmt.executeQuery(query);
                    
                    			while (rs.next()) {
                    				System.out.println("id: " + rs.getInt("id"));
                    			}
                    
                    			rs.close();
                    			stmt.close();
                    
                    			prop.setProperty("urldb", "BCC_2013_Feb");
                    			conn = iws.getDataSource(prop).getConnection();
                    
                    			stmt = conn.createStatement();
                    			rs = stmt.executeQuery(query);
                    
                    			while (rs.next()) {
                    				System.out.println("id: " + rs.getInt("id"));
                    			}
                    
                    			rs.close();
                    			stmt.close();
                    
                    		} catch (SQLException e) {
                    			e.printStackTrace();
                    		} finally {
                    			try {
                    				conn.close();
                    			} catch (SQLException e) {
                    				e.printStackTrace();
                    			}
                    		}
                    
                    		System.out.println(iws.getDataSource(prop).getDriverClassName());
                    	}
                    }
                    
                    public class TranItemWriter implements Classifier<Tran, ItemWriter<Tran>> {
                    	private ItemWriter<Tran> jdbcItemWriter1; // for ds1
                    	private ItemWriter<Tran> jdbcItemWriter2; // for ds2
                    	private ItemWriter<Tran> jdbcItemWriter3; // for ds3
                    	private ItemWriter<Tran> jdbcItemWriter4; // for ds4
                    	private ItemWriter<Tran> jdbcItemWriter5; // for ds5
                    	private ItemWriter<Tran> jdbcItemWriter6; // for ds6
                    	private ItemWriter<Tran> jdbcItemWriter7; // for ds7
                    	private ItemWriter<Tran> jdbcItemWriter8; // for ds8
                    	private ItemWriter<Tran> jdbcItemWriter9; // for ds9
                    	private ItemWriter<Tran> jdbcItemWriter10; // for ds10
                    	
                    	@Override
                    	public ItemWriter<Tran> classify(Tran tran) {
                    		if ("AFS".equals(tran.getManufAbbr())) {
                    			return jdbcItemWriter1;
                    		} else if ("BCC".equals(tran.getManufAbbr())) {
                    			return jdbcItemWriter2;
                    		// etc....
                    		} else {
                    			return null;
                    		}
                    	}
                    }
                    The things I know I don't get is:

                    a) Full grasp of the Factory Bean concept - I am getting a ItemWriterService bean back. What's a Factory Bean states the returned instance is transient - Spring knows nothing about what you have returned from getObject(), and will make no attempt to exercise any lifecycle hooks or anything else on it. So I have a bean instance of ItemWriterService but how does Spring know about the datasource(s)?

                    b) Does that mean Spring exercises no transactional control in my batch job chunk steps and if so I get around that by using @Transactional?

                    c) Does that mean I am responsible for opening / closing Connections etc., inside the ItemWriter.write method?

                    d) Can I not create (don't know how mind you) the jdbcItemWriter1 - 10 dynamically. I was thinking with job1 getting the manufacturers - let's say 5 in this instance. I would 'magically' create 5 instances of jdbcItemWriter tied to their respective manufacturer datasources. (I really don't want to hard code a bunch of if statements for current Manufacturer Abbreviations inside the Classifier class). I see a mapping option in the xml so 'should' be able to dynamically create them, again don't know how... and in the ItemWriter write method do something like:

                    Code:
                    @Override
                    public void write(List<? extends FileObject> files) throws Exception {
                    	
                    	for (FileObject file : files) {
                    		if (file.getManufAbbr().equals(jdbcItemWriter.getManufAbbr())) { // must match file to correct db
                    			long newId = dao.insertRecord(file); // insert file details
                    			dao.updateFile(newId, file); // insert actual file
                    		}
                    	}
                    }
                    Though I hesitate to ask, could you psuedo-code the objects and flow or point me to some code that includes the necessary steps that I can look at to get an idea? I don't know how it all fits together...

                    Thank you,

                    Comment


                    • #11
                      Quick question...how are you planning on handling transactions across these connections? Are you using XA?

                      Comment


                      • #12
                        Yes - Atomikos. Just used dbcp in my example to try and keep things simple.

                        Comment


                        • #13
                          Just a FYI for anybody thinking of doing something similar - I ended up going with the following:

                          Rather than committing to a dynamic database with XA (which I couldn't get to work with Atomikos) - created a RabbitMQ message with database details included in message.
                          Message sent to Manufacturer queue.
                          Consumer (local) bound to this queue reads message and configures database connection using boneCP (see boneCP blog for how to do this).
                          Message translated to domain object and inserted with jdbcTemplate.

                          Comment

                          Working...
                          X