diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 661be21..652a264 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -51,8 +52,10 @@ import org.apache.hadoop.hive.shims.Hadoop20Shims.NullOutputCommitter; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -69,7 +72,6 @@ import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.Vertex; -import org.apache.tez.mapreduce.common.MRInputSplitDistributor; import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; @@ -217,9 +219,21 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, // finally create the vertex Vertex map = null; + // use tez to combine splits + boolean useTezGroupedSplits = false; + int numTasks = -1; Class amSplitGeneratorClass = null; InputSplitInfo inputSplitInfo = null; + Class inputFormatClass = conf.getClass("mapred.input.format.class", + InputFormat.class); + + // we'll set up tez to combine spits for us iff the input format + // is HiveInputFormat + if (inputFormatClass == HiveInputFormat.class) { + useTezGroupedSplits = true; + conf.setClass("mapred.input.format.class", TezGroupedSplitsInputFormat.class, InputFormat.class); + } if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)) { // if we're generating the splits in the AM, we just need to set @@ -245,7 +259,14 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, assert mapWork.getAliasToWork().keySet().size() == 1; String alias = mapWork.getAliasToWork().keySet().iterator().next(); - byte[] mrInput = MRHelpers.createMRInputPayload(serializedConf, null); + + byte[] mrInput = null; + if (useTezGroupedSplits) { + mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf, + null, HiveInputFormat.class.getName()); + } else { + mrInput = MRHelpers.createMRInputPayload(serializedConf, null); + } map.addInput(alias, new InputDescriptor(MRInputLegacy.class.getName()). setUserPayload(mrInput), amSplitGeneratorClass);