diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java index 549447eee4..650ea40c5b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java @@ -219,6 +219,208 @@ public void externalTableReplicationDropDatabase() throws Throwable { verifyTableDataExists(replica, dbDataLocReplica, tableName, true); } + @Test + public void managedTableReplicationWithRemoteStagingAndCopyTaskOnTarget() throws Throwable { + List withClauseOptions = getStagingLocationConfig(replica.repldDir); + withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (100)") + .run("create table t2 (id int)") + .run("insert into table t2 values (200)") + .dump(primaryDbName, withClauseOptions); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1") + .verifyResult("100") + .run("show tables like 't2'") + .verifyResult("t2") + .run("select id from t2") + .verifyResult("200"); + + tuple = primary.run("use " + primaryDbName) + .run("create table t3 (id int)") + .run("insert into table t3 values (300)") + .run("create table t4 (id int)") + .run("insert into table t4 values (400)") + .dump(primaryDbName, withClauseOptions); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .run("select id from t1") + .verifyResult("100") + .run("select id from t2") + .verifyResult("200") + .run("select id from t3") + .verifyResult("300") + .run("select id from t4") + .verifyResult("400"); + } + + @Test + public void managedTableReplicationWithLocalStagingAndCopyTaskOnTarget() throws Throwable { + List withClauseOptions = getStagingLocationConfig(primary.repldDir); + withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (100)") + .run("create table t2 (id int)") + .run("insert into table t2 values (200)") + .dump(primaryDbName, withClauseOptions); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1") + .verifyResult("100") + .run("show tables like 't2'") + .verifyResult("t2") + .run("select id from t2") + .verifyResult("200"); + + tuple = primary.run("use " + primaryDbName) + .run("create table t3 (id int)") + .run("insert into table t3 values (300)") + .run("create table t4 (id int)") + .run("insert into table t4 values (400)") + .dump(primaryDbName, withClauseOptions); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .run("select id from t1") + .verifyResult("100") + .run("select id from t2") + .verifyResult("200") + .run("select id from t3") + .verifyResult("300") + .run("select id from t4") + .verifyResult("400"); + } + + @Test + public void managedTableReplicationWithRemoteStagingAndCopyTaskOnSource() throws Throwable { + List withClauseOptions = getStagingLocationConfig(replica.repldDir); + withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); + withClauseOptions.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + "'"); + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (100)") + .run("create table t2 (id int)") + .run("insert into table t2 values (200)") + .dump(primaryDbName, withClauseOptions); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1") + .verifyResult("100") + .run("show tables like 't2'") + .verifyResult("t2") + .run("select id from t2") + .verifyResult("200"); + + tuple = primary.run("use " + primaryDbName) + .run("create table t3 (id int)") + .run("insert into table t3 values (300)") + .run("create table t4 (id int)") + .run("insert into table t4 values (400)") + .dump(primaryDbName, withClauseOptions); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .run("select id from t1") + .verifyResult("100") + .run("select id from t2") + .verifyResult("200") + .run("select id from t3") + .verifyResult("300") + .run("select id from t4") + .verifyResult("400"); + } + + @Test + public void managedTableReplicationWithLocalStagingAndCopyTaskOnSource() throws Throwable { + List withClauseOptions = getStagingLocationConfig(primary.repldDir); + withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); + withClauseOptions.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + "'"); + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (100)") + .run("create table t2 (id int)") + .run("insert into table t2 values (200)") + .dump(primaryDbName, withClauseOptions); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1") + .verifyResult("100") + .run("show tables like 't2'") + .verifyResult("t2") + .run("select id from t2") + .verifyResult("200"); + + tuple = primary.run("use " + primaryDbName) + .run("create table t3 (id int)") + .run("insert into table t3 values (300)") + .run("create table t4 (id int)") + .run("insert into table t4 values (400)") + .dump(primaryDbName, withClauseOptions); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .run("select id from t1") + .verifyResult("100") + .run("select id from t2") + .verifyResult("200") + .run("select id from t3") + .verifyResult("300") + .run("select id from t4") + .verifyResult("400"); + } + private void verifyTableDataExists(WarehouseInstance warehouse, Path dbDataPath, String tableName, boolean shouldExists) throws IOException { FileSystem fileSystem = FileSystem.get(warehouse.warehouseRoot.toUri(), warehouse.getConf()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index c386aeeda5..8fe0a72b76 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl; import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -29,7 +30,6 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; -import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -45,7 +45,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.stream.Collectors; public class CopyUtils { @@ -58,7 +57,7 @@ private final HiveConf hiveConf; private final long maxCopyFileSize; private final long maxNumberOfFiles; - private final boolean hiveInTest; + private final boolean hiveInReplTest; private final String copyAsUser; private FileSystem destinationFs; @@ -66,7 +65,7 @@ public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinatio this.hiveConf = hiveConf; maxNumberOfFiles = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES); maxCopyFileSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE); - hiveInTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST); + hiveInReplTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL); this.copyAsUser = distCpDoAsUser; this.destinationFs = destinationFs; } @@ -74,16 +73,17 @@ public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinatio // Used by replication, copy files from source to destination. It is possible source file is // changed/removed during copy, so double check the checksum after copy, // if not match, copy again from cm - public void copyAndVerify(Path destRoot, List srcFiles, Path origSrcPtah, + public void copyAndVerify(Path destRoot, List srcFiles, Path origSrcPath, boolean overwrite) throws IOException, LoginException, HiveFatalException { UserGroupInformation proxyUser = getProxyUser(); - FileSystem sourceFs = origSrcPtah.getFileSystem(hiveConf); + FileSystem sourceFs = CollectionUtils.isEmpty(srcFiles) + ? origSrcPath.getFileSystem(hiveConf) : srcFiles.get(0).getSrcFs(); boolean useRegularCopy = regularCopy(sourceFs, srcFiles); try { if (!useRegularCopy) { srcFiles.clear(); - srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPtah, null)); + srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath, null)); doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy, overwrite); } else { Map>> map = fsToFileMap(srcFiles, destRoot); @@ -429,7 +429,7 @@ public void doCopy(Path destination, List srcPaths) throws IOException, Lo */ boolean regularCopy(FileSystem sourceFs, List fileList) throws IOException { - if (hiveInTest) { + if (hiveInReplTest) { return true; } if (isLocal(sourceFs) || isLocal(destinationFs)) {