diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index c46103a254..f8c486b19b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -312,7 +312,22 @@ public void testBasic() throws IOException { verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driver); String replicatedDbName = dbName + "_dupe"; - bootstrapLoadAndVerify(dbName, replicatedDbName); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replicatedDbName); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + boolean dumpAckFound = false; + boolean loadAckFound = false; + for (FileStatus status : fs.listStatus(dumpPath)) { + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.DUMP_ACKNOWLEDGEMENT)) { + dumpAckFound = true; + } + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.LOAD_ACKNOWLEDGEMENT)) { + loadAckFound = true; + } + } + assertTrue(dumpAckFound); + assertTrue(loadAckFound); verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data, driverMirror); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); @@ -395,6 +410,9 @@ public void testTaskCreationOptimization() throws Throwable { assertEquals(false, hasMoveTask(task)); assertEquals(true, hasPartitionTask(task)); + Path loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + //delete load ack to reload the same dump + loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); loadAndVerify(dbNameReplica, dbName, dump.lastReplId); run("insert into table " + dbName + ".t2 partition(country='india') values ('delhi')", driver); @@ -406,6 +424,9 @@ public void testTaskCreationOptimization() throws Throwable { assertEquals(true, hasMoveTask(task)); assertEquals(true, hasPartitionTask(task)); + loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + //delete load ack to reload the same dump + loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); loadAndVerify(dbNameReplica, dbName, dump.lastReplId); run("insert into table " + dbName + ".t2 partition(country='us') values ('sf')", driver); @@ -488,7 +509,7 @@ public void testBootstrapLoadOnExistingDb() throws IOException { // Load to an empty database Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, dbName + "_empty"); - String replDumpLocn = bootstrapDump.dumpLocation; + verifyRun("SELECT * from " + dbName + "_empty.unptned", unptn_data, driverMirror); String[] nullReplId = new String[]{ "NULL" }; @@ -496,8 +517,8 @@ public void testBootstrapLoadOnExistingDb() throws IOException { // Create a database with a table createDB(dbName + "_withtable", driverMirror); run("CREATE TABLE " + dbName + "_withtable.unptned(a string) STORED AS TEXTFILE", driverMirror); - // Load using same dump to a DB with table. It should fail as DB is not empty. - verifyFail("REPL LOAD " + dbName + " INTO " + dbName + "_withtable ", driverMirror); + // Load using same dump to a DB with table will not do anything. Just print a log saying its already loaded + run("REPL LOAD " + dbName + " INTO " + dbName + "_withtable ", driverMirror); // REPL STATUS should return NULL verifyRun("REPL STATUS " + dbName + "_withtable", nullReplId, driverMirror); @@ -506,8 +527,8 @@ public void testBootstrapLoadOnExistingDb() throws IOException { createDB(dbName + "_withview", driverMirror); run("CREATE TABLE " + dbName + "_withview.unptned(a string) STORED AS TEXTFILE", driverMirror); run("CREATE VIEW " + dbName + "_withview.view AS SELECT * FROM " + dbName + "_withview.unptned", driverMirror); - // Load using same dump to a DB with view. It should fail as DB is not empty. - verifyFail("REPL LOAD " + dbName + " INTO " + dbName + "_withview", driverMirror); + // Load using same dump to a DB with table will not do anything. Just print a log saying its already loaded + run("REPL LOAD " + dbName + " INTO " + dbName + "_withview", driverMirror); // REPL STATUS should return NULL verifyRun("REPL STATUS " + dbName + "_withview", nullReplId, driverMirror); @@ -838,7 +859,26 @@ public void testIncrementalAdds() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2, driver); // Perform REPL-DUMP/LOAD - incrementalLoadAndVerify(dbName, replDbName); + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); + boolean dumpAckFound = false; + boolean loadAckFound = false; + assertFalse(fs.exists(new Path(bootstrapDump.dumpLocation))); + fs = new Path(incrementalDump.dumpLocation).getFileSystem(hconf); + Path dumpPath = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + dumpAckFound = false; + loadAckFound = false; + for (FileStatus status : fs.listStatus(dumpPath)) { + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.DUMP_ACKNOWLEDGEMENT)) { + dumpAckFound = true; + } + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.LOAD_ACKNOWLEDGEMENT)) { + loadAckFound = true; + } + } + + assertTrue(dumpAckFound); + assertTrue(loadAckFound); // VERIFY tables and partitions on destination for equivalence. verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); @@ -2376,7 +2416,7 @@ public void testTruncateWithCM() throws IOException { Tuple bootstrapDump = replDumpDb(dbName); String replDumpId = bootstrapDump.lastReplId; - String replDumpLocn = bootstrapDump.dumpLocation; + run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror); String[] empty = new String[] {}; String[] unptn_data = new String[] { "eleven", "thirteen" }; @@ -2387,39 +2427,33 @@ public void testTruncateWithCM() throws IOException { run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); Tuple firstInsert = replDumpDb(dbName); Integer numOfEventsIns1 = Integer.valueOf(firstInsert.lastReplId) - Integer.valueOf(replDumpId); + // load only first insert (1 record) + loadAndVerify(replDbName, dbName, firstInsert.lastReplId); + + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1, driverMirror); // x events to insert, last repl ID: replDumpId+2x run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')", driver); verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); Tuple secondInsert = replDumpDb(dbName); Integer numOfEventsIns2 = Integer.valueOf(secondInsert.lastReplId) - Integer.valueOf(firstInsert.lastReplId); + // load only second insert (2 records) + + loadAndVerify(replDbName, dbName, secondInsert.lastReplId); + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load2, driverMirror); // y event to truncate, last repl ID: replDumpId+2x+y run("TRUNCATE TABLE " + dbName + ".unptned", driver); verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", empty, driver); Tuple thirdTrunc = replDumpDb(dbName); Integer numOfEventsTrunc3 = Integer.valueOf(thirdTrunc.lastReplId) - Integer.valueOf(secondInsert.lastReplId); + // load only truncate (0 records) + loadAndVerify(replDbName, dbName, thirdTrunc.lastReplId); + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror); // x events to insert, last repl ID: replDumpId+3x+y run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_load1[0] + "')", driver); verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1, driver); - - run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror); - - // Dump and load only first insert (1 record) - loadAndVerify(replDbName, dbName, firstInsert.lastReplId); - - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1, driverMirror); - - // Dump and load only second insert (2 records) - - loadAndVerify(replDbName, dbName, secondInsert.lastReplId); - verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load2, driverMirror); - - // Dump and load only truncate (0 records) - loadAndVerify(replDbName, dbName, thirdTrunc.lastReplId); - verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror); - // Dump and load insert after truncate (1 record) incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load1, driverMirror); @@ -2434,10 +2468,7 @@ public void testIncrementalRepeatEventOnExistingObject() throws IOException, Int // Bootstrap dump/load String replDbName = dbName + "_dupe"; - Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - - // List to maintain the incremental dumps for each operation - List incrementalDumpList = new ArrayList(); + bootstrapLoadAndVerify(dbName, replDbName); String[] empty = new String[] {}; String[] unptn_data = new String[] { "ten" }; @@ -2447,48 +2478,48 @@ public void testIncrementalRepeatEventOnExistingObject() throws IOException, Int // INSERT EVENT to unpartitioned table run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); Tuple replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); Thread.sleep(1000); // INSERT EVENT to partitioned table with dynamic ADD_PARTITION run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=1) values('" + ptn_data_1[0] + "')", driver); + //Second dump without load will print a warning + run("REPL DUMP " + dbName, driverMirror); + //Load the previous dump first + loadAndVerify(replDbName, dbName, replDump.lastReplId); + replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // ADD_PARTITION EVENT to partitioned table run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // INSERT EVENT to partitioned table on existing partition run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=2) values('" + ptn_data_2[0] + "')", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // TRUNCATE_PARTITION EVENT on partitioned table run("TRUNCATE TABLE " + dbName + ".ptned PARTITION (b=1)", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // TRUNCATE_TABLE EVENT on unpartitioned table run("TRUNCATE TABLE " + dbName + ".unptned", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // CREATE_TABLE EVENT with multiple partitions run("CREATE TABLE " + dbName + ".unptned_tmp AS SELECT * FROM " + dbName + ".ptned", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // ADD_CONSTRAINT EVENT run("ALTER TABLE " + dbName + ".unptned_tmp ADD CONSTRAINT uk_unptned UNIQUE(a) disable", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); - // Replicate all the events happened so far - for (Tuple currDump : incrementalDumpList) { - // Load the incremental dump and ensure it does nothing and lastReplID remains same - loadAndVerify(replDbName, dbName, currDump.lastReplId); - } + Tuple incrDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror); @@ -2517,10 +2548,7 @@ public void testIncrementalRepeatEventOnMissingObject() throws Exception { // Bootstrap dump/load String replDbName = dbName + "_dupe"; - Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - - // List to maintain the incremental dumps for each operation - List incrementalDumpList = new ArrayList(); + bootstrapLoadAndVerify(dbName, replDbName); String[] unptn_data = new String[] { "ten" }; String[] ptn_data_1 = new String[] { "fifteen" }; @@ -2529,78 +2557,74 @@ public void testIncrementalRepeatEventOnMissingObject() throws Exception { // INSERT EVENT to unpartitioned table run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); Tuple replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // INSERT EVENT to partitioned table with dynamic ADD_PARTITION run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // ADD_PARTITION EVENT to partitioned table run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // INSERT EVENT to partitioned table on existing partition run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // TRUNCATE_PARTITION EVENT on partitioned table run("TRUNCATE TABLE " + dbName + ".ptned PARTITION(b=1)", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // TRUNCATE_TABLE EVENT on unpartitioned table run("TRUNCATE TABLE " + dbName + ".unptned", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // CREATE_TABLE EVENT on partitioned table run("CREATE TABLE " + dbName + ".ptned_tmp (a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // INSERT EVENT to partitioned table with dynamic ADD_PARTITION run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=10) values('" + ptn_data_1[0] + "')", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // INSERT EVENT to partitioned table with dynamic ADD_PARTITION run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=20) values('" + ptn_data_2[0] + "')", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // DROP_PARTITION EVENT to partitioned table run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b=1)", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // RENAME_PARTITION EVENT to partitioned table run("ALTER TABLE " + dbName + ".ptned PARTITION (b=2) RENAME TO PARTITION (b=20)", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // RENAME_TABLE EVENT to unpartitioned table run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_new", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // ADD_CONSTRAINT EVENT run("ALTER TABLE " + dbName + ".ptned_tmp ADD CONSTRAINT uk_unptned UNIQUE(a) disable", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); // DROP_TABLE EVENT to partitioned table run("DROP TABLE " + dbName + ".ptned_tmp", driver); replDump = replDumpDb(dbName); - incrementalDumpList.add(replDump); + loadAndVerify(replDbName, dbName, replDump.lastReplId); Thread.sleep(1000); - // Load each incremental dump from the list. Each dump have only one operation. - for (Tuple currDump : incrementalDumpList) { - // Load the current incremental dump and ensure it does nothing and lastReplID remains same - loadAndVerify(replDbName, dbName, currDump.lastReplId); - } + // Replicate all the events happened so far Tuple incrDump = incrementalLoadAndVerify(dbName, replDbName); // Verify if the data are intact even after applying an applied event once again on missing objects @@ -3262,14 +3286,11 @@ public void testDumpNonReplDatabase() throws IOException { verifyFail("REPL DUMP " + dbName, driver); assertTrue(run("REPL DUMP " + dbName + " with ('hive.repl.dump.metadata.only' = 'true')", true, driver)); + //Dump again before load will print a warning assertTrue(run("REPL DUMP " + dbName + " with ('hive.repl.dump.metadata.only' = 'true')", true, driver)); - run("alter database " + dbName + " set dbproperties ('repl.source.for' = '1, 2, 3')", driver); - assertTrue(run("REPL DUMP " + dbName, true, driver)); - assertTrue(run("REPL DUMP " + dbName, true, driver)); dbName = createDBNonRepl(testName.getMethodName() + "_case", driver); - run("alter database " + dbName + " set dbproperties ('repl.SOURCE.for' = '1, 2, 3')", driver); - assertTrue(run("REPL DUMP " + dbName, true, driver)); + run("alter database " + dbName + " set dbproperties ('repl.source.for' = '1, 2, 3')", driver); assertTrue(run("REPL DUMP " + dbName, true, driver)); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java index a683fd4216..b0c3a9e0b0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java @@ -53,6 +53,8 @@ private static List dumpWithAcidBootstrapClause = Arrays.asList( "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'"); + private static List dumpWithAcidClause = Arrays.asList( + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'"); @BeforeClass public static void classLevelSetup() throws Exception { @@ -136,60 +138,35 @@ public Boolean apply(@Nullable CallerArguments args) { } finally { InjectableBehaviourObjectStore.resetAlterTableModifier(); } + //Load again should succeed as checkpointing is in place + replica.load(replicatedDbName, primaryDbName); + verifyIncLoad(replicatedDbName, incDump.lastReplicationId); prepareInc2AcidData(primaryDbName, primary.hiveConf); prepareInc2NonAcidData(primaryDbName, primary.hiveConf); LOG.info(testName.getMethodName() + ": second incremental dump with acid table bootstrap"); WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName) - .dump(primaryDbName, dumpWithAcidBootstrapClause); - - // Set incorrect bootstrap dump to clean tables. Here, used the full bootstrap dump which is invalid. - // So, REPL LOAD fails. - List loadWithClause = Collections.singletonList( - "'" + ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" - + bootstrapDump.dumpLocation + "'"); - LOG.info(testName.getMethodName() - + ": trying to load second incremental dump with wrong bootstrap dump " - + " specified for cleaning ACID tables. Should fail."); - replica.loadFailure(replicatedDbName, primaryDbName, loadWithClause); - - // Set previously failed bootstrap dump to clean-up. Now, new bootstrap should overwrite the old one. - loadWithClause = Collections.singletonList( - "'" + ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" - + incDump.dumpLocation + "'"); - - LOG.info(testName.getMethodName() - + ": trying to load second incremental dump with correct bootstrap dump " - + "specified for cleaning ACID tables. Should succeed."); - replica.load(replicatedDbName, primaryDbName, loadWithClause); - verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId); - - // Once the REPL LOAD is successful, the this config should be unset or else, the subsequent REPL LOAD - // will also drop those tables which will cause data loss. - loadWithClause = Collections.emptyList(); - - // Verify if bootstrapping with same dump is idempotent and return same result + .dump(primaryDbName, dumpWithAcidClause); LOG.info(testName.getMethodName() + ": trying to load second incremental dump (with acid bootstrap) again." + " Should succeed."); - replica.load(replicatedDbName, primaryDbName, loadWithClause); + replica.load(replicatedDbName, primaryDbName); verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId); } @Test public void retryIncBootstrapAcidFromDifferentDumpWithoutCleanTablesConfig() throws Throwable { - WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, + prepareDataAndDump(primaryDbName, dumpWithoutAcidClause); replica.load(replicatedDbName, primaryDbName); prepareIncAcidData(primaryDbName); prepareIncNonAcidData(primaryDbName); - WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName) - .dump(primaryDbName, dumpWithAcidBootstrapClause); - WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName) + primary.run("use " + primaryDbName) .dump(primaryDbName, dumpWithAcidBootstrapClause); replica.load(replicatedDbName, primaryDbName); - + primary.run("use " + primaryDbName) + .dump(primaryDbName, dumpWithAcidBootstrapClause); // Re-bootstrapping from different bootstrap dump without clean tables config should fail. replica.loadFailure(replicatedDbName, primaryDbName, Collections.emptyList(), ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode()); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index c5532cfd14..6a2c5e328e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -689,18 +689,6 @@ public void testIncrementalReplWithEventsBatchHavingDropCreateTable() throws Thr .run("insert into table2 partition(country='india') values(1)") .dump(primaryDbName, Collections.emptyList()); - // Second incremental dump - WarehouseInstance.Tuple secondIncremental = primary.run("use " + primaryDbName) - .run("drop table table1") - .run("drop table table2") - .run("create table table2 (id int) partitioned by (country string)") - .run("alter table table2 add partition(country='india')") - .run("alter table table2 drop partition(country='india')") - .run("insert into table2 partition(country='us') values(2)") - .run("create table table1 (i int)") - .run("insert into table1 values (2)") - .dump(primaryDbName, Collections.emptyList()); - // First incremental load replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) @@ -713,6 +701,18 @@ public void testIncrementalReplWithEventsBatchHavingDropCreateTable() throws Thr .run("select id from table2 order by id") .verifyResults(new String[] {"1"}); + // Second incremental dump + WarehouseInstance.Tuple secondIncremental = primary.run("use " + primaryDbName) + .run("drop table table1") + .run("drop table table2") + .run("create table table2 (id int) partitioned by (country string)") + .run("alter table table2 add partition(country='india')") + .run("alter table table2 drop partition(country='india')") + .run("insert into table2 partition(country='us') values(2)") + .run("create table table1 (i int)") + .run("insert into table1 values (2)") + .dump(primaryDbName, Collections.emptyList()); + // Second incremental load replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) @@ -746,6 +746,18 @@ public void testIncrementalReplWithDropAndCreateTableDifferentPartitionTypeAndIn .run("insert into table3 partition(country='india') values(3)") .dump(primaryDbName, Collections.emptyList()); + // First incremental load + replica.load(replicatedDbName, primaryDbName) + .status(replicatedDbName) + .verifyResult(firstIncremental.lastReplicationId) + .run("use " + replicatedDbName) + .run("select id from table1") + .verifyResults(new String[] {"1"}) + .run("select * from table2") + .verifyResults(new String[] {"2"}) + .run("select id from table3") + .verifyResults(new String[] {"3"}); + // Second incremental dump WarehouseInstance.Tuple secondIncremental = primary.run("use " + primaryDbName) .run("drop table table1") @@ -759,18 +771,6 @@ public void testIncrementalReplWithDropAndCreateTableDifferentPartitionTypeAndIn .run("insert into table3 partition(name='adam', rank=100) values(30)") .dump(primaryDbName, Collections.emptyList()); - // First incremental load - replica.load(replicatedDbName, primaryDbName) - .status(replicatedDbName) - .verifyResult(firstIncremental.lastReplicationId) - .run("use " + replicatedDbName) - .run("select id from table1") - .verifyResults(new String[] { "1" }) - .run("select * from table2") - .verifyResults(new String[] { "2" }) - .run("select id from table3") - .verifyResults(new String[] { "3" }); - // Second incremental load replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) @@ -880,16 +880,6 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { } catch (CommandProcessorException e) { assertTrue(e.getMessage().toLowerCase().contains("semanticException no data to load in path".toLowerCase())); } - - // Bootstrap load from an empty dump directory should return empty load directory error. Since we have repl status - //check on target - tuple = primary.dump("someJunkDB"); - try { - replica.runCommand("REPL LOAD someJunkDB into someJunkDB "); - } catch (CommandProcessorException e) { - assertTrue(e.getMessage().toLowerCase().contains("semanticException no data to load in path".toLowerCase())); - } - primary.run(" drop database if exists " + testDbName + " cascade"); } @@ -934,7 +924,7 @@ public void testIncrementalDumpMultiIteration() throws Throwable { Path path = new Path(hiveDumpDir); FileSystem fs = path.getFileSystem(conf); FileStatus[] fileStatus = fs.listStatus(path); - int numEvents = fileStatus.length - 2; //one is metadata file and one data dir + int numEvents = fileStatus.length - 3; //one is metadata file and one data dir and one is _dump ack replica.load(replicatedDbName, primaryDbName, Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'")) @@ -1331,7 +1321,7 @@ public void testMoveOptimizationBootstrapReplLoadRetryAfterFailure() throws Thro .dump(primaryDbName); testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", - "ADD_PARTITION"); + "ADD_PARTITION", tuple); } @Test @@ -1345,6 +1335,11 @@ public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Thro .run("create table t1 (place string) partitioned by (country string)") .dump(primaryDbName); replica.load(replicatedDbName, primaryDbName, withConfigs); + //delete load ack to reuse the dump + new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation + + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + + ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + replica.load(replicatedDbName_CM, primaryDbName, withConfigs); replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); @@ -1353,7 +1348,8 @@ public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Thro .run("insert overwrite table t1 select * from t2") .dump(primaryDbName, Collections.emptyList()); - testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "ADD_PARTITION"); + testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "ADD_PARTITION", + tuple); } @Test @@ -1361,11 +1357,14 @@ public void testMoveOptimizationIncrementalFailureAfterCopy() throws Throwable { List withConfigs = Collections.singletonList("'hive.repl.enable.move.optimization'='true'"); String replicatedDbName_CM = replicatedDbName + "_CM"; - primary.run("use " + primaryDbName) + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) .run("create table t2 (place string) partitioned by (country string)") .run("ALTER TABLE t2 ADD PARTITION (country='india')") .dump(primaryDbName); replica.load(replicatedDbName, primaryDbName, withConfigs); + //delete load ack to reuse the dump + new Path(bootstrapDump.dumpLocation).getFileSystem(conf).delete(new Path(bootstrapDump.dumpLocation + + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + ReplUtils.LOAD_ACKNOWLEDGEMENT), true); replica.load(replicatedDbName_CM, primaryDbName, withConfigs); replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); @@ -1374,11 +1373,11 @@ public void testMoveOptimizationIncrementalFailureAfterCopy() throws Throwable { .run("insert into table t2 partition(country='india') values ('bangalore')") .dump(primaryDbName, Collections.emptyList()); - testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", "INSERT"); + testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", "INSERT", tuple); } private void testMoveOptimization(String primaryDb, String replicaDb, String replicatedDbName_CM, - String tbl, String eventType) throws Throwable { + String tbl, String eventType, WarehouseInstance.Tuple tuple) throws Throwable { List withConfigs = Collections.singletonList("'hive.repl.enable.move.optimization'='true'"); @@ -1415,6 +1414,12 @@ public Boolean apply(NotificationEvent entry) { primary.run("use " + primaryDb) .run("drop table " + tbl); + //delete load ack to reuse the dump + new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation + + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + + ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + + InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); try { replica.loadFailure(replicatedDbName_CM, primaryDbName, withConfigs); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 81feaf5eec..dd375c50ce 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -54,7 +54,6 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; -import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -611,7 +610,7 @@ public void retryBootstrapExternalTablesFromDifferentDump() throws Throwable { dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); - WarehouseInstance.Tuple tupleIncWithExternalBootstrap = primary.run("use " + primaryDbName) + primary.run("use " + primaryDbName) .run("drop table t1") .run("create external table t4 (id int)") .run("insert into table t4 values (10)") @@ -643,46 +642,43 @@ public Boolean apply(@Nullable CallerArguments args) { InjectableBehaviourObjectStore.resetAlterTableModifier(); } + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[]{"t1"}) + .run("show tables like 't2'") + .verifyResult("t2") + .run("select country from t2 order by country") + .verifyResults(new String[] {"india", "us"}) + .run("select id from t4") + .verifyResults(Arrays.asList("10")) + .run("select id from t5") + .verifyResult("10") + .verifyReplTargetProperty(replicatedDbName); + // Insert into existing external table and then Drop it, add another managed table with same name // and dump another bootstrap dump for external tables. - WarehouseInstance.Tuple tupleNewIncWithExternalBootstrap = primary.run("use " + primaryDbName) + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'"); + primary.run("use " + primaryDbName) .run("insert into table t2 partition(country='india') values ('chennai')") .run("drop table t2") .run("create table t2 as select * from t4") .run("insert into table t4 values (20)") .dump(primaryDbName, dumpWithClause); - // Set incorrect bootstrap dump to clean tables. Here, used the full bootstrap dump which is invalid. - // So, REPL LOAD fails. - loadWithClause.add("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" - + tupleBootstrapWithoutExternal.dumpLocation + "'"); - replica.loadFailure(replicatedDbName, primaryDbName, loadWithClause); - loadWithClause.remove("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" - + tupleBootstrapWithoutExternal.dumpLocation + "'"); - - // Set previously failed bootstrap dump to clean-up. Now, new bootstrap should overwrite the old one. - loadWithClause.add("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" - + tupleIncWithExternalBootstrap.dumpLocation + "'"); - - // Verify if bootstrapping with same dump is idempotent and return same result - for (int i = 0; i < 2; i++) { - replica.load(replicatedDbName, primaryDbName, loadWithClause) - .run("use " + replicatedDbName) - .run("show tables like 't1'") - .verifyFailure(new String[]{"t1"}) - .run("select id from t2") - .verifyResult("10") - .run("select id from t4") - .verifyResults(Arrays.asList("10", "20")) - .run("select id from t5") - .verifyResult("10") - .verifyReplTargetProperty(replicatedDbName); - - // Once the REPL LOAD is successful, the this config should be unset or else, the subsequent REPL LOAD - // will also drop those tables which will cause data loss. - loadWithClause.remove("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" - + tupleIncWithExternalBootstrap.dumpLocation + "'"); - } + + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[]{"t1"}) + .run("select id from t2") + .verifyResult("10") + .run("select id from t4") + .verifyResults(Arrays.asList("10", "20")) + .run("select id from t5") + .verifyResult("10") + .verifyReplTargetProperty(replicatedDbName); + } @Test diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java index 1b4833ca6a..f98067c4c7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java @@ -314,9 +314,9 @@ public void testIncLoadPenFlagPropAlterDB() throws Throwable { assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(primaryDbName).getParameters())); - tuple = primary.run("use " + primaryDbName) + primary.run("use " + primaryDbName) .run("alter database " + primaryDbName + " set dbproperties('dummy_key'='dummy_val')") - .run("create table tbl_temp (fld int)") + .run("create table tbl_temp (fld int)") .dump(primaryDbName); loadWithFailureInAddNotification("tbl_temp"); @@ -325,8 +325,6 @@ public void testIncLoadPenFlagPropAlterDB() throws Throwable { assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(primaryDbName).getParameters())); assertTrue(replDb.getParameters().get("dummy_key").equalsIgnoreCase("dummy_val")); - // next incremental dump - primary.dump(primaryDbName); replica.loadWithoutExplain(replicatedDbName, primaryDbName); assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); } diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index dbe282d374..e4ebf5059f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -2820,7 +2820,7 @@ public void testGetQueryLogForReplCommands() throws Exception { assertNotNull("Statement is null", stmt); replDir = new Path(replDir, primaryDb + "_repl"); - FileSystem fs = FileSystem.get(replDir.toUri(), conf); + FileSystem fs = replDir.getFileSystem(conf); fs.mkdirs(replDir); try { diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index c6b80924b9..9b9b68f3a0 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -29,7 +29,8 @@ REPL_STATE_LOG(14), REPL_TXN(15), REPL_INCREMENTAL_LOAD(16), - SCHEDULED_QUERY_MAINT(17); + SCHEDULED_QUERY_MAINT(17), + REPL_LOAD_COMPLETE_ACK(18); private final int value; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index a37157382a..4f87bac75f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -27,14 +27,16 @@ import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; -import org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyTask; -import org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadTask; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogTask; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadCompleteAckTask; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadCompleteAckWork; +import org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyTask; +import org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork; import org.apache.hadoop.hive.ql.exec.schq.ScheduledQueryMaintenanceTask; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; @@ -111,6 +113,7 @@ public TaskTuple(Class workClass, Class> taskClass) { taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class)); taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class)); taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); + taskvec.add(new TaskTuple(ReplLoadCompleteAckWork.class, ReplLoadCompleteAckTask.class)); taskvec.add(new TaskTuple(ExportWork.class, ExportTask.class)); taskvec.add(new TaskTuple(ReplTxnWork.class, ReplTxnTask.class)); taskvec.add(new TaskTuple(DirCopyWork.class, DirCopyTask.class)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 92e45b4c57..91ec76618f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -126,21 +126,30 @@ public int execute() { Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() .getBytes(StandardCharsets.UTF_8.name()))); - Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); - Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); - DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); - // Initialize ReplChangeManager instance since we will require it to encode file URI. - ReplChangeManager.getInstance(conf); - Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); - Long lastReplId; - if (!dumpRoot.getFileSystem(conf).exists(dumpRoot) - || dumpRoot.getFileSystem(conf).listStatus(dumpRoot).length == 0) { - lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + Path previousDumpMetaPath = getPreviousDumpMetadataPath(dumpRoot); + Path previousHiveDumpPath = + previousDumpMetaPath != null ? new Path(previousDumpMetaPath, ReplUtils.REPL_HIVE_BASE_DIR) : null; + //If no previous dump is present or previous dump was loaded, proceed + if (shouldDump(previousHiveDumpPath)) { + Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); + Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); + // Initialize ReplChangeManager instance since we will require it to encode file URI. + ReplChangeManager.getInstance(conf); + Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); + Long lastReplId; + if (previousHiveDumpPath == null) { + lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + } else { + work.setEventFrom(getEventFromPreviousDumpMetadata(previousHiveDumpPath)); + lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + } + prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); + writeDumpCompleteAck(hiveDumpRoot); + deleteAllPreviousDumpMeta(dumpRoot, currentDumpPath); } else { - work.setEventFrom(getEventFromPreviousDumpMetadata(dumpRoot)); - lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + LOG.warn("Previous Dump is not yet loaded"); } - prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); } catch (Exception e) { LOG.error("failed", e); setException(e); @@ -149,16 +158,24 @@ public int execute() { return 0; } - private Long getEventFromPreviousDumpMetadata(Path dumpRoot) throws IOException, SemanticException { - FileStatus[] statuses = dumpRoot.getFileSystem(conf).listStatus(dumpRoot); - if (statuses.length > 0) { - FileStatus latestUpdatedStatus = statuses[0]; + private void deleteAllPreviousDumpMeta(Path dumpRoot, Path currentDumpPath) throws IOException { + FileSystem fs = dumpRoot.getFileSystem(conf); + if (fs.exists(dumpRoot)) { + FileStatus[] statuses = fs.listStatus(dumpRoot, path -> !path.equals(currentDumpPath)); for (FileStatus status : statuses) { - if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) { - latestUpdatedStatus = status; - } + fs.delete(status.getPath(), true); } - DumpMetaData dmd = new DumpMetaData(new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR), conf); + } + } + + private void writeDumpCompleteAck(Path currentDumpPath) throws SemanticException { + Path ackPath = new Path(currentDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT); + Utils.create(ackPath, conf); + } + + private Long getEventFromPreviousDumpMetadata(Path previousDumpPath) throws SemanticException { + if (previousDumpPath != null) { + DumpMetaData dmd = new DumpMetaData(previousDumpPath, conf); if (dmd.isIncrementalDump()) { return dmd.getEventTo(); } @@ -168,6 +185,34 @@ private Long getEventFromPreviousDumpMetadata(Path dumpRoot) throws IOException, return 0L; } + private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException { + FileSystem fs = dumpRoot.getFileSystem(conf); + if (fs.exists(dumpRoot)) { + FileStatus[] statuses = fs.listStatus(dumpRoot); + if (statuses.length > 0) { + FileStatus latestUpdatedStatus = statuses[0]; + for (FileStatus status : statuses) { + if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) { + latestUpdatedStatus = status; + } + } + return latestUpdatedStatus.getPath(); + } + } + return null; + } + + private boolean shouldDump(Path previousDumpPath) throws IOException { + //If no previous dump means bootstrap. So return true as there was no + //previous dump to load + if (previousDumpPath == null) { + return true; + } else { + FileSystem fs = previousDumpPath.getFileSystem(conf); + return fs.exists(new Path(previousDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT)); + } + } + private void prepareReturnValues(List values) throws SemanticException { LOG.debug("prepareReturnValues : " + dumpSchema); for (String s : values) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckTask.java new file mode 100644 index 0000000000..975dfb03af --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckTask.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.apache.hadoop.hive.ql.plan.api.StageType; + +import java.io.Serializable; + +/** + * ReplLoadCompleteAckTask. + * + * Add the load complete acknoledgement. + **/ +public class ReplLoadCompleteAckTask extends Task implements Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public int execute() { + try { + Path ackPath = new Path(work.getDumpPath(), ReplUtils.LOAD_ACKNOWLEDGEMENT); + Utils.create(ackPath, conf); + } catch (SemanticException e) { + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + return 0; + } + + @Override + public StageType getType() { + return StageType.REPL_LOAD_COMPLETE_ACK; + } + + @Override + public String getName() { + return "REPL_LOAD_COMPLETE_ACK"; + } + + @Override + public boolean canExecuteInParallel() { + // REPL_LOAD_COMPLETE_ACK is executed only when all its parents are done with execution. + // So running it in parallel has no + // benefits. + return false; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckWork.java new file mode 100644 index 0000000000..c36ee6d673 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckWork.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.Explain.Level; + +import java.io.Serializable; + +/** + * ReplLoadCompleteAckWork. + * FS based Acknowledgement for repl load complete + * + */ +@Explain(displayName = "Repl Load Complete Ack", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) +public class ReplLoadCompleteAckWork implements Serializable { + private static final long serialVersionUID = 1L; + private String dumpPath; + + public String getDumpPath() { + return dumpPath; + } + + public ReplLoadCompleteAckWork(String dumpPath) { + this.dumpPath = dumpPath; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index a2c467bafd..c7bb17ec63 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -19,13 +19,9 @@ import com.google.common.collect.Collections2; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.ddl.database.alter.poperties.AlterDatabaseSetPropertiesDesc; @@ -39,7 +35,6 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.FSTableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction; @@ -49,20 +44,16 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.api.StageType; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -286,6 +277,7 @@ a database ( directory ) // Populate the driver context with the scratch dir info from the repl context, so that the temp dirs will be cleaned up later context.getFsScratchDirs().putAll(loadContext.pathInfo.getFsScratchDirs()); + createReplLoadCompleteAckTask(); } catch (RuntimeException e) { LOG.error("replication failed with run time exception", e); throw e; @@ -298,77 +290,6 @@ a database ( directory ) return 0; } - /** - * Cleanup/drop tables from the given database which are bootstrapped by input dump dir. - * @throws HiveException Failed to drop the tables. - * @throws IOException File operations failure. - * @throws InvalidInputException Invalid input dump directory. - */ - private void cleanTablesFromBootstrap() throws HiveException, IOException, InvalidInputException { - Path bootstrapDirectory = new PathBuilder(work.bootstrapDumpToCleanTables) - .addDescendant(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME).build(); - FileSystem fs = bootstrapDirectory.getFileSystem(conf); - - if (!fs.exists(bootstrapDirectory)) { - throw new InvalidInputException("Input bootstrap dump directory specified to clean tables from is invalid: " - + bootstrapDirectory); - } - - FileStatus[] fileStatuses = fs.listStatus(bootstrapDirectory, EximUtil.getDirectoryFilter(fs)); - if ((fileStatuses == null) || (fileStatuses.length == 0)) { - throw new InvalidInputException("Input bootstrap dump directory specified to clean tables from is empty: " - + bootstrapDirectory); - } - - if (StringUtils.isNotBlank(work.dbNameToLoadIn) && (fileStatuses.length > 1)) { - throw new InvalidInputException("Input bootstrap dump directory specified to clean tables from has multiple" - + " DB dirs in the dump: " + bootstrapDirectory - + " which is not allowed on single target DB: " + work.dbNameToLoadIn); - } - - // Iterate over the DBs and tables listed in the input bootstrap dump directory to clean tables from. - BootstrapEventsIterator bootstrapEventsIterator - = new BootstrapEventsIterator(bootstrapDirectory.toString(), work.dbNameToLoadIn, false, conf); - - // This map will have only one entry if target database is renamed using input DB name from REPL LOAD. - // For multiple DBs case, this map maintains the table names list against each DB. - Map> dbToTblsListMap = new HashMap<>(); - while (bootstrapEventsIterator.hasNext()) { - BootstrapEvent event = bootstrapEventsIterator.next(); - if (event.eventType().equals(BootstrapEvent.EventType.Table)) { - FSTableEvent tableEvent = (FSTableEvent) event; - String dbName = (StringUtils.isBlank(work.dbNameToLoadIn) ? tableEvent.getDbName() : work.dbNameToLoadIn); - List tableNames; - if (dbToTblsListMap.containsKey(dbName)) { - tableNames = dbToTblsListMap.get(dbName); - } else { - tableNames = new ArrayList<>(); - dbToTblsListMap.put(dbName, tableNames); - } - tableNames.add(tableEvent.getTableName()); - } - } - - // No tables listed in the given bootstrap dump directory specified to clean tables. - if (dbToTblsListMap.isEmpty()) { - LOG.info("No DB/tables are listed in the bootstrap dump: {} specified to clean tables.", - bootstrapDirectory); - return; - } - - Hive db = getHive(); - for (Map.Entry> dbEntry : dbToTblsListMap.entrySet()) { - String dbName = dbEntry.getKey(); - List tableNames = dbEntry.getValue(); - - for (String table : tableNames) { - db.dropTable(dbName + "." + table, true); - } - LOG.info("Tables listed in the Database: {} in the bootstrap dump: {} are cleaned", - dbName, bootstrapDirectory); - } - } - /** * If replication policy is changed between previous and current load, then the excluded tables in * the new replication policy will be dropped. @@ -398,6 +319,21 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep dbName); } + private void createReplLoadCompleteAckTask() { + if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks()) + || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) { + //All repl load tasks are executed and status is 0, create the task to add the acknowledgement + ReplLoadCompleteAckWork replLoadCompleteAckWork = new ReplLoadCompleteAckWork(work.dumpDirectory); + Task loadCompleteAckWorkTask = TaskFactory.get(replLoadCompleteAckWork, conf); + if (this.childTasks.isEmpty()) { + this.childTasks.add(loadCompleteAckWorkTask); + } else { + DAGTraversal.traverse(this.childTasks, + new AddDependencyToLeaves(Collections.singletonList(loadCompleteAckWorkTask))); + } + } + } + private void createEndReplLogTask(Context context, Scope scope, ReplLogger replLogger) throws SemanticException { Map dbProps; @@ -485,11 +421,6 @@ private void createBuilderTask(List> rootTasks) { private int executeIncrementalLoad() { try { - // If user has requested to cleanup any bootstrap dump, then just do it before incremental load. - if (work.needCleanTablesFromBootstrap) { - cleanTablesFromBootstrap(); - work.needCleanTablesFromBootstrap = false; - } // If replication policy is changed between previous and current repl load, then drop the tables // that are excluded in the new replication policy. @@ -562,6 +493,7 @@ private int executeIncrementalLoad() { DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf))); } this.childTasks = childTasks; + createReplLoadCompleteAckTask(); return 0; } catch (Exception e) { LOG.error("failed replication", e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index f25c71403d..370c5ec247 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.exec.Task; -import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.Iterator; @@ -46,8 +45,6 @@ final String dbNameToLoadIn; final ReplScope currentReplScope; final String dumpDirectory; - final String bootstrapDumpToCleanTables; - boolean needCleanTablesFromBootstrap; private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; @@ -77,10 +74,6 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, if ((currentReplScope != null) && StringUtils.isNotBlank(dbNameToLoadIn)) { currentReplScope.setDbName(dbNameToLoadIn); } - String bootstrapDumpToCleanTablesLoc = hiveConf.get(ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG); - this.bootstrapDumpToCleanTables = bootstrapDumpToCleanTablesLoc == null ? null : bootstrapDumpToCleanTablesLoc - + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; - this.needCleanTablesFromBootstrap = StringUtils.isNotBlank(this.bootstrapDumpToCleanTables); rootTask = null; if (isIncrementalDump) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 64ecf42eb4..211c3f014d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -70,10 +70,6 @@ // tasks. public static final String REPL_CURRENT_TBL_WRITE_ID = "hive.repl.current.table.write.id"; - // Configuration to be received via WITH clause of REPL LOAD to clean tables from any previously failed - // bootstrap load. - public static final String REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG = "hive.repl.clean.tables.from.bootstrap"; - public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; @@ -98,7 +94,10 @@ // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be // seen in production or in case of tests other than the ones where it's required. public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables"; - + //Acknowledgement for repl dump complete + public static final String DUMP_ACKNOWLEDGEMENT = "_finished_dump"; + //Acknowledgement for repl load complete + public static final String LOAD_ACKNOWLEDGEMENT = "_finished_load"; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index c2e9f883ce..9427e65ac0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -49,8 +49,6 @@ import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Base64; -import java.util.Arrays; -import java.util.Comparator; import java.util.List; import java.util.ArrayList; import java.util.Collections; @@ -83,7 +81,6 @@ // By default, this will be same as that of super class BaseSemanticAnalyzer. But need to obtain again // if the Hive configs are received from WITH clause in REPL LOAD or REPL STATUS commands. private Hive db; - private boolean isTargetAlreadyLoaded; private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; @@ -390,7 +387,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // tells us what is inside that dumpdir. //If repl status of target is greater than dumps, don't do anything as the load for the latest dump is done - if (!isTargetAlreadyLoaded) { + if (loadPath != null) { DumpMetaData dmd = new DumpMetaData(loadPath, conf); boolean evDump = false; @@ -406,6 +403,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { queryState.getLineageState(), evDump, dmd.getEventTo(), dirLocationsToCopy(loadPath, evDump)); rootTasks.add(TaskFactory.get(replLoadWork, conf)); + } else { + LOG.warn("Previous Dump Already Loaded"); } } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes @@ -429,36 +428,19 @@ private Path getCurrentLoadPath() throws IOException, SemanticException { } FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase); if (statuses.length > 0) { - //sort based on last modified. Recent one is at the end - Arrays.sort(statuses, new Comparator() { - public int compare(FileStatus f1, FileStatus f2) { - return Long.compare(f1.getModificationTime(), f2.getModificationTime()); + //sort based on last modified. Recent one is at the beginning + FileStatus latestUpdatedStatus = statuses[0]; + for (FileStatus status : statuses) { + if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) { + latestUpdatedStatus = status; } - }); - if (replScope.getDbName() != null) { - String currentReplStatusOfTarget - = getReplStatus(replScope.getDbName()); - if (currentReplStatusOfTarget == null) { //bootstrap - return new Path(statuses[0].getPath(), ReplUtils.REPL_HIVE_BASE_DIR); - } else { - DumpMetaData latestDump = new DumpMetaData( - new Path(statuses[statuses.length - 1].getPath(), ReplUtils.REPL_HIVE_BASE_DIR), conf); - if (Long.parseLong(currentReplStatusOfTarget.trim()) >= latestDump.getEventTo()) { - isTargetAlreadyLoaded = true; - } else { - for (FileStatus status : statuses) { - Path hiveLoadPath = new Path(status.getPath(), ReplUtils.REPL_HIVE_BASE_DIR); - DumpMetaData dmd = new DumpMetaData(hiveLoadPath, conf); - if (dmd.isIncrementalDump() - && Long.parseLong(currentReplStatusOfTarget.trim()) < dmd.getEventTo()) { - return hiveLoadPath; - } - } - } + } + Path hiveDumpPath = new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR); + if (loadPathBase.getFileSystem(conf).exists(hiveDumpPath)) { + if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT)) + && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))) { + return hiveDumpPath; } - } else { - //If dbname is null(in case of repl load *), can't get repl status of target, return unsupported - throw new UnsupportedOperationException("REPL LOAD * is not supported"); } } return null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 6f8912b5f9..a4681a7c75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.utils.Retry; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -82,6 +83,23 @@ public static void writeOutput(List> listValues, Path outputFile, H } } + public static void create(Path outputFile, HiveConf hiveConf) + throws SemanticException { + Retry retriable = new Retry(IOException.class) { + @Override + public Void execute() throws IOException { + FileSystem fs = outputFile.getFileSystem(hiveConf); + fs.create(outputFile); + return null; + } + }; + try { + retriable.run(); + } catch (Exception e) { + throw new SemanticException(e); + } + } + public static Iterable matchesDb(Hive db, String dbPattern) throws HiveException { if (dbPattern == null) { return db.getAllDatabases();