diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index efe9fff780..2bf1ee055c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -98,6 +98,9 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.COPY_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -315,8 +318,8 @@ public void testBasic() throws IOException { FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString()))); verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data, driverMirror); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); @@ -367,8 +370,8 @@ public void testBootstrapFailedDump() throws IOException { advanceDumpDir(); FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString()))); verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptnData, driverMirror); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptnData1, driverMirror); @@ -452,7 +455,7 @@ public void testTaskCreationOptimization() throws Throwable { Path loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); //delete load ack to reload the same dump - loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + loadPath.getFileSystem(hconf).delete(new Path(loadPath, LOAD_ACKNOWLEDGEMENT.toString()), true); loadAndVerify(dbNameReplica, dbName, dump.lastReplId); run("insert into table " + dbName + ".t2 partition(country='india') values ('delhi')", driver); @@ -466,7 +469,7 @@ public void testTaskCreationOptimization() throws Throwable { loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); //delete load ack to reload the same dump - loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + loadPath.getFileSystem(hconf).delete(new Path(loadPath, LOAD_ACKNOWLEDGEMENT.toString()), true); loadAndVerify(dbNameReplica, dbName, dump.lastReplId); run("insert into table " + dbName + ".t2 partition(country='us') values ('sf')", driver); @@ -902,8 +905,8 @@ public void testIncrementalAdds() throws IOException { Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); Path dumpPath = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString()))); // VERIFY tables and partitions on destination for equivalence. verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); @@ -1723,7 +1726,8 @@ public void testIncrementalLoadWithOneFailedDump() throws IOException { Tuple incrementalDump = replDumpDb(dbName); //Remove the dump ack file, so that dump is treated as an invalid dump. - String ackFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT; + String ackFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + + DUMP_ACKNOWLEDGEMENT.toString(); Path dumpFinishedAckFilePath = new Path(incrementalDump.dumpLocation, ackFileRelativePath); Path tmpDumpFinishedAckFilePath = new Path(dumpFinishedAckFilePath.getParent(), "old_" + dumpFinishedAckFilePath.getName()); @@ -1809,7 +1813,7 @@ public void testIncrementalLoadWithPreviousDumpDeleteFailed() throws IOException FileSystem fs = FileSystem.get(fileToDelete.toUri(), hconf); fs.delete(fileToDelete, true); assertTrue(fs.exists(bootstrapDumpDir)); - assertTrue(fs.exists(new Path(bootstrapDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT))); + assertTrue(fs.exists(new Path(bootstrapDumpDir, DUMP_ACKNOWLEDGEMENT.toString()))); loadAndVerify(replDbName, dbName, incrDump.lastReplId); @@ -3835,4 +3839,83 @@ private static void createTestDataFile(String filename, String[] lines) throws I } } } + + @Test + public void testCheckPointing() throws IOException { + String testname = "testCheckPointing"; + String dbName = createDB(testname, driver); + run("CREATE TABLE " + dbName + ".t1(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".t2(a string) STORED AS TEXTFILE", driver); + + run("insert into "+ dbName +".t1 values (1)", driver); + run("insert into "+ dbName +".t1 values (2)", driver); + run("insert into "+ dbName +".t1 values (3)", driver); + run("insert into "+ dbName +".t2 values (11)", driver); + run("insert into "+ dbName +".t2 values (21)", driver); + + String replicatedDbName = dbName + "_dupe"; + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replicatedDbName); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString()))); + Path dbPath = new Path(dumpPath + Path.SEPARATOR + dbName); + Path tablet1Path = new Path(dbPath, "t1"); + assertTrue(fs.exists(new Path(new Path(tablet1Path, "data"), + COPY_ACKNOWLEDGEMENT.toString()))); + Path tablet2Path = new Path(dbPath, "t2"); + assertTrue(fs.exists(new Path(new Path(tablet2Path, "data"), + COPY_ACKNOWLEDGEMENT.toString()))); + + run("insert into "+ dbName +".t1 values (3)", driver); + run("insert into "+ dbName +".t1 values (4)", driver); + incrementalLoadAndVerify(dbName, replicatedDbName); + } + + @Test + public void testCheckPointingInDumpFailure() throws IOException { + String testname = "testCheckPointingInDumpFailure"; + String dbName = createDB(testname, driver); + run("CREATE TABLE " + dbName + ".t1(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".t2(a string) STORED AS TEXTFILE", driver); + + run("insert into "+ dbName +".t1 values (1)", driver); + run("insert into "+ dbName +".t1 values (2)", driver); + run("insert into "+ dbName +".t1 values (3)", driver); + run("insert into "+ dbName +".t2 values (11)", driver); + run("insert into "+ dbName +".t2 values (21)", driver); + + String replicatedDbName = dbName + "_dupe"; + Tuple bootstrapDump = replDumpDb(dbName); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path dbPath = new Path(dumpPath + Path.SEPARATOR + dbName); + Path tablet1Path = new Path(dbPath, "t1"); + assertTrue(fs.exists(new Path(new Path(tablet1Path, "data"), + COPY_ACKNOWLEDGEMENT.toString()))); + Path tablet2Path = new Path(dbPath, "t2"); + assertTrue(fs.exists(new Path(new Path(tablet2Path, "data"), + COPY_ACKNOWLEDGEMENT.toString()))); + long modifiedTimeTable1 = fs.getFileStatus(new Path(tablet1Path, "data")).getModificationTime(); + long modifiedTimeTable2 = fs.getFileStatus(new Path(tablet2Path, "data")).getModificationTime(); + //Delete table 2 copy ack + fs.delete(new Path(new Path(tablet2Path, "data"), + COPY_ACKNOWLEDGEMENT.toString()), true); + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertFalse(fs.exists(new Path(new Path(tablet2Path, "data"), + COPY_ACKNOWLEDGEMENT.toString()))); + //Do another dump. It should only dump table t2. Modification time of table t1 is same while t2 is greater + Tuple nextDump = incrementalLoadAndVerify(dbName, replicatedDbName); + assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(new Path(tablet1Path, "data"), COPY_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(new Path(tablet2Path, "data"), COPY_ACKNOWLEDGEMENT.toString()))); + assertEquals(modifiedTimeTable1, fs.getFileStatus(new Path(tablet1Path, "data")).getModificationTime()); + assertTrue(modifiedTimeTable2 < fs.getFileStatus(new Path(tablet2Path, "data")) + .getModificationTime()); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 56b27a555e..33124c8f76 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -55,6 +55,7 @@ import java.util.stream.Collectors; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -1093,13 +1094,13 @@ public void testIfBootstrapReplLoadFailWhenRetryAfterBootstrapComplete() throws // To retry with same dump delete the load ack new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path( - hiveDumpLocation, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + hiveDumpLocation, LOAD_ACKNOWLEDGEMENT.toString()), true); // Retry with same dump with which it was already loaded also fails. replica.loadFailure(replicatedDbName, primaryDbName); // To retry with same dump delete the load ack new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path( - hiveDumpLocation, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + hiveDumpLocation, LOAD_ACKNOWLEDGEMENT.toString()), true); // Retry from same dump when the database is empty is also not allowed. replica.run("drop table t1") .run("drop table t2") @@ -1344,7 +1345,7 @@ public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Thro //delete load ack to reuse the dump new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR - + ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + + LOAD_ACKNOWLEDGEMENT.toString()), true); replica.load(replicatedDbName_CM, primaryDbName, withConfigs); replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") @@ -1370,7 +1371,7 @@ public void testMoveOptimizationIncrementalFailureAfterCopy() throws Throwable { replica.load(replicatedDbName, primaryDbName, withConfigs); //delete load ack to reuse the dump new Path(bootstrapDump.dumpLocation).getFileSystem(conf).delete(new Path(bootstrapDump.dumpLocation - + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + LOAD_ACKNOWLEDGEMENT.toString()), true); replica.load(replicatedDbName_CM, primaryDbName, withConfigs); replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); @@ -1423,7 +1424,7 @@ public Boolean apply(NotificationEvent entry) { //delete load ack to reuse the dump new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR - + ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + + LOAD_ACKNOWLEDGEMENT.toString()), true); InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 132578991d..b7c814ca3e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -33,6 +34,8 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; +import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; import org.junit.Assert; @@ -54,6 +57,9 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.COPY_ACKNOWLEDGEMENT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -906,6 +912,140 @@ public void replicationWithTableNameContainsKeywords() throws Throwable { .verifyReplTargetProperty(replicatedDbName); } + @Test + public void testCheckPointing() throws Throwable { + List withClauseOptions = externalTableBasePathWithClause(); + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE EXTERNAL TABLE t2(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for bootstrap + assertExternalFileInfo(Arrays.asList("t2"), bootstrapDump.dumpLocation, primaryDbName); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("repl status " + replicatedDbName) + .verifyResult(bootstrapDump.lastReplicationId) + .run("select a from t1") + .verifyResults(new String[]{"1", "2"}) + .run("select a from t2") + .verifyResults(new String[]{"11", "21"}); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString()))); + Path dbPath = new Path(dumpPath, primaryDbName.toLowerCase()); + Path tablet1Path = new Path(dbPath, "t1"); + //For managed table data and metadata is together + assertTrue(fs.exists(new Path(new Path(tablet1Path, "data"), COPY_ACKNOWLEDGEMENT.toString()))); + //For external table, data is directly copied to target external table base dir + HiveWrapper.Tuple tableTuple + = new HiveWrapper(Hive.get(primary.hiveConf), primaryDbName).table("t2", conf); + Path tablet2Path = ReplExternalTables.externalTableDataPath(conf, new Path(REPLICA_EXTERNAL_BASE), + PathBuilder.fullyQualifiedHDFSUri(tableTuple.object.getDataLocation(), FileSystem.get(conf))); + assertTrue(fs.exists(new Path(tablet2Path, COPY_ACKNOWLEDGEMENT.toString()))); + + WarehouseInstance.Tuple nextDump = primary.run("use " + primaryDbName) + .run("insert into t1 values (3)") + .run("insert into t1 values (4)") + .dump(primaryDbName, withClauseOptions); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("repl status " + replicatedDbName) + .verifyResult(nextDump.lastReplicationId) + .run("select a from t1") + .verifyResults(new String[]{"1", "2", "3", "4"}) + .run("select a from t2") + .verifyResults(new String[]{"11", "21"}); + } + + @Test + public void testCheckPointingInDumpFailure() throws Throwable { + List withClauseOptions = externalTableBasePathWithClause(); + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE EXTERNAL TABLE t2(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for bootstrap + assertExternalFileInfo(Arrays.asList("t2"), bootstrapDump.dumpLocation, primaryDbName); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("repl status " + replicatedDbName) + .verifyResult(bootstrapDump.lastReplicationId) + .run("select a from t1") + .verifyResults(new String[]{"1", "2"}) + .run("select a from t2") + .verifyResults(new String[]{"11", "21"}); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString()))); + Path dbPath = new Path(dumpPath, primaryDbName.toLowerCase()); + Path tablet1Path = new Path(dbPath, "t1"); + assertTrue(fs.exists(new Path(new Path(tablet1Path, "data"), COPY_ACKNOWLEDGEMENT.toString()))); + //For external table, data is directly copied to target external table base dir + HiveWrapper.Tuple tableTuple + = new HiveWrapper(Hive.get(primary.hiveConf), primaryDbName).table("t2", conf); + Path tablet2Path = ReplExternalTables.externalTableDataPath(conf, new Path(REPLICA_EXTERNAL_BASE), + PathBuilder.fullyQualifiedHDFSUri(tableTuple.object.getDataLocation(), FileSystem.get(conf))); + assertTrue(fs.exists(new Path(tablet2Path, COPY_ACKNOWLEDGEMENT.toString()))); + long modifiedTimeTable1 = fs.getFileStatus(new Path(tablet1Path, "data")).getModificationTime(); + long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime(); + //Delete table 2 copy ack + fs.delete(new Path(tablet2Path, COPY_ACKNOWLEDGEMENT.toString()), true); + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertFalse(fs.exists(new Path(tablet2Path, COPY_ACKNOWLEDGEMENT.toString()))); + + //Do another dump. It should only dump table t2. Modification time of table t1 is same while t2 is greater + WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName, withClauseOptions); + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("repl status " + replicatedDbName) + .verifyResult(bootstrapDump.lastReplicationId) + .run("select a from t1") + .verifyResults(new String[]{"1", "2"}) + .run("select a from t2") + .verifyResults(new String[]{"11", "21"}); + assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(new Path(tablet1Path, "data"), COPY_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(tablet2Path, COPY_ACKNOWLEDGEMENT.toString()))); + assertEquals(modifiedTimeTable1, fs.getFileStatus(new Path(tablet1Path, "data")).getModificationTime()); + assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path) + .getModificationTime()); + } + private List externalTableBasePathWithClause() throws IOException, SemanticException { return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 3c7274cb4f..64e5888b4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.CopyWork; @@ -49,6 +50,8 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.util.StringUtils; +import javax.security.auth.login.LoginException; + import static org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; @@ -134,139 +137,153 @@ private void renameFileCopiedFromCmPath(Path toPath, FileSystem dstFs, List 1 || work.getToPaths().length > 1) { + throw new RuntimeException("Invalid ReplCopyWork: " + + work.getFromPaths() + ", " + work.getToPaths()); + } + Path fromPath = work.getFromPaths()[0]; + toPath = work.getToPaths()[0]; try { - // Note: CopyWork supports copying multiple files, but ReplCopyWork doesn't. - // Not clear of ReplCopyWork should inherit from CopyWork. - if (work.getFromPaths().length > 1 || work.getToPaths().length > 1) { - throw new RuntimeException("Invalid ReplCopyWork: " - + work.getFromPaths() + ", " + work.getToPaths()); + if (!work.isCheckpointEnabled()) { + status = copyPaths(fromPath, toPath); + } else { + if (!ReplUtils.dataCopyCompleted(toPath, conf)) { + status = copyPaths(fromPath, toPath); + if (status == 0) { + ReplUtils.addCopyAck(toPath, conf); + } + } } - Path fromPath = work.getFromPaths()[0]; - toPath = work.getToPaths()[0]; - - console.printInfo("Copying data from " + fromPath.toString(), " to " - + toPath.toString()); - - ReplCopyWork rwork = ((ReplCopyWork)work); - - FileSystem srcFs = fromPath.getFileSystem(conf); - dstFs = toPath.getFileSystem(conf); - - // This should only be true for copy tasks created from functions, otherwise there should never - // be a CM uri in the from path. - if (ReplChangeManager.isCMFileUri(fromPath)) { - String[] result = ReplChangeManager.decodeFileUri(fromPath.toString()); - ReplChangeManager.FileInfo sourceInfo = ReplChangeManager - .getFileInfo(new Path(result[0]), result[1], result[2], result[3], conf); - if (FileUtils.copy( - sourceInfo.getSrcFs(), sourceInfo.getSourcePath(), - dstFs, toPath, false, false, conf)) { - return 0; + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + return status; + } + + private int copyPaths(Path fromPath, Path toPath) throws MetaException, IOException, HiveException, LoginException { + FileSystem dstFs = null; + + console.printInfo("Copying data from " + fromPath.toString(), " to " + + toPath.toString()); + + ReplCopyWork rwork = ((ReplCopyWork)work); + + FileSystem srcFs = fromPath.getFileSystem(conf); + dstFs = toPath.getFileSystem(conf); + + // This should only be true for copy tasks created from functions, otherwise there should never + // be a CM uri in the from path. + if (ReplChangeManager.isCMFileUri(fromPath)) { + String[] result = ReplChangeManager.decodeFileUri(fromPath.toString()); + ReplChangeManager.FileInfo sourceInfo = ReplChangeManager + .getFileInfo(new Path(result[0]), result[1], result[2], result[3], conf); + if (FileUtils.copy( + sourceInfo.getSrcFs(), sourceInfo.getSourcePath(), + dstFs, toPath, false, false, conf)) { + return 0; + } else { + console.printError("Failed to copy: '" + fromPath.toString() + "to: '" + toPath.toString() + + "'"); + return 1; + } + } + + List srcFiles = new ArrayList<>(); + if (rwork.readSrcAsFilesList()) { + // This flow is usually taken for REPL LOAD + // Our input is the result of a _files listing, we should expand out _files. + srcFiles = filesInFileListing(srcFs, fromPath); + if (LOG.isDebugEnabled()) { + LOG.debug("ReplCopyTask _files contains: {}", (srcFiles == null ? "null" : srcFiles.size())); + } + if ((srcFiles == null) || (srcFiles.isEmpty())) { + if (work.isErrorOnSrcEmpty()) { + console.printError("No _files entry found on source: " + fromPath.toString()); + return 5; } else { - console.printError("Failed to copy: '" + fromPath.toString() + "to: '" + toPath.toString() - + "'"); - return 1; + return 0; } } - List srcFiles = new ArrayList<>(); - if (rwork.readSrcAsFilesList()) { - // This flow is usually taken for REPL LOAD - // Our input is the result of a _files listing, we should expand out _files. - srcFiles = filesInFileListing(srcFs, fromPath); - if (LOG.isDebugEnabled()) { - LOG.debug("ReplCopyTask _files contains: {}", (srcFiles == null ? "null" : srcFiles.size())); - } - if ((srcFiles == null) || (srcFiles.isEmpty())) { - if (work.isErrorOnSrcEmpty()) { - console.printError("No _files entry found on source: " + fromPath.toString()); - return 5; - } else { + if (work.isCopyToMigratedTxnTable()) { + if (work.isNeedCheckDuplicateCopy()) { + updateSrcFileListForDupCopy(dstFs, toPath, srcFiles, + ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID); + if (srcFiles.isEmpty()) { + LOG.info("All files are already present in the base directory. Skipping copy task."); return 0; } } - - if (work.isCopyToMigratedTxnTable()) { - if (work.isNeedCheckDuplicateCopy()) { - updateSrcFileListForDupCopy(dstFs, toPath, srcFiles, - ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID); - if (srcFiles.isEmpty()) { - LOG.info("All files are already present in the base directory. Skipping copy task."); - return 0; - } - } - // If direct (move optimized) copy is triggered for data to a migrated transactional table, then it - // should have a write ID allocated by parent ReplTxnTask. Use it to create the base or delta directory. - // The toPath received in ReplCopyWork is pointing to table/partition base location. - // So, just need to append the base or delta directory. - // getDeleteDestIfExist returns true if it is repl load for replace/insert overwrite event and - // hence need to create base directory. If false, then it is repl load for regular insert into or - // load flow and hence just create delta directory. - Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf); - if (writeId == null) { - console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration"); - return 6; - } - // Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any - // duplicate copy from the source. Check HIVE-21197 for more detail. - int stmtId = (writeId.equals(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)) ? - ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID : - context.getHiveTxnManager().getStmtIdAndIncrement(); - toPath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId)); - } - } else { - // This flow is usually taken for IMPORT command - FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath); - if (LOG.isDebugEnabled()) { - LOG.debug("ReplCopyTasks srcs= {}", (srcs == null ? "null" : srcs.length)); + // If direct (move optimized) copy is triggered for data to a migrated transactional table, then it + // should have a write ID allocated by parent ReplTxnTask. Use it to create the base or delta directory. + // The toPath received in ReplCopyWork is pointing to table/partition base location. + // So, just need to append the base or delta directory. + // getDeleteDestIfExist returns true if it is repl load for replace/insert overwrite event and + // hence need to create base directory. If false, then it is repl load for regular insert into or + // load flow and hence just create delta directory. + Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf); + if (writeId == null) { + console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration"); + return 6; } - if (srcs == null || srcs.length == 0) { - if (work.isErrorOnSrcEmpty()) { - console.printError("No files matching path: " + fromPath.toString()); - return 3; - } else { - return 0; - } + // Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any + // duplicate copy from the source. Check HIVE-21197 for more detail. + int stmtId = (writeId.equals(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)) ? + ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID : + context.getHiveTxnManager().getStmtIdAndIncrement(); + toPath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId)); + } + } else { + // This flow is usually taken for IMPORT command + FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath); + if (LOG.isDebugEnabled()) { + LOG.debug("ReplCopyTasks srcs= {}", (srcs == null ? "null" : srcs.length)); + } + if (srcs == null || srcs.length == 0) { + if (work.isErrorOnSrcEmpty()) { + console.printError("No files matching path: " + fromPath.toString()); + return 3; + } else { + return 0; } + } - for (FileStatus oneSrc : srcs) { - console.printInfo("Copying file: " + oneSrc.getPath().toString()); - LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath); - srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), - oneSrc.getPath(), null)); - } + for (FileStatus oneSrc : srcs) { + console.printInfo("Copying file: " + oneSrc.getPath().toString()); + LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath); + srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), + oneSrc.getPath(), null)); } + } - LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size()); + LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size()); - // in case of move optimization, file is directly copied to destination. So we need to clear the old content, if - // its a replace (insert overwrite ) operation. - if (work.getDeleteDestIfExist() && dstFs.exists(toPath)) { - LOG.debug(" path " + toPath + " is cleaned before renaming"); - getHive().cleanUpOneDirectoryForReplace(toPath, dstFs, HIDDEN_FILES_PATH_FILTER, conf, work.getNeedRecycle(), - work.getIsAutoPurge()); - } + // in case of move optimization, file is directly copied to destination. So we need to clear the old content, if + // its a replace (insert overwrite ) operation. + if (work.getDeleteDestIfExist() && dstFs.exists(toPath)) { + LOG.debug(" path " + toPath + " is cleaned before renaming"); + getHive().cleanUpOneDirectoryForReplace(toPath, dstFs, HIDDEN_FILES_PATH_FILTER, conf, work.getNeedRecycle(), + work.getIsAutoPurge()); + } - if (!FileUtils.mkdir(dstFs, toPath, conf)) { - console.printError("Cannot make target directory: " + toPath.toString()); - return 2; - } - // Copy the files from different source file systems to one destination directory - new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles, fromPath); - - // If a file is copied from CM path, then need to rename them using original source file name - // This is needed to avoid having duplicate files in target if same event is applied twice - // where the first event refers to source path and second event refers to CM path - renameFileCopiedFromCmPath(toPath, dstFs, srcFiles); - return 0; - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - setException(e); - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + if (!FileUtils.mkdir(dstFs, toPath, conf)) { + console.printError("Cannot make target directory: " + toPath.toString()); + return 2; } + // Copy the files from different source file systems to one destination directory + new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles, fromPath); + + // If a file is copied from CM path, then need to rename them using original source file name + // This is needed to avoid having duplicate files in target if same event is applied twice + // where the first event refers to source path and second event refers to CM path + renameFileCopiedFromCmPath(toPath, dstFs, srcFiles); + return 0; } private List filesInFileListing(FileSystem fs, Path dataPath) @@ -325,16 +342,24 @@ public String getName() { HiveConf conf, boolean isAutoPurge, boolean needRecycle, boolean copyToMigratedTxnTable) { return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle, - copyToMigratedTxnTable, true); + copyToMigratedTxnTable, true, false); } public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf, boolean isAutoPurge, boolean needRecycle, boolean copyToMigratedTxnTable, boolean readSourceAsFileList) { + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle, copyToMigratedTxnTable, + readSourceAsFileList, false); + } + + public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + HiveConf conf, boolean isAutoPurge, boolean needRecycle, + boolean copyToMigratedTxnTable, boolean readSourceAsFileList, + boolean shouldCheckpoint) { Task copyTask = null; LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ - ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); + ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false, shouldCheckpoint); rcwork.setReadSrcAsFilesList(readSourceAsFileList); if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION) || copyToMigratedTxnTable)) { rcwork.setDeleteDestIfExist(true); @@ -364,6 +389,13 @@ public String getName() { public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf, boolean readSourceAsFileList) { - return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, false, readSourceAsFileList); + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, + false, readSourceAsFileList, false); + } + + public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + HiveConf conf, boolean readSourceAsFileList, boolean shouldCheckpoint) { + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, false, readSourceAsFileList, + shouldCheckpoint); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java index e8a8df1e12..0c7a4fa962 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.shims.ShimLoader; @@ -131,14 +132,30 @@ private int handleException(Exception e, Path sourcePath, Path targetPath, @Override public int execute() { - String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); - - Path sourcePath = work.getFullyQualifiedSourcePath(); Path targetPath = work.getFullyQualifiedTargetPath(); + Path sourcePath = work.getFullyQualifiedSourcePath(); if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) { sourcePath = reservedRawPath(work.getFullyQualifiedSourcePath().toUri()); targetPath = reservedRawPath(work.getFullyQualifiedTargetPath().toUri()); } + int status = 0; + try { + if (!ReplUtils.dataCopyCompleted(targetPath, conf)) { + status = copyDir(sourcePath, targetPath); + if (status == 0) { + ReplUtils.addCopyAck(targetPath, conf); + } + } + } catch (Exception e) { + LOG.error("failed", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + return status; + } + + private int copyDir(Path sourcePath, Path targetPath) { + String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); int currentRetry = 0; int error = 0; UserGroupInformation proxyUser = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java new file mode 100644 index 0000000000..b4c0c58003 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +/** + * ReplAck, used for repl acknowledgement constants. + */ +public enum ReplAck { + DUMP_ACKNOWLEDGEMENT("_finished_dump"), + LOAD_ACKNOWLEDGEMENT("_finished_load"), + COPY_ACKNOWLEDGEMENT("_finished_copy"); + private String ack; + ReplAck(String ack) { + this.ack = ack; + } + + @Override + public String toString() { + return ack; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 69f6ffef5a..f7a8081c22 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -93,6 +93,7 @@ import java.util.ArrayList; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; public class ReplDumpTask extends Task implements Serializable { private static final long serialVersionUID = 1L; @@ -135,20 +136,20 @@ public int execute() { Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() .getBytes(StandardCharsets.UTF_8.name()))); - Path previousHiveDumpPath = getPreviousDumpMetadataPath(dumpRoot); + Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot); //If no previous dump is present or previous dump is already loaded, proceed with the dump operation. - if (shouldDump(previousHiveDumpPath)) { - Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); + if (shouldDump(previousValidHiveDumpPath)) { + Path currentDumpPath = getCurrentDumpPath(dumpRoot); Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); // Initialize ReplChangeManager instance since we will require it to encode file URI. ReplChangeManager.getInstance(conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; - if (previousHiveDumpPath == null) { + if (previousValidHiveDumpPath == null) { lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); } else { - work.setEventFrom(getEventFromPreviousDumpMetadata(previousHiveDumpPath)); + work.setEventFrom(getEventFromPreviousDumpMetadata(previousValidHiveDumpPath)); lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); } work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); @@ -166,6 +167,16 @@ public int execute() { return 0; } + private Path getCurrentDumpPath(Path dumpRoot) throws IOException { + Path previousDumpPath = getPreviousDumpPath(dumpRoot); + if (previousDumpPath != null && !validDump(previousDumpPath)) { + //Resume previous dump + return previousDumpPath; + } else { + return new Path(dumpRoot, getNextDumpDir()); + } + } + private void initiateDataCopyTasks() throws SemanticException, IOException { TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); List> childTasks = new ArrayList<>(); @@ -183,7 +194,8 @@ private void initiateDataCopyTasks() throws SemanticException, IOException { private void finishRemainingTasks() throws SemanticException, IOException { prepareReturnValues(work.getResultValues()); Path dumpAckFile = new Path(work.getCurrentDumpPath(), - ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT); + ReplUtils.REPL_HIVE_BASE_DIR + File.separator + + ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); Utils.create(dumpAckFile, conf); deleteAllPreviousDumpMeta(work.getCurrentDumpPath()); } @@ -233,7 +245,7 @@ private Long getEventFromPreviousDumpMetadata(Path previousDumpPath) throws Sema return 0L; } - private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException { + private Path getPreviousValidDumpMetadataPath(Path dumpRoot) throws IOException { FileStatus latestValidStatus = null; FileSystem fs = dumpRoot.getFileSystem(conf); if (fs.exists(dumpRoot)) { @@ -241,8 +253,8 @@ private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException { for (FileStatus status : statuses) { LOG.info("Evaluating previous dump dir path:{}", status.getPath()); if (latestValidStatus == null) { - latestValidStatus = validDump(fs, status.getPath()) ? status : null; - } else if (validDump(fs, status.getPath()) + latestValidStatus = validDump(status.getPath()) ? status : null; + } else if (validDump(status.getPath()) && status.getModificationTime() > latestValidStatus.getModificationTime()) { latestValidStatus = status; } @@ -254,10 +266,14 @@ private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException { return latestDumpDir; } - private boolean validDump(FileSystem fs, Path dumpDir) throws IOException { + private boolean validDump(Path dumpDir) throws IOException { //Check if it was a successful dump - Path hiveDumpDir = new Path(dumpDir, ReplUtils.REPL_HIVE_BASE_DIR); - return fs.exists(new Path(hiveDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT)); + if (dumpDir != null) { + FileSystem fs = dumpDir.getFileSystem(conf); + Path hiveDumpDir = new Path(dumpDir, ReplUtils.REPL_HIVE_BASE_DIR); + return fs.exists(new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString())); + } + return false; } private boolean shouldDump(Path previousDumpPath) throws IOException { @@ -267,7 +283,7 @@ private boolean shouldDump(Path previousDumpPath) throws IOException { return true; } else { FileSystem fs = previousDumpPath.getFileSystem(conf); - return fs.exists(new Path(previousDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT)); + return fs.exists(new Path(previousDumpPath, LOAD_ACKNOWLEDGEMENT.toString())); } } @@ -827,6 +843,24 @@ private String getNextDumpDir() { } } + private Path getPreviousDumpPath(Path dumpRoot) throws IOException { + FileSystem fs = dumpRoot.getFileSystem(conf); + if (fs.exists(dumpRoot)) { + FileStatus[] statuses = fs.listStatus(dumpRoot); + if (statuses.length > 0) { + FileStatus latestValidStatus = statuses[0]; + for (FileStatus status : statuses) { + LOG.info("Evaluating previous dump dir path:{}", status.getPath()); + if (status.getModificationTime() > latestValidStatus.getModificationTime()) { + latestValidStatus = status; + } + } + return latestValidStatus.getPath(); + } + } + return null; + } + void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) throws Exception { Path functionsRoot = new Path(new Path(dumpRoot, dbName), ReplUtils.FUNCTIONS_ROOT_DIR_NAME); List functionNames = hiveDb.getFunctions(dbName, "*"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 1f0d70212c..f37213b305 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -162,7 +162,7 @@ public void setResultValues(List resultValues) { EximUtil.ManagedTableCopyPath managedTableCopyPath = managedTableCopyPathIterator.next(); Task copyTask = ReplCopyTask.getLoadCopyTask( managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(), - managedTableCopyPath.getTargetPath(), conf, false); + managedTableCopyPath.getTargetPath(), conf, false, true); tasks.add(copyTask); tracker.addTask(copyTask); LOG.debug("added task for {}", managedTableCopyPath); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 3427b59e67..a5935552c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -65,6 +64,7 @@ import java.util.Map; import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; public class ReplLoadTask extends Task implements Serializable { private final static int ZERO_TASKS = 0; @@ -316,7 +316,7 @@ private void createReplLoadCompleteAckTask() { || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) { //All repl load tasks are executed and status is 0, create the task to add the acknowledgement AckWork replLoadAckWork = new AckWork( - new Path(work.dumpDirectory, ReplUtils.LOAD_ACKNOWLEDGEMENT)); + new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString())); Task loadAckWorkTask = TaskFactory.get(replLoadAckWork, conf); if (this.childTasks.isEmpty()) { this.childTasks.add(loadAckWorkTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 211c3f014d..f1a2f7cbe1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl.util; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.repl.ReplConst; @@ -33,12 +34,14 @@ import org.apache.hadoop.hive.ql.ddl.table.partition.PartitionUtils; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplAck; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; import org.apache.hadoop.hive.ql.plan.ReplTxnWork; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -94,10 +97,6 @@ // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be // seen in production or in case of tests other than the ones where it's required. public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables"; - //Acknowledgement for repl dump complete - public static final String DUMP_ACKNOWLEDGEMENT = "_finished_dump"; - //Acknowledgement for repl load complete - public static final String LOAD_ACKNOWLEDGEMENT = "_finished_load"; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ @@ -296,4 +295,13 @@ public static boolean includeAcidTableInDump(HiveConf conf) { public static boolean tableIncludedInReplScope(ReplScope replScope, String tableName) { return ((replScope == null) || replScope.tableIncludedInReplScope(tableName)); } + + public static boolean dataCopyCompleted(Path toPath, HiveConf conf) throws IOException { + FileSystem dstFs = toPath.getFileSystem(conf); + return (dstFs.exists(new Path(toPath, ReplAck.COPY_ACKNOWLEDGEMENT.toString()))); + } + + public static void addCopyAck(Path toPath, HiveConf conf) throws SemanticException { + Utils.create(new Path(toPath, ReplAck.COPY_ACKNOWLEDGEMENT.toString()), conf); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 8802139e84..c4ff070da6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplAck; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; @@ -54,6 +55,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; @@ -424,8 +426,9 @@ private Path getCurrentLoadPath() throws IOException, SemanticException { } } Path hiveDumpPath = new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR); - if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT)) - && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))) { + if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, + ReplAck.DUMP_ACKNOWLEDGEMENT.toString())) + && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, LOAD_ACKNOWLEDGEMENT.toString()))) { return hiveDumpPath; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 683f3c0362..4746e54706 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -20,7 +20,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -296,13 +295,6 @@ private void validateTargetDir(URI rootDirExportFile) throws SemanticException { throw new SemanticException( astRepresentationForErrorMsg + ": " + "Target is not a directory : " + rootDirExportFile); - } else { - FileStatus[] files = fs.listStatus(toPath, FileUtils.HIDDEN_FILES_PATH_FILTER); - if (files != null && files.length != 0) { - throw new SemanticException( - astRepresentationForErrorMsg + ": " + "Target is not an empty directory : " - + rootDirExportFile); - } } } catch (FileNotFoundException ignored) { } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java index c631f3d6e7..1fea731207 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java @@ -61,10 +61,17 @@ private boolean checkDuplicateCopy = false; + private boolean checkpointEnabled = false; + public ReplCopyWork(final Path srcPath, final Path destPath, boolean errorOnSrcEmpty) { super(srcPath, destPath, errorOnSrcEmpty); } + public ReplCopyWork(final Path srcPath, final Path destPath, boolean errorOnSrcEmpty, boolean checkpointEnabled) { + this(srcPath, destPath, errorOnSrcEmpty); + this.checkpointEnabled = checkpointEnabled; + } + public void setReadSrcAsFilesList(boolean readSrcAsFilesList) { this.readSrcAsFilesList = readSrcAsFilesList; } @@ -120,4 +127,8 @@ public boolean isNeedCheckDuplicateCopy() { public void setCheckDuplicateCopy(boolean flag) { checkDuplicateCopy = flag; } + + public boolean isCheckpointEnabled() { + return checkpointEnabled; + } }