diff --git metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java new file mode 100644 index 0000000..c88b782 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; + + +/** + * It handles cleanup of dropped partition/table/database in ACID related metastore tables + */ +public class AcidEventListener extends MetaStoreEventListener { + + private TxnStore txnHandler; + private HiveConf hiveConf; + + public AcidEventListener(Configuration configuration) { + super(configuration); + hiveConf = (HiveConf) configuration; + + boolean hackOn = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST) || + HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST); + String origTxnMgr = null; + boolean origConcurrency = false; + + // Since TxnUtils.getTxnStore calls TxnHandler.setConf -> checkQFileTestHack -> TxnDbUtil.setConfValues, + // which may change the values of below two entries, we need to avoid pulluting the original values + if (hackOn) { + origTxnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER); + origConcurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + } + + txnHandler = TxnUtils.getTxnStore(hiveConf); + + // Set them back + if (hackOn) { + hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, origTxnMgr); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, origConcurrency); + } + } + + @Override + public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException { + // We can loop thru all the tables to check if they are ACID first and then perform cleanup, + // but it's more efficient to unconditionally perform cleanup for the database, especially + // when there are a lot of tables + txnHandler.cleanupRecords('d', dbEvent.getDatabase(), null, null); + } + + @Override + public void onDropTable(DropTableEvent tableEvent) throws MetaException { + if (TxnUtils.isAcidTable(tableEvent.getTable())) { + txnHandler.cleanupRecords('t', null, tableEvent.getTable(), null); + } + } + + @Override + public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { + if (TxnUtils.isAcidTable(partitionEvent.getTable())) { + txnHandler.cleanupRecords('p', null, partitionEvent.getTable(), partitionEvent.getPartitionIterator()); + } + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 98fbf70..23d66fc 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -433,6 +433,7 @@ public Object getValue() { listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, hiveConf, hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS)); listeners.add(new SessionPropertiesListener(hiveConf)); + listeners.add(new AcidEventListener(hiveConf)); if (metrics != null) { listeners.add(new HMSMetricsListener(hiveConf, metrics)); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 2a7545c..5d10b5c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -32,7 +32,8 @@ import org.apache.hadoop.hive.shims.ShimLoader; /** - * Utility methods for creating and destroying txn database/schema. + * Utility methods for creating and destroying txn database/schema, plus methods for + * querying against metastore tables. * Placed here in a separate class so it can be shared across unit tests. */ public final class TxnDbUtil { @@ -142,8 +143,13 @@ public static void prepDb() throws Exception { conn.commit(); } catch (SQLException e) { + try { + conn.rollback(); + } catch (SQLException re) { + System.err.println("Error rolling back: " + re.getMessage()); + } + // This might be a deadlock, if so, let's retry - conn.rollback(); if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) { LOG.warn("Caught deadlock, retrying db creation"); prepDb(); @@ -219,14 +225,20 @@ public static int countLockComponents(long lockId) throws Exception { } } - public static int findNumCurrentLocks() throws Exception { + /** + * Utility method used to run COUNT queries like "select count(*) from ..." against metastore tables + * @param countQuery countQuery text + * @return count countQuery result + * @throws Exception + */ + public static int countQueryAgent(String countQuery) throws Exception { Connection conn = null; Statement stmt = null; ResultSet rs = null; try { conn = getConnection(); stmt = conn.createStatement(); - rs = stmt.executeQuery("select count(*) from hive_locks"); + rs = stmt.executeQuery(countQuery); if (!rs.next()) { return 0; } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 79c4f7a..d432c46 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -25,6 +25,7 @@ import org.apache.commons.dbcp.PoolableConnectionFactory; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.Warehouse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.dbcp.PoolingDataSource; @@ -952,6 +953,170 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) } /** + * Clean up corresponding records in metastore tables, specifically: + * TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS + */ + public void cleanupRecords(char type, Database db, Table table, + Iterator partitionIterator) throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + + try { + String dbName; + String tblName; + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + List queries = new ArrayList(); + StringBuilder buff = new StringBuilder(); + + switch (type) { + case 'd': // database + dbName = db.getName(); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + break; + case 't': // table + dbName = table.getDbName(); + tblName = table.getTableName(); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("' and tc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("' and ctc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("' and cq_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("' and cc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + break; + case 'p': // partition + dbName = table.getDbName(); + tblName = table.getTableName(); + List partCols = table.getPartitionKeys(); // partition columns + List partVals; // partition values + String partName; + + while (partitionIterator.hasNext()) { + Partition p = partitionIterator.next(); + partVals = p.getValues(); + partName = Warehouse.makePartName(partCols, partVals); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("' and tc_table='"); + buff.append(tblName); + buff.append("' and tc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("' and ctc_table='"); + buff.append(tblName); + buff.append("' and ctc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("' and cq_table='"); + buff.append(tblName); + buff.append("' and cq_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("' and cc_table='"); + buff.append(tblName); + buff.append("' and cc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + } + + break; + default: + throw new MetaException("Invalid object type for cleanup: " + type); + } + + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + stmt.executeUpdate(query); + } + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "cleanupRecords"); + if (e.getMessage().contains("does not exist")) { + LOG.warn("Cannot perform cleanup since metastore table does not exist"); + } else { + throw new MetaException("Unable to clean up " + StringUtils.stringifyException(e)); + } + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + cleanupRecords(type, db, table, partitionIterator); + } + } + + /** * For testing only, do not use. */ @VisibleForTesting @@ -1396,7 +1561,7 @@ private void checkQFileTestHack() { TxnDbUtil.prepDb(); } catch (Exception e) { // We may have already created the tables and thus don't need to redo it. - if (!e.getMessage().contains("already exists")) { + if (e.getMessage() != null && !e.getMessage().contains("already exists")) { throw new RuntimeException("Unable to set up transaction database for" + " testing: " + e.getMessage()); } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 5e0306a..d456b87 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.CheckLockRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; @@ -38,15 +39,18 @@ import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnOpenException; import org.apache.hadoop.hive.metastore.api.UnlockRequest; import java.sql.SQLException; +import java.util.Iterator; import java.util.List; import java.util.Set; @@ -216,6 +220,17 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) throws NoSuchTxnException, TxnAbortedException, MetaException; /** + * Clean up corresponding records in metastore tables + * @param type object type: 'd' - database, 't' - table, 'p' - partition + * @param db database object + * @param table table object + * @param partitionIterator partition iterator + * @throws MetaException + */ + public void cleanupRecords(char type, Database db, Table table, + Iterator partitionIterator) throws MetaException; + + /** * Timeout transactions and/or locks. This should only be called by the compactor. */ public void performTimeOuts(); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index b7502c2..e79b23a 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -23,11 +23,15 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.Map; import java.util.Set; public class TxnUtils { @@ -94,4 +98,56 @@ public static TxnStore getTxnStore(HiveConf conf) { throw new RuntimeException(e); } } + + /** Note, this method should be logically in sync with AcidUtils#isAcidTable. + * Checks metadata to make sure it's a valid ACID table at metadata level + * Three things we will check: + * 1. TBLPROPERTIES 'transactional'='true' + * 2. The table should be bucketed + * 3. InputFormatClass/OutputFormatClass should implement AcidInputFormat/AcidOutputFormat + * Currently OrcInputFormat/OrcOutputFormat is the only implementer + * Note, users are responsible for using the correct TxnManager. We do not look at + * SessionState.get().getTxnMgr().supportsAcid() here + * @param table table + * @return true if table is a legit ACID table, false otherwise + */ + public static boolean isAcidTable(Table table) { + if (table == null) { + return false; + } + Map parameters = table.getParameters(); + String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (tableIsTransactional == null || !tableIsTransactional.equalsIgnoreCase("true")) { + return false; + } + + List bucketCols = table.getSd().getBucketCols(); + if (bucketCols == null || bucketCols.isEmpty()) { + return false; + } + + String inputFormat = table.getSd().getInputFormat(); + String outputFormat = table.getSd().getOutputFormat(); + Class inputFormatClass; + Class outputFormatClass; + Class acidInputFormatClass; + Class acidOutputFormatClass; + try { + inputFormatClass = Class.forName(inputFormat); + outputFormatClass = Class.forName(outputFormat); + acidInputFormatClass = Class.forName("org.apache.hadoop.hive.ql.io.AcidInputFormat"); + acidOutputFormatClass = Class.forName("org.apache.hadoop.hive.ql.io.AcidOutputFormat"); + } catch (ClassNotFoundException e) { + LOG.warn("Class cannot be found. InputFormat class: " + inputFormat + ", OutputFormat class: " + + outputFormat); + return false; + } + if (inputFormatClass == null || outputFormatClass == null || + !acidInputFormatClass.isAssignableFrom(inputFormatClass) || + !acidOutputFormatClass.isAssignableFrom(outputFormatClass)) { + return false; + } + + return true; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 520ae74..df69ab2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -704,7 +704,8 @@ public static void setTransactionalTableScan(Configuration conf, boolean isAcidT HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable); } - /** Checks metadata to make sure it's a valid ACID table at metadata level + /** Note, this method should be logically in sync with TxnUtils#isAcidTable. + * Checks metadata to make sure it's a valid ACID table at metadata level * Three things we will check: * 1. TBLPROPERTIES 'transactional'='true' * 2. The table should be bucketed diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index f4debfe..9b00435 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -331,14 +331,7 @@ public void testNonAcidToAcidConversionAndMajorCompaction() throws Exception { // 4. Perform a major compaction runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); - Worker w = new Worker(); - w.setThreadId((int) w.getId()); - w.setHiveConf(hiveConf); - AtomicBoolean stop = new AtomicBoolean(); - AtomicBoolean looped = new AtomicBoolean(); - stop.set(true); - w.init(stop, looped); - w.run(); + runWorker(hiveConf); // There should be 1 new directory: base_xxxxxxx. // Original bucket files and delta directory should stay until Cleaner kicks in. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + @@ -375,14 +368,7 @@ public void testNonAcidToAcidConversionAndMajorCompaction() throws Exception { // Before Cleaner, there should be 5 items: // 2 original files, 1 original directory, 1 base directory and 1 delta directory Assert.assertEquals(5, status.length); - Cleaner c = new Cleaner(); - c.setThreadId((int) c.getId()); - c.setHiveConf(hiveConf); - stop = new AtomicBoolean(); - looped = new AtomicBoolean(); - stop.set(true); - c.init(stop, looped); - c.run(); + runCleaner(hiveConf); // There should be only 1 directory left: base_xxxxxxx. // Original bucket files and delta directory should have been cleaned up. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + @@ -596,7 +582,7 @@ else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) { } return compactionsByState; } - private static void runWorker(HiveConf hiveConf) throws MetaException { + public static void runWorker(HiveConf hiveConf) throws MetaException { AtomicBoolean stop = new AtomicBoolean(true); Worker t = new Worker(); t.setThreadId((int) t.getId()); @@ -605,7 +591,7 @@ private static void runWorker(HiveConf hiveConf) throws MetaException { t.init(stop, looped); t.run(); } - private static void runCleaner(HiveConf hiveConf) throws MetaException { + public static void runCleaner(HiveConf hiveConf) throws MetaException { AtomicBoolean stop = new AtomicBoolean(true); Cleaner t = new Cleaner(); t.setThreadId((int) t.getId()); diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 1b07d4b..d1b370e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -248,34 +248,233 @@ public void testDummyTxnManagerOnAcidTable() throws Exception { cpr = driver.run("create table T11 (a int, b int) clustered by(b) into 2 buckets stored as orc"); checkCmdOnDriver(cpr); - // Now switch to DummyTxnManager - conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); - txnMgr = SessionState.get().initTxnMgr(conf); - Assert.assertTrue(txnMgr instanceof DummyTxnManager); - // All DML should fail with DummyTxnManager on ACID table + useDummyTxnManagerTemporarily(conf); cpr = driver.compileAndRespond("select * from T10"); Assert.assertEquals(ErrorMsg.TXNMGR_NOT_ACID.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table")); + useDummyTxnManagerTemporarily(conf); cpr = driver.compileAndRespond("insert into table T10 values (1, 2)"); Assert.assertEquals(ErrorMsg.TXNMGR_NOT_ACID.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table")); + useDummyTxnManagerTemporarily(conf); cpr = driver.compileAndRespond("insert overwrite table T10 select a, b from T11"); Assert.assertEquals(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("INSERT OVERWRITE not allowed on table with OutputFormat" + " that implements AcidOutputFormat while transaction manager that supports ACID is in use")); + useDummyTxnManagerTemporarily(conf); cpr = driver.compileAndRespond("update T10 set a=0 where b=1"); Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); + useDummyTxnManagerTemporarily(conf); cpr = driver.compileAndRespond("delete from T10"); Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); } + /** + * Temporarily set DummyTxnManager as the txn manager for the session. + * HIVE-10632: we have to do this for every new query, because this jira introduced an AcidEventListener + * in HiveMetaStore, which will instantiate a txn handler, but due to HIVE-12902, we have to call + * TxnHandler.setConf and TxnHandler.checkQFileTestHack and TxnDbUtil.setConfValues, which will + * set txn manager back to DbTxnManager. + */ + private void useDummyTxnManagerTemporarily(HiveConf hiveConf) throws Exception { + hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + txnMgr = SessionState.get().initTxnMgr(hiveConf); + Assert.assertTrue(txnMgr instanceof DummyTxnManager); + } + + /** + * Normally the compaction process will clean up records in TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, + * COMPACTION_QUEUE and COMPLETED_COMPACTIONS. But if a table/partition has been dropped before + * compaction and there are still relevant records in those metastore tables, the Initiator will + * complain about not being able to find the table/partition. This method is to test and make sure + * we clean up relevant records as soon as a table/partition is dropped. + * + * Note, here we don't need to worry about cleaning up TXNS table, since it's handled separately. + * @throws Exception + */ + @Test + public void testMetastoreTablesCleanup() throws Exception { + CommandProcessorResponse cpr = driver.run("create database if not exists temp"); + checkCmdOnDriver(cpr); + + // Create some ACID tables: T10, T11 - unpartitioned table, T12p, T13p - partitioned table + cpr = driver.run("create table temp.T10 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table temp.T11 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table temp.T12p (a int, b int) partitioned by (ds string, hour string) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table temp.T13p (a int, b int) partitioned by (ds string, hour string) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + + // Successfully insert some data into ACID tables, so that we have records in COMPLETED_TXN_COMPONENTS + cpr = driver.run("insert into temp.T10 values (1, 1)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T10 values (2, 2)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T11 values (3, 3)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T11 values (4, 4)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T12p partition (ds='today', hour='1') values (5, 5)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (6, 6)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T13p partition (ds='today', hour='1') values (7, 7)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T13p partition (ds='tomorrow', hour='2') values (8, 8)"); + checkCmdOnDriver(cpr); + int count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11')"); + Assert.assertEquals(4, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t12p', 't13p')"); + Assert.assertEquals(4, count); + + // Fail some inserts, so that we have records in TXN_COMPONENTS + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + cpr = driver.run("insert into temp.T10 values (9, 9)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T11 values (10, 10)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T12p partition (ds='today', hour='1') values (11, 11)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T13p partition (ds='today', hour='1') values (12, 12)"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(4, count); + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + + // Drop a table/partition; corresponding records in TXN_COMPONENTS and COMPLETED_TXN_COMPONENTS should disappear + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t10'"); + Assert.assertEquals(1, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t10'"); + Assert.assertEquals(2, count); + cpr = driver.run("drop table temp.T10"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t10'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t10'"); + Assert.assertEquals(0, count); + + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t12p' and TC_PARTITION='ds=today/hour=1'"); + Assert.assertEquals(1, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t12p' and CTC_PARTITION='ds=today/hour=1'"); + Assert.assertEquals(1, count); + cpr = driver.run("alter table temp.T12p drop partition (ds='today', hour='1')"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t12p' and TC_PARTITION='ds=today/hour=1'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t12p' and CTC_PARTITION='ds=today/hour=1'"); + Assert.assertEquals(0, count); + + // Successfully perform compaction on a table/partition, so that we have successful records in COMPLETED_COMPACTIONS + cpr = driver.run("alter table temp.T11 compact 'minor'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='i'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='r' and CQ_TYPE='i'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='s' and CC_TYPE='i'"); + Assert.assertEquals(1, count); + + cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'minor'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='i'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='r' and CQ_TYPE='i'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='s' and CC_TYPE='i'"); + Assert.assertEquals(1, count); + + // Fail compaction, so that we have failed records in COMPLETED_COMPACTIONS + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); + cpr = driver.run("alter table temp.T11 compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='f' and CC_TYPE='a'"); + Assert.assertEquals(1, count); + + cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='f' and CC_TYPE='a'"); + Assert.assertEquals(1, count); + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false); + + // Put 2 records into COMPACTION_QUEUE and do nothing + cpr = driver.run("alter table temp.T11 compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + + // Drop a table/partition, corresponding records in COMPACTION_QUEUE and COMPLETED_COMPACTIONS should disappear + cpr = driver.run("drop table temp.T11"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11'"); + Assert.assertEquals(0, count); + + cpr = driver.run("alter table temp.T12p drop partition (ds='tomorrow', hour='2')"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p'"); + Assert.assertEquals(0, count); + + // Put 1 record into COMPACTION_QUEUE and do nothing + cpr = driver.run("alter table temp.T13p partition (ds='today', hour='1') compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t13p' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + + // Drop database, everything in all 4 meta tables should disappear + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(1, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(2, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(1, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + cpr = driver.run("drop database if exists temp cascade"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + } + private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) { Assert.assertEquals(l.toString(),l.getType(), type); Assert.assertEquals(l.toString(),l.getState(), state);