commit 857517bcd437e02aad673c7a9406f6fd5c463116 Author: Sahil Takiar Date: Fri Feb 24 10:38:05 2017 -0800 HIVE-14864: Distcp is not called from MoveTask when src is a directory diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 9e07c08..6998ca6 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -33,6 +33,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -565,19 +566,28 @@ public static boolean copy(FileSystem srcFS, Path src, HiveConf conf) throws IOException { HadoopShims shims = ShimLoader.getHadoopShims(); - boolean copied; + boolean copied = false; + boolean triedDistcp = false; /* Run distcp if source file/dir is too big */ - if (srcFS.getUri().getScheme().equals("hdfs") && - srcFS.getFileStatus(src).getLen() > conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE)) { - LOG.info("Source is " + srcFS.getFileStatus(src).getLen() + " bytes. (MAX: " + conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE) + ")"); - LOG.info("Launch distributed copy (distcp) job."); - HiveConfUtil.updateJobCredentialProviders(conf); - copied = shims.runDistCp(src, dst, conf); - if (copied && deleteSource) { - srcFS.delete(src, true); + if (srcFS.getUri().getScheme().equals("hdfs")) { + ContentSummary srcContentSummary = srcFS.getContentSummary(src); + if (srcContentSummary.getFileCount() > conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES) + && srcContentSummary.getLength() > conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE)) { + + LOG.info("Source is " + srcContentSummary.getLength() + " bytes. (MAX: " + conf.getLongVar( + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE) + ")"); + LOG.info("Source is " + srcContentSummary.getFileCount() + " files. (MAX: " + conf.getLongVar( + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES) + ")"); + LOG.info("Launch distributed copy (distcp) job."); + triedDistcp = true; + copied = shims.runDistCp(src, dst, conf); + if (copied && deleteSource) { + srcFS.delete(src, true); + } } - } else { + } + if (!triedDistcp) { copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf); } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f0c129b..c21c40b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1162,9 +1162,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_GROUPBY_LIMIT_EXTRASTEP("hive.groupby.limit.extrastep", true, "This parameter decides if Hive should \n" + "create new MR job for sorting final output"), - // Max filesize used to do a single copy (after that, distcp is used) + // Max file num and size used to do a single copy (after that, distcp is used) + HIVE_EXEC_COPYFILE_MAXNUMFILES("hive.exec.copyfile.maxnumfiles", 1L, + "Maximum number of files Hive uses to do sequential HDFS copies between directories." + + "Distributed copies (distcp) will be used instead for larger numbers of files so that copies can be done faster."), HIVE_EXEC_COPYFILE_MAXSIZE("hive.exec.copyfile.maxsize", 32L * 1024 * 1024 /*32M*/, - "Maximum file size (in Mb) that Hive uses to do single HDFS copies between directories." + + "Maximum file size (in bytes) that Hive uses to do single HDFS copies between directories." + "Distributed copies (distcp) will be used instead for bigger files so that copies can be done faster."), // for hive udtf operator