diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index c46103a254..760bfb9afc 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -312,7 +312,22 @@ public void testBasic() throws IOException { verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driver); String replicatedDbName = dbName + "_dupe"; - bootstrapLoadAndVerify(dbName, replicatedDbName); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replicatedDbName); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + boolean dumpAckFound = false; + boolean loadAckFound = false; + for (FileStatus status : fs.listStatus(dumpPath)) { + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.DUMP_ACKNOWLEDGEMENT)) { + dumpAckFound = true; + } + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.LOAD_ACKNOWLEDGEMENT)) { + loadAckFound = true; + } + } + assertTrue(dumpAckFound); + assertTrue(loadAckFound); verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data, driverMirror); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); @@ -395,6 +410,9 @@ public void testTaskCreationOptimization() throws Throwable { assertEquals(false, hasMoveTask(task)); assertEquals(true, hasPartitionTask(task)); + Path loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + //delete load ack to reload the same dump + loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); loadAndVerify(dbNameReplica, dbName, dump.lastReplId); run("insert into table " + dbName + ".t2 partition(country='india') values ('delhi')", driver); @@ -406,6 +424,9 @@ public void testTaskCreationOptimization() throws Throwable { assertEquals(true, hasMoveTask(task)); assertEquals(true, hasPartitionTask(task)); + loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + //delete load ack to reload the same dump + loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); loadAndVerify(dbNameReplica, dbName, dump.lastReplId); run("insert into table " + dbName + ".t2 partition(country='us') values ('sf')", driver); @@ -488,7 +509,7 @@ public void testBootstrapLoadOnExistingDb() throws IOException { // Load to an empty database Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, dbName + "_empty"); - String replDumpLocn = bootstrapDump.dumpLocation; + verifyRun("SELECT * from " + dbName + "_empty.unptned", unptn_data, driverMirror); String[] nullReplId = new String[]{ "NULL" }; @@ -838,7 +859,26 @@ public void testIncrementalAdds() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2, driver); // Perform REPL-DUMP/LOAD - incrementalLoadAndVerify(dbName, replDbName); + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); + boolean dumpAckFound = false; + boolean loadAckFound = false; + assertFalse(fs.exists(new Path(bootstrapDump.dumpLocation))); + fs = new Path(incrementalDump.dumpLocation).getFileSystem(hconf); + Path dumpPath = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + dumpAckFound = false; + loadAckFound = false; + for (FileStatus status : fs.listStatus(dumpPath)) { + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.DUMP_ACKNOWLEDGEMENT)) { + dumpAckFound = true; + } + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.LOAD_ACKNOWLEDGEMENT)) { + loadAckFound = true; + } + } + + assertTrue(dumpAckFound); + assertTrue(loadAckFound); // VERIFY tables and partitions on destination for equivalence. verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); @@ -2376,7 +2416,7 @@ public void testTruncateWithCM() throws IOException { Tuple bootstrapDump = replDumpDb(dbName); String replDumpId = bootstrapDump.lastReplId; - String replDumpLocn = bootstrapDump.dumpLocation; + run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror); String[] empty = new String[] {}; String[] unptn_data = new String[] { "eleven", "thirteen" }; @@ -2387,39 +2427,33 @@ public void testTruncateWithCM() throws IOException { run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); Tuple firstInsert = replDumpDb(dbName); Integer numOfEventsIns1 = Integer.valueOf(firstInsert.lastReplId) - Integer.valueOf(replDumpId); + // load only first insert (1 record) + loadAndVerify(replDbName, dbName, firstInsert.lastReplId); + + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1, driverMirror); // x events to insert, last repl ID: replDumpId+2x run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')", driver); verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); Tuple secondInsert = replDumpDb(dbName); Integer numOfEventsIns2 = Integer.valueOf(secondInsert.lastReplId) - Integer.valueOf(firstInsert.lastReplId); + // load only second insert (2 records) + + loadAndVerify(replDbName, dbName, secondInsert.lastReplId); + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load2, driverMirror); // y event to truncate, last repl ID: replDumpId+2x+y run("TRUNCATE TABLE " + dbName + ".unptned", driver); verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", empty, driver); Tuple thirdTrunc = replDumpDb(dbName); Integer numOfEventsTrunc3 = Integer.valueOf(thirdTrunc.lastReplId) - Integer.valueOf(secondInsert.lastReplId); + // load only truncate (0 records) + loadAndVerify(replDbName, dbName, thirdTrunc.lastReplId); + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror); // x events to insert, last repl ID: replDumpId+3x+y run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_load1[0] + "')", driver); verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1, driver); - - run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror); - - // Dump and load only first insert (1 record) - loadAndVerify(replDbName, dbName, firstInsert.lastReplId); - - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1, driverMirror); - - // Dump and load only second insert (2 records) - - loadAndVerify(replDbName, dbName, secondInsert.lastReplId); - verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load2, driverMirror); - - // Dump and load only truncate (0 records) - loadAndVerify(replDbName, dbName, thirdTrunc.lastReplId); - verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror); - // Dump and load insert after truncate (1 record) incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load1, driverMirror); @@ -2434,10 +2468,7 @@ public void testIncrementalRepeatEventOnExistingObject() throws IOException, Int // Bootstrap dump/load String replDbName = dbName + "_dupe"; - Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - - // List to maintain the incremental dumps for each operation - List incrementalDumpList = new ArrayList(); + bootstrapLoadAndVerify(dbName, replDbName); String[] empty = new String[] {}; String[] unptn_data = new String[] { "ten" }; @@ -2447,48 +2478,48 @@ public void testIncrementalRepeatEventOnExistingObject() throws IOException, Int // INSERT EVENT to unpartitioned table run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); Tuple replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); Thread.sleep(1000); // INSERT EVENT to partitioned table with dynamic ADD_PARTITION run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=1) values('" + ptn_data_1[0] + "')", driver); + //Second dump without load should be a failure + verifyFail("REPL DUMP " + dbName, driverMirror); + //Load the previous dump first + loadAndVerify(replDbName, dbName, replDump.lastReplId); + replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // ADD_PARTITION EVENT to partitioned table run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // INSERT EVENT to partitioned table on existing partition run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=2) values('" + ptn_data_2[0] + "')", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // TRUNCATE_PARTITION EVENT on partitioned table run("TRUNCATE TABLE " + dbName + ".ptned PARTITION (b=1)", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // TRUNCATE_TABLE EVENT on unpartitioned table run("TRUNCATE TABLE " + dbName + ".unptned", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // CREATE_TABLE EVENT with multiple partitions run("CREATE TABLE " + dbName + ".unptned_tmp AS SELECT * FROM " + dbName + ".ptned", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // ADD_CONSTRAINT EVENT run("ALTER TABLE " + dbName + ".unptned_tmp ADD CONSTRAINT uk_unptned UNIQUE(a) disable", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); - // Replicate all the events happened so far - for (Tuple currDump : incrementalDumpList) { - // Load the incremental dump and ensure it does nothing and lastReplID remains same - loadAndVerify(replDbName, dbName, currDump.lastReplId); - } + Tuple incrDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror); @@ -2517,10 +2548,7 @@ public void testIncrementalRepeatEventOnMissingObject() throws Exception { // Bootstrap dump/load String replDbName = dbName + "_dupe"; - Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - - // List to maintain the incremental dumps for each operation - List incrementalDumpList = new ArrayList(); + bootstrapLoadAndVerify(dbName, replDbName); String[] unptn_data = new String[] { "ten" }; String[] ptn_data_1 = new String[] { "fifteen" }; @@ -2529,78 +2557,74 @@ public void testIncrementalRepeatEventOnMissingObject() throws Exception { // INSERT EVENT to unpartitioned table run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); Tuple replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // INSERT EVENT to partitioned table with dynamic ADD_PARTITION run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // ADD_PARTITION EVENT to partitioned table run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // INSERT EVENT to partitioned table on existing partition run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // TRUNCATE_PARTITION EVENT on partitioned table run("TRUNCATE TABLE " + dbName + ".ptned PARTITION(b=1)", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // TRUNCATE_TABLE EVENT on unpartitioned table run("TRUNCATE TABLE " + dbName + ".unptned", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // CREATE_TABLE EVENT on partitioned table run("CREATE TABLE " + dbName + ".ptned_tmp (a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // INSERT EVENT to partitioned table with dynamic ADD_PARTITION run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=10) values('" + ptn_data_1[0] + "')", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // INSERT EVENT to partitioned table with dynamic ADD_PARTITION run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=20) values('" + ptn_data_2[0] + "')", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // DROP_PARTITION EVENT to partitioned table run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b=1)", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // RENAME_PARTITION EVENT to partitioned table run("ALTER TABLE " + dbName + ".ptned PARTITION (b=2) RENAME TO PARTITION (b=20)", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // RENAME_TABLE EVENT to unpartitioned table run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_new", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // ADD_CONSTRAINT EVENT run("ALTER TABLE " + dbName + ".ptned_tmp ADD CONSTRAINT uk_unptned UNIQUE(a) disable", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // DROP_TABLE EVENT to partitioned table run("DROP TABLE " + dbName + ".ptned_tmp", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); - // Load each incremental dump from the list. Each dump have only one operation. - for (Tuple currDump : incrementalDumpList) { - // Load the current incremental dump and ensure it does nothing and lastReplID remains same - loadAndVerify(replDbName, dbName, currDump.lastReplId); - } + // Replicate all the events happened so far Tuple incrDump = incrementalLoadAndVerify(dbName, replDbName); // Verify if the data are intact even after applying an applied event once again on missing objects @@ -3262,14 +3286,11 @@ public void testDumpNonReplDatabase() throws IOException { verifyFail("REPL DUMP " + dbName, driver); assertTrue(run("REPL DUMP " + dbName + " with ('hive.repl.dump.metadata.only' = 'true')", true, driver)); - assertTrue(run("REPL DUMP " + dbName + " with ('hive.repl.dump.metadata.only' = 'true')", + //Dump again before load should fail + assertFalse(run("REPL DUMP " + dbName + " with ('hive.repl.dump.metadata.only' = 'true')", true, driver)); - run("alter database " + dbName + " set dbproperties ('repl.source.for' = '1, 2, 3')", driver); - assertTrue(run("REPL DUMP " + dbName, true, driver)); - assertTrue(run("REPL DUMP " + dbName, true, driver)); dbName = createDBNonRepl(testName.getMethodName() + "_case", driver); - run("alter database " + dbName + " set dbproperties ('repl.SOURCE.for' = '1, 2, 3')", driver); - assertTrue(run("REPL DUMP " + dbName, true, driver)); + run("alter database " + dbName + " set dbproperties ('repl.source.for' = '1, 2, 3')", driver); assertTrue(run("REPL DUMP " + dbName, true, driver)); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java index a683fd4216..8e6dafcaf5 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java @@ -178,18 +178,17 @@ public Boolean apply(@Nullable CallerArguments args) { @Test public void retryIncBootstrapAcidFromDifferentDumpWithoutCleanTablesConfig() throws Throwable { - WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, + prepareDataAndDump(primaryDbName, dumpWithoutAcidClause); replica.load(replicatedDbName, primaryDbName); prepareIncAcidData(primaryDbName); prepareIncNonAcidData(primaryDbName); - WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName) - .dump(primaryDbName, dumpWithAcidBootstrapClause); - WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName) + primary.run("use " + primaryDbName) .dump(primaryDbName, dumpWithAcidBootstrapClause); replica.load(replicatedDbName, primaryDbName); - + primary.run("use " + primaryDbName) + .dump(primaryDbName, dumpWithAcidBootstrapClause); // Re-bootstrapping from different bootstrap dump without clean tables config should fail. replica.loadFailure(replicatedDbName, primaryDbName, Collections.emptyList(), ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode()); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index c5532cfd14..53e5f67696 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -689,18 +689,6 @@ public void testIncrementalReplWithEventsBatchHavingDropCreateTable() throws Thr .run("insert into table2 partition(country='india') values(1)") .dump(primaryDbName, Collections.emptyList()); - // Second incremental dump - WarehouseInstance.Tuple secondIncremental = primary.run("use " + primaryDbName) - .run("drop table table1") - .run("drop table table2") - .run("create table table2 (id int) partitioned by (country string)") - .run("alter table table2 add partition(country='india')") - .run("alter table table2 drop partition(country='india')") - .run("insert into table2 partition(country='us') values(2)") - .run("create table table1 (i int)") - .run("insert into table1 values (2)") - .dump(primaryDbName, Collections.emptyList()); - // First incremental load replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) @@ -713,6 +701,18 @@ public void testIncrementalReplWithEventsBatchHavingDropCreateTable() throws Thr .run("select id from table2 order by id") .verifyResults(new String[] {"1"}); + // Second incremental dump + WarehouseInstance.Tuple secondIncremental = primary.run("use " + primaryDbName) + .run("drop table table1") + .run("drop table table2") + .run("create table table2 (id int) partitioned by (country string)") + .run("alter table table2 add partition(country='india')") + .run("alter table table2 drop partition(country='india')") + .run("insert into table2 partition(country='us') values(2)") + .run("create table table1 (i int)") + .run("insert into table1 values (2)") + .dump(primaryDbName, Collections.emptyList()); + // Second incremental load replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) @@ -746,6 +746,18 @@ public void testIncrementalReplWithDropAndCreateTableDifferentPartitionTypeAndIn .run("insert into table3 partition(country='india') values(3)") .dump(primaryDbName, Collections.emptyList()); + // First incremental load + replica.load(replicatedDbName, primaryDbName) + .status(replicatedDbName) + .verifyResult(firstIncremental.lastReplicationId) + .run("use " + replicatedDbName) + .run("select id from table1") + .verifyResults(new String[] { "1" }) + .run("select * from table2") + .verifyResults(new String[] { "2" }) + .run("select id from table3") + .verifyResults(new String[] { "3" }); + // Second incremental dump WarehouseInstance.Tuple secondIncremental = primary.run("use " + primaryDbName) .run("drop table table1") @@ -759,18 +771,6 @@ public void testIncrementalReplWithDropAndCreateTableDifferentPartitionTypeAndIn .run("insert into table3 partition(name='adam', rank=100) values(30)") .dump(primaryDbName, Collections.emptyList()); - // First incremental load - replica.load(replicatedDbName, primaryDbName) - .status(replicatedDbName) - .verifyResult(firstIncremental.lastReplicationId) - .run("use " + replicatedDbName) - .run("select id from table1") - .verifyResults(new String[] { "1" }) - .run("select * from table2") - .verifyResults(new String[] { "2" }) - .run("select id from table3") - .verifyResults(new String[] { "3" }); - // Second incremental load replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) @@ -934,7 +934,7 @@ public void testIncrementalDumpMultiIteration() throws Throwable { Path path = new Path(hiveDumpDir); FileSystem fs = path.getFileSystem(conf); FileStatus[] fileStatus = fs.listStatus(path); - int numEvents = fileStatus.length - 2; //one is metadata file and one data dir + int numEvents = fileStatus.length - 3; //one is metadata file and one data dir and one is _dump ack replica.load(replicatedDbName, primaryDbName, Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'")) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 92e45b4c57..ad5e7b1c38 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -126,21 +126,30 @@ public int execute() { Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() .getBytes(StandardCharsets.UTF_8.name()))); - Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); - Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); - DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); - // Initialize ReplChangeManager instance since we will require it to encode file URI. - ReplChangeManager.getInstance(conf); - Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); - Long lastReplId; - if (!dumpRoot.getFileSystem(conf).exists(dumpRoot) - || dumpRoot.getFileSystem(conf).listStatus(dumpRoot).length == 0) { - lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + Path previousDumpMetaPath = getPreviousDumpMetadataPath(dumpRoot); + Path previousHiveDumpPath = + previousDumpMetaPath != null ? new Path(previousDumpMetaPath, ReplUtils.REPL_HIVE_BASE_DIR) : null; + //If no previous dump is present or previous dump was loaded, proceed + if (shouldDump(previousHiveDumpPath)) { + Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); + Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); + // Initialize ReplChangeManager instance since we will require it to encode file URI. + ReplChangeManager.getInstance(conf); + Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); + Long lastReplId; + if (previousHiveDumpPath == null) { + lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + } else { + work.setEventFrom(getEventFromPreviousDumpMetadata(previousHiveDumpPath)); + lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + } + prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); + writeDumpCompleteAck(hiveDumpRoot); + deletePreviousDumpMeta(previousDumpMetaPath); } else { - work.setEventFrom(getEventFromPreviousDumpMetadata(dumpRoot)); - lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + throw new Exception("Previous Dump is not yet loaded"); } - prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); } catch (Exception e) { LOG.error("failed", e); setException(e); @@ -149,16 +158,21 @@ public int execute() { return 0; } - private Long getEventFromPreviousDumpMetadata(Path dumpRoot) throws IOException, SemanticException { - FileStatus[] statuses = dumpRoot.getFileSystem(conf).listStatus(dumpRoot); - if (statuses.length > 0) { - FileStatus latestUpdatedStatus = statuses[0]; - for (FileStatus status : statuses) { - if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) { - latestUpdatedStatus = status; - } - } - DumpMetaData dmd = new DumpMetaData(new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR), conf); + private void deletePreviousDumpMeta(Path previousDumpMetaPath) throws IOException { + if (previousDumpMetaPath != null) { + FileSystem fs = previousDumpMetaPath.getFileSystem(conf); + fs.delete(previousDumpMetaPath, true); + } + } + + private void writeDumpCompleteAck(Path currentDumpPath) throws SemanticException { + Path ackPath = new Path(currentDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT); + Utils.write(ackPath, conf); + } + + private Long getEventFromPreviousDumpMetadata(Path previousDumpPath) throws SemanticException { + if (previousDumpPath != null) { + DumpMetaData dmd = new DumpMetaData(previousDumpPath, conf); if (dmd.isIncrementalDump()) { return dmd.getEventTo(); } @@ -168,6 +182,36 @@ private Long getEventFromPreviousDumpMetadata(Path dumpRoot) throws IOException, return 0L; } + private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException { + FileSystem fs = dumpRoot.getFileSystem(conf); + if (fs.exists(dumpRoot)) { + FileStatus[] statuses = fs.listStatus(dumpRoot); + if (statuses.length == 1) { + return statuses[0].getPath(); + } + } + return null; + } + + private boolean shouldDump(Path previousDumpPath) throws IOException { + //If no previous dump means bootstrap. So return true as there was no + //previous dump to load + if (previousDumpPath == null) { + return true; + } else { + FileSystem fs = previousDumpPath.getFileSystem(conf); + if (fs.exists(previousDumpPath)) { + FileStatus[] latestUpdateStatuses = fs.listStatus(previousDumpPath); + for (FileStatus status : latestUpdateStatuses) { + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.LOAD_ACKNOWLEDGEMENT)) { + return true; + } + } + } + } + return false; + } + private void prepareReturnValues(List values) throws SemanticException { LOG.debug("prepareReturnValues : " + dumpSchema); for (String s : values) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index a2c467bafd..3ab9be31ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.plan.api.StageType; import java.io.IOException; @@ -97,19 +98,35 @@ public StageType getType() { @Override public int execute() { - Task rootTask = work.getRootTask(); - if (rootTask != null) { - rootTask.setChildTasks(null); - } - work.setRootTask(this); - this.parentTasks = null; - if (work.isIncrementalLoad()) { - return executeIncrementalLoad(); - } else { - return executeBootStrapLoad(); + try { + Task rootTask = work.getRootTask(); + if (rootTask != null) { + rootTask.setChildTasks(null); + } + work.setRootTask(this); + this.parentTasks = null; + int status = 0; + if (work.isIncrementalLoad()) { + status = executeIncrementalLoad(); + } else { + status = executeBootStrapLoad(); + } + if (status == 0) { + writeLoadCompleteAck(work.dumpDirectory); + } + return status; + } catch (SemanticException e) { + LOG.error("failed", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } } + private void writeLoadCompleteAck(String dumpDirectory) throws SemanticException { + Path ackPath = new Path(dumpDirectory, ReplUtils.LOAD_ACKNOWLEDGEMENT); + Utils.write(ackPath, conf); + } + private int executeBootStrapLoad() { try { int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 64ecf42eb4..a7c5ea0969 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -98,7 +98,10 @@ // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be // seen in production or in case of tests other than the ones where it's required. public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables"; - + //Acknoledgement for repl dump complete + public static final String DUMP_ACKNOWLEDGEMENT = "_finished_dump"; + //Acknoledgement for repl load complete + public static final String LOAD_ACKNOWLEDGEMENT = "_finished_load"; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index c2e9f883ce..50ecb7197a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -83,7 +83,6 @@ // By default, this will be same as that of super class BaseSemanticAnalyzer. But need to obtain again // if the Hive configs are received from WITH clause in REPL LOAD or REPL STATUS commands. private Hive db; - private boolean isTargetAlreadyLoaded; private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; @@ -390,7 +389,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // tells us what is inside that dumpdir. //If repl status of target is greater than dumps, don't do anything as the load for the latest dump is done - if (!isTargetAlreadyLoaded) { + if (loadPath != null) { DumpMetaData dmd = new DumpMetaData(loadPath, conf); boolean evDump = false; @@ -406,6 +405,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { queryState.getLineageState(), evDump, dmd.getEventTo(), dirLocationsToCopy(loadPath, evDump)); rootTasks.add(TaskFactory.get(replLoadWork, conf)); + } else { + throw new Exception("Dump Already Loaded"); } } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes @@ -429,36 +430,28 @@ private Path getCurrentLoadPath() throws IOException, SemanticException { } FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase); if (statuses.length > 0) { - //sort based on last modified. Recent one is at the end + //sort based on last modified. Recent one is at the beginning Arrays.sort(statuses, new Comparator() { public int compare(FileStatus f1, FileStatus f2) { - return Long.compare(f1.getModificationTime(), f2.getModificationTime()); + return Long.compare(f2.getModificationTime(), f1.getModificationTime()); } }); - if (replScope.getDbName() != null) { - String currentReplStatusOfTarget - = getReplStatus(replScope.getDbName()); - if (currentReplStatusOfTarget == null) { //bootstrap - return new Path(statuses[0].getPath(), ReplUtils.REPL_HIVE_BASE_DIR); - } else { - DumpMetaData latestDump = new DumpMetaData( - new Path(statuses[statuses.length - 1].getPath(), ReplUtils.REPL_HIVE_BASE_DIR), conf); - if (Long.parseLong(currentReplStatusOfTarget.trim()) >= latestDump.getEventTo()) { - isTargetAlreadyLoaded = true; - } else { - for (FileStatus status : statuses) { - Path hiveLoadPath = new Path(status.getPath(), ReplUtils.REPL_HIVE_BASE_DIR); - DumpMetaData dmd = new DumpMetaData(hiveLoadPath, conf); - if (dmd.isIncrementalDump() - && Long.parseLong(currentReplStatusOfTarget.trim()) < dmd.getEventTo()) { - return hiveLoadPath; - } - } + Path hiveDumpPath = new Path(statuses[0].getPath(), ReplUtils.REPL_HIVE_BASE_DIR); + if (loadPathBase.getFileSystem(conf).exists(hiveDumpPath)) { + boolean dumpComplete = false; + boolean loadComplete = false; + FileStatus[] dumpStatuses = loadPathBase.getFileSystem(conf).listStatus(hiveDumpPath); + for (FileStatus dumpStatus : dumpStatuses) { + if (dumpStatus.getPath().getName().contains(ReplUtils.DUMP_ACKNOWLEDGEMENT)) { + dumpComplete = true; + } + if (dumpStatus.getPath().getName().contains(ReplUtils.LOAD_ACKNOWLEDGEMENT)) { + loadComplete = true; } } - } else { - //If dbname is null(in case of repl load *), can't get repl status of target, return unsupported - throw new UnsupportedOperationException("REPL LOAD * is not supported"); + if (dumpComplete && !loadComplete) { + return hiveDumpPath; + } } } return null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 6f8912b5f9..7030451c0f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -82,6 +82,16 @@ public static void writeOutput(List> listValues, Path outputFile, H } } + public static void write(Path outputFile, HiveConf hiveConf) + throws SemanticException { + try { + FileSystem fs = outputFile.getFileSystem(hiveConf); + fs.create(outputFile); + } catch (IOException e) { + throw new SemanticException(e); + } + } + public static Iterable matchesDb(Hive db, String dbPattern) throws HiveException { if (dbPattern == null) { return db.getAllDatabases();