diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java index 8b99db1..eea8cdb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java @@ -77,16 +77,23 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, return rr; } - protected FileStatus[] listStatus(JobConf job, Path path) throws IOException { + /** + * Recursively lists status for all files starting from the directory dir + * @param job + * @param dir + * @return + * @throws IOException + */ + protected FileStatus[] listStatus(JobConf job, Path dir) throws IOException { ArrayList result = new ArrayList(); List errors = new ArrayList(); - FileSystem fs = path.getFileSystem(job); - FileStatus[] matches = fs.globStatus(path); + FileSystem fs = dir.getFileSystem(job); + FileStatus[] matches = fs.globStatus(dir); if (matches == null) { - errors.add(new IOException("Input path does not exist: " + path)); + errors.add(new IOException("Input path does not exist: " + dir)); } else if (matches.length == 0) { - errors.add(new IOException("Input Pattern " + path + " matches 0 files")); + errors.add(new IOException("Input Pattern " + dir + " matches 0 files")); } else { for (FileStatus globStat : matches) { FileUtils.listStatusRecursively(fs, globStat, result); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 6ca4d5b..56d9808 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -317,6 +317,8 @@ public int hashCode() { // https://issues.apache.org/jira/browse/MAPREDUCE-1597 is fixed. // Hadoop does not handle non-splittable files correctly for CombineFileInputFormat, // so don't use CombineFileInputFormat for non-splittable files + + //ie, dont't combine if inputformat is a TextInputFormat and has compression turned on FileSystem inpFs = path.getFileSystem(job); if (inputFormat instanceof TextInputFormat) { @@ -327,6 +329,7 @@ public int hashCode() { if (fStats.isDir()) { dirs.offer(path); } else if ((new CompressionCodecFactory(job)).getCodec(path) != null) { + //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine) splits = super.getSplits(job, numSplits); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return splits; @@ -340,6 +343,7 @@ public int hashCode() { dirs.offer(fStatus[idx].getPath()); } else if ((new CompressionCodecFactory(job)).getCodec( fStatus[idx].getPath()) != null) { + //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine) splits = super.getSplits(job, numSplits); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return splits; @@ -348,7 +352,7 @@ public int hashCode() { } } } - + //don't combine if inputformat is a SymlinkTextInputFormat if (inputFormat instanceof SymlinkTextInputFormat) { splits = super.getSplits(job, numSplits); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); @@ -362,6 +366,10 @@ public int hashCode() { List> opList = null; if (!mrwork.isMapperCannotSpanPartns()) { + //if mapper can span partitions, make sure a splits does not contain multiple + // opList + inputFormatClassName + deserializerClassName combination + // This is done using the Map of CombinePathInputFormat to PathFilter + opList = HiveFileFormatUtils.doGetWorksFromPath( pathToAliases, aliasToWork, filterPath); CombinePathInputFormat combinePathInputFormat = @@ -397,6 +405,9 @@ public int hashCode() { // Processing directories List iss = new ArrayList(); if (!mrwork.isMapperCannotSpanPartns()) { + //mapper can span partitions + //combine into as few as one split, subject to the PathFilters set + // using combine.createPool. iss = Arrays.asList(combine.getSplits(job, 1)); } else { for (Path path : inpDirs) {