diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java index 07e8787367..f002472762 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java @@ -30,13 +30,18 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; @@ -123,6 +128,77 @@ public void testDistCpCopyWithRemoteStagingAndCopyTaskOnTarget() throws Throwabl .verifyResult("400"); } + @Test + public void testTableLevelReplicationWithRemoteStaging() throws Throwable { + List withClauseOptions = getStagingLocationConfig(replica.repldDir, true); + withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (100)") + .run("create table t2 (id int)") + .run("insert into table t2 values (200)") + .dump(primaryDbName +".'t[0-9]+'", null, withClauseOptions); + + // verify that the external table info is written correctly for bootstrap + assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, replica); + + //verify table list + verifyTableListForPolicy(replica.miniDFSCluster.getFileSystem(), + tuple.dumpLocation, new String[]{"t1", "t2"}); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1") + .verifyResult("100") + .run("show tables like 't2'") + .verifyResult("t2") + .run("select id from t2") + .verifyResult("200"); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (300)") + .run("create table t4 (id int)") + .run("insert into table t4 values (400)") + .run("create table t5 (id int) partitioned by (p int)") + .run("insert into t5 partition(p=1) values(10)") + .run("insert into t5 partition(p=2) values(20)") + .dump(primaryDbName + ".'t[0-9]+'", null, withClauseOptions); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, replica); + + //verify table list + verifyTableListForPolicy(replica.miniDFSCluster.getFileSystem(), + tuple.dumpLocation, new String[]{"t1", "t2", "t3", "t4", "t5"}); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .run("show tables like 't5'") + .verifyResult("t5") + .run("select id from t1") + .verifyResult("100") + .run("select id from t2") + .verifyResult("200") + .run("select id from t3") + .verifyResult("300") + .run("select id from t4") + .verifyResult("400") + .run("select id from t5") + .verifyResults(new String[]{"10", "20"}); + } + @Test public void testDistCpCopyWithLocalStagingAndCopyTaskOnTarget() throws Throwable { List withClauseOptions = getStagingLocationConfig(primary.repldDir, true); @@ -481,4 +557,37 @@ private void assertExternalFileInfo(List expected, String dumplocation, } ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile); } + + /* + * Method used from TestTableLevelReplicationScenarios + */ + private void verifyTableListForPolicy(FileSystem fileSystem, String dumpLocation, String[] tableList) throws Throwable { + String hiveDumpLocation = dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; + Path tableListFile = new Path(hiveDumpLocation, ReplUtils.REPL_TABLE_LIST_DIR_NAME); + tableListFile = new Path(tableListFile, primaryDbName.toLowerCase()); + + if (tableList == null) { + Assert.assertFalse(fileSystem.exists(tableListFile)); + return; + } else { + Assert.assertTrue(fileSystem.exists(tableListFile)); + } + + BufferedReader reader = null; + try { + InputStream inputStream = fileSystem.open(tableListFile); + reader = new BufferedReader(new InputStreamReader(inputStream)); + Set tableNames = new HashSet<>(Arrays.asList(tableList)); + int numTable = 0; + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + numTable++; + Assert.assertTrue(tableNames.contains(line)); + } + Assert.assertEquals(numTable, tableList.length); + } finally { + if (reader != null) { + reader.close(); + } + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 7e690fce35..d5e192ecf1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -809,7 +809,7 @@ private void dumpTableListToDumpLocation(List tableList, Path dbRoot, St retryable.executeCallable((Callable) () -> { Path tableListFile = new Path(dbRoot, ReplUtils.REPL_TABLE_LIST_DIR_NAME); tableListFile = new Path(tableListFile, dbName.toLowerCase()); - FSDataOutputStream writer = FileSystem.get(hiveConf).create(tableListFile); + FSDataOutputStream writer = tableListFile.getFileSystem(hiveConf).create(tableListFile); for (String tableName : tableList) { String line = tableName.toLowerCase().concat("\n"); writer.write(line.getBytes(StandardCharsets.UTF_8));