diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 3ed2d08..537b778 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -33,6 +33,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -569,19 +570,28 @@ public static boolean copy(FileSystem srcFS, Path src, HiveConf conf) throws IOException { HadoopShims shims = ShimLoader.getHadoopShims(); - boolean copied; + boolean copied = false; /* Run distcp if source file/dir is too big */ - if (srcFS.getUri().getScheme().equals("hdfs") && - srcFS.getFileStatus(src).getLen() > conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE)) { - LOG.info("Source is " + srcFS.getFileStatus(src).getLen() + " bytes. (MAX: " + conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE) + ")"); - LOG.info("Launch distributed copy (distcp) job."); - copied = shims.runDistCp(src, dst, conf); - if (copied && deleteSource) { - srcFS.delete(src, true); + if (srcFS.getUri().getScheme().equals("hdfs")) { + ContentSummary srcContentSummary = srcFS.getContentSummary(src); + + if (srcContentSummary.getFileCount() > conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES) + && srcContentSummary.getLength() > conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE)) { + + LOG.info("Source is " + srcContentSummary.getLength() + " bytes. (MAX: " + conf.getLongVar( + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE) + ")"); + LOG.info("Source is " + srcContentSummary.getFileCount() + " files. (MAX: " + conf.getLongVar( + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES) + ")"); + LOG.info("Launch distributed copy (distcp) job."); + copied = shims.runDistCp(src, dst, conf); + if (copied && deleteSource) { + srcFS.delete(src, true); + } + + } else { + copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf); } - } else { - copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf); } boolean inheritPerms = conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 18b98e9..378a938 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1092,6 +1092,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "create new MR job for sorting final output"), // Max filesize used to do a single copy (after that, distcp is used) + HIVE_EXEC_COPYFILE_MAXNUMFILES("hive.exec.copyfile.maxnumfiles", 1, + "Maximum number of files Hive uses to do sequential HDFS copies between directories." + + "Distributed copies (distcp) will be used instead for larger numbers of files so that copies can be done faster."), HIVE_EXEC_COPYFILE_MAXSIZE("hive.exec.copyfile.maxsize", 32L * 1024 * 1024 /*32M*/, "Maximum file size (in Mb) that Hive uses to do single HDFS copies between directories." + "Distributed copies (distcp) will be used instead for bigger files so that copies can be done faster."),