diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 1adec4efa3..3639ab1f61 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -865,23 +865,6 @@ public void testShouldNotCreateDirectoryForNonNativeTableInDumpDirectory() throw assertFalse(fs.exists(cSerdesTableDumpLocation)); } - private void verifyIfCkptSet(WarehouseInstance wh, String dbName, String dumpDir) throws Exception { - Database db = wh.getDatabase(replicatedDbName); - verifyIfCkptSet(db.getParameters(), dumpDir); - - List tblNames = wh.getAllTables(dbName); - for (String tblName : tblNames) { - Table tbl = wh.getTable(dbName, tblName); - verifyIfCkptSet(tbl.getParameters(), dumpDir); - if (tbl.getPartitionKeysSize() != 0) { - List partitions = wh.getAllPartitions(dbName, tblName); - for (Partition ptn : partitions) { - verifyIfCkptSet(ptn.getParameters(), dumpDir); - } - } - } - } - @Test public void testShouldDumpMetaDataForNonNativeTableIfSetMeataDataOnly() throws Throwable { String tableName = testName.getMethodName() + "_table"; @@ -1166,7 +1149,7 @@ public void testIfBootstrapReplLoadFailWhenRetryAfterBootstrapComplete() throws .verifyResults(Collections.singletonList("10")) .run("select country from t2 order by country") .verifyResults(Arrays.asList("india", "uk", "us")); - verifyIfCkptSet(replica, replicatedDbName, tuple.dumpLocation); + replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation); WarehouseInstance.Tuple tuple_2 = primary .run("use " + primaryDbName) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 2cdc35f6cc..83f38fa6a9 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -147,6 +147,9 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .run("select country from t2 where country = 'france'") .verifyResult("france"); + // Ckpt should be set on bootstrapped db. + replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation); + assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1"); assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); @@ -511,6 +514,9 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { .run("show tables like 't4'") .verifyResult("t4"); + // Ckpt should be set on bootstrapped tables. + replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2", "t3"), tuple.dumpLocation); + // Drop source tables to see if target points to correct data or not after bootstrap load. primary.run("use " + primaryDbName) .run("drop table t2") diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index e0547d4cce..bd3a557540 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; @@ -361,6 +362,32 @@ private void printOutput() throws IOException { } } + private void verifyIfCkptSet(Map props, String dumpDir) { + assertTrue(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY)); + assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir)); + } + + public void verifyIfCkptSet(String dbName, String dumpDir) throws Exception { + Database db = getDatabase(dbName); + verifyIfCkptSet(db.getParameters(), dumpDir); + + List tblNames = getAllTables(dbName); + verifyIfCkptSetForTables(dbName, tblNames, dumpDir); + } + + public void verifyIfCkptSetForTables(String dbName, List tblNames, String dumpDir) throws Exception { + for (String tblName : tblNames) { + Table tbl = getTable(dbName, tblName); + verifyIfCkptSet(tbl.getParameters(), dumpDir); + if (tbl.getPartitionKeysSize() != 0) { + List partitions = getAllPartitions(dbName, tblName); + for (Partition ptn : partitions) { + verifyIfCkptSet(ptn.getParameters(), dumpDir); + } + } + } + } + public Database getDatabase(String dbName) throws Exception { try { return client.getDatabase(dbName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 65b7aa00d2..fa72527de2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -199,12 +199,22 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti context.hiveConf ); + Task ckptTask = ReplUtils.getTableCheckpointTask( + tableDesc, + (HashMap)partSpec.getPartSpec(), + context.dumpDirectory, + context.hiveConf + ); + boolean isOnlyDDLOperation = event.replicationSpec().isMetadataOnly() || (TableType.EXTERNAL_TABLE.equals(table.getTableType()) && !event.replicationSpec().isMigratingToExternalTable() ); if (isOnlyDDLOperation) { + // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for + // bootstrap, we skip current partition update. + addPartTask.addDependentTask(ckptTask); if (ptnRootTask == null) { ptnRootTask = addPartTask; } else { @@ -246,21 +256,14 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType); } - // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for - // bootstrap, we skip current partition update. - Task ckptTask = ReplUtils.getTableCheckpointTask( - tableDesc, - (HashMap)partSpec.getPartSpec(), - context.dumpDirectory, - context.hiveConf - ); - if (ptnRootTask == null) { ptnRootTask = copyTask; } else { ptnRootTask.addDependentTask(copyTask); } + // Set Checkpoint task as dependant to the tail of add partition tasks. So, if same dump is + // retried for bootstrap, we skip current partition update. copyTask.addDependentTask(addPartTask); if (movePartitionTask != null) { addPartTask.addDependentTask(movePartitionTask);