diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 08bc654..a3f6568 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -380,6 +380,7 @@ TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}.", true), DBTXNMGR_REQUIRES_CONCURRENCY(10264, "To use DbTxnManager you must set hive.support.concurrency=true"), + TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table with a non-ACID transaction manager"), LOCK_NO_SUCH_LOCK(10270, "No record of lock {0} could be found, " + "may have timed out", true), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 72ea562..cfe2ca1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -30,7 +32,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; @@ -716,4 +717,34 @@ public static boolean isAcidTable(Table table) { } return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + + /** Checks metadata to make sure it's a valid ACID table at metadata level + * Three things we will check: + * 1. TBLPROPERTIES 'transactional'='true' + * 2. The table should be bucketed + * 3. InputFormatClass/OutputFormatClass should implement AcidInputFormat/AcidOutputFormat + * Currently OrcInputFormat/OrcOutputFormat is the only implementer + * @param table table + * @return true if table is a legit ACID table, false otherwise + */ + public static boolean isAcidTableByMetadata(Table table) { + if (!isAcidTable(table)) { + return false; + } + + List bucketCols = table.getBucketCols(); + if (bucketCols == null || bucketCols.isEmpty()) { + return false; + } + + Class inputFormatClass = table.getInputFormatClass(); + Class outputFormatClass = table.getOutputFormatClass(); + if (inputFormatClass == null || outputFormatClass == null || + !AcidInputFormat.class.isAssignableFrom(inputFormatClass) || + !AcidOutputFormat.class.isAssignableFrom(outputFormatClass)) { + return false; + } + + return true; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 8c880c3..9bbae9a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -1626,6 +1626,8 @@ public void getMetaData(QB qb, ReadEntity parentInput) throws SemanticException // here, it means the table itself doesn't support it. throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, tab_name); } + // Disallow queries against ACID table with transaction managers other than DbTxnManager + checkAcidTxnManager(tab); if (tab.isView()) { if (qb.getParseInfo().isAnalyzeCommand()) { @@ -1744,6 +1746,9 @@ public void getMetaData(QB qb, ReadEntity parentInput) throws SemanticException .getMsg(ast, "The class is " + outputFormatClass.toString())); } + // Disallow INSERT against ACID table with transaction managers other than DbTxnManager + checkAcidTxnManager(ts.tableHandle); + // TableSpec ts is got from the query (user specified), // which means the user didn't specify partitions in their query, // but whether the table itself is partitioned is not know. @@ -12223,6 +12228,14 @@ protected boolean updating() { protected boolean deleting() { return false; } + + // Error out if the table is ACID but current session's transaction manager does not support ACID + protected void checkAcidTxnManager(Table tab) throws SemanticException { + if (AcidUtils.isAcidTableByMetadata(tab) && !SessionState.get().getTxnMgr().supportsAcid()) { + throw new SemanticException("Table: " + tab + " - " + ErrorMsg.TXNMGR_NOT_ACID.getMsg()); + } + } + public static ASTNode genSelectDIAST(RowResolver rr) { LinkedHashMap> map = rr.getRslvMap(); ASTNode selectDI = new ASTNode(new CommonToken(HiveParser.TOK_SELECTDI, "TOK_SELECTDI")); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index efeb70f..acfe7ef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -400,13 +401,22 @@ public String getSessionId() { /** * Initialize the transaction manager. This is done lazily to avoid hard wiring one - * transaction manager at the beginning of the session. In general users shouldn't change - * this, but it's useful for testing. + * transaction manager at the beginning of the session. * @param conf Hive configuration to initialize transaction manager * @return transaction manager * @throws LockException */ public synchronized HiveTxnManager initTxnMgr(HiveConf conf) throws LockException { + String txnMgrName = conf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER); + // Only change txnMgr if the setting has changed + if (txnMgr != null && + ((!txnMgrName.equals("org.apache.hadoop.hive.ql.lockmgr.DbTxnManager") && + (txnMgr instanceof DbTxnManager)) || + (txnMgrName.equals("org.apache.hadoop.hive.ql.lockmgr.DbTxnManager") && + !(txnMgr instanceof DbTxnManager)))) { + txnMgr.closeTxnManager(); + txnMgr = null; + } if (txnMgr == null) { txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 3bdcc21..43a02e8 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.lockmgr; +import com.sun.xml.bind.v2.TODO; import junit.framework.Assert; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.LockState; @@ -241,6 +242,41 @@ public void testLockRetryLimit() throws Exception { otherTxnMgr.closeTxnManager(); } + @Test + public void testDummyTxnManagerOnAcidTable() throws Exception { + // Create an ACID table with DbTxnManager + CommandProcessorResponse cpr = driver.run("create table T10 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table T11 (a int, b int) clustered by(b) into 2 buckets stored as orc"); + checkCmdOnDriver(cpr); + + // Now switch to DummyTxnManager + conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + txnMgr = SessionState.get().initTxnMgr(conf); + Assert.assertTrue(txnMgr instanceof DummyTxnManager); + + // All DML should fail with DummyTxnManager on ACID table + cpr = driver.compileAndRespond("select * from T10"); + Assert.assertEquals(40000, cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table with a non-ACID transaction manager")); + + cpr = driver.compileAndRespond("insert into table T10 values (1, 2)"); + Assert.assertEquals(40000, cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table with a non-ACID transaction manager")); + + cpr = driver.compileAndRespond("insert overwrite table T10 select a, b from T11"); + Assert.assertEquals(40000, cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table with a non-ACID transaction manager")); + + cpr = driver.compileAndRespond("update T10 set a=0 where b=1"); + Assert.assertEquals(10294, cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); + + cpr = driver.compileAndRespond("delete from T10"); + Assert.assertEquals(10294, cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); + } + private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) { Assert.assertEquals(l.toString(),l.getType(), type); Assert.assertEquals(l.toString(),l.getState(), state);