diff --git metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java new file mode 100644 index 0000000..71ad916 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; + + +/** + * It handles cleanup of dropped partition/table/database in ACID related metastore tables + */ +public class AcidEventListener extends MetaStoreEventListener { + + private TxnStore txnHandler; + private HiveConf hiveConf; + + public AcidEventListener(Configuration configuration) { + super(configuration); + hiveConf = (HiveConf) configuration; + } + + @Override + public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException { + // We can loop thru all the tables to check if they are ACID first and then perform cleanup, + // but it's more efficient to unconditionally perform cleanup for the database, especially + // when there are a lot of tables + txnHandler = getTxnHandler(); + txnHandler.cleanupRecords(HiveObjectType.DATABASE, dbEvent.getDatabase(), null, null); + } + + @Override + public void onDropTable(DropTableEvent tableEvent) throws MetaException { + if (TxnUtils.isAcidTable(tableEvent.getTable())) { + txnHandler = getTxnHandler(); + txnHandler.cleanupRecords(HiveObjectType.TABLE, null, tableEvent.getTable(), null); + } + } + + @Override + public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { + if (TxnUtils.isAcidTable(partitionEvent.getTable())) { + txnHandler = getTxnHandler(); + txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, partitionEvent.getTable(), + partitionEvent.getPartitionIterator()); + } + } + + private TxnStore getTxnHandler() { + boolean hackOn = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST) || + HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST); + String origTxnMgr = null; + boolean origConcurrency = false; + + // Since TxnUtils.getTxnStore calls TxnHandler.setConf -> checkQFileTestHack -> TxnDbUtil.setConfValues, + // which may change the values of below two entries, we need to avoid pulluting the original values + if (hackOn) { + origTxnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER); + origConcurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + } + + txnHandler = TxnUtils.getTxnStore(hiveConf); + + // Set them back + if (hackOn) { + hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, origTxnMgr); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, origConcurrency); + } + + return txnHandler; + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index bfebfdc..9472ef9 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -435,6 +435,7 @@ public Object getValue() { listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, hiveConf, hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS)); listeners.add(new SessionPropertiesListener(hiveConf)); + listeners.add(new AcidEventListener(hiveConf)); if (metrics != null) { listeners.add(new HMSMetricsListener(hiveConf, metrics)); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 2a7545c..5d10b5c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -32,7 +32,8 @@ import org.apache.hadoop.hive.shims.ShimLoader; /** - * Utility methods for creating and destroying txn database/schema. + * Utility methods for creating and destroying txn database/schema, plus methods for + * querying against metastore tables. * Placed here in a separate class so it can be shared across unit tests. */ public final class TxnDbUtil { @@ -142,8 +143,13 @@ public static void prepDb() throws Exception { conn.commit(); } catch (SQLException e) { + try { + conn.rollback(); + } catch (SQLException re) { + System.err.println("Error rolling back: " + re.getMessage()); + } + // This might be a deadlock, if so, let's retry - conn.rollback(); if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) { LOG.warn("Caught deadlock, retrying db creation"); prepDb(); @@ -219,14 +225,20 @@ public static int countLockComponents(long lockId) throws Exception { } } - public static int findNumCurrentLocks() throws Exception { + /** + * Utility method used to run COUNT queries like "select count(*) from ..." against metastore tables + * @param countQuery countQuery text + * @return count countQuery result + * @throws Exception + */ + public static int countQueryAgent(String countQuery) throws Exception { Connection conn = null; Statement stmt = null; ResultSet rs = null; try { conn = getConnection(); stmt = conn.createStatement(); - rs = stmt.executeQuery("select count(*) from hive_locks"); + rs = stmt.executeQuery(countQuery); if (!rs.next()) { return 0; } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index d4d0162..53d2bb4 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -25,6 +25,7 @@ import org.apache.commons.dbcp.PoolableConnectionFactory; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.Warehouse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.dbcp.PoolingDataSource; @@ -1153,6 +1154,170 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) } /** + * Clean up corresponding records in metastore tables, specifically: + * TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS + */ + public void cleanupRecords(HiveObjectType type, Database db, Table table, + Iterator partitionIterator) throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + + try { + String dbName; + String tblName; + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + List queries = new ArrayList(); + StringBuilder buff = new StringBuilder(); + + switch (type) { + case DATABASE: + dbName = db.getName(); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + break; + case TABLE: + dbName = table.getDbName(); + tblName = table.getTableName(); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("' and tc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("' and ctc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("' and cq_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("' and cc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + break; + case PARTITION: + dbName = table.getDbName(); + tblName = table.getTableName(); + List partCols = table.getPartitionKeys(); // partition columns + List partVals; // partition values + String partName; + + while (partitionIterator.hasNext()) { + Partition p = partitionIterator.next(); + partVals = p.getValues(); + partName = Warehouse.makePartName(partCols, partVals); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("' and tc_table='"); + buff.append(tblName); + buff.append("' and tc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("' and ctc_table='"); + buff.append(tblName); + buff.append("' and ctc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("' and cq_table='"); + buff.append(tblName); + buff.append("' and cq_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("' and cc_table='"); + buff.append(tblName); + buff.append("' and cc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + } + + break; + default: + throw new MetaException("Invalid object type for cleanup: " + type); + } + + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + stmt.executeUpdate(query); + } + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "cleanupRecords"); + if (e.getMessage().contains("does not exist")) { + LOG.warn("Cannot perform cleanup since metastore table does not exist"); + } else { + throw new MetaException("Unable to clean up " + StringUtils.stringifyException(e)); + } + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + cleanupRecords(type, db, table, partitionIterator); + } + } + + /** * For testing only, do not use. */ @VisibleForTesting @@ -1599,7 +1764,7 @@ private void checkQFileTestHack() { TxnDbUtil.prepDb(); } catch (Exception e) { // We may have already created the tables and thus don't need to redo it. - if (!e.getMessage().contains("already exists")) { + if (e.getMessage() != null && !e.getMessage().contains("already exists")) { throw new RuntimeException("Unable to set up transaction database for" + " testing: " + e.getMessage(), e); } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 5e0306a..6d738b5 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -21,32 +21,10 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; -import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; -import org.apache.hadoop.hive.metastore.api.CheckLockRequest; -import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; -import org.apache.hadoop.hive.metastore.api.CompactionRequest; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; -import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; -import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; -import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchLockException; -import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; -import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; -import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; -import org.apache.hadoop.hive.metastore.api.TxnAbortedException; -import org.apache.hadoop.hive.metastore.api.TxnOpenException; -import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.api.*; import java.sql.SQLException; +import java.util.Iterator; import java.util.List; import java.util.Set; @@ -216,6 +194,17 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) throws NoSuchTxnException, TxnAbortedException, MetaException; /** + * Clean up corresponding records in metastore tables + * @param type Hive object type + * @param db database object + * @param table table object + * @param partitionIterator partition iterator + * @throws MetaException + */ + public void cleanupRecords(HiveObjectType type, Database db, Table table, + Iterator partitionIterator) throws MetaException; + + /** * Timeout transactions and/or locks. This should only be called by the compactor. */ public void performTimeOuts(); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index b7502c2..0d90b11 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -23,11 +23,14 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; import java.util.Set; public class TxnUtils { @@ -94,4 +97,19 @@ public static TxnStore getTxnStore(HiveConf conf) { throw new RuntimeException(e); } } + + /** Checks if a table is a valid ACID table. + * Note, users are responsible for using the correct TxnManager. We do not look at + * SessionState.get().getTxnMgr().supportsAcid() here + * @param table table + * @return true if table is a legit ACID table, false otherwise + */ + public static boolean isAcidTable(Table table) { + if (table == null) { + return false; + } + Map parameters = table.getParameters(); + String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 9bf9377..2b50a2a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.io; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.OutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -706,12 +704,7 @@ public static void setTransactionalTableScan(Configuration conf, boolean isAcidT HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable); } - /** Checks metadata to make sure it's a valid ACID table at metadata level - * Three things we will check: - * 1. TBLPROPERTIES 'transactional'='true' - * 2. The table should be bucketed - * 3. InputFormatClass/OutputFormatClass should implement AcidInputFormat/AcidOutputFormat - * Currently OrcInputFormat/OrcOutputFormat is the only implementer + /** Checks if a table is a valid ACID table. * Note, users are responsible for using the correct TxnManager. We do not look at * SessionState.get().getTxnMgr().supportsAcid() here * @param table table @@ -725,23 +718,7 @@ public static boolean isAcidTable(Table table) { if (tableIsTransactional == null) { tableIsTransactional = table.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); } - if (tableIsTransactional == null || !tableIsTransactional.equalsIgnoreCase("true")) { - return false; - } - - List bucketCols = table.getBucketCols(); - if (bucketCols == null || bucketCols.isEmpty()) { - return false; - } - - Class inputFormatClass = table.getInputFormatClass(); - Class outputFormatClass = table.getOutputFormatClass(); - if (inputFormatClass == null || outputFormatClass == null || - !AcidInputFormat.class.isAssignableFrom(inputFormatClass) || - !AcidOutputFormat.class.isAssignableFrom(outputFormatClass)) { - return false; - } - return true; + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index f4debfe..9b00435 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -331,14 +331,7 @@ public void testNonAcidToAcidConversionAndMajorCompaction() throws Exception { // 4. Perform a major compaction runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); - Worker w = new Worker(); - w.setThreadId((int) w.getId()); - w.setHiveConf(hiveConf); - AtomicBoolean stop = new AtomicBoolean(); - AtomicBoolean looped = new AtomicBoolean(); - stop.set(true); - w.init(stop, looped); - w.run(); + runWorker(hiveConf); // There should be 1 new directory: base_xxxxxxx. // Original bucket files and delta directory should stay until Cleaner kicks in. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + @@ -375,14 +368,7 @@ public void testNonAcidToAcidConversionAndMajorCompaction() throws Exception { // Before Cleaner, there should be 5 items: // 2 original files, 1 original directory, 1 base directory and 1 delta directory Assert.assertEquals(5, status.length); - Cleaner c = new Cleaner(); - c.setThreadId((int) c.getId()); - c.setHiveConf(hiveConf); - stop = new AtomicBoolean(); - looped = new AtomicBoolean(); - stop.set(true); - c.init(stop, looped); - c.run(); + runCleaner(hiveConf); // There should be only 1 directory left: base_xxxxxxx. // Original bucket files and delta directory should have been cleaned up. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + @@ -596,7 +582,7 @@ else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) { } return compactionsByState; } - private static void runWorker(HiveConf hiveConf) throws MetaException { + public static void runWorker(HiveConf hiveConf) throws MetaException { AtomicBoolean stop = new AtomicBoolean(true); Worker t = new Worker(); t.setThreadId((int) t.getId()); @@ -605,7 +591,7 @@ private static void runWorker(HiveConf hiveConf) throws MetaException { t.init(stop, looped); t.run(); } - private static void runCleaner(HiveConf hiveConf) throws MetaException { + public static void runCleaner(HiveConf hiveConf) throws MetaException { AtomicBoolean stop = new AtomicBoolean(true); Cleaner t = new Cleaner(); t.setThreadId((int) t.getId()); diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 1b07d4b..d1b370e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -248,34 +248,233 @@ public void testDummyTxnManagerOnAcidTable() throws Exception { cpr = driver.run("create table T11 (a int, b int) clustered by(b) into 2 buckets stored as orc"); checkCmdOnDriver(cpr); - // Now switch to DummyTxnManager - conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); - txnMgr = SessionState.get().initTxnMgr(conf); - Assert.assertTrue(txnMgr instanceof DummyTxnManager); - // All DML should fail with DummyTxnManager on ACID table + useDummyTxnManagerTemporarily(conf); cpr = driver.compileAndRespond("select * from T10"); Assert.assertEquals(ErrorMsg.TXNMGR_NOT_ACID.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table")); + useDummyTxnManagerTemporarily(conf); cpr = driver.compileAndRespond("insert into table T10 values (1, 2)"); Assert.assertEquals(ErrorMsg.TXNMGR_NOT_ACID.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table")); + useDummyTxnManagerTemporarily(conf); cpr = driver.compileAndRespond("insert overwrite table T10 select a, b from T11"); Assert.assertEquals(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("INSERT OVERWRITE not allowed on table with OutputFormat" + " that implements AcidOutputFormat while transaction manager that supports ACID is in use")); + useDummyTxnManagerTemporarily(conf); cpr = driver.compileAndRespond("update T10 set a=0 where b=1"); Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); + useDummyTxnManagerTemporarily(conf); cpr = driver.compileAndRespond("delete from T10"); Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); } + /** + * Temporarily set DummyTxnManager as the txn manager for the session. + * HIVE-10632: we have to do this for every new query, because this jira introduced an AcidEventListener + * in HiveMetaStore, which will instantiate a txn handler, but due to HIVE-12902, we have to call + * TxnHandler.setConf and TxnHandler.checkQFileTestHack and TxnDbUtil.setConfValues, which will + * set txn manager back to DbTxnManager. + */ + private void useDummyTxnManagerTemporarily(HiveConf hiveConf) throws Exception { + hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + txnMgr = SessionState.get().initTxnMgr(hiveConf); + Assert.assertTrue(txnMgr instanceof DummyTxnManager); + } + + /** + * Normally the compaction process will clean up records in TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, + * COMPACTION_QUEUE and COMPLETED_COMPACTIONS. But if a table/partition has been dropped before + * compaction and there are still relevant records in those metastore tables, the Initiator will + * complain about not being able to find the table/partition. This method is to test and make sure + * we clean up relevant records as soon as a table/partition is dropped. + * + * Note, here we don't need to worry about cleaning up TXNS table, since it's handled separately. + * @throws Exception + */ + @Test + public void testMetastoreTablesCleanup() throws Exception { + CommandProcessorResponse cpr = driver.run("create database if not exists temp"); + checkCmdOnDriver(cpr); + + // Create some ACID tables: T10, T11 - unpartitioned table, T12p, T13p - partitioned table + cpr = driver.run("create table temp.T10 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table temp.T11 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table temp.T12p (a int, b int) partitioned by (ds string, hour string) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table temp.T13p (a int, b int) partitioned by (ds string, hour string) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + + // Successfully insert some data into ACID tables, so that we have records in COMPLETED_TXN_COMPONENTS + cpr = driver.run("insert into temp.T10 values (1, 1)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T10 values (2, 2)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T11 values (3, 3)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T11 values (4, 4)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T12p partition (ds='today', hour='1') values (5, 5)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (6, 6)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T13p partition (ds='today', hour='1') values (7, 7)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T13p partition (ds='tomorrow', hour='2') values (8, 8)"); + checkCmdOnDriver(cpr); + int count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11')"); + Assert.assertEquals(4, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t12p', 't13p')"); + Assert.assertEquals(4, count); + + // Fail some inserts, so that we have records in TXN_COMPONENTS + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + cpr = driver.run("insert into temp.T10 values (9, 9)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T11 values (10, 10)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T12p partition (ds='today', hour='1') values (11, 11)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T13p partition (ds='today', hour='1') values (12, 12)"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(4, count); + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + + // Drop a table/partition; corresponding records in TXN_COMPONENTS and COMPLETED_TXN_COMPONENTS should disappear + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t10'"); + Assert.assertEquals(1, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t10'"); + Assert.assertEquals(2, count); + cpr = driver.run("drop table temp.T10"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t10'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t10'"); + Assert.assertEquals(0, count); + + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t12p' and TC_PARTITION='ds=today/hour=1'"); + Assert.assertEquals(1, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t12p' and CTC_PARTITION='ds=today/hour=1'"); + Assert.assertEquals(1, count); + cpr = driver.run("alter table temp.T12p drop partition (ds='today', hour='1')"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t12p' and TC_PARTITION='ds=today/hour=1'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t12p' and CTC_PARTITION='ds=today/hour=1'"); + Assert.assertEquals(0, count); + + // Successfully perform compaction on a table/partition, so that we have successful records in COMPLETED_COMPACTIONS + cpr = driver.run("alter table temp.T11 compact 'minor'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='i'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='r' and CQ_TYPE='i'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='s' and CC_TYPE='i'"); + Assert.assertEquals(1, count); + + cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'minor'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='i'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='r' and CQ_TYPE='i'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='s' and CC_TYPE='i'"); + Assert.assertEquals(1, count); + + // Fail compaction, so that we have failed records in COMPLETED_COMPACTIONS + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); + cpr = driver.run("alter table temp.T11 compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='f' and CC_TYPE='a'"); + Assert.assertEquals(1, count); + + cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='f' and CC_TYPE='a'"); + Assert.assertEquals(1, count); + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false); + + // Put 2 records into COMPACTION_QUEUE and do nothing + cpr = driver.run("alter table temp.T11 compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + + // Drop a table/partition, corresponding records in COMPACTION_QUEUE and COMPLETED_COMPACTIONS should disappear + cpr = driver.run("drop table temp.T11"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11'"); + Assert.assertEquals(0, count); + + cpr = driver.run("alter table temp.T12p drop partition (ds='tomorrow', hour='2')"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p'"); + Assert.assertEquals(0, count); + + // Put 1 record into COMPACTION_QUEUE and do nothing + cpr = driver.run("alter table temp.T13p partition (ds='today', hour='1') compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t13p' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + + // Drop database, everything in all 4 meta tables should disappear + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(1, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(2, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(1, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + cpr = driver.run("drop database if exists temp cascade"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + } + private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) { Assert.assertEquals(l.toString(),l.getType(), type); Assert.assertEquals(l.toString(),l.getState(), state);