diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4c89812..bcf62a7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -541,6 +541,10 @@ public void run() { errorMessage += " " + e.getMessage(); } + if (error == ErrorMsg.TXNMGR_NOT_ACID) { + errorMessage += ". Failed command: " + queryStr; + } + SQLState = error.getSQLState(); downstreamError = e; console.printError(errorMessage, "\n" diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 08bc654..f27aa78 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -380,6 +380,7 @@ TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}.", true), DBTXNMGR_REQUIRES_CONCURRENCY(10264, "To use DbTxnManager you must set hive.support.concurrency=true"), + TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table {0}.{1} with a non-ACID transaction manager", true), LOCK_NO_SUCH_LOCK(10270, "No record of lock {0} could be found, " + "may have timed out", true), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 72ea562..520ae74 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -30,7 +32,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; @@ -703,17 +704,42 @@ public static void setTransactionalTableScan(Configuration conf, boolean isAcidT HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable); } - // If someone is trying to read a table with transactional=true they must be using the - // right TxnManager. We do not look at SessionState.get().getTxnMgr().supportsAcid(). + /** Checks metadata to make sure it's a valid ACID table at metadata level + * Three things we will check: + * 1. TBLPROPERTIES 'transactional'='true' + * 2. The table should be bucketed + * 3. InputFormatClass/OutputFormatClass should implement AcidInputFormat/AcidOutputFormat + * Currently OrcInputFormat/OrcOutputFormat is the only implementer + * Note, users are responsible for using the correct TxnManager. We do not look at + * SessionState.get().getTxnMgr().supportsAcid() here + * @param table table + * @return true if table is a legit ACID table, false otherwise + */ public static boolean isAcidTable(Table table) { if (table == null) { return false; } - String tableIsTransactional = - table.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); - if(tableIsTransactional == null) { + String tableIsTransactional = table.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (tableIsTransactional == null) { tableIsTransactional = table.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); } - return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + if (tableIsTransactional == null || !tableIsTransactional.equalsIgnoreCase("true")) { + return false; + } + + List bucketCols = table.getBucketCols(); + if (bucketCols == null || bucketCols.isEmpty()) { + return false; + } + + Class inputFormatClass = table.getInputFormatClass(); + Class outputFormatClass = table.getOutputFormatClass(); + if (inputFormatClass == null || outputFormatClass == null || + !AcidInputFormat.class.isAssignableFrom(inputFormatClass) || + !AcidOutputFormat.class.isAssignableFrom(outputFormatClass)) { + return false; + } + + return true; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 3617699..47dbbb3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -424,6 +424,11 @@ public ValidTxnList getValidTxns() throws LockException { } @Override + public String getTxnManagerName() { + return CLASS_NAME; + } + + @Override public boolean supportsExplicitLock() { return false; } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 036fc24..1d071a8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -210,6 +210,11 @@ public ValidTxnList getValidTxns() throws LockException { } @Override + public String getTxnManagerName() { + return DummyTxnManager.class.getName(); + } + + @Override public boolean supportsExplicitLock() { return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index cb97d29..9b4a97f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -117,6 +117,12 @@ ValidTxnList getValidTxns() throws LockException; /** + * Get the name for currently installed transaction manager. + * @return transaction manager name + */ + String getTxnManagerName(); + + /** * This call closes down the transaction manager. All open transactions * are aborted. If no transactions are open but locks are held those locks * are released. This method should be called if processing of a session diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 607c2f3..ba1945f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -1621,8 +1621,7 @@ public void getMetaData(QB qb, ReadEntity parentInput) throws SemanticException if ((updating() || deleting()) && !isAcid && isTableWrittenTo) { //isTableWrittenTo: delete from acidTbl where a in (select id from nonAcidTable) //so only assert this if we are actually writing to this table - // isAcidTable above also checks for whether we are using an acid compliant - // transaction manager. But that has already been caught in + // Whether we are using an acid compliant transaction manager has already been caught in // UpdateDeleteSemanticAnalyzer, so if we are updating or deleting and getting nonAcid // here, it means the table itself doesn't support it. throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, tab_name); @@ -10591,6 +10590,13 @@ public void validate() throws SemanticException { Table tbl = readEntity.getTable(); Partition p = readEntity.getPartition(); + + if (p != null) { + tbl = p.getTable(); + } + if (tbl != null && AcidUtils.isAcidTable(tbl)) { + checkAcidTxnManager(tbl); + } } for (WriteEntity writeEntity : getOutputs()) { @@ -10649,6 +10655,10 @@ public void validate() throws SemanticException { LOG.debug("Not a partition."); tbl = writeEntity.getTable(); } + + if (tbl != null && AcidUtils.isAcidTable(tbl)) { + checkAcidTxnManager(tbl); + } } boolean reworkMapredWork = HiveConf.getBoolVar(this.conf, @@ -12219,6 +12229,14 @@ protected boolean updating() { protected boolean deleting() { return false; } + + // Make sure the proper transaction manager that supports ACID is being used + protected void checkAcidTxnManager(Table table) throws SemanticException { + if (SessionState.get() != null && !SessionState.get().getTxnMgr().supportsAcid()) { + throw new SemanticException(ErrorMsg.TXNMGR_NOT_ACID, table.getDbName(), table.getTableName()); + } + } + public static ASTNode genSelectDIAST(RowResolver rr) { LinkedHashMap> map = rr.getRslvMap(); ASTNode selectDI = new ASTNode(new CommonToken(HiveParser.TOK_SELECTDI, "TOK_SELECTDI")); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index efeb70f..99b38b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -400,13 +401,19 @@ public String getSessionId() { /** * Initialize the transaction manager. This is done lazily to avoid hard wiring one - * transaction manager at the beginning of the session. In general users shouldn't change - * this, but it's useful for testing. + * transaction manager at the beginning of the session. * @param conf Hive configuration to initialize transaction manager * @return transaction manager * @throws LockException */ public synchronized HiveTxnManager initTxnMgr(HiveConf conf) throws LockException { + // Only change txnMgr if the setting has changed + if (txnMgr != null && + !txnMgr.getTxnManagerName().equals(conf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER))) { + txnMgr.closeTxnManager(); + txnMgr = null; + } + if (txnMgr == null) { txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 3bdcc21..1b07d4b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.After; @@ -241,6 +240,42 @@ public void testLockRetryLimit() throws Exception { otherTxnMgr.closeTxnManager(); } + @Test + public void testDummyTxnManagerOnAcidTable() throws Exception { + // Create an ACID table with DbTxnManager + CommandProcessorResponse cpr = driver.run("create table T10 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table T11 (a int, b int) clustered by(b) into 2 buckets stored as orc"); + checkCmdOnDriver(cpr); + + // Now switch to DummyTxnManager + conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + txnMgr = SessionState.get().initTxnMgr(conf); + Assert.assertTrue(txnMgr instanceof DummyTxnManager); + + // All DML should fail with DummyTxnManager on ACID table + cpr = driver.compileAndRespond("select * from T10"); + Assert.assertEquals(ErrorMsg.TXNMGR_NOT_ACID.getErrorCode(), cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table")); + + cpr = driver.compileAndRespond("insert into table T10 values (1, 2)"); + Assert.assertEquals(ErrorMsg.TXNMGR_NOT_ACID.getErrorCode(), cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table")); + + cpr = driver.compileAndRespond("insert overwrite table T10 select a, b from T11"); + Assert.assertEquals(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID.getErrorCode(), cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("INSERT OVERWRITE not allowed on table with OutputFormat" + + " that implements AcidOutputFormat while transaction manager that supports ACID is in use")); + + cpr = driver.compileAndRespond("update T10 set a=0 where b=1"); + Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); + + cpr = driver.compileAndRespond("delete from T10"); + Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), cpr.getResponseCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); + } + private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) { Assert.assertEquals(l.toString(),l.getType(), type); Assert.assertEquals(l.toString(),l.getState(), state); diff --git ql/src/test/results/clientnegative/delete_not_bucketed.q.out ql/src/test/results/clientnegative/delete_not_bucketed.q.out index d0ba680..8c4a40c 100644 --- ql/src/test/results/clientnegative/delete_not_bucketed.q.out +++ ql/src/test/results/clientnegative/delete_not_bucketed.q.out @@ -6,4 +6,4 @@ POSTHOOK: query: create table acid_notbucketed(a int, b varchar(128)) stored as POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@acid_notbucketed -FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table acid_notbucketed that does not use an AcidOutputFormat or is not bucketed +FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.acid_notbucketed that does not use an AcidOutputFormat or is not bucketed diff --git ql/src/test/results/clientnegative/update_not_bucketed.q.out ql/src/test/results/clientnegative/update_not_bucketed.q.out index 8ebf41d..42a48a0 100644 --- ql/src/test/results/clientnegative/update_not_bucketed.q.out +++ ql/src/test/results/clientnegative/update_not_bucketed.q.out @@ -6,4 +6,4 @@ POSTHOOK: query: create table acid_notbucketed(a int, b varchar(128)) partitione POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@acid_notbucketed -FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table acid_notbucketed that does not use an AcidOutputFormat or is not bucketed +FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.acid_notbucketed that does not use an AcidOutputFormat or is not bucketed