Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1135335) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -809,8 +810,9 @@ throws Exception { int numEmptyPaths = 0; - List pathsProcessed = new ArrayList(); - + Set pathsProcessed = new HashSet(); + Set emptyPathSet = new HashSet(); + int size=0; // AliasToWork contains all the aliases for (String oneAlias : work.getAliasToWork().keySet()) { LOG.info("Processing alias " + oneAlias); @@ -828,15 +830,24 @@ if (pathsProcessed.contains(path)) { continue; } + + //1 for comma + size = size + path.length() + 1; pathsProcessed.add(path); LOG.info("Adding input file " + path); - - Path dirPath = new Path(path); - if (!Utilities.isEmptyPath(job, path, ctx)) { - FileInputFormat.addInputPath(job, dirPath); - } else { + LinkedHashMap partToRework = work.getPartDescToRework(); + PartitionDesc partDesc = work.getPathToPartitionInfo().get(onefile); + boolean checkEmptyPaths = true; + if(partToRework != null) { + Boolean val = partToRework.get(partDesc); + if(val != null && val) { + checkEmptyPaths = false; + } + } + if (checkEmptyPaths && Utilities.isEmptyPath(job, path, ctx)) { emptyPaths.add(path); + emptyPathSet.add(path); } } } @@ -860,6 +871,67 @@ oneAlias); } } + setInputPaths(job, pathsProcessed, emptyPathSet, size); + } + + private static void setInputPaths(JobConf job, Set pathsProcessed, + Set emptyPathSet, int size) { + String existingPaths = job.get("mapred.input.dir"); + boolean first =true; + if(existingPaths != null && !existingPaths.trim().equals("")) { + first = false; + size = size + existingPaths.length() + 1; + } + + // plus 100 for comma escape + StringBuilder sb = new StringBuilder(size + 100); + //existing paths for empty paths + if(!first) { + sb.append(existingPaths); + } + for(String toAdd : pathsProcessed) { + if (emptyPathSet.contains(toAdd)) { + continue; + } + String afterEscape = escapeComma(toAdd, '\\'); + if(!first) { + sb.append(","); + } else { + first = false; + } + sb.append(afterEscape); + } + + job.set("mapred.input.dir", sb.toString()); + } + + public static String escapeComma(String str, char escapeChar) { + if (str == null) { + return null; + } + + boolean appearComma = false; + for (int i = 0; i < str.length(); i++) { + char curChar = str.charAt(i); + if(curChar == ',') { + appearComma = true; + break; + } + } + + if (!appearComma) { + return str; + } + StringBuilder result = new StringBuilder(); + for (int i = 0; i < str.length(); i++) { + char curChar = str.charAt(i); + if(curChar == ',') { + // special char + result.append(escapeChar); + } + result.append(curChar); + } + return result.toString(); } @Override Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1135335) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -2047,12 +2047,16 @@ try { MapredWork mapredWork = ((MapRedTask) task).getWork(); Set> reworkInputFormats = new HashSet>(); + if (mapredWork.getPartDescToRework() == null) { + mapredWork.setPartDescToRework(new LinkedHashMap()); + } for (PartitionDesc part : mapredWork.getPathToPartitionInfo().values()) { Class inputFormatCls = part .getInputFileFormatClass(); if (ReworkMapredInputFormat.class.isAssignableFrom(inputFormatCls)) { reworkInputFormats.add(inputFormatCls); } + mapredWork.getPartDescToRework().put(part, Boolean.TRUE); } if (reworkInputFormats.size() > 0) { Index: ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (revision 1135335) +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (working copy) @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.parse.SplitSample; +import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.shims.ShimLoader; @@ -271,6 +272,62 @@ } ArrayList result = new ArrayList(); + boolean prepareNormalCombine = true; + if (this.mrwork != null && this.mrwork.getHadoopSupportsSplittable() + && !mrwork.isMapperCannotSpanPartns() + && mrwork.getPartDescToRework() != null + && mrwork.getPartDescToRework().size() > 0) { + // check if we need to use NullCombineFilter + // will use NullCombineFilter if all partitionDesc has been reworked, + // which means the query is only selecting from tables/partitions which + // use ReworkMapredInputFormat, and the rework option has been applied. + boolean findNoReworkPart = false; + for (PartitionDesc part : mrwork.getPathToPartitionInfo().values()) { + Boolean val = mrwork.getPartDescToRework().get(part); + if (val == null || !val) { + findNoReworkPart = true; + break; + } + } + + if(!findNoReworkPart) { + prepareNormalCombine = false; + prepareNullCombineFilter(job, combine); + } + } + + if(prepareNormalCombine) { + InputSplit[] ret = prepareNormalCombineFilter(job, numSplits, pathToAliases, aliasToWork, + combine); + if (ret != null) { + return ret; + } + } + + InputSplitShim[] iss = combine.getSplits(job, 1); + + if (mrwork.getNameToSplitSample() != null && !mrwork.getNameToSplitSample().isEmpty()) { + iss = sampleSplits(iss); + } + + for (InputSplitShim is : iss) { + CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is); + result.add(csplit); + } + + LOG.info("number of splits " + result.size()); + return result.toArray(new CombineHiveInputSplit[result.size()]); + } + + private void prepareNullCombineFilter(JobConf job, + CombineFileInputFormatShim combine) { + combine.createPool(job, new NullCombineFilter()); + } + + private InputSplit[] prepareNormalCombineFilter(JobConf job, int numSplits, + Map> pathToAliases, + Map> aliasToWork, + CombineFileInputFormatShim combine) throws IOException { // combine splits only from same tables and same partitions. Do not combine splits from multiple // tables or multiple partitions. Path[] paths = combine.getInputPathsShim(job); @@ -379,20 +436,8 @@ } } } - - InputSplitShim[] iss = combine.getSplits(job, 1); - - if (mrwork.getNameToSplitSample() != null && !mrwork.getNameToSplitSample().isEmpty()) { - iss = sampleSplits(iss); - } - - for (InputSplitShim is : iss) { - CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is); - result.add(csplit); - } - - LOG.info("number of splits " + result.size()); - return result.toArray(new CombineHiveInputSplit[result.size()]); + + return null; } /** @@ -508,6 +553,15 @@ ((CombineHiveInputSplit) split).getInputSplitShim(), reporter, CombineHiveRecordReader.class); } + + static class NullCombineFilter implements PathFilter { + + @Override + public boolean accept(Path path) { + return true; + } + + } static class CombineFilter implements PathFilter { private final List pStrings = new ArrayList(); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1135335) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy) @@ -47,6 +47,8 @@ private LinkedHashMap pathToPartitionInfo; + private transient LinkedHashMap partDescToRework; + private LinkedHashMap> aliasToWork; private LinkedHashMap aliasToPartnInfo; @@ -430,5 +432,14 @@ LinkedHashMap, OpParseContext> opParseCtxMap) { this.opParseCtxMap = opParseCtxMap; } + + public LinkedHashMap getPartDescToRework() { + return partDescToRework; + } + + public void setPartDescToRework( + LinkedHashMap partDescToRework) { + this.partDescToRework = partDescToRework; + } }