diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index df183a0..04434e6 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -25,15 +25,21 @@ import java.sql.Statement; import java.util.Properties; +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.shims.ShimLoader; /** * Utility methods for creating and destroying txn database/schema. - * Placed here in a separate class so it can be shared across unit tests. + * Placed here in a separate class so it can be shared across unit tests. Methods in this class + * are not intended for use outside of tests. They may not perform well. */ +@VisibleForTesting public final class TxnDbUtil { + static final private Log LOG = LogFactory.getLog(TxnDbUtil.class.getName()); private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; private TxnDbUtil() { @@ -50,6 +56,7 @@ private TxnDbUtil() { public static void setConfValues(HiveConf conf) { conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, TXN_MANAGER); conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); + if (externalDbForTesting()) setExternalDb(conf); } public static void prepDb() throws Exception { @@ -58,10 +65,13 @@ public static void prepDb() throws Exception { // out of date with it. I'm open to any suggestions on how to make this // read the file in a build friendly way. + if (externalDbForTesting()) return; + + HiveConf conf = getConf(); Connection conn = null; Statement stmt = null; try { - conn = getConnection(); + conn = getConnection(conf); stmt = conn.createStatement(); stmt.execute("CREATE TABLE TXNS (" + " TXN_ID bigint PRIMARY KEY," + @@ -125,25 +135,47 @@ public static void prepDb() throws Exception { public static void cleanDb() throws Exception { Connection conn = null; Statement stmt = null; + HiveConf conf = getConf(); + conn = getConnection(conf); + stmt = conn.createStatement(); try { - conn = getConnection(); - stmt = conn.createStatement(); + if (externalDbForTesting()) { + // In this case we need to clean the tables, otherwise all of the unit tests (which depend + // on starting at txn 1) will get unexpected results. + try { + stmt.execute("delete from TXN_COMPONENTS"); + stmt.execute("delete from COMPLETED_TXN_COMPONENTS"); + stmt.execute("delete from HIVE_LOCKS"); + stmt.execute("delete from TXNS"); + stmt.execute("delete from COMPACTION_QUEUE"); + stmt.execute("update NEXT_TXN_ID set NTXN_NEXT = 1"); + stmt.execute("update NEXT_LOCK_ID set NL_NEXT = 1"); + stmt.execute("update NEXT_COMPACTION_QUEUE_ID set NCQ_NEXT = 1"); + } catch (Exception e) { + LOG.error("Caught exception of type " + e.getClass().getName() + " with msg: " + + e.getMessage()); + } - // We want to try these, whether they succeed or fail. - try { - stmt.execute("DROP INDEX HL_TXNID_INDEX"); - } catch (Exception e) { - System.err.println("Unable to drop index HL_TXNID_INDEX " + e.getMessage()); - } + } else { + conn = getConnection(conf); + stmt = conn.createStatement(); + + // We want to try these, whether they succeed or fail. + try { + stmt.execute("DROP INDEX HL_TXNID_INDEX"); + } catch (Exception e) { + System.err.println("Unable to drop index HL_TXNID_INDEX " + e.getMessage()); + } - dropTable(stmt, "TXN_COMPONENTS"); - dropTable(stmt, "COMPLETED_TXN_COMPONENTS"); - dropTable(stmt, "TXNS"); - dropTable(stmt, "NEXT_TXN_ID"); - dropTable(stmt, "HIVE_LOCKS"); - dropTable(stmt, "NEXT_LOCK_ID"); - dropTable(stmt, "COMPACTION_QUEUE"); - dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID"); + dropTable(stmt, "TXN_COMPONENTS"); + dropTable(stmt, "COMPLETED_TXN_COMPONENTS"); + dropTable(stmt, "TXNS"); + dropTable(stmt, "NEXT_TXN_ID"); + dropTable(stmt, "HIVE_LOCKS"); + dropTable(stmt, "NEXT_LOCK_ID"); + dropTable(stmt, "COMPACTION_QUEUE"); + dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID"); + } conn.commit(); } finally { @@ -171,9 +203,10 @@ public static int countLockComponents(long lockId) throws Exception { Connection conn = null; PreparedStatement stmt = null; ResultSet rs = null; + HiveConf conf = getConf(); try { - conn = getConnection(); - stmt = conn.prepareStatement("SELECT count(*) FROM hive_locks WHERE hl_lock_ext_id = ?"); + conn = getConnection(conf); + stmt = conn.prepareStatement("SELECT count(*) FROM HIVE_LOCKS WHERE hl_lock_ext_id = ?"); stmt.setLong(1, lockId); rs = stmt.executeQuery(); if (!rs.next()) { @@ -189,10 +222,11 @@ public static int findNumCurrentLocks() throws Exception { Connection conn = null; Statement stmt = null; ResultSet rs = null; + HiveConf conf = getConf(); try { - conn = getConnection(); + conn = getConnection(conf); stmt = conn.createStatement(); - rs = stmt.executeQuery("select count(*) from hive_locks"); + rs = stmt.executeQuery("select count(*) from HIVE_LOCKS"); if (!rs.next()) { return 0; } @@ -202,8 +236,30 @@ public static int findNumCurrentLocks() throws Exception { } } - private static Connection getConnection() throws Exception { + private static HiveConf getConf() { HiveConf conf = new HiveConf(); + if (externalDbForTesting()) setExternalDb(conf); + return conf; + } + + private static boolean externalDbForTesting() { + // This allows the tester to set an environment variable and have the tests use an external + // db connection instead of the internal derby for transactions. The user has to set up the + // database in the external store. The actual values are set in setConfValues(). + return System.getenv("METASTORE_CONNECTION_DRIVER") != null; + } + + private static void setExternalDb(HiveConf conf) { + conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, + System.getenv("METASTORE_CONNECTION_DRIVER")); + conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, + System.getenv("METASTORECONNECTURLKEY")); + conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, + System.getenv("METASTORE_CONNECTION_USER_NAME")); + conf.setVar(HiveConf.ConfVars.METASTOREPWD, System.getenv("METASTOREPWD")); + } + + private static Connection getConnection(HiveConf conf) throws Exception { String jdbcDriver = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER); Driver driver = (Driver) Class.forName(jdbcDriver).newInstance(); Properties prop = new Properties(); @@ -213,7 +269,9 @@ private static Connection getConnection() throws Exception { ShimLoader.getHadoopShims().getPassword(conf, HiveConf.ConfVars.METASTOREPWD.varname); prop.setProperty("user", user); prop.setProperty("password", passwd); - return driver.connect(driverUrl, prop); + Connection conn = driver.connect(driverUrl, prop); + conn.setAutoCommit(false); + return conn; } private static void closeResources(Connection conn, Statement stmt, ResultSet rs) {