diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 08bc654..a3f6568 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -380,6 +380,7 @@ TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}.", true), DBTXNMGR_REQUIRES_CONCURRENCY(10264, "To use DbTxnManager you must set hive.support.concurrency=true"), + TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table with a non-ACID transaction manager"), LOCK_NO_SUCH_LOCK(10270, "No record of lock {0} could be found, " + "may have timed out", true), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 72ea562..520ae74 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -30,7 +32,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; @@ -703,17 +704,42 @@ public static void setTransactionalTableScan(Configuration conf, boolean isAcidT HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable); } - // If someone is trying to read a table with transactional=true they must be using the - // right TxnManager. We do not look at SessionState.get().getTxnMgr().supportsAcid(). + /** Checks metadata to make sure it's a valid ACID table at metadata level + * Three things we will check: + * 1. TBLPROPERTIES 'transactional'='true' + * 2. The table should be bucketed + * 3. InputFormatClass/OutputFormatClass should implement AcidInputFormat/AcidOutputFormat + * Currently OrcInputFormat/OrcOutputFormat is the only implementer + * Note, users are responsible for using the correct TxnManager. We do not look at + * SessionState.get().getTxnMgr().supportsAcid() here + * @param table table + * @return true if table is a legit ACID table, false otherwise + */ public static boolean isAcidTable(Table table) { if (table == null) { return false; } - String tableIsTransactional = - table.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); - if(tableIsTransactional == null) { + String tableIsTransactional = table.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (tableIsTransactional == null) { tableIsTransactional = table.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); } - return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + if (tableIsTransactional == null || !tableIsTransactional.equalsIgnoreCase("true")) { + return false; + } + + List bucketCols = table.getBucketCols(); + if (bucketCols == null || bucketCols.isEmpty()) { + return false; + } + + Class inputFormatClass = table.getInputFormatClass(); + Class outputFormatClass = table.getOutputFormatClass(); + if (inputFormatClass == null || outputFormatClass == null || + !AcidInputFormat.class.isAssignableFrom(inputFormatClass) || + !AcidOutputFormat.class.isAssignableFrom(outputFormatClass)) { + return false; + } + + return true; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 8c880c3..5fe13ad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -10595,6 +10595,13 @@ public void validate() throws SemanticException { Table tbl = readEntity.getTable(); Partition p = readEntity.getPartition(); + + if (p != null) { + tbl = p.getTable(); + } + if (tbl != null && AcidUtils.isAcidTable(tbl)) { + checkAcidTxnManager(); + } } for (WriteEntity writeEntity : getOutputs()) { @@ -10653,6 +10660,10 @@ public void validate() throws SemanticException { LOG.debug("Not a partition."); tbl = writeEntity.getTable(); } + + if (tbl != null && AcidUtils.isAcidTable(tbl)) { + checkAcidTxnManager(); + } } boolean reworkMapredWork = HiveConf.getBoolVar(this.conf, @@ -12223,6 +12234,14 @@ protected boolean updating() { protected boolean deleting() { return false; } + + // Make sure the proper transaction manager that supports ACID is being used + protected void checkAcidTxnManager() throws SemanticException { + if (SessionState.get() != null && !SessionState.get().getTxnMgr().supportsAcid()) { + throw new SemanticException(ErrorMsg.TXNMGR_NOT_ACID.getMsg()); + } + } + public static ASTNode genSelectDIAST(RowResolver rr) { LinkedHashMap> map = rr.getRslvMap(); ASTNode selectDI = new ASTNode(new CommonToken(HiveParser.TOK_SELECTDI, "TOK_SELECTDI")); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index efeb70f..acfe7ef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -400,13 +401,22 @@ public String getSessionId() { /** * Initialize the transaction manager. This is done lazily to avoid hard wiring one - * transaction manager at the beginning of the session. In general users shouldn't change - * this, but it's useful for testing. + * transaction manager at the beginning of the session. * @param conf Hive configuration to initialize transaction manager * @return transaction manager * @throws LockException */ public synchronized HiveTxnManager initTxnMgr(HiveConf conf) throws LockException { + String txnMgrName = conf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER); + // Only change txnMgr if the setting has changed + if (txnMgr != null && + ((!txnMgrName.equals("org.apache.hadoop.hive.ql.lockmgr.DbTxnManager") && + (txnMgr instanceof DbTxnManager)) || + (txnMgrName.equals("org.apache.hadoop.hive.ql.lockmgr.DbTxnManager") && + !(txnMgr instanceof DbTxnManager)))) { + txnMgr.closeTxnManager(); + txnMgr = null; + } if (txnMgr == null) { txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 3bdcc21..2875f5d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.lockmgr; +import com.sun.xml.bind.v2.TODO; import junit.framework.Assert; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.LockState; @@ -241,6 +242,42 @@ public void testLockRetryLimit() throws Exception { otherTxnMgr.closeTxnManager(); } + @Test + public void testDummyTxnManagerOnAcidTable() throws Exception { + // Create an ACID table with DbTxnManager + CommandProcessorResponse cpr = driver.run("create table T10 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table T11 (a int, b int) clustered by(b) into 2 buckets stored as orc"); + checkCmdOnDriver(cpr); + + // Now switch to DummyTxnManager + conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + txnMgr = SessionState.get().initTxnMgr(conf); + Assert.assertTrue(txnMgr instanceof DummyTxnManager); + + // All DML should fail with DummyTxnManager on ACID table + cpr = driver.compileAndRespond("select * from T10"); + Assert.assertEquals(10265, cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table with a non-ACID transaction manager")); + + cpr = driver.compileAndRespond("insert into table T10 values (1, 2)"); + Assert.assertEquals(10265, cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table with a non-ACID transaction manager")); + + cpr = driver.compileAndRespond("insert overwrite table T10 select a, b from T11"); + Assert.assertEquals(10295, cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("INSERT OVERWRITE not allowed on table with OutputFormat" + + " that implements AcidOutputFormat while transaction manager that supports ACID is in use")); + + cpr = driver.compileAndRespond("update T10 set a=0 where b=1"); + Assert.assertEquals(10294, cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); + + cpr = driver.compileAndRespond("delete from T10"); + Assert.assertEquals(10294, cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); + } + private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) { Assert.assertEquals(l.toString(),l.getType(), type); Assert.assertEquals(l.toString(),l.getState(), state);