diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index d872ef3..dd3533c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -67,14 +67,17 @@ import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.Vertex; -import org.apache.tez.runtime.library.input.ShuffledMergedInput; +import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy; import org.apache.tez.runtime.library.output.OnFileSortedOutput; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; +import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.processor.map.MapProcessor; import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; +import org.apache.tez.mapreduce.partition.MRPartitioner; /** * DagUtils. DagUtils is a collection of helper methods to convert @@ -160,7 +163,7 @@ public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class.getName()), - new InputDescriptor(ShuffledMergedInput.class.getName())); + new InputDescriptor(ShuffledMergedInputLegacy.class.getName())); return new Edge(v, w, edgeProperty); } @@ -205,15 +208,19 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, int seqNo, // finally create the vertex Vertex map = null; + byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf); if (inputSplitInfo.getNumTasks() != 0) { map = new Vertex("Map "+seqNo, new ProcessorDescriptor(MapProcessor.class.getName()). - setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), + setUserPayload(serializedConf), inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(conf)); Map environment = new HashMap(); MRHelpers.updateEnvironmentForMRTasks(conf, environment, true); map.setTaskEnvironment(environment); map.setJavaOpts(MRHelpers.getMapJavaOpts(conf)); + map.addInput("in_"+seqNo, + new InputDescriptor(MRInputLegacy.class.getName()). + setUserPayload(serializedConf)); map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints()); @@ -520,6 +527,7 @@ public static JobConf createConfiguration(HiveConf hiveConf) throws IOException conf.set(MRJobConfig.OUTPUT_VALUE_CLASS, BytesWritable.class.getName()); conf.set("mapred.partitioner.class", HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPARTITIONER)); + conf.set("tez.runtime.partitioner.class", MRPartitioner.class.getName()); return conf; } @@ -561,20 +569,30 @@ public static JobConf initializeVertexConf(JobConf conf, BaseWork work) { */ public static Vertex createVertex(JobConf conf, BaseWork work, Path scratchDir, int seqNo, LocalResource appJarLr, List additionalLr, - FileSystem fileSystem, Context ctx) throws Exception { + FileSystem fileSystem, Context ctx, boolean hasChildren) throws Exception { + Vertex v = null; // simply dispatch the call to the right method for the actual (sub-) type of // BaseWork. if (work instanceof MapWork) { - return createVertex(conf, (MapWork) work, seqNo, appJarLr, + v = createVertex(conf, (MapWork) work, seqNo, appJarLr, additionalLr, fileSystem, scratchDir, ctx); } else if (work instanceof ReduceWork) { - return createVertex(conf, (ReduceWork) work, seqNo, appJarLr, + v = createVertex(conf, (ReduceWork) work, seqNo, appJarLr, additionalLr, fileSystem, scratchDir, ctx); } else { assert false; return null; } + + // final vertices need to have at least one output + if (!hasChildren) { + v.addOutput("out_"+seqNo, + new OutputDescriptor(MROutput.class.getName()) + .setUserPayload(MRHelpers.createUserPayloadFromConf(conf))); + } + + return v; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 22b2321..5cfe755 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -159,10 +159,12 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, int i = ws.size(); for (BaseWork w: ws) { + boolean isFinal = work.getLeaves().contains(w); + // translate work to vertex JobConf wxConf = DagUtils.initializeVertexConf(conf, w); Vertex wx = DagUtils.createVertex(wxConf, w, tezDir, - i--, appJarLr, additionalLr, fs, ctx); + i--, appJarLr, additionalLr, fs, ctx, !isFinal); dag.addVertex(wx); workToVertex.put(w, wx); workToConf.put(w, wxConf);