diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java index e63250c9fb..14ec9c5a67 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; @@ -119,6 +120,7 @@ public void testRecyclePartTable() throws Exception { client.dropDatabase(dbName, true, true); Database db = new Database(); + db.putToParameters(SOURCE_OF_REPLICATION, "1,2,3"); db.setName(dbName); client.createDatabase(db); @@ -204,6 +206,7 @@ public void testRecycleNonPartTable() throws Exception { client.dropDatabase(dbName, true, true); Database db = new Database(); + db.putToParameters(SOURCE_OF_REPLICATION,"1,2,3"); db.setName(dbName); client.createDatabase(db); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 8b33b78548..7f9fb980c7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -89,6 +89,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; public class TestReplicationScenarios { @@ -3156,7 +3157,55 @@ public void testRecycleFileDropTempTable() throws IOException { assertTrue(fileCount == fileCountAfter); } + @Test + public void testRecycleFileNonReplDatabase() throws IOException { + String dbName = createDBNonRepl(testName.getMethodName(), driver); + + String cmDir = hconf.getVar(HiveConf.ConfVars.REPLCMDIR); + Path path = new Path(cmDir); + FileSystem fs = path.getFileSystem(hconf); + ContentSummary cs = fs.getContentSummary(path); + long fileCount = cs.getFileCount(); + + run("CREATE TABLE " + dbName + ".normal(a int)", driver); + run("INSERT INTO " + dbName + ".normal values (1)", driver); + + cs = fs.getContentSummary(path); + long fileCountAfter = cs.getFileCount(); + assertTrue(fileCount == fileCountAfter); + + run("INSERT INTO " + dbName + ".normal values (3)", driver); + run("TRUNCATE TABLE " + dbName + ".normal", driver); + + cs = fs.getContentSummary(path); + fileCountAfter = cs.getFileCount(); + assertTrue(fileCount == fileCountAfter); + + run("INSERT INTO " + dbName + ".normal values (4)", driver); + run("ALTER TABLE " + dbName + ".normal RENAME to " + dbName + ".normal1", driver); + verifyRun("SELECT count(*) from " + dbName + ".normal1", new String[]{"1"}, driver); + + cs = fs.getContentSummary(path); + fileCountAfter = cs.getFileCount(); + assertTrue(fileCount == fileCountAfter); + + run("INSERT INTO " + dbName + ".normal1 values (5)", driver); + run("DROP TABLE " + dbName + ".normal1", driver); + + cs = fs.getContentSummary(path); + fileCountAfter = cs.getFileCount(); + assertTrue(fileCount == fileCountAfter); + } + private static String createDB(String name, IDriver myDriver) { + LOG.info("Testing " + name); + String dbName = name + "_" + tid; + run("CREATE DATABASE " + dbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')", myDriver); + return dbName; + } + + private static String createDBNonRepl(String name, IDriver myDriver) { LOG.info("Testing " + name); String dbName = name + "_" + tid; run("CREATE DATABASE " + dbName, myDriver); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 8ad507f5ae..9a2d296c05 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.shims.Utils; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import org.junit.rules.TestName; import org.junit.rules.TestRule; import org.slf4j.Logger; @@ -97,7 +98,8 @@ public void setup() throws Throwable { replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; - primary.run("create database " + primaryDbName); + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); } @After diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index df9bde059e..5666e44876 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -52,6 +52,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; public class TestReplicationScenariosAcrossInstances { @Rule @@ -61,7 +62,7 @@ public TestRule replV1BackwardCompat; protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); - private static WarehouseInstance primary, replica; + private static WarehouseInstance primary, replica, primaryInTestNo; private String primaryDbName, replicatedDbName; @BeforeClass @@ -77,6 +78,12 @@ public static void classLevelSetup() throws Exception { }}; primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); + HashMap overridesForHiveConfNoTest = new HashMap() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false"); + put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "false"); + }}; + primaryInTestNo = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); } @AfterClass @@ -90,7 +97,8 @@ public void setup() throws Throwable { replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; - primary.run("create database " + primaryDbName); + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); } @After @@ -733,4 +741,12 @@ public void testIncrementalReplWithDropAndCreateTableDifferentPartitionTypeAndIn .run("select id from table3") .verifyResults(new String[] {"30"}); } + + @Test + public void testDroppingReplEnabledDb() throws Throwable { + primaryInTestNo.hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, false); + primaryInTestNo.run("create database " + primaryDbName + "_test WITH DBPROPERTIES (' " + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + primaryInTestNo.runFailure("drop database " + primaryDbName + "_test"); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 63fe8adc8b..bb82ba0eaa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -1587,10 +1587,10 @@ private void moveDir(FileSystem fs, Path from, Path to) throws HiveException { } } - private void deleteDir(Path dir) throws HiveException { + private void deleteDir(Path dir, Database db) throws HiveException { try { Warehouse wh = new Warehouse(conf); - wh.deleteDir(dir, true); + wh.deleteDir(dir, true, db); } catch (MetaException e) { throw new HiveException(e); } @@ -1845,7 +1845,7 @@ private int archive(Hive db, AlterTableSimpleDesc simpleDesc, // If a failure occurs here, the directory containing the original files // will not be deleted. The user will run ARCHIVE again to clear this up if(pathExists(intermediateOriginalDir)) { - deleteDir(intermediateOriginalDir); + deleteDir(intermediateOriginalDir, db.getDatabase(tbl.getDbName())); } if(recovery) { @@ -2051,7 +2051,7 @@ private int unarchive(Hive db, AlterTableSimpleDesc simpleDesc) // If a failure happens here, the intermediate archive files won't be // deleted. The user will need to call unarchive again to clear those up. if(pathExists(intermediateArchivedDir)) { - deleteDir(intermediateArchivedDir); + deleteDir(intermediateArchivedDir, db.getDatabase(tbl.getDbName())); } if(recovery) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e8554f9ce5..241a4f650d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -112,6 +112,7 @@ import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.CheckConstraintsRequest; @@ -1737,8 +1738,10 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new // base_x. (there is Insert Overwrite and Load Data Overwrite) boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + boolean needRecycle = !tbl.isTemporary() + && ReplChangeManager.isReplPolicySet(Hive.get().getDatabase(tbl.getDbName())); replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, - isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); + isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle); } else { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, @@ -2304,8 +2307,10 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType if (loadFileType == LoadFileType.REPLACE_ALL && !isTxnTable) { //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361 boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + boolean needRecycle = !tbl.isTemporary() + && ReplChangeManager.isReplPolicySet(Hive.get().getDatabase(tbl.getDbName())); replaceFiles(tblPath, loadPath, destPath, tblPath, conf, isSrcLocal, isAutopurge, - newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); + newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle); } else { try { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java index 8fccf369f8..04090a4d3a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -90,11 +92,9 @@ private void export_meta_data(PreDropTableEvent tableEvent) throws MetaException EximUtil.createExportDump(fs, outFile, mTbl, null, null, new HiveConf(conf, MetaDataExportListener.class)); if (moveMetadataToTrash == true) { - wh.deleteDir(metaPath, true); + wh.deleteDir(metaPath, true, Hive.get().getDatabase(tbl.getDbName())); } - } catch (IOException e) { - throw new MetaException(e.getMessage()); - } catch (SemanticException e) { + } catch (IOException | MetaException | HiveException e) { throw new MetaException(e.getMessage()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index fe6d2d663d..7574547736 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -20,6 +20,8 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -320,7 +322,7 @@ private static String idWatermark(CompactionInfo ci) { return " id=" + ci.id; } private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci) - throws IOException { + throws IOException, HiveException { Path locPath = new Path(location); AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList); List obsoleteDirs = dir.getObsolete(); @@ -349,7 +351,9 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti for (Path dead : filesToDelete) { LOG.debug("Going to delete path " + dead.toString()); - replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true); + if (ReplChangeManager.isReplPolicySet(Hive.get().getDatabase(ci.dbname))) { + replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true); + } fs.delete(dead, true); } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 0be0aaa10c..771406c962 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -242,7 +242,8 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam " already exists : " + destPath); } // check that src exists and also checks permissions necessary, rename src to dest - if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath, true)) { + if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath, + ReplChangeManager.isReplPolicySet(msdb.getDatabase(catName, dbname)))) { dataWasMoved = true; } } catch (IOException | MetaException e) { @@ -624,7 +625,7 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String } //rename the data directory - wh.renameDir(srcPath, destPath, true); + wh.renameDir(srcPath, destPath, ReplChangeManager.isReplPolicySet(msdb.getDatabase(catName, dbname))); LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done."); dataWasMoved = true; } @@ -632,9 +633,9 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String LOG.error("Cannot rename partition directory from " + srcPath + " to " + destPath, e); throw new InvalidOperationException("Unable to access src or dest location for partition " + tbl.getDbName() + "." + tbl.getTableName() + " " + new_part.getValues()); - } catch (MetaException me) { + } catch (MetaException | NoSuchObjectException me) { LOG.error("Cannot rename partition directory from " + srcPath + " to " + destPath, me); - throw me; + throw new MetaException(me.getMessage()); } new_part.getSd().setLocation(newPartLoc); diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index a2b874353a..44698cf078 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -980,6 +980,7 @@ public void create_catalog(CreateCatalogRequest rqst) startFunction("create_catalog", ": " + catalog.toString()); boolean success = false; Exception ex = null; + Database db = null; try { try { getMS().getCatalog(catalog.getName()); @@ -998,6 +999,11 @@ public void create_catalog(CreateCatalogRequest rqst) RawStore ms = getMS(); Path catPath = new Path(catalog.getLocationUri()); + + // Create a default database inside the catalog + db = new Database(DEFAULT_DATABASE_NAME, "Default database for catalog " + + catalog.getName(), catalog.getLocationUri(), Collections.emptyMap()); + boolean madeDir = false; Map transactionalListenersResponses = Collections.emptyMap(); try { @@ -1013,9 +1019,6 @@ public void create_catalog(CreateCatalogRequest rqst) ms.openTransaction(); ms.createCatalog(catalog); - // Create a default database inside the catalog - Database db = new Database(DEFAULT_DATABASE_NAME, "Default database for catalog " + - catalog.getName(), catalog.getLocationUri(), Collections.emptyMap()); db.setCatalogName(catalog.getName()); create_database_core(ms, db); @@ -1031,7 +1034,7 @@ public void create_catalog(CreateCatalogRequest rqst) if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(catPath, true); + wh.deleteDir(catPath, true, false, ReplChangeManager.isReplPolicySet(db)); } } @@ -1161,7 +1164,9 @@ private void dropCatalogCore(String catName) success = ms.commitTransaction(); } finally { if (success) { - wh.deleteDir(wh.getDnsPath(new Path(cat.getLocationUri())), false); + // All the databases (except default, which is always repl enabled) must have been dropped before calling + // drop catalog. So needCmRecycle should be passed as true. + wh.deleteDir(wh.getDnsPath(new Path(cat.getLocationUri())), false, false, true); } else { ms.rollbackTransaction(); } @@ -1223,7 +1228,7 @@ private void create_database_core(RawStore ms, final Database db) if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(dbPath, true); + wh.deleteDir(dbPath, true, false, db); } } @@ -1381,6 +1386,10 @@ private void drop_database_core(RawStore ms, String catName, ms.openTransaction(); db = ms.getDatabase(catName, name); + if (!isInTest && ReplChangeManager.isReplPolicySet(db)) { + throw new InvalidOperationException("can not drop a database which is a source of replication"); + } + firePreEvent(new PreDropDatabaseEvent(db, this)); String catPrependedName = MetaStoreUtils.prependCatalogToDbName(catName, name, conf); @@ -1512,14 +1521,14 @@ private void drop_database_core(RawStore ms, String catName, ms.rollbackTransaction(); } else if (deleteData) { // Delete the data in the partitions which have other locations - deletePartitionData(partitionPaths); + deletePartitionData(partitionPaths, false, db); // Delete the data in the tables which have other locations for (Path tablePath : tablePaths) { - deleteTableData(tablePath); + deleteTableData(tablePath, false, db); } // Delete the data in the database try { - wh.deleteDir(new Path(db.getLocationUri()), true); + wh.deleteDir(new Path(db.getLocationUri()), true, false, db); } catch (Exception e) { LOG.error("Failed to delete database directory: " + db.getLocationUri() + " " + e.getMessage()); @@ -1896,7 +1905,7 @@ private void create_table_core(final RawStore ms, final Table tbl, if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(tblPath, true); + wh.deleteDir(tblPath, true, ms.getDatabase(tbl.getCatName(), tbl.getDbName())); } } @@ -2390,11 +2399,12 @@ private boolean drop_table_core(final RawStore ms, final String catName, final S if (!success) { ms.rollbackTransaction(); } else if (deleteData && !isExternal) { + Database db = ms.getDatabase(tbl.getCatName(), tbl.getDbName()); // Data needs deletion. Check if trash may be skipped. // Delete the data in the partitions which have other locations - deletePartitionData(partPaths, ifPurge); + deletePartitionData(partPaths, ifPurge, db); // Delete the data in the table - deleteTableData(tblPath, ifPurge); + deleteTableData(tblPath, ifPurge, db); // ok even if the data is not deleted } @@ -2409,27 +2419,19 @@ private boolean drop_table_core(final RawStore ms, final String catName, final S return success; } - /** - * Deletes the data in a table's location, if it fails logs an error - * - * @param tablePath - */ - private void deleteTableData(Path tablePath) { - deleteTableData(tablePath, false); - } - /** * Deletes the data in a table's location, if it fails logs an error * * @param tablePath * @param ifPurge completely purge the table (skipping trash) while removing * data from warehouse + * @param db database the table belongs to */ - private void deleteTableData(Path tablePath, boolean ifPurge) { + private void deleteTableData(Path tablePath, boolean ifPurge, Database db) { if (tablePath != null) { try { - wh.deleteDir(tablePath, true, ifPurge); + wh.deleteDir(tablePath, true, ifPurge, db); } catch (Exception e) { LOG.error("Failed to delete table directory: " + tablePath + " " + e.getMessage()); @@ -2437,16 +2439,6 @@ private void deleteTableData(Path tablePath, boolean ifPurge) { } } - /** - * Give a list of partitions' locations, tries to delete each one - * and for each that fails logs an error. - * - * @param partPaths - */ - private void deletePartitionData(List partPaths) { - deletePartitionData(partPaths, false); - } - /** * Give a list of partitions' locations, tries to delete each one * and for each that fails logs an error. @@ -2455,11 +2447,11 @@ private void deletePartitionData(List partPaths) { * @param ifPurge completely purge the partition (skipping trash) while * removing data from warehouse */ - private void deletePartitionData(List partPaths, boolean ifPurge) { + private void deletePartitionData(List partPaths, boolean ifPurge, Database db) { if (partPaths != null && !partPaths.isEmpty()) { for (Path partPath : partPaths) { try { - wh.deleteDir(partPath, true, ifPurge); + wh.deleteDir(partPath, true, ifPurge, db); } catch (Exception e) { LOG.error("Failed to delete partition directory: " + partPath + " " + e.getMessage()); @@ -2701,7 +2693,7 @@ public void truncate_table(final String dbName, final String tableName, List e : addedPartitions.entrySet()) { if (e.getValue()) { // we just created this directory - it's not a case of pre-creation, so we nuke. - wh.deleteDir(new Path(e.getKey().location), true); + wh.deleteDir(new Path(e.getKey().location), true, + ms.getDatabase(tbl.getCatName(), tbl.getDbName())); } } @@ -3565,7 +3565,7 @@ private int add_partitions_pspec_core(RawStore ms, String catName, String dbName for (Map.Entry e : addedPartitions.entrySet()) { if (e.getValue()) { // we just created this directory - it's not a case of pre-creation, so we nuke. - wh.deleteDir(new Path(e.getKey().location), true); + wh.deleteDir(new Path(e.getKey().location), true, ms.getDatabase(catName, dbName)); } } } @@ -3717,7 +3717,8 @@ private Partition add_partition_core(final RawStore ms, success = ms.addPartition(part); } finally { if (!success && madeDir) { - wh.deleteDir(new Path(part.getSd().getLocation()), true); + wh.deleteDir(new Path(part.getSd().getLocation()), true, + ms.getDatabase(tbl.getCatName(), tbl.getDbName())); } } @@ -4039,13 +4040,14 @@ private boolean drop_partition_common(RawStore ms, String catName, String db_nam } // Archived partitions have har:/to_har_file as their location. // The original directory was saved in params + boolean isReplEnabled = ReplChangeManager.isReplPolicySet(ms.getDatabase(catName, db_name)); if (isArchived) { assert (archiveParentDir != null); - wh.deleteDir(archiveParentDir, true, mustPurge); + wh.deleteDir(archiveParentDir, true, mustPurge, isReplEnabled); } else { assert (partPath != null); - wh.deleteDir(partPath, true, mustPurge); - deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge); + wh.deleteDir(partPath, true, mustPurge, isReplEnabled); + deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, isReplEnabled); } // ok even if the data is not deleted } @@ -4073,12 +4075,13 @@ private static boolean isMustPurge(EnvironmentContext envContext, Table tbl) { || (tbl.isSetParameters() && "true".equalsIgnoreCase(tbl.getParameters().get("auto.purge"))); } - private void deleteParentRecursive(Path parent, int depth, boolean mustPurge) throws IOException, MetaException { + private void deleteParentRecursive(Path parent, int depth, boolean mustPurge, boolean needRecycle) + throws IOException, MetaException { if (depth > 0 && parent != null && wh.isWritable(parent)) { if (wh.isDir(parent) && wh.isEmpty(parent)) { - wh.deleteDir(parent, true, mustPurge); + wh.deleteDir(parent, true, mustPurge, needRecycle); } - deleteParentRecursive(parent.getParent(), depth - 1, mustPurge); + deleteParentRecursive(parent.getParent(), depth - 1, mustPurge, needRecycle); } } @@ -4226,13 +4229,14 @@ public DropPartitionsResult drop_partitions_req( : "dropPartition() will move partition-directories to trash-directory."); // Archived partitions have har:/to_har_file as their location. // The original directory was saved in params + boolean isReplEnabled = ReplChangeManager.isReplPolicySet(ms.getDatabase(catName, dbName)); for (Path path : archToDelete) { - wh.deleteDir(path, true, mustPurge); + wh.deleteDir(path, true, mustPurge, isReplEnabled); } for (PathAndPartValSize p : dirsToDelete) { - wh.deleteDir(p.path, true, mustPurge); + wh.deleteDir(p.path, true, mustPurge, isReplEnabled); try { - deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge); + deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, isReplEnabled); } catch (IOException ex) { LOG.warn("Error from deleteParentRecursive", ex); throw new MetaException("Failed to delete parent: " + ex.getMessage()); @@ -6851,7 +6855,8 @@ public void drop_function(String dbName, String funcName) // a copy is required to allow incremental replication to work correctly. if (func.getResourceUris() != null && !func.getResourceUris().isEmpty()) { for (ResourceUri uri : func.getResourceUris()) { - if (uri.getUri().toLowerCase().startsWith("hdfs:")) { + if (uri.getUri().toLowerCase().startsWith("hdfs:") && + ReplChangeManager.isReplPolicySet(get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]))) { wh.addToChangeManagement(new Path(uri.getUri())); } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 79ba7ff35b..1519bd81d3 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore; import java.io.IOException; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -33,6 +34,7 @@ import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; @@ -56,6 +58,7 @@ private static final String ORIG_LOC_TAG = "user.original-loc"; static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; private static final String URI_FRAGMENT_SEPARATOR = "#"; + public static final String SOURCE_OF_REPLICATION = "repl.source.for"; public enum RecycleType { MOVE, @@ -467,4 +470,29 @@ static void scheduleCMClearer(Configuration conf) { 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); } } + + public static boolean isReplPolicySet(Database db) { + // Can not judge, so assuming replication is not enabled. + if (db == null) { + LOG.warn("db object passed is null. isReplPolicySet is returning false"); + return false; + } + + // For default database replication is always enabled. + return (db.getName().toLowerCase().equals(org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME) || + getReplPolicyIdString(db) != null); + } + + public static String getReplPolicyIdString(Database db) { + if (db != null) { + Map m = db.getParameters(); + if ((m != null) && (m.containsKey(SOURCE_OF_REPLICATION))) { + String replPolicyId = m.get(SOURCE_OF_REPLICATION); + LOG.debug("repl policy for database {} is {}", db.getName(), replPolicyId); + return replPolicyId; + } + } + LOG.info("Repl policy is not set for database ", db.getName()); + return null; + } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java index 88cbfcdc4b..2af3f796d8 100755 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -301,18 +301,16 @@ void addToChangeManagement(Path file) throws MetaException { } } - public boolean deleteDir(Path f, boolean recursive) throws MetaException { - return deleteDir(f, recursive, false); + public boolean deleteDir(Path f, boolean recursive, Database db) throws MetaException { + return deleteDir(f, recursive, false, db); } - public boolean deleteDir(Path f, boolean recursive, boolean ifPurge) throws MetaException { - return deleteDir(f, recursive, ifPurge, true); + public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, Database db) throws MetaException { + return deleteDir(f, recursive, ifPurge, ReplChangeManager.isReplPolicySet(db)); } public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, boolean needCmRecycle) throws MetaException { - // no need to create the CM recycle file for temporary tables if (needCmRecycle) { - try { cm.recycle(f, RecycleType.MOVE, ifPurge); } catch (IOException e) {