diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java index 92bcefe5ee..878cce278a 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java @@ -3,6 +3,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.junit.Assert; @@ -156,4 +158,50 @@ public void testConcatenateMM() throws Exception { "t/base_0000002/000000_0"}}; checkResult(expected2, testQuery, false, "check data after concatenate", LOG); } + @Test + public void testRenameTable() throws Exception { + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + runStatementOnDriver("drop table if exists S"); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T(a int, b int) stored as orc"); + runStatementOnDriver("insert into T values(1,2),(4,5)"); + //put something in WRITE_SET + runStatementOnDriver("update T set b = 6 where b = 5"); + runStatementOnDriver("alter table T compact 'minor'"); + + runStatementOnDriver("alter table T RENAME TO S"); + + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from S"; + String[][] expected = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "s/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", + "s/delta_0000002_0000002_0000/bucket_00000"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='t'")); + + Assert.assertEquals( + TxnDbUtil.queryToString(hiveConf,"select * from COMPLETED_TXN_COMPONENTS"), 2, + TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='s'")); + Assert.assertEquals(2, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='s'")); + } } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java index f849b1a0c3..bfa425cc94 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java @@ -19,15 +19,22 @@ package org.apache.hadoop.hive.metastore; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.HiveObjectType; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; /** @@ -69,6 +76,48 @@ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaExcep } } + @Override + public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { + if (!TxnUtils.isTransactionalTable(tableEvent.getNewTable())) { + return; + } + Table oldTable = tableEvent.getOldTable(); + Table newTable = tableEvent.getNewTable(); + if(!oldTable.getCatName().equalsIgnoreCase(newTable.getCatName()) || + !oldTable.getDbName().equalsIgnoreCase(newTable.getDbName()) || + !oldTable.getTableName().equalsIgnoreCase(newTable.getTableName())) { + txnHandler = getTxnHandler(); + txnHandler.onRename( + oldTable.getCatName(), oldTable.getDbName(), oldTable.getTableName(), null, + newTable.getCatName(), newTable.getDbName(), newTable.getTableName(), null); + } + } + @Override + public void onAlterPartition (AlterPartitionEvent partitionEvent) throws MetaException { + Partition oldPart = partitionEvent.getOldPartition(); + Partition newPart = partitionEvent.getNewPartition(); + Table t = partitionEvent.getTable(); + String oldPartName = Warehouse.makePartName(t.getPartitionKeys(), oldPart.getValues()); + String newPartName = Warehouse.makePartName(t.getPartitionKeys(), newPart.getValues()); + if(!oldPartName.equals(newPartName)) { + txnHandler = getTxnHandler(); + txnHandler.onRename(t.getCatName(), t.getDbName(), t.getTableName(), oldPartName, + t.getCatName(), t.getDbName(), t.getTableName(), newPartName); + } + } + @Override + public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException { + Database oldDb = dbEvent.getOldDatabase(); + Database newDb = dbEvent.getNewDatabase(); + if(!oldDb.getCatalogName().equalsIgnoreCase(newDb.getCatalogName()) || + !oldDb.getName().equalsIgnoreCase(newDb.getName())) { + txnHandler = getTxnHandler(); + txnHandler.onRename( + oldDb.getCatalogName(), oldDb.getName(), null, null, + newDb.getCatalogName(), newDb.getName(), null, null); + } + } + private TxnStore getTxnHandler() { boolean hackOn = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST); diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index c513b4d7de..457096682f 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -2760,7 +2760,206 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, cleanupRecords(type, db, table, partitionIterator); } } + /** + * TODO: catalog hasn't been added to transactional tables yet, so it's passed in but not used + */ + @Override + public void onRename(String oldCatName, String oldDbName, String oldTabName, String oldPartName, + String newCatName, String newDbName, String newTabName, String newPartName) + throws MetaException { + + final boolean isCat = oldCatName != null && !oldCatName.equalsIgnoreCase(newCatName); + final boolean isDb = oldDbName != null && !oldDbName.equalsIgnoreCase(newDbName); + final boolean isTab = oldTabName != null && !oldTabName.equalsIgnoreCase(newTabName); + final boolean isPart = oldPartName != null && !oldPartName.equalsIgnoreCase(newPartName); + final String compName = oldCatName + "." + oldDbName + "." + oldTabName + "." + oldPartName; + + if(isPart) { + assert oldCatName != null && oldDbName != null && oldTabName != null; + assert !isTab && !isDb : "Can only change 1 component at a time " + compName; + } + if(isTab) { + assert oldCatName != null && oldDbName != null; + assert !isPart && !isDb : "Can only change 1 component at a time " + compName; + } + if(isDb) { + assert oldCatName != null; + assert !isPart && !isTab : "Can only change 1 component at a time " + compName; + } + + try { + Connection dbConn = null; + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + List queries = new ArrayList<>(); + + String update = "update TXN_COMPONENTS set "; + String where = " where "; + if(isPart) { + update += "TC_PARTITION = " + quoteString(newPartName); + where += "TC_PARTITION = " + quoteString(oldPartName) + " AND " + + "TC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "TC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + else if(isTab) { + update += "TC_TABLE = " + quoteString(normalizeCase(newTabName)); + where += "TC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "TC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + else if(isDb) { + update += "TC_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "TC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + update = "update COMPLETED_TXN_COMPONENTS set "; + where = " where "; + if(isPart) { + update += "CTC_PARTITION = " + quoteString(newPartName); + where += "CTC_PARTITION = " + quoteString(oldPartName) + " AND " + + "CTC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "CTC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + else if(isTab) { + update += "CTC_TABLE = " + quoteString(normalizeCase(newTabName)); + where += "CTC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "CTC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + else if(isDb) { + update += "CTC_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "CTC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + update = "update HIVE_LOCKS set "; + where = " where "; + if(isPart) { + update += "HL_PARTITION = " + quoteString(newPartName); + where += "HL_PARTITION = " + quoteString(oldPartName) + " AND " + + "HL_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "HL_DB = " + quoteString(normalizeCase(oldDbName)); + } + else if(isTab) { + update += "HL_TABLE = " + quoteString(normalizeCase(newTabName)); + where += "HL_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "HL_DB = " + quoteString(normalizeCase(oldDbName)); + } + else if(isDb) { + update += "HL_DB = " + quoteString(normalizeCase(newDbName)); + where += "HL_DB = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + update = "update COMPACTION_QUEUE set "; + where = " where "; + if(isPart) { + update += "CQ_PARTITION = " + quoteString(newPartName); + where += "CQ_PARTITION = " + quoteString(oldPartName) + " AND " + + "CQ_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "CQ_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + else if(isTab) { + update += "CQ_TABLE = " + quoteString(normalizeCase(newTabName)); + where += "CQ_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "CQ_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + else if(isDb) { + update += "CQ_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "CQ_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + update = "update COMPLETED_COMPACTIONS set "; + where = " where "; + if(isPart) { + update += "CC_PARTITION = " + quoteString(newPartName); + where += "CC_PARTITION = " + quoteString(oldPartName) + " AND " + + "CC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "CC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + else if(isTab) { + update += "CC_TABLE = " + quoteString(normalizeCase(newTabName)); + where += "CC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "CC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + else if(isDb) { + update += "CC_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "CC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + update = "update WRITE_SET set "; + where = " where "; + if(isPart) { + update += "WS_PARTITION = " + quoteString(newPartName); + where += "WS_PARTITION = " + quoteString(oldPartName) + " AND " + + "WS_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "WS_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + else if(isTab) { + update += "WS_TABLE = " + quoteString(normalizeCase(newTabName)); + where += "WS_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "WS_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + else if(isDb) { + update += "WS_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "WS_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + update = "update TXN_TO_WRITE_ID set "; + where = " where "; + if(isTab) { + update += "T2W_TABLE = " + quoteString(normalizeCase(newTabName)); + where += "T2W_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "T2W_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + else if(isDb) { + update += "T2W_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "T2W_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + update = "update NEXT_WRITE_ID set "; + where = " where "; + if(isTab) { + update += "NWI_TABLE = " + quoteString(normalizeCase(newTabName)); + where += "NWI_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND " + + "NWI_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + else if(isDb) { + update += "NWI_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "NWI_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + stmt.executeUpdate(query); + } + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "cleanupRecords"); + if (e.getMessage().contains("does not exist")) { + LOG.warn("Cannot perform cleanup since metastore table does not exist"); + } else { + throw new MetaException("Unable to clean up " + StringUtils.stringifyException(e)); + } + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + onRename(oldCatName, oldDbName, oldTabName, oldPartName, + newCatName, newDbName, newTabName, newPartName); + } + } /** * For testing only, do not use. */ diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index b8e398f744..4695f0deef 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -266,6 +266,11 @@ void addDynamicPartitions(AddDynamicPartitions rqst) void cleanupRecords(HiveObjectType type, Database db, Table table, Iterator partitionIterator) throws MetaException; + @RetrySemantics.Idempotent + void onRename(String oldCatName, String oldDbName, String oldTabName, String oldPartName, + String newCatName, String newDbName, String newTabName, String newPartName) + throws MetaException; + /** * Timeout transactions and/or locks. This should only be called by the compactor. */