Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision b4af62e037b07b70ad0a95d49681bff53ac1071f) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (date 1640830810000) @@ -1842,6 +1842,8 @@ HIVEMERGEMAPFILES("hive.merge.mapfiles", true, "Merge small files at the end of a map-only job"), + HIVEMERGEMAXNUMFILEPERMAP("hive.merge.max.number.file.per.map", 1024, + "Hive merge max number files of a map"), HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false, "Merge small files at the end of a map-reduce job"), HIVEMERGETEZFILES("hive.merge.tezfiles", false, "Merge small files at the end of a Tez DAG"), Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (revision b4af62e037b07b70ad0a95d49681bff53ac1071f) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (date 1640833610000) @@ -170,11 +170,11 @@ } else { // no dynamic partitions if(lbLevel == 0) { // static partition without list bucketing - long totalSz = getMergeSize(inpFs, dirPath, avgConditionSize); - Utilities.FILE_OP_LOGGER.debug("merge resolve simple case - totalSz " + totalSz + " from " + dirPath); - - if (totalSz >= 0) { // add the merge job - setupMapRedWork(conf, work, trgtSize, totalSz); + AverageSize avgSize = getMergeSize(inpFs, dirPath, avgConditionSize); + if (null != avgSize) { // add the merge job + Utilities.FILE_OP_LOGGER.debug("merge resolve simple case - totalSz " + + avgSize.getTotalSize() + " from " + dirPath); + setupMapRedWork(conf, work, trgtSize, avgSize); resTsks.add(mrTask); } else { // don't need to merge, add the move job resTsks.add(mvTask); @@ -252,14 +252,17 @@ // populate pathToPartitionInfo and pathToAliases w/ DP paths long totalSz = 0; + int totalNumFiles = 0; + boolean doMerge = false; // list of paths that don't need to merge but need to move to the dest location List toMove = new ArrayList(); for (int i = 0; i < status.length; ++i) { - long len = getMergeSize(inpFs, status[i].getPath(), avgConditionSize); - if (len >= 0) { + AverageSize avgSize = getMergeSize(inpFs, status[i].getPath(), avgConditionSize); + if (null != avgSize) { doMerge = true; - totalSz += len; + totalSz += avgSize.getTotalSize(); + totalNumFiles += avgSize.getNumFiles(); PartitionDesc pDesc = (dpCtx != null) ? generateDPFullPartSpec(dpCtx, status, tblDesc, i) : partDesc; if (pDesc == null) { @@ -277,7 +280,8 @@ } if (doMerge) { // add the merge MR job - setupMapRedWork(conf, work, trgtSize, totalSz); + AverageSize totalAvgSize = new AverageSize(totalSz, totalNumFiles); + setupMapRedWork(conf, work, trgtSize, totalAvgSize); // add the move task for those partitions that do not need merging if (toMove.size() > 0) { @@ -341,21 +345,38 @@ return new PartitionDesc(tblDesc, fullPartSpec); } - private void setupMapRedWork(HiveConf conf, MapWork mWork, long targetSize, long totalSize) { - mWork.setMaxSplitSize(targetSize); - mWork.setMinSplitSize(targetSize); - mWork.setMinSplitSizePerNode(targetSize); - mWork.setMinSplitSizePerRack(targetSize); + + private void setupMapRedWork(HiveConf conf, MapWork mWork, long targetSize, AverageSize avgSize) { + int maxNumFilesPerMap = conf.getIntVar(HiveConf.ConfVars.HIVEMERGEMAXNUMFILEPERMAP); + if (avgSize.getNumFiles() > maxNumFilesPerMap && avgSize.getAvgSize() < targetSize / maxNumFilesPerMap) { + long targetMaxFileSize = avgSize.getTotalSize() / (avgSize.getNumFiles() / maxNumFilesPerMap + 1); + mWork.setMaxSplitSize(targetMaxFileSize); + mWork.setMinSplitSize(targetMaxFileSize); + mWork.setMinSplitSizePerNode(targetMaxFileSize); + mWork.setMinSplitSizePerRack(targetMaxFileSize); + } else { + mWork.setMaxSplitSize(targetSize); + mWork.setMinSplitSize(targetSize); + mWork.setMinSplitSizePerNode(targetSize); + mWork.setMinSplitSizePerRack(targetSize); + } + mWork.setIsMergeFromResolver(true); } private static class AverageSize { private final long totalSize; + private final long avgSize; private final int numFiles; public AverageSize(long totalSize, int numFiles) { this.totalSize = totalSize; this.numFiles = numFiles; + if(numFiles > 0) { + this.avgSize = totalSize / numFiles; + } else { + this.avgSize = 0; + } } public long getTotalSize() { @@ -365,6 +386,10 @@ public int getNumFiles() { return numFiles; } + + public long getAvgSize() { + return avgSize; + } } private AverageSize getAverageSize(FileSystem inpFs, Path dirPath) { @@ -407,19 +432,19 @@ * If return value is 0 that means there are multiple files each of which is an empty file. * This could be true when the table is bucketized and all buckets are empty. */ - private long getMergeSize(FileSystem inpFs, Path dirPath, long avgSize) { + private AverageSize getMergeSize(FileSystem inpFs, Path dirPath, long avgSize) { AverageSize averageSize = getAverageSize(inpFs, dirPath); if (averageSize.getTotalSize() < 0) { - return -1; + return null; } if (averageSize.getNumFiles() <= 1) { - return -1; + return null; } if (averageSize.getTotalSize()/averageSize.getNumFiles() < avgSize) { - return averageSize.getTotalSize(); + return averageSize; } - return -1; + return null; } }