diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index c03b252e15..614e5aac3b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -53,6 +53,7 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; @@ -2023,4 +2024,262 @@ public void testMaterializedViewOnAcidTableReplication() throws Throwable { .run("show tables") .verifyResults(new String[]{"t1"}); } + + @Test + public void testORCTableRegularCopyWithCopyOnTarget() throws Throwable { + ArrayList withClause = new ArrayList<>(); + withClause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'"); + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart1(a int) partitioned by (name string)" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart2(a int) partitioned by (name string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE text1(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (11)") + .run("insert into t2 values (2)") + .run("insert into t2 values (22)") + .run("insert into t3 values (33)") + .run("insert into tpart1 partition(name='Tom') values(100)") + .run("insert into tpart1 partition(name='Jerry') values(101)") + .run("insert into tpart2 partition(name='Bob') values(200)") + .run("insert into tpart2 partition(name='Carl') values(201)") + .run("insert into text1 values ('ricky')") + .dump(primaryDbName, withClause); + + replica.run("DROP TABLE t3"); + + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", "text1"}) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "11"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"100", "101"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Tom", "name=Jerry"}) + .run("select a from " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"200", "201"}) + .run("show partitions " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"name=Bob", "name=Carl"}) + .run("select a from " + replicatedDbName + ".text1") + .verifyResults(new String[]{"ricky"}); + + WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart3(a int) partitioned by (name string)" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart4(a int) partitioned by (name string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (111)") + .run("insert into t2 values (222)") + .run("insert into t4 values (4)") + .run("insert into tpart1 partition(name='Tom') values(102)") + .run("insert into tpart1 partition(name='Jerry') values(103)") + .run("insert into tpart2 partition(name='Bob') values(202)") + .run("insert into tpart2 partition(name='Carl') values(203)") + .run("insert into tpart3 partition(name='Tom3') values(300)") + .run("insert into tpart3 partition(name='Jerry3') values(301)") + .run("insert into tpart4 partition(name='Bob4') values(400)") + .run("insert into tpart4 partition(name='Carl4') values(401)") + .run("insert into text1 values ('martin')") + .dump(primaryDbName, withClause); + + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables ") + .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", "tpart3", "tpart4", "text1"}) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "11", "111"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22", "222"}) + .run("select * from " + replicatedDbName + ".t4") + .verifyResults(new String[]{"4"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"100", "101", "102", "103"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Tom", "name=Jerry"}) + .run("select a from " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"200", "201", "202", "203"}) + .run("show partitions " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"name=Bob", "name=Carl"}) + .run("select a from " + replicatedDbName + ".tpart3") + .verifyResults(new String[]{"300", "301"}) + .run("show partitions " + replicatedDbName + ".tpart3") + .verifyResults(new String[]{"name=Tom3", "name=Jerry3"}) + .run("select a from " + replicatedDbName + ".tpart4") + .verifyResults(new String[]{"400", "401"}) + .run("show partitions " + replicatedDbName + ".tpart4") + .verifyResults(new String[]{"name=Bob4", "name=Carl4"}) + .run("select a from " + replicatedDbName + ".text1") + .verifyResults(new String[]{"ricky", "martin"}); + + incrementalDump = primary.run("use " + primaryDbName) + .run("insert into t4 values (44)") + .run("insert into t1 values (1111)") + .run("DROP TABLE t1") + .run("insert into t2 values (2222)") + .run("insert into tpart1 partition(name='Tom') values(104)") + .run("insert into tpart1 partition(name='Tom_del') values(1000)") + .run("insert into tpart1 partition(name='Harry') values(10001)") + .run("insert into tpart1 partition(name='Jerry') values(105)") + .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')") + .run("DROP TABLE tpart2") + .dump(primaryDbName, withClause); + + replica.run("DROP TABLE t4") + .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')"); + + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables ") + .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", "tpart4", "text1"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22", "222", "2222"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"101", "103", "105", "1000", "10001"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Harry", "name=Jerry", "name=Tom_del"}); + } + + @Test + public void testORCTableDistcpCopyWithCopyOnTarget() throws Throwable { + //Distcp copy + List withClause = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart1(a int) partitioned by (name string)" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart2(a int) partitioned by (name string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE text1(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (11)") + .run("insert into t2 values (2)") + .run("insert into t2 values (22)") + .run("insert into t3 values (33)") + .run("insert into tpart1 partition(name='Tom') values(100)") + .run("insert into tpart1 partition(name='Jerry') values(101)") + .run("insert into tpart2 partition(name='Bob') values(200)") + .run("insert into tpart2 partition(name='Carl') values(201)") + .run("insert into text1 values ('ricky')") + .dump(primaryDbName, withClause); + + replica.run("DROP TABLE t3"); + + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", "text1"}) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "11"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"100", "101"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Tom", "name=Jerry"}) + .run("select a from " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"200", "201"}) + .run("show partitions " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"name=Bob", "name=Carl"}) + .run("select a from " + replicatedDbName + ".text1") + .verifyResults(new String[]{"ricky"}); + + WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart3(a int) partitioned by (name string)" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart4(a int) partitioned by (name string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (111)") + .run("insert into t2 values (222)") + .run("insert into t4 values (4)") + .run("insert into tpart1 partition(name='Tom') values(102)") + .run("insert into tpart1 partition(name='Jerry') values(103)") + .run("insert into tpart2 partition(name='Bob') values(202)") + .run("insert into tpart2 partition(name='Carl') values(203)") + .run("insert into tpart3 partition(name='Tom3') values(300)") + .run("insert into tpart3 partition(name='Jerry3') values(301)") + .run("insert into tpart4 partition(name='Bob4') values(400)") + .run("insert into tpart4 partition(name='Carl4') values(401)") + .run("insert into text1 values ('martin')") + .dump(primaryDbName, withClause); + + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables ") + .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", "tpart3", "tpart4", "text1"}) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "11", "111"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22", "222"}) + .run("select * from " + replicatedDbName + ".t4") + .verifyResults(new String[]{"4"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"100", "101", "102", "103"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Tom", "name=Jerry"}) + .run("select a from " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"200", "201", "202", "203"}) + .run("show partitions " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"name=Bob", "name=Carl"}) + .run("select a from " + replicatedDbName + ".tpart3") + .verifyResults(new String[]{"300", "301"}) + .run("show partitions " + replicatedDbName + ".tpart3") + .verifyResults(new String[]{"name=Tom3", "name=Jerry3"}) + .run("select a from " + replicatedDbName + ".tpart4") + .verifyResults(new String[]{"400", "401"}) + .run("show partitions " + replicatedDbName + ".tpart4") + .verifyResults(new String[]{"name=Bob4", "name=Carl4"}) + .run("select a from " + replicatedDbName + ".text1") + .verifyResults(new String[]{"ricky", "martin"}); + + incrementalDump = primary.run("use " + primaryDbName) + .run("insert into t4 values (44)") + .run("insert into t1 values (1111)") + .run("DROP TABLE t1") + .run("insert into t2 values (2222)") + .run("insert into tpart1 partition(name='Tom') values(104)") + .run("insert into tpart1 partition(name='Tom_del') values(1000)") + .run("insert into tpart1 partition(name='Harry') values(10001)") + .run("insert into tpart1 partition(name='Jerry') values(105)") + .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')") + .run("DROP TABLE tpart2") + .dump(primaryDbName, withClause); + + replica.run("DROP TABLE t4") + .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')"); + + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables ") + .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", "tpart4", "text1"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22", "222", "2222"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"101", "103", "105", "1000", "10001"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Harry", "name=Jerry", "name=Tom_del"}); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java index 452ba64e6f..07e8787367 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java @@ -68,8 +68,8 @@ public void tearDown() throws Throwable { } @Test - public void testRemoteStagingAndCopyTaskOnTarget() throws Throwable { - List withClauseOptions = getStagingLocationConfig(replica.repldDir); + public void testDistCpCopyWithRemoteStagingAndCopyTaskOnTarget() throws Throwable { + List withClauseOptions = getStagingLocationConfig(replica.repldDir, true); withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -124,8 +124,8 @@ public void testRemoteStagingAndCopyTaskOnTarget() throws Throwable { } @Test - public void testLocalStagingAndCopyTaskOnTarget() throws Throwable { - List withClauseOptions = getStagingLocationConfig(primary.repldDir); + public void testDistCpCopyWithLocalStagingAndCopyTaskOnTarget() throws Throwable { + List withClauseOptions = getStagingLocationConfig(primary.repldDir, true); withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -180,8 +180,8 @@ public void testLocalStagingAndCopyTaskOnTarget() throws Throwable { } @Test - public void testRemoteStagingAndCopyTaskOnSource() throws Throwable { - List withClauseOptions = getStagingLocationConfig(replica.repldDir); + public void testDistCpCopyWithRemoteStagingAndCopyTaskOnSource() throws Throwable { + List withClauseOptions = getStagingLocationConfig(replica.repldDir, true); withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); withClauseOptions.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + "'"); WarehouseInstance.Tuple tuple = primary @@ -237,8 +237,8 @@ public void testRemoteStagingAndCopyTaskOnSource() throws Throwable { } @Test - public void testLocalStagingAndCopyTaskOnSource() throws Throwable { - List withClauseOptions = getStagingLocationConfig(primary.repldDir); + public void testDistCpCopyWithLocalStagingAndCopyTaskOnSource() throws Throwable { + List withClauseOptions = getStagingLocationConfig(primary.repldDir, true); withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); withClauseOptions.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + "'"); WarehouseInstance.Tuple tuple = primary @@ -295,7 +295,7 @@ public void testLocalStagingAndCopyTaskOnSource() throws Throwable { @Test public void testRegularCopyRemoteStagingAndCopyTaskOnSource() throws Throwable { - List withClauseOptions = getStagingLocationConfig(replica.repldDir); + List withClauseOptions = getStagingLocationConfig(replica.repldDir, false); withClauseOptions.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + "'"); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -351,7 +351,7 @@ public void testRegularCopyRemoteStagingAndCopyTaskOnSource() throws Throwable { @Test public void testRegularCopyWithLocalStagingAndCopyTaskOnTarget() throws Throwable { - List withClauseOptions = getStagingLocationConfig(primary.repldDir); + List withClauseOptions = getStagingLocationConfig(primary.repldDir, false); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -409,7 +409,7 @@ public void externalTableReplicationDropDatabase() throws Throwable { String primaryDb = "primarydb1"; String replicaDb = "repldb1"; String tableName = "t1"; - List withClauseOptions = getStagingLocationConfig(primary.repldDir); + List withClauseOptions = getStagingLocationConfig(primary.repldDir, false); WarehouseInstance.Tuple tuple = primary .run("create database " + primaryDb) .run("alter database "+ primaryDb + " set dbproperties('repl.source.for'='1,2,3')") @@ -456,9 +456,15 @@ private void verifyTableDataExists(WarehouseInstance warehouse, Path dbDataPath, Assert.assertEquals(shouldExists, fileSystem.exists(dataFilePath)); } - private List getStagingLocationConfig(String stagingLoc) { + private List getStagingLocationConfig(String stagingLoc, boolean addDistCpConfigs) throws IOException { List confList = new ArrayList<>(); confList.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + stagingLoc + "'"); + if (addDistCpConfigs) { + confList.add("'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'"); + confList.add("'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'"); + confList.add("'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + } return confList; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 58d8e8c56a..1f40dd02e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -156,7 +156,7 @@ public int execute() { } // Copy the files from different source file systems to one destination directory CopyUtils copyUtils = new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs); - copyUtils.copyAndVerify(toPath, srcFiles, fromPath, work.isOverWrite()); + copyUtils.copyAndVerify(toPath, srcFiles, fromPath, work.readSrcAsFilesList(), work.isOverWrite()); // If a file is copied from CM path, then need to rename them using original source file name // This is needed to avoid having duplicate files in target if same event is applied twice diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index c313c383db..5a662ff3c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -74,7 +74,7 @@ public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinatio // changed/removed during copy, so double check the checksum after copy, // if not match, copy again from cm public void copyAndVerify(Path destRoot, List srcFiles, Path origSrcPath, - boolean overwrite) + boolean readSrcAsFilesList, boolean overwrite) throws IOException, LoginException, HiveFatalException { UserGroupInformation proxyUser = getProxyUser(); if (CollectionUtils.isEmpty(srcFiles)) { @@ -83,11 +83,8 @@ public void copyAndVerify(Path destRoot, List srcFil FileSystem sourceFs = srcFiles.get(0).getSrcFs(); boolean useRegularCopy = regularCopy(sourceFs, srcFiles); try { - if (!useRegularCopy) { - srcFiles.clear(); - srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath, null)); - doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy, overwrite); - } else { + if (useRegularCopy || readSrcAsFilesList) { + // Layout of data files may differ based on the type of tables. Map>> map = fsToFileMap(srcFiles, destRoot); for (Map.Entry>> entry : map.entrySet()) { Map> destMap = entry.getValue(); @@ -104,9 +101,16 @@ public void copyAndVerify(Path destRoot, List srcFil } // Copy files with retry logic on failure or source file is dropped or changed. - doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, true, overwrite); + doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, useRegularCopy, overwrite); } } + } else { + // When distCp is to be used and the srcFiles doesn't contain subDirs (readSrcAsFilesList=false), + // original from path should be used during distCp, as distCp copies dirItems of srcPath, + // not the srcPath folder itself. + srcFiles.clear(); + srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath, null)); + doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy, overwrite); } } finally { if (proxyUser != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index e758c8da53..1aff73843c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -127,7 +127,7 @@ protected void writeFileEntry(Table table, Partition ptn, String file, Context w filePaths.add(fileInfo); FileSystem dstFs = dataPath.getFileSystem(hiveConf); CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs); - copyUtils.copyAndVerify(dataPath, filePaths, srcDataPath, false); + copyUtils.copyAndVerify(dataPath, filePaths, srcDataPath, true, false); copyUtils.renameFileCopiedFromCmPath(dataPath, dstFs, filePaths); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java index 0d66128e2a..b5a910f26a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java @@ -80,7 +80,7 @@ private void copyFunctionBinaries(List functionBinaryCopyPaths, Hi Path destRoot = funcBinCopyPath.getTargetPath().getParent(); FileSystem dstFs = destRoot.getFileSystem(hiveConf); CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs); - copyUtils.copyAndVerify(destRoot, filePaths, funcBinCopyPath.getSrcPath(), false); + copyUtils.copyAndVerify(destRoot, filePaths, funcBinCopyPath.getSrcPath(), true, false); copyUtils.renameFileCopiedFromCmPath(destRoot, dstFs, filePaths); } }