diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 35db50c..3dff582 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -41,8 +42,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.parse.SplitSample; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -75,6 +74,21 @@ private static final String CLASS_NAME = CombineHiveInputFormat.class.getName(); public static final Log LOG = LogFactory.getLog(CLASS_NAME); + // to call protected method + private static final Method IS_SPLITABLE; + + static { + Method method; + try { + method = FileInputFormat.class.getDeclaredMethod( + "isSplitable", new Class[] {FileSystem.class, Path.class}); + method.setAccessible(true); + } catch (Exception e) { + method = null; + } + IS_SPLITABLE = method; + } + /** * CombineHiveInputSplit encapsulates an InputSplit with its corresponding * inputFormatClassName. A CombineHiveInputSplit comprises of multiple chunks @@ -318,6 +332,12 @@ public int hashCode() { } FileSystem inpFs = path.getFileSystem(job); + // CombineFileInputFormat does not care return value of isSplitable() in FileInputFormat, + // which is common base class for custom input formats. + if (!isSplitable(inputFormat, inpFs, path)) { + return super.getSplits(job, numSplits); + } + // Since there is no easy way of knowing whether MAPREDUCE-1597 is present in the tree or not, // we use a configuration variable for the same if (this.mrwork != null && !this.mrwork.getHadoopSupportsSplittable()) { @@ -326,8 +346,7 @@ public int hashCode() { // Hadoop does not handle non-splittable files correctly for CombineFileInputFormat, // so don't use CombineFileInputFormat for non-splittable files - //ie, dont't combine if inputformat is a TextInputFormat and has compression turned on - + //ie, don't combine if inputformat is a TextInputFormat and has compression turned on if (inputFormat instanceof TextInputFormat) { Queue dirs = new LinkedList(); FileStatus fStats = inpFs.getFileStatus(path); @@ -336,7 +355,7 @@ public int hashCode() { if (fStats.isDir()) { dirs.offer(path); } else if ((new CompressionCodecFactory(job)).getCodec(path) != null) { - //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine) + //if compression codec is set, use HiveInputFormat.getSplits (don't combine) splits = super.getSplits(job, numSplits); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return splits; @@ -350,7 +369,7 @@ public int hashCode() { dirs.offer(fStatus[idx].getPath()); } else if ((new CompressionCodecFactory(job)).getCodec( fStatus[idx].getPath()) != null) { - //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine) + //if compression codec is set, use HiveInputFormat.getSplits (don't combine) splits = super.getSplits(job, numSplits); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return splits; @@ -444,6 +463,17 @@ public int hashCode() { return result.toArray(new CombineHiveInputSplit[result.size()]); } + private boolean isSplitable(InputFormat inputFormat, FileSystem inpFs, Path path) + throws IOException { + try { + return IS_SPLITABLE == null || ( + FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) && + ((Boolean)IS_SPLITABLE.invoke(inputFormat, inpFs, path)).booleanValue()); + } catch (Exception e) { + return true; + } + } + /** * Create Hive splits based on CombineFileSplit. */