diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index d0f30bcc2d..285f6248f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -187,15 +187,25 @@ protected int execute(DriverContext driverContext) { } private boolean doCopy(Path dst, FileSystem dstFs, Path src, FileSystem srcFs) throws IOException { - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)){ - // regular copy in test env. + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) + || isLocalFile(src) || isLocalFile(dst)){ + // regular copy in test env, or when source or destination is a local file + // distcp runs inside a mapper task, and cannot handle file:/// + LOG.debug("Using regular copy for {} -> {}", src.toUri(), dst.toUri()); return FileUtils.copy(srcFs, src, dstFs, dst, false, true, conf); } else { // distcp in actual deployment with privilege escalation + LOG.debug("Using privleged distcp for {} -> {}", src.toUri(), dst.toUri()); return FileUtils.privilegedCopy(srcFs, src, dst, conf); } } + private boolean isLocalFile(Path p) { + String scheme = p.toUri().getScheme(); + boolean isLocalFile = scheme.equalsIgnoreCase("file"); + LOG.debug("{} was a local file? {}, had scheme {}",p.toUri(), isLocalFile, scheme); + return isLocalFile; + } private List filesInFileListing(FileSystem fs, Path path) throws IOException { @@ -251,7 +261,7 @@ public String getName() { public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) { Task copyTask = null; LOG.debug("ReplCopyTask:getLoadCopyTask: "+srcPath + "=>" + dstPath); - if (replicationSpec.isInReplicationScope()){ + if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); LOG.debug("ReplCopyTask:\trcwork"); if (replicationSpec.isLazy()){ @@ -269,7 +279,7 @@ public String getName() { public static Task getDumpCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) { Task copyTask = null; LOG.debug("ReplCopyTask:getDumpCopyTask: "+srcPath + "=>" + dstPath); - if (replicationSpec.isInReplicationScope()){ + if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); LOG.debug("ReplCopyTask:\trcwork"); if (replicationSpec.isLazy()){ diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 4319bedfd4..e3d1199ac5 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -1100,6 +1100,7 @@ public void setStoragePolicy(Path path, StoragePolicyValue policy) // if no entries were added via conf, we initiate our defaults params.add("-update"); params.add("-skipcrccheck"); + params.add("-pb"); } params.add(src.toString()); params.add(dst.toString()); diff --git a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java index ba1086c02f..6c93df5704 100644 --- a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java +++ b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java @@ -48,11 +48,12 @@ public void testConstructDistCpParams() { Hadoop23Shims shims = new Hadoop23Shims(); List paramsDefault = shims.constructDistCpParams(copySrc, copyDst, conf); - assertEquals(4, paramsDefault.size()); + assertEquals(5, paramsDefault.size()); assertTrue("Distcp -update set by default", paramsDefault.contains("-update")); assertTrue("Distcp -skipcrccheck set by default", paramsDefault.contains("-skipcrccheck")); - assertEquals(copySrc.toString(), paramsDefault.get(2)); - assertEquals(copyDst.toString(), paramsDefault.get(3)); + assertTrue("Distcp -pb set by default", paramsDefault.contains("-pb")); + assertEquals(copySrc.toString(), paramsDefault.get(3)); + assertEquals(copyDst.toString(), paramsDefault.get(4)); conf.set("distcp.options.foo", "bar"); // should set "-foo bar" conf.set("distcp.options.blah", ""); // should set "-blah" @@ -67,6 +68,8 @@ public void testConstructDistCpParams() { !paramsWithCustomParamInjection.contains("-update")); assertTrue("Distcp -skipcrccheck not set if not requested", !paramsWithCustomParamInjection.contains("-skipcrccheck")); + assertTrue("Distcp -pb not set if not requested", + !paramsWithCustomParamInjection.contains("-pb")); // the "-foo bar" and "-blah" params order is not guaranteed String firstParam = paramsWithCustomParamInjection.get(0);