Index: ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (revision 955674) +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (working copy) @@ -23,9 +23,11 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedList; import java.util.Map; import java.util.Queue; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,9 +38,9 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim; import org.apache.hadoop.hive.shims.HadoopShims.InputSplitShim; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodecFactory; @@ -230,8 +232,8 @@ // combine splits only from same tables and same partitions. Do not combine splits from multiple // tables or multiple partitions. Path[] paths = combine.getInputPathsShim(job); + Set poolSet = new HashSet(); for (Path path : paths) { - LOG.info("CombineHiveInputSplit creating pool for " + path); PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, path); TableDesc tableDesc = part.getTableDesc(); @@ -283,7 +285,24 @@ return super.getSplits(job, numSplits); } - combine.createPool(job, new CombineFilter(path)); + // In the case of tablesample, the input paths are pointing to files rather than directories. + // We need to get the parent directory as the filtering path so that all files in the same + // parent directory will be grouped into one pool but not files from different parent + // directories. This guarantees that a split will combine all files in the same partition + // but won't cross multiple partitions. + Path filterPath = path; + if (!path.getFileSystem(job).getFileStatus(path).isDir()) { // path is not directory + filterPath = path.getParent(); + } + if (!poolSet.contains(filterPath)) { + LOG.info("CombineHiveInputSplit creating pool for " + path + + "; using filter path " + filterPath); + combine.createPool(job, new CombineFilter(filterPath)); + poolSet.add(filterPath); + } else { + LOG.info("CombineHiveInputSplit: pool is already created for " + path + + "; using filter path " + filterPath); + } } InputSplitShim[] iss = combine.getSplits(job, 1); for (InputSplitShim is : iss) { @@ -389,10 +408,12 @@ private final String pString; // store a path prefix in this TestFilter + // PRECONDITION: p should always be a directory public CombineFilter(Path p) { // we need to keep the path part only because the Hadoop CombineFileInputFormat will // pass the path part only to accept(). - pString = p.toUri().getPath().toString() + File.separator; + // Trailing the path with a separator to prevent partial matching. + pString = p.toUri().getPath().toString() + File.separator;; } // returns true if the specified path matches the prefix stored