diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 421140f..c0a594a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -31,6 +32,8 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,12 +52,15 @@ import org.apache.hadoop.hive.shims.HadoopShims.InputSplitShim; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.util.StringUtils; + /** * CombineHiveInputFormat is a parameterized InputFormat which looks at the path @@ -274,6 +280,9 @@ public class CombineHiveInputFormat inpDirs = new ArrayList(); + List inpFiles = new ArrayList(); Map poolMap = new HashMap(); Set poolSet = new HashSet(); @@ -334,16 +343,6 @@ public class CombineHiveInputFormat> opList = null; @@ -353,14 +352,20 @@ public class CombineHiveInputFormat iss = new ArrayList(); + if (!mrwork.isMapperCannotSpanPartns()) { + iss = Arrays.asList(combine.getSplits(job, 1)); + } else { + for (Path path : inpDirs) { + processPaths(job, combine, iss, path); + } + + if (inpFiles.size() > 0) { + // Processing files + for (Path filterPath : poolSet) { + combine.createPool(job, new CombineFilter(filterPath)); + } + processPaths(job, combine, iss, inpFiles.toArray(new Path[0])); + } + } if (mrwork.getNameToSplitSample() != null && !mrwork.getNameToSplitSample().isEmpty()) { iss = sampleSplits(iss); @@ -395,6 +416,13 @@ public class CombineHiveInputFormat iss, Path... path) throws IOException { + JobConf currJob = new JobConf(job); + FileInputFormat.setInputPaths(currJob, path); + iss.addAll(Arrays.asList(combine.getSplits(currJob, 1))); + } + /** * This function is used to sample inputs for clauses like "TABLESAMPLE(1 PERCENT)" * @@ -406,7 +434,7 @@ public class CombineHiveInputFormat sampleSplits(List splits) { HashMap nameToSamples = mrwork.getNameToSplitSample(); List retLists = new ArrayList(); Map> aliasToSplitList = new HashMap>(); @@ -473,8 +501,7 @@ public class CombineHiveInputFormat