diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 35db50c..16bbf8d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -41,8 +42,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.parse.SplitSample; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -75,6 +74,21 @@ private static final String CLASS_NAME = CombineHiveInputFormat.class.getName(); public static final Log LOG = LogFactory.getLog(CLASS_NAME); + // to call protected method + private static final Method IS_SPLITTABLE; + + static { + Method method; + try { + method = FileInputFormat.class.getDeclaredMethod( + "isSplitable", new Class[] {FileSystem.class, Path.class}); + method.setAccessible(true); + } catch (Exception e) { + method = null; + } + IS_SPLITTABLE = method; + } + /** * CombineHiveInputSplit encapsulates an InputSplit with its corresponding * inputFormatClassName. A CombineHiveInputSplit comprises of multiple chunks @@ -326,8 +340,7 @@ public int hashCode() { // Hadoop does not handle non-splittable files correctly for CombineFileInputFormat, // so don't use CombineFileInputFormat for non-splittable files - //ie, dont't combine if inputformat is a TextInputFormat and has compression turned on - + //ie, don't combine if inputformat is a TextInputFormat and has compression turned on if (inputFormat instanceof TextInputFormat) { Queue dirs = new LinkedList(); FileStatus fStats = inpFs.getFileStatus(path); @@ -336,7 +349,7 @@ public int hashCode() { if (fStats.isDir()) { dirs.offer(path); } else if ((new CompressionCodecFactory(job)).getCodec(path) != null) { - //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine) + //if compression codec is set, use HiveInputFormat.getSplits (don't combine) splits = super.getSplits(job, numSplits); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return splits; @@ -350,7 +363,7 @@ public int hashCode() { dirs.offer(fStatus[idx].getPath()); } else if ((new CompressionCodecFactory(job)).getCodec( fStatus[idx].getPath()) != null) { - //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine) + //if compression codec is set, use HiveInputFormat.getSplits (don't combine) splits = super.getSplits(job, numSplits); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return splits; @@ -444,6 +457,23 @@ public int hashCode() { return result.toArray(new CombineHiveInputSplit[result.size()]); } + private boolean shouldSkipCombine(InputFormat inputFormat, Path path, JobConf job) + throws IOException { + if (inputFormat instanceof AvoidSplitCombination && + ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, job)) { + return true; + } + // CombineFileInputFormat does not care return value of isSplitable() in FileInputFormat, + // which is common base class for custom input formats. + try { + return IS_SPLITTABLE != null && inputFormat instanceof FileInputFormat && + !((Boolean) IS_SPLITTABLE.invoke( + inputFormat, path.getFileSystem(job), path)).booleanValue(); + } catch (Exception e) { + return false; + } + } + /** * Create Hive splits based on CombineFileSplit. */ @@ -472,11 +502,10 @@ public int hashCode() { Class inputFormatClass = part.getInputFileFormatClass(); String inputFormatClassName = inputFormatClass.getName(); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); - if (inputFormat instanceof AvoidSplitCombination && - ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, job)) { + + if (shouldSkipCombine(inputFormat, path, job)) { if (LOG.isDebugEnabled()) { - LOG.debug("The split [" + path + - "] is being parked for HiveInputFormat.getSplits"); + LOG.debug("The split [" + path + "] is being parked for HiveInputFormat.getSplits"); } nonCombinablePaths.add(path); } else { @@ -494,6 +523,9 @@ public int hashCode() { // Process the normal splits if (nonCombinablePaths.size() > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("NonCombinable Paths : " + nonCombinablePaths); + } FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray (new Path[nonCombinablePaths.size()])); InputSplit[] splits = super.getSplits(job, numSplits); @@ -504,6 +536,9 @@ public int hashCode() { // Process the combine splits if (combinablePaths.size() > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Combinable Paths : " + combinablePaths); + } FileInputFormat.setInputPaths(job, combinablePaths.toArray (new Path[combinablePaths.size()])); InputSplit[] splits = getCombineSplits(job, numSplits);