diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 3ab5827..a5f84f0 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -48,121 +48,135 @@ public static void prepDb() throws Exception { // intended for creating derby databases, and thus will inexorably get // out of date with it. I'm open to any suggestions on how to make this // read the file in a build friendly way. - Connection conn = getConnection(); - Statement s = conn.createStatement(); - s.execute("CREATE TABLE TXNS (" + - " TXN_ID bigint PRIMARY KEY," + - " TXN_STATE char(1) NOT NULL," + - " TXN_STARTED bigint NOT NULL," + - " TXN_LAST_HEARTBEAT bigint NOT NULL," + - " TXN_USER varchar(128) NOT NULL," + - " TXN_HOST varchar(128) NOT NULL)"); + Connection conn = null; + boolean committed = false; + try { + conn = getConnection(); + Statement s = conn.createStatement(); + s.execute("CREATE TABLE TXNS (" + + " TXN_ID bigint PRIMARY KEY," + + " TXN_STATE char(1) NOT NULL," + + " TXN_STARTED bigint NOT NULL," + + " TXN_LAST_HEARTBEAT bigint NOT NULL," + + " TXN_USER varchar(128) NOT NULL," + + " TXN_HOST varchar(128) NOT NULL)"); - s.execute("CREATE TABLE TXN_COMPONENTS (" + - " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," + - " TC_DATABASE varchar(128) NOT NULL," + - " TC_TABLE varchar(128)," + - " TC_PARTITION varchar(767))"); - s.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" + - " CTC_TXNID bigint," + - " CTC_DATABASE varchar(128) NOT NULL," + - " CTC_TABLE varchar(128)," + - " CTC_PARTITION varchar(767))"); - s.execute("CREATE TABLE NEXT_TXN_ID (" + - " NTXN_NEXT bigint NOT NULL)"); - s.execute("INSERT INTO NEXT_TXN_ID VALUES(1)"); - s.execute("CREATE TABLE HIVE_LOCKS (" + - " HL_LOCK_EXT_ID bigint NOT NULL," + - " HL_LOCK_INT_ID bigint NOT NULL," + - " HL_TXNID bigint," + - " HL_DB varchar(128) NOT NULL," + - " HL_TABLE varchar(128)," + - " HL_PARTITION varchar(767)," + - " HL_LOCK_STATE char(1) NOT NULL," + - " HL_LOCK_TYPE char(1) NOT NULL," + - " HL_LAST_HEARTBEAT bigint NOT NULL," + - " HL_ACQUIRED_AT bigint," + - " HL_USER varchar(128) NOT NULL," + - " HL_HOST varchar(128) NOT NULL," + - " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))"); - s.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)"); + s.execute("CREATE TABLE TXN_COMPONENTS (" + + " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," + + " TC_DATABASE varchar(128) NOT NULL," + + " TC_TABLE varchar(128)," + + " TC_PARTITION varchar(767))"); + s.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" + + " CTC_TXNID bigint," + + " CTC_DATABASE varchar(128) NOT NULL," + + " CTC_TABLE varchar(128)," + + " CTC_PARTITION varchar(767))"); + s.execute("CREATE TABLE NEXT_TXN_ID (" + + " NTXN_NEXT bigint NOT NULL)"); + s.execute("INSERT INTO NEXT_TXN_ID VALUES(1)"); + s.execute("CREATE TABLE HIVE_LOCKS (" + + " HL_LOCK_EXT_ID bigint NOT NULL," + + " HL_LOCK_INT_ID bigint NOT NULL," + + " HL_TXNID bigint," + + " HL_DB varchar(128) NOT NULL," + + " HL_TABLE varchar(128)," + + " HL_PARTITION varchar(767)," + + " HL_LOCK_STATE char(1) NOT NULL," + + " HL_LOCK_TYPE char(1) NOT NULL," + + " HL_LAST_HEARTBEAT bigint NOT NULL," + + " HL_ACQUIRED_AT bigint," + + " HL_USER varchar(128) NOT NULL," + + " HL_HOST varchar(128) NOT NULL," + + " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))"); + s.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)"); - s.execute("CREATE TABLE NEXT_LOCK_ID (" + - " NL_NEXT bigint NOT NULL)"); - s.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)"); + s.execute("CREATE TABLE NEXT_LOCK_ID (" + + " NL_NEXT bigint NOT NULL)"); + s.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)"); - s.execute("CREATE TABLE COMPACTION_QUEUE (" + - " CQ_ID bigint PRIMARY KEY," + - " CQ_DATABASE varchar(128) NOT NULL," + - " CQ_TABLE varchar(128) NOT NULL," + - " CQ_PARTITION varchar(767)," + - " CQ_STATE char(1) NOT NULL," + - " CQ_TYPE char(1) NOT NULL," + - " CQ_WORKER_ID varchar(128)," + - " CQ_START bigint," + - " CQ_RUN_AS varchar(128))"); + s.execute("CREATE TABLE COMPACTION_QUEUE (" + + " CQ_ID bigint PRIMARY KEY," + + " CQ_DATABASE varchar(128) NOT NULL," + + " CQ_TABLE varchar(128) NOT NULL," + + " CQ_PARTITION varchar(767)," + + " CQ_STATE char(1) NOT NULL," + + " CQ_TYPE char(1) NOT NULL," + + " CQ_WORKER_ID varchar(128)," + + " CQ_START bigint," + + " CQ_RUN_AS varchar(128))"); - s.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)"); - s.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)"); + s.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)"); + s.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)"); - conn.commit(); - conn.close(); + conn.commit(); + committed = true; + } finally { + if (!committed) conn.rollback(); + conn.close(); + } } public static void cleanDb() throws Exception { - Connection conn = getConnection(); - Statement s = conn.createStatement(); - // We want to try these, whether they succeed or fail. + Connection conn = null; + boolean committed = false; try { - s.execute("DROP INDEX HL_TXNID_INDEX"); - } catch (Exception e) { - System.err.println("Unable to drop index HL_TXNID_INDEX " + - e.getMessage()); + conn = getConnection(); + Statement s = conn.createStatement(); + // We want to try these, whether they succeed or fail. + try { + s.execute("DROP INDEX HL_TXNID_INDEX"); + } catch (Exception e) { + System.err.println("Unable to drop index HL_TXNID_INDEX " + + e.getMessage()); + } + try { + s.execute("DROP TABLE TXN_COMPONENTS"); + } catch (Exception e) { + System.err.println("Unable to drop table TXN_COMPONENTS " + + e.getMessage()); + } + try { + s.execute("DROP TABLE COMPLETED_TXN_COMPONENTS"); + } catch (Exception e) { + System.err.println("Unable to drop table COMPLETED_TXN_COMPONENTS " + + e.getMessage()); + } + try { + s.execute("DROP TABLE TXNS"); + } catch (Exception e) { + System.err.println("Unable to drop table TXNS " + + e.getMessage()); + } + try { + s.execute("DROP TABLE NEXT_TXN_ID"); + } catch (Exception e) { + System.err.println("Unable to drop table NEXT_TXN_ID " + + e.getMessage()); + } + try { + s.execute("DROP TABLE HIVE_LOCKS"); + } catch (Exception e) { + System.err.println("Unable to drop table HIVE_LOCKS " + + e.getMessage()); + } + try { + s.execute("DROP TABLE NEXT_LOCK_ID"); + } catch (Exception e) { + } + try { + s.execute("DROP TABLE COMPACTION_QUEUE"); + } catch (Exception e) { + } + try { + s.execute("DROP TABLE NEXT_COMPACTION_QUEUE_ID"); + } catch (Exception e) { + } + conn.commit(); + committed = true; + } finally { + if (!committed) conn.rollback(); + conn.close(); } - try { - s.execute("DROP TABLE TXN_COMPONENTS"); - } catch (Exception e) { - System.err.println("Unable to drop table TXN_COMPONENTS " + - e.getMessage()); - } - try { - s.execute("DROP TABLE COMPLETED_TXN_COMPONENTS"); - } catch (Exception e) { - System.err.println("Unable to drop table COMPLETED_TXN_COMPONENTS " + - e.getMessage()); - } - try { - s.execute("DROP TABLE TXNS"); - } catch (Exception e) { - System.err.println("Unable to drop table TXNS " + - e.getMessage()); - } - try { - s.execute("DROP TABLE NEXT_TXN_ID"); - } catch (Exception e) { - System.err.println("Unable to drop table NEXT_TXN_ID " + - e.getMessage()); - } - try { - s.execute("DROP TABLE HIVE_LOCKS"); - } catch (Exception e) { - System.err.println("Unable to drop table HIVE_LOCKS " + - e.getMessage()); - } - try { - s.execute("DROP TABLE NEXT_LOCK_ID"); - } catch (Exception e) { - } - try { - s.execute("DROP TABLE COMPACTION_QUEUE"); - } catch (Exception e) { - } - try { - s.execute("DROP TABLE NEXT_COMPACTION_QUEUE_ID"); - } catch (Exception e) { - } - conn.commit(); - conn.close(); } /** @@ -173,25 +187,34 @@ public static void cleanDb() throws Exception { */ public static int countLockComponents(long lockId) throws Exception { Connection conn = getConnection(); - Statement s = conn.createStatement(); - ResultSet rs = s.executeQuery("select count(*) from hive_locks where " + - "hl_lock_ext_id = " + lockId); - if (!rs.next()) return 0; - int rc = rs.getInt(1); - conn.rollback(); - conn.close(); - return rc; + try { + Statement s = conn.createStatement(); + ResultSet rs = s.executeQuery("select count(*) from hive_locks where hl_lock_ext_id = " + + lockId); + if (!rs.next()) return 0; + int rc = rs.getInt(1); + return rc; + } finally { + conn.rollback(); + conn.close(); + } } public static int findNumCurrentLocks() throws Exception { - Connection conn = getConnection(); - Statement s = conn.createStatement(); - ResultSet rs = s.executeQuery("select count(*) from hive_locks"); - if (!rs.next()) return 0; - int rc = rs.getInt(1); - conn.rollback(); - conn.close(); - return rc; + Connection conn = null; + try { + conn = getConnection(); + Statement s = conn.createStatement(); + ResultSet rs = s.executeQuery("select count(*) from hive_locks"); + if (!rs.next()) return 0; + int rc = rs.getInt(1); + return rc; + } finally { + if (conn != null) { + conn.rollback(); + conn.close(); + } + } } private static Connection getConnection() throws Exception { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index f74f683..264052f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -165,13 +165,13 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo break; case TABLE: + case DUMMYPARTITION: // in case of dynamic partitioning lock the table t = output.getTable(); compBuilder.setDbName(t.getDbName()); compBuilder.setTableName(t.getTableName()); break; case PARTITION: - case DUMMYPARTITION: compBuilder.setPartitionName(output.getPartition().getName()); t = output.getPartition().getTable(); compBuilder.setDbName(t.getDbName()); @@ -301,7 +301,10 @@ protected void destruct() { try { if (txnId > 0) rollbackTxn(); if (lockMgr != null) lockMgr.close(); + if (client != null) client.close(); } catch (Exception e) { + LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage() + + ">, swallowing as there is nothing we can do with it."); // Not much we can do about it here. } } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index acc2883..8f593aa 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -21,12 +21,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.DummyPartition;import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.log4j.Level; @@ -137,6 +137,43 @@ public void testSingleWriteTable() throws Exception { Assert.assertEquals(0, locks.size()); } + + @Test + public void testSingleWritePartition() throws Exception { + WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); + QueryPlan qp = new MockQueryPlan(this); + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(qp, ctx, "fred"); + List locks = ctx.getHiveLocks(); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); + txnMgr.commitTxn(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testWriteDynamicPartition() throws Exception { + WriteEntity we = addDynamicPartitionedOutput(newTable(true), WriteEntity.WriteType.INSERT); + QueryPlan qp = new MockQueryPlan(this); + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(qp, ctx, "fred"); + List locks = ctx.getHiveLocks(); + Assert.assertEquals(1, locks.size()); + /*Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); + */// Make sure we're locking the whole table, since this is dynamic partitioning + ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks(); + List elms = rsp.getLocks(); + Assert.assertEquals(1, elms.size()); + Assert.assertNotNull(elms.get(0).getTablename()); + Assert.assertNull(elms.get(0).getPartname()); + txnMgr.commitTxn(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + @Test public void testReadWrite() throws Exception { Table t = newTable(true); @@ -252,6 +289,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { + if (txnMgr != null) txnMgr.closeTxnManager(); TxnDbUtil.cleanDb(); } @@ -318,4 +356,12 @@ private WriteEntity addPartitionOutput(Table t, WriteEntity.WriteType writeType) writeEntities.add(we); return we; } + + private WriteEntity addDynamicPartitionedOutput(Table t, WriteEntity.WriteType writeType) + throws Exception { + DummyPartition dp = new DummyPartition(t, "no clue what I should call this"); + WriteEntity we = new WriteEntity(dp, writeType, false); + writeEntities.add(we); + return we; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java index 5f32d5f..233920f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java +++ ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java @@ -65,7 +65,6 @@ public static void init() throws Exception { private AlterTableSimpleDesc parseAndAnalyze(String query) throws Exception { ParseDriver hd = new ParseDriver(); ASTNode head = (ASTNode)hd.parse(query).getChild(0); - System.out.println("HERE " + head.dump()); BaseSemanticAnalyzer a = SemanticAnalyzerFactory.get(conf, head); a.analyze(head, new Context(conf)); List> roots = a.getRootTasks();