diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8cd401b364..84fd4677f1 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -540,6 +540,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false, "Indicates whether replication dump only metadata information or data + metadata. \n" + "This config makes hive.repl.include.external.tables config ineffective."), + REPL_RETAIN_PREV_DUMP_DIR("hive.repl.retain.prev.dump.dir", false, + "If this is set to false, then all previously used dump-directories will be deleted after repl-dump. " + + "If true, a number of latest dump-directories specified by hive.repl.retain.prev.dump.dir.count will be retained"), + REPL_RETAIN_PREV_DUMP_DIR_COUNT("hive.repl.retain.prev.dump.dir.count", 3, + "Indicates maximium number of latest previously used dump-directories which would be retained when " + + "hive.repl.retain.prev.dump.dir is set to true"), REPL_INCLUDE_MATERIALIZED_VIEWS("hive.repl.include.materialized.views", false, "Indicates whether replication of materialized views is enabled."), REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY("hive.repl.dump.skip.immutable.data.copy", false, diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 81e610eea4..f81fe9132f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -2018,6 +2018,129 @@ public void testIncrementalLoadWithPreviousDumpDeleteFailed() throws IOException verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror); } + @Test + public void testConfiguredDeleteOfPrevDumpDir() throws IOException { + boolean verifySetupOriginal = verifySetupSteps; + verifySetupSteps = true; + String nameOfTest = "testConfigureDeleteOfPrevDumpDir"; + String dbName = createDB(nameOfTest, driver); + String replDbName = dbName + "_dupe"; + List withConfigDeletePrevDump = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'= 'false' "); + List withConfigRetainPrevDump = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'= 'true' ", + "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT + "'= '2' "); + + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + + String[] unptnData = new String[] {"eleven", "twelve"}; + String[] ptnData1 = new String[] {"thirteen", "fourteen", "fifteen"}; + String[] ptnData2 = new String[] {"fifteen", "sixteen", "seventeen"}; + String[] empty = new String[] {}; + + String unptnLocn = new Path(TEST_PATH, nameOfTest + "_unptn").toUri().getPath(); + String ptnLocn1 = new Path(TEST_PATH, nameOfTest + "_ptn1").toUri().getPath(); + String ptnLocn2 = new Path(TEST_PATH, nameOfTest + "_ptn2").toUri().getPath(); + + createTestDataFile(unptnLocn, unptnData); + createTestDataFile(ptnLocn1, ptnData1); + createTestDataFile(ptnLocn2, ptnData2); + + run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver); + run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver); + run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned_late", unptnData, driver); + + //perform first incremental with default option and check that bootstrap-dump-dir gets deleted + Path bootstrapDumpDir = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + FileSystem fs = FileSystem.get(bootstrapDumpDir.toUri(), hconf); + assertTrue(fs.exists(bootstrapDumpDir)); + Tuple incrDump = replDumpDb(dbName); + assertFalse(fs.exists(bootstrapDumpDir)); + + + loadAndVerify(replDbName, dbName, incrDump.lastReplId); + verifyRun("SELECT * from " + replDbName + ".unptned", unptnData, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror); + + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver); + + + //Perform 2nd incremental with retain option. + //Check 1st incremental dump-dir is present even after 2nd incr dump. + Path incrDumpDir1 = new Path(incrDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + incrDump = replDumpDb(dbName, withConfigRetainPrevDump); + assertTrue(fs.exists(incrDumpDir1)); + + loadAndVerify(replDbName, dbName, incrDump.lastReplId); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror); + + run("CREATE TABLE " + dbName + + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName + + ".ptned WHERE b=1", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptnData1, driver); + + + //Perform 3rd incremental with retain option, retaining last 2 consumed dump-dirs. + //Verify 1st and 2nd incr-dump-dirs are present after 3rd incr-dump + Path incrDumpDir2 = new Path(incrDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + incrDump = replDumpDb(dbName, withConfigRetainPrevDump); + assertTrue(fs.exists(incrDumpDir1)); + assertTrue(fs.exists(incrDumpDir2)); + + loadAndVerify(replDbName, dbName, incrDump.lastReplId); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror); + + + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName + + ".ptned WHERE b=2", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver); + + + //perform 4'th incr-dump with retain option in policy, retaining only last 2 dump-dirs + //verify incr-1 dumpdir gets deleted, incr-2 and incr-3 remain + Path incrDumpDir3 = new Path(incrDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + incrDump = replDumpDb(dbName, withConfigRetainPrevDump); + assertFalse(fs.exists(incrDumpDir1)); + assertTrue(fs.exists(incrDumpDir2)); + assertTrue(fs.exists(incrDumpDir3)); + + loadAndVerify(replDbName, dbName, incrDump.lastReplId); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptnData2, driverMirror); + + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=3) SELECT a FROM " + dbName + + ".ptned WHERE b=2", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=3", ptnData2, driver); + + + //ensure 4'th incr-dump dir is present + //perform 5'th incr-dump with retain option to be false + //verify all prev dump-dirs get deleted + Path incrDumpDir4 = new Path(incrDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(incrDumpDir4)); + incrDump = replDumpDb(dbName, withConfigDeletePrevDump); + assertFalse(fs.exists(incrDumpDir2)); + assertFalse(fs.exists(incrDumpDir3)); + assertFalse(fs.exists(incrDumpDir4)); + + loadAndVerify(replDbName, dbName, incrDump.lastReplId); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=3", ptnData2, driverMirror); + + verifySetupSteps = verifySetupOriginal; + } + @Test public void testIncrementalInsertToPartition() throws IOException { String testName = "incrementalInsertToPartition"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index f261889c86..c897bb4b00 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -103,6 +103,7 @@ import java.util.List; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Base64; import java.util.UUID; import java.util.ArrayList; @@ -305,8 +306,22 @@ private void deleteAllPreviousDumpMeta(Path currentDumpPath) { if (fs.exists(dumpRoot)) { FileStatus[] statuses = fs.listStatus(dumpRoot, path -> !path.equals(currentDumpPath) && !path.toUri().getPath().equals(currentDumpPath.toString())); + + int retainPrevDumpDirCount = conf.getIntVar(HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT); + int numDumpDirs = statuses.length; + if(shouldRetainPrevDumpDirs()) { + Arrays.sort(statuses, (Comparator. + comparingLong(fileStatus1 -> fileStatus1.getModificationTime()) + .thenComparingLong(fileStatus2 -> fileStatus2.getModificationTime()))); + } for (FileStatus status : statuses) { - fs.delete(status.getPath(), true); + //based on config, either delete all previous dump-dirs + //or delete a minimum number of oldest dump-directories + if(!shouldRetainPrevDumpDirs() || numDumpDirs > retainPrevDumpDirCount){ + fs.delete(status.getPath(), true); + numDumpDirs--; + + } } } } catch (Exception ex) { @@ -428,6 +443,13 @@ private boolean isMaterializedViewsReplEnabled() { return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_MATERIALIZED_VIEWS); } + /** + * Decide whether to retain previous dump-directories after repl-dump + */ + private boolean shouldRetainPrevDumpDirs() { + return conf.getBoolVar(HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR); + } + /** * Decide whether to dump ACID tables. * @param tableName - Name of ACID table to be replicated