diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index efe9fff780..f2b9d3951b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -98,6 +98,8 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -315,8 +317,8 @@ public void testBasic() throws IOException { FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString()))); verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data, driverMirror); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); @@ -367,8 +369,8 @@ public void testBootstrapFailedDump() throws IOException { advanceDumpDir(); FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString()))); verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptnData, driverMirror); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptnData1, driverMirror); @@ -452,7 +454,7 @@ public void testTaskCreationOptimization() throws Throwable { Path loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); //delete load ack to reload the same dump - loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + loadPath.getFileSystem(hconf).delete(new Path(loadPath, LOAD_ACKNOWLEDGEMENT.toString()), true); loadAndVerify(dbNameReplica, dbName, dump.lastReplId); run("insert into table " + dbName + ".t2 partition(country='india') values ('delhi')", driver); @@ -466,7 +468,7 @@ public void testTaskCreationOptimization() throws Throwable { loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); //delete load ack to reload the same dump - loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + loadPath.getFileSystem(hconf).delete(new Path(loadPath, LOAD_ACKNOWLEDGEMENT.toString()), true); loadAndVerify(dbNameReplica, dbName, dump.lastReplId); run("insert into table " + dbName + ".t2 partition(country='us') values ('sf')", driver); @@ -902,8 +904,8 @@ public void testIncrementalAdds() throws IOException { Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); Path dumpPath = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString()))); // VERIFY tables and partitions on destination for equivalence. verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); @@ -1723,7 +1725,8 @@ public void testIncrementalLoadWithOneFailedDump() throws IOException { Tuple incrementalDump = replDumpDb(dbName); //Remove the dump ack file, so that dump is treated as an invalid dump. - String ackFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT; + String ackFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + + DUMP_ACKNOWLEDGEMENT.toString(); Path dumpFinishedAckFilePath = new Path(incrementalDump.dumpLocation, ackFileRelativePath); Path tmpDumpFinishedAckFilePath = new Path(dumpFinishedAckFilePath.getParent(), "old_" + dumpFinishedAckFilePath.getName()); @@ -1809,7 +1812,7 @@ public void testIncrementalLoadWithPreviousDumpDeleteFailed() throws IOException FileSystem fs = FileSystem.get(fileToDelete.toUri(), hconf); fs.delete(fileToDelete, true); assertTrue(fs.exists(bootstrapDumpDir)); - assertTrue(fs.exists(new Path(bootstrapDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT))); + assertTrue(fs.exists(new Path(bootstrapDumpDir, DUMP_ACKNOWLEDGEMENT.toString()))); loadAndVerify(replDbName, dbName, incrDump.lastReplId); @@ -3835,4 +3838,244 @@ private static void createTestDataFile(String filename, String[] lines) throws I } } } + + @Test + public void testCheckPointingInDumpFailure() throws IOException { + String testname = testName.getMethodName(); + String dbName = createDB(testname, driver); + run("CREATE TABLE " + dbName + ".t1(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".t2(a string) STORED AS TEXTFILE", driver); + + run("insert into "+ dbName +".t1 values (1)", driver); + run("insert into "+ dbName +".t1 values (2)", driver); + run("insert into "+ dbName +".t1 values (3)", driver); + run("insert into "+ dbName +".t2 values (11)", driver); + run("insert into "+ dbName +".t2 values (21)", driver); + + String replicatedDbName = dbName + "_dupe"; + Tuple bootstrapDump = replDumpDb(dbName); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path dbPath = new Path(dumpPath + Path.SEPARATOR + dbName); + Path tablet1Path = new Path(dbPath, "t1"); + Path tablet2Path = new Path(dbPath, "t2"); + long modifiedTimeTable1 = fs.getFileStatus(new Path(tablet1Path, "data")).getModificationTime(); + long modifiedTimeTable2 = fs.getFileStatus(new Path(tablet2Path, "data")).getModificationTime(); + //Delete dump ack, metadata should be rewritten, data should be same + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + //Do another dump. It should only dump table t2. Modification time of table t1 is same while t2 is greater + Tuple nextDump = incrementalLoadAndVerify(dbName, replicatedDbName); + assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertEquals(modifiedTimeTable1, fs.getFileStatus(new Path(tablet1Path, "data")).getModificationTime()); + assertEquals(modifiedTimeTable2, fs.getFileStatus(new Path(tablet2Path, "data")) + .getModificationTime()); + } + +// @Test +// public void testCheckPointingWithSourceTableDataInserted() throws IOException { +// String testname = testName.getMethodName(); +// String dbName = createDB(testname, driver); +// run("CREATE TABLE " + dbName + ".t1(a string) STORED AS TEXTFILE", driver); +// run("CREATE TABLE " + dbName + ".t2(a string) STORED AS TEXTFILE", driver); +// +// run("insert into "+ dbName +".t1 values (1)", driver); +// run("insert into "+ dbName +".t1 values (2)", driver); +// run("insert into "+ dbName +".t1 values (3)", driver); +// run("insert into "+ dbName +".t2 values (11)", driver); +// run("insert into "+ dbName +".t2 values (21)", driver); +// +// String replicatedDbName = dbName + "_dupe"; +// Tuple bootstrapDump = replDumpDb(dbName); +// +// FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); +// Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); +// assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); +// Path dbPath = new Path(dumpPath + Path.SEPARATOR + dbName); +// Path tablet1Path = new Path(dbPath, "t1"); +// Path tablet2Path = new Path(dbPath, "t2"); +// //Delete table 2 copy ack +// fs.delete(new Path(new Path(tablet2Path, "data"), +// COPY_ACKNOWLEDGEMENT.toString()), true); +// fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); +// //Do a load. It shouldn't contain any data +// run("REPL LOAD " + dbName + " INTO " + replicatedDbName, driverMirror); +// verifySetup("select tables in " + replicatedDbName, new String[]{}, driver); +// +// //Do another dump. It should only dump table t2. Also insert new data. New data will not be there in target +// run("insert into "+ dbName +".t2 values (13)", driver); +// run("insert into "+ dbName +".t2 values (24)", driver); +// +// incrementalLoadAndVerify(dbName, replicatedDbName); +// String[] t1Data = new String[]{"1" , "2", "3"}; +// String[] t2Data = new String[]{"11" , "21"}; +// verifySetup("select * from " + replicatedDbName + ".t1", t1Data, driver); +// verifySetup("select * from " + replicatedDbName + ".t2", t2Data, driver); +// +// incrementalLoadAndVerify(dbName, replicatedDbName); +// t1Data = new String[]{"1" , "2", "3"}; +// t2Data = new String[]{"11" , "21", "13", "24"}; +// verifySetup("select * from " + replicatedDbName + ".t1", t1Data, driver); +// verifySetup("select * from " + replicatedDbName + ".t2", t2Data, driver); +// } +// +// @Test +// public void testCheckPointingWithNewTablesAdded() throws IOException { +// String testname = testName.getMethodName(); +// String dbName = createDB(testname, driver); +// run("CREATE TABLE " + dbName + ".t1(a string) STORED AS TEXTFILE", driver); +// run("CREATE TABLE " + dbName + ".t2(a string) STORED AS TEXTFILE", driver); +// +// run("insert into "+ dbName +".t1 values (1)", driver); +// run("insert into "+ dbName +".t1 values (2)", driver); +// run("insert into "+ dbName +".t1 values (3)", driver); +// run("insert into "+ dbName +".t2 values (11)", driver); +// run("insert into "+ dbName +".t2 values (21)", driver); +// +// String replicatedDbName = dbName + "_dupe"; +// Tuple bootstrapDump = replDumpDb(dbName); +// +// FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); +// Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); +// assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); +// Path dbPath = new Path(dumpPath + Path.SEPARATOR + dbName); +// Path tablet1Path = new Path(dbPath, "t1"); +// Path tablet2Path = new Path(dbPath, "t2"); +// //Delete table 2 copy ack +// fs.delete(new Path(new Path(tablet2Path, "data"), +// COPY_ACKNOWLEDGEMENT.toString()), true); +// fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); +// //Do a load. It shouldn't contain any data +// run("REPL LOAD " + dbName + " INTO " + replicatedDbName, driverMirror); +// verifySetup("select tables in " + replicatedDbName, new String[]{}, driver); +// +// //Do another dump. It should only dump table t2. Also insert new data. New data will not be there in target +// run("insert into "+ dbName +".t2 values (13)", driver); +// run("insert into "+ dbName +".t2 values (24)", driver); +// run("CREATE TABLE " + dbName + ".t3(a string) STORED AS TEXTFILE", driver); +// run("insert into "+ dbName +".t3 values (1)", driver); +// run("insert into "+ dbName +".t3 values (2)", driver); +// +// incrementalLoadAndVerify(dbName, replicatedDbName); +// String[] t1Data = new String[]{"1" , "2", "3"}; +// String[] t2Data = new String[]{"11" , "21"}; +// String[] tableList = new String[]{"t1", "t2"}; +// verifySetup("select * from " + replicatedDbName + ".t1", t1Data, driver); +// verifySetup("select * from " + replicatedDbName + ".t2", t2Data, driver); +// verifySetup("show tables in " + replicatedDbName, tableList, driver); +// +// incrementalLoadAndVerify(dbName, replicatedDbName); +// t1Data = new String[]{"1" , "2", "3"}; +// t2Data = new String[]{"11" , "21", "13", "24"}; +// tableList = new String[]{"t1", "t2", "t3"}; +// String[] t3Data = new String[]{"1" , "2", "3"}; +// verifySetup("select * from " + replicatedDbName + ".t1", t1Data, driver); +// verifySetup("select * from " + replicatedDbName + ".t2", t2Data, driver); +// verifySetup("show tables in " + replicatedDbName, tableList, driver); +// verifySetup("select * from " + replicatedDbName + ".t3", t3Data, driver); +// } +// +// @Test +// public void testCheckPointingWithSourceTableDeleted() throws IOException { +// String testname = testName.getMethodName(); +// String dbName = createDB(testname, driver); +// run("CREATE TABLE " + dbName + ".t1(a string) STORED AS TEXTFILE", driver); +// run("CREATE TABLE " + dbName + ".t2(a string) STORED AS TEXTFILE", driver); +// +// run("insert into "+ dbName +".t1 values (1)", driver); +// run("insert into "+ dbName +".t1 values (2)", driver); +// run("insert into "+ dbName +".t1 values (3)", driver); +// run("insert into "+ dbName +".t2 values (11)", driver); +// run("insert into "+ dbName +".t2 values (21)", driver); +// +// String replicatedDbName = dbName + "_dupe"; +// Tuple bootstrapDump = replDumpDb(dbName); +// +// FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); +// Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); +// assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); +// Path dbPath = new Path(dumpPath + Path.SEPARATOR + dbName); +// Path tablet1Path = new Path(dbPath, "t1"); +// Path tablet2Path = new Path(dbPath, "t2"); +// //Delete table 2 copy ack +// fs.delete(new Path(new Path(tablet2Path, "data"), +// COPY_ACKNOWLEDGEMENT.toString()), true); +// fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); +// +// //Do another dump. It should only dump table t2. Also drop table. New data will not be there in target +// run("drop "+ dbName +".t1", driver); +// +// incrementalLoadAndVerify(dbName, replicatedDbName); +// String[] t1Data = new String[]{"1" , "2", "3"}; +// String[] t2Data = new String[]{"11" , "21"}; +// verifySetup("select * from " + replicatedDbName + ".t1", t1Data, driver); +// verifySetup("select * from " + replicatedDbName + ".t2", t2Data, driver); +// +// //Previous drop table refected in next incremental dump +// incrementalLoadAndVerify(dbName, replicatedDbName); +// String[] tableList = new String[]{"t2"}; +// t2Data = new String[]{"11" , "21"}; +// verifySetup("show tables in " + replicatedDbName, tableList, driver); +// verifySetup("select * from " + replicatedDbName + ".t2", t2Data, driver); +// } +// +// @Test +// public void testCheckPointingMetadataDumpFailure() throws IOException { +// String testname = testName.getMethodName(); +// String dbName = createDB(testname, driver); +// run("CREATE TABLE " + dbName + ".t1(a string) STORED AS TEXTFILE", driver); +// run("CREATE TABLE " + dbName + ".t2(a string) STORED AS TEXTFILE", driver); +// +// run("insert into "+ dbName +".t1 values (1)", driver); +// run("insert into "+ dbName +".t1 values (2)", driver); +// run("insert into "+ dbName +".t1 values (3)", driver); +// run("insert into "+ dbName +".t2 values (11)", driver); +// run("insert into "+ dbName +".t2 values (21)", driver); +// +// String replicatedDbName = dbName + "_dupe"; +// Tuple bootstrapDump = replDumpDb(dbName); +// +// FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); +// Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); +// assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); +// Path dbPath = new Path(dumpPath + Path.SEPARATOR + dbName); +// Path tablet1Path = new Path(dbPath, "t1"); +// assertTrue(fs.exists(new Path(new Path(tablet1Path, "data"), +// COPY_ACKNOWLEDGEMENT.toString()))); +// Path tablet2Path = new Path(dbPath, "t2"); +// assertTrue(fs.exists(new Path(new Path(tablet2Path, "data"), +// COPY_ACKNOWLEDGEMENT.toString()))); +// long modifiedTimeTable1 = fs.getFileStatus(new Path(tablet1Path, "data")).getModificationTime(); +// long modifiedTimeTable2 = fs.getFileStatus(new Path(tablet2Path, "data")).getModificationTime(); +// //Delete metadata ack also +// fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); +// fs.delete(new Path(dumpPath, "_dumpmetadata"), true); +// assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); +// assertFalse(fs.exists(new Path(new Path(tablet2Path, "data"), +// COPY_ACKNOWLEDGEMENT.toString()))); +// //Insert new data +// run("insert into "+ dbName +".t1 values (12)", driver); +// run("insert into "+ dbName +".t1 values (13)", driver); +// //Do another dump. It should be treated as a new dump and shouldn't resume as metadata dump failed +// // checkpointing will not be used +// Tuple nextDump = incrementalLoadAndVerify(dbName, replicatedDbName); +// Path nextDumpPath = new Path(nextDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); +// Path nextDbPath = new Path(nextDumpPath + Path.SEPARATOR + dbName); +// Path nextTablet1Path = new Path(nextDbPath, "t1"); +// Path nextTablet2Path = new Path(nextDbPath, "t2"); +// assertTrue(fs.exists(new Path(nextDumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); +// assertTrue(fs.exists(new Path(new Path(nextTablet1Path, "data"), COPY_ACKNOWLEDGEMENT.toString()))); +// assertTrue(fs.exists(new Path(new Path(nextTablet2Path, "data"), COPY_ACKNOWLEDGEMENT.toString()))); +// assertTrue(modifiedTimeTable1 < fs.getFileStatus(new Path(nextTablet1Path, "data")) +// .getModificationTime()); +// assertTrue(modifiedTimeTable2 < fs.getFileStatus(new Path(nextTablet2Path, "data")) +// .getModificationTime()); +// String[] t1Data = new String[]{"1" , "2", "3", "12", "13"}; +// String[] t2Data = new String[]{"11" , "21"}; +// verifySetup("select * from " + replicatedDbName + ".t2", t2Data, driver); +// verifySetup("select * from " + replicatedDbName + ".t1", t1Data, driver); +// } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 56b27a555e..33124c8f76 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -55,6 +55,7 @@ import java.util.stream.Collectors; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -1093,13 +1094,13 @@ public void testIfBootstrapReplLoadFailWhenRetryAfterBootstrapComplete() throws // To retry with same dump delete the load ack new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path( - hiveDumpLocation, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + hiveDumpLocation, LOAD_ACKNOWLEDGEMENT.toString()), true); // Retry with same dump with which it was already loaded also fails. replica.loadFailure(replicatedDbName, primaryDbName); // To retry with same dump delete the load ack new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path( - hiveDumpLocation, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + hiveDumpLocation, LOAD_ACKNOWLEDGEMENT.toString()), true); // Retry from same dump when the database is empty is also not allowed. replica.run("drop table t1") .run("drop table t2") @@ -1344,7 +1345,7 @@ public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Thro //delete load ack to reuse the dump new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR - + ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + + LOAD_ACKNOWLEDGEMENT.toString()), true); replica.load(replicatedDbName_CM, primaryDbName, withConfigs); replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") @@ -1370,7 +1371,7 @@ public void testMoveOptimizationIncrementalFailureAfterCopy() throws Throwable { replica.load(replicatedDbName, primaryDbName, withConfigs); //delete load ack to reuse the dump new Path(bootstrapDump.dumpLocation).getFileSystem(conf).delete(new Path(bootstrapDump.dumpLocation - + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + LOAD_ACKNOWLEDGEMENT.toString()), true); replica.load(replicatedDbName_CM, primaryDbName, withConfigs); replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); @@ -1423,7 +1424,7 @@ public Boolean apply(NotificationEvent entry) { //delete load ack to reuse the dump new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR - + ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + + LOAD_ACKNOWLEDGEMENT.toString()), true); InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java index efef052268..46f9bb3add 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java @@ -28,6 +28,7 @@ Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class DirCopyWork implements Serializable { + private static final long serialVersionUID = 1L; private final Path fullyQualifiedSourcePath; private final Path fullyQualifiedTargetPath; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java new file mode 100644 index 0000000000..db8db5f8e7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +/** + * ReplAck, used for repl acknowledgement constants. + */ +public enum ReplAck { + DUMP_ACKNOWLEDGEMENT("_finished_dump"), + LOAD_ACKNOWLEDGEMENT("_finished_load"); + private String ack; + ReplAck(String ack) { + this.ack = ack; + } + + @Override + public String toString() { + return ack; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 69f6ffef5a..d186318b6e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -93,6 +93,7 @@ import java.util.ArrayList; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; public class ReplDumpTask extends Task implements Serializable { private static final long serialVersionUID = 1L; @@ -135,20 +136,20 @@ public int execute() { Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() .getBytes(StandardCharsets.UTF_8.name()))); - Path previousHiveDumpPath = getPreviousDumpMetadataPath(dumpRoot); + Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot); //If no previous dump is present or previous dump is already loaded, proceed with the dump operation. - if (shouldDump(previousHiveDumpPath)) { - Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); + if (shouldDump(previousValidHiveDumpPath)) { + Path currentDumpPath = getCurrentDumpPath(dumpRoot); Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); // Initialize ReplChangeManager instance since we will require it to encode file URI. ReplChangeManager.getInstance(conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; - if (previousHiveDumpPath == null) { + if (previousValidHiveDumpPath == null) { lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); } else { - work.setEventFrom(getEventFromPreviousDumpMetadata(previousHiveDumpPath)); + work.setEventFrom(getEventFromPreviousDumpMetadata(previousValidHiveDumpPath)); lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); } work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); @@ -166,6 +167,16 @@ public int execute() { return 0; } + private Path getCurrentDumpPath(Path dumpRoot) throws IOException { + Path previousDumpPath = getPreviousDumpPath(dumpRoot); + if (previousDumpPath != null && !validDump(previousDumpPath) && shouldResumePreviousDump(previousDumpPath)) { + //Resume previous dump + return previousDumpPath; + } else { + return new Path(dumpRoot, getNextDumpDir()); + } + } + private void initiateDataCopyTasks() throws SemanticException, IOException { TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); List> childTasks = new ArrayList<>(); @@ -183,7 +194,8 @@ private void initiateDataCopyTasks() throws SemanticException, IOException { private void finishRemainingTasks() throws SemanticException, IOException { prepareReturnValues(work.getResultValues()); Path dumpAckFile = new Path(work.getCurrentDumpPath(), - ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT); + ReplUtils.REPL_HIVE_BASE_DIR + File.separator + + ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); Utils.create(dumpAckFile, conf); deleteAllPreviousDumpMeta(work.getCurrentDumpPath()); } @@ -233,7 +245,7 @@ private Long getEventFromPreviousDumpMetadata(Path previousDumpPath) throws Sema return 0L; } - private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException { + private Path getPreviousValidDumpMetadataPath(Path dumpRoot) throws IOException { FileStatus latestValidStatus = null; FileSystem fs = dumpRoot.getFileSystem(conf); if (fs.exists(dumpRoot)) { @@ -241,8 +253,8 @@ private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException { for (FileStatus status : statuses) { LOG.info("Evaluating previous dump dir path:{}", status.getPath()); if (latestValidStatus == null) { - latestValidStatus = validDump(fs, status.getPath()) ? status : null; - } else if (validDump(fs, status.getPath()) + latestValidStatus = validDump(status.getPath()) ? status : null; + } else if (validDump(status.getPath()) && status.getModificationTime() > latestValidStatus.getModificationTime()) { latestValidStatus = status; } @@ -254,10 +266,14 @@ private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException { return latestDumpDir; } - private boolean validDump(FileSystem fs, Path dumpDir) throws IOException { + private boolean validDump(Path dumpDir) throws IOException { //Check if it was a successful dump - Path hiveDumpDir = new Path(dumpDir, ReplUtils.REPL_HIVE_BASE_DIR); - return fs.exists(new Path(hiveDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT)); + if (dumpDir != null) { + FileSystem fs = dumpDir.getFileSystem(conf); + Path hiveDumpDir = new Path(dumpDir, ReplUtils.REPL_HIVE_BASE_DIR); + return fs.exists(new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString())); + } + return false; } private boolean shouldDump(Path previousDumpPath) throws IOException { @@ -267,7 +283,7 @@ private boolean shouldDump(Path previousDumpPath) throws IOException { return true; } else { FileSystem fs = previousDumpPath.getFileSystem(conf); - return fs.exists(new Path(previousDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT)); + return fs.exists(new Path(previousDumpPath, LOAD_ACKNOWLEDGEMENT.toString())); } } @@ -472,7 +488,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive if (shouldBootstrapDumpTable(table)) { HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(table); managedTableCopyPaths.addAll( - dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, + dumpTable(dbName, tableName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb, tableTuple)); } if (tableList != null && isTableSatifiesConfig(table)) { @@ -611,13 +627,14 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) List tableList; LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern); + List extTableCopyWorks = new ArrayList<>(); + List managedTableCopyPaths = new ArrayList<>(); long timeoutInMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); long waitUntilTime = System.currentTimeMillis() + timeoutInMs; - + Path dataDumpPath = new Path(dumpRoot, EximUtil.DATA_PATH_NAME); + Path metadataDumpPath = new Path(dumpRoot, EximUtil.METADATA_PATH_NAME); String validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); - List extTableCopyWorks = new ArrayList<>(); - List managedTableCopyPaths = new ArrayList<>(); for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("Dumping db: " + dbName); @@ -653,7 +670,8 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) LOG.debug("Adding table {} to external tables list", tblName); extTableLocations.addAll(writer.dataLocationDump(tableTuple.object)); } - managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, + managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, + bootDumpBeginReplId, hiveDb, tableTuple)); } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. @@ -677,11 +695,11 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throw e; } else { LOG.error("failed to reset the db state for " + uniqueKey - + " on failure of repl dump", e); + + " on failure of repl dump", e); throw caught; } } - if(caught != null) { + if (caught != null) { throw caught; } } @@ -689,14 +707,29 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) } Long bootDumpEndReplId = currentNotificationId(hiveDb); LOG.info("Preparing to return {},{}->{}", - dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); + dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); dmd.write(); + work.setDirCopyIterator(extTableCopyWorks.iterator()); work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); return bootDumpBeginReplId; } + private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) { + try { + return dumpMetaData.getEventFrom() != null; + } catch (Exception e) { + LOG.info("No previous dump present"); + return false; + } + } + + private boolean shouldResumePreviousDump(Path dumpPath) { + Path hiveDumpPath = new Path(dumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf)); + } + long currentNotificationId(Hive hiveDb) throws TException { return hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); } @@ -711,7 +744,7 @@ Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) return dbRoot; } - List dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot, + List dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple) throws Exception { LOG.info("Bootstrap Dump for table " + tblName); TableSpec tableSpec = new TableSpec(tuple.object); @@ -827,6 +860,24 @@ private String getNextDumpDir() { } } + private Path getPreviousDumpPath(Path dumpRoot) throws IOException { + FileSystem fs = dumpRoot.getFileSystem(conf); + if (fs.exists(dumpRoot)) { + FileStatus[] statuses = fs.listStatus(dumpRoot); + if (statuses.length > 0) { + FileStatus latestValidStatus = statuses[0]; + for (FileStatus status : statuses) { + LOG.info("Evaluating previous dump dir path:{}", status.getPath()); + if (status.getModificationTime() > latestValidStatus.getModificationTime()) { + latestValidStatus = status; + } + } + return latestValidStatus.getPath(); + } + } + return null; + } + void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) throws Exception { Path functionsRoot = new Path(new Path(dumpRoot, dbName), ReplUtils.FUNCTIONS_ROOT_DIR_NAME); List functionNames = hiveDb.getFunctions(dbName, "*"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 3427b59e67..a5935552c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -65,6 +64,7 @@ import java.util.Map; import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; public class ReplLoadTask extends Task implements Serializable { private final static int ZERO_TASKS = 0; @@ -316,7 +316,7 @@ private void createReplLoadCompleteAckTask() { || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) { //All repl load tasks are executed and status is 0, create the task to add the acknowledgement AckWork replLoadAckWork = new AckWork( - new Path(work.dumpDirectory, ReplUtils.LOAD_ACKNOWLEDGEMENT)); + new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString())); Task loadAckWorkTask = TaskFactory.get(replLoadAckWork, conf); if (this.childTasks.isEmpty()) { this.childTasks.add(loadAckWorkTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 211c3f014d..ed0ddebb80 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -94,10 +94,6 @@ // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be // seen in production or in case of tests other than the ones where it's required. public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables"; - //Acknowledgement for repl dump complete - public static final String DUMP_ACKNOWLEDGEMENT = "_finished_dump"; - //Acknowledgement for repl load complete - public static final String LOAD_ACKNOWLEDGEMENT = "_finished_load"; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index bc90ea1db7..11148f5222 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; @@ -74,6 +75,7 @@ public static final String METADATA_NAME = "_metadata"; public static final String FILES_NAME = "_files"; public static final String DATA_PATH_NAME = "data"; + public static final String METADATA_PATH_NAME = "metadata"; private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class); @@ -159,7 +161,8 @@ public void setOpenTxnTask(Task openTxnTask) { /** * Wrapper class for mapping source and target path for copying managed table data. */ - public static class ManagedTableCopyPath { + public static class ManagedTableCopyPath implements Serializable { + private static final long serialVersionUID = 1L; private ReplicationSpec replicationSpec; private static boolean nullSrcPathForTest = false; private Path srcPath; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 8802139e84..c4ff070da6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplAck; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; @@ -54,6 +55,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; @@ -424,8 +426,9 @@ private Path getCurrentLoadPath() throws IOException, SemanticException { } } Path hiveDumpPath = new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR); - if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT)) - && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))) { + if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, + ReplAck.DUMP_ACKNOWLEDGEMENT.toString())) + && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, LOAD_ACKNOWLEDGEMENT.toString()))) { return hiveDumpPath; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 5c8d0edd77..70607846ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import javax.annotation.Nullable; -import java.text.Collator; +import java.io.Serializable; import java.util.Map; /** @@ -35,8 +35,8 @@ * Typically, this corresponds to the replicationClause definition * in the parser. */ -public class ReplicationSpec { - +public class ReplicationSpec implements Serializable { + private static final long serialVersionUID = 1L; private boolean isInReplicationScope = false; // default is that it's not in a repl scope private boolean isMetadataOnly = false; // default is full export/import, not metadata-only private String eventId = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 683f3c0362..4746e54706 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -20,7 +20,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -296,13 +295,6 @@ private void validateTargetDir(URI rootDirExportFile) throws SemanticException { throw new SemanticException( astRepresentationForErrorMsg + ": " + "Target is not a directory : " + rootDirExportFile); - } else { - FileStatus[] files = fs.listStatus(toPath, FileUtils.HIDDEN_FILES_PATH_FILTER); - if (files != null && files.length != 0) { - throw new SemanticException( - astRepresentationForErrorMsg + ": " + "Target is not an empty directory : " - + rootDirExportFile); - } } } catch (FileNotFoundException ignored) { } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 81fac252a3..2d432f9c98 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -129,7 +129,7 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw @Override List dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, - Path replDataDir, long lastReplId, Hive hiveDb, + long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple) throws Exception { tableDumpCount++;