diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index a9fdc57..93b7e04 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -80,13 +80,11 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.tez.client.PreWarmContext; +import org.apache.tez.common.TezJobConfig; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeManagerDescriptor; import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.EdgeProperty.DataMovementType; -import org.apache.tez.dag.api.EdgeProperty.DataSourceType; -import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; @@ -100,16 +98,13 @@ import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; -import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer; +import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer; +import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer; import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; -import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy; -import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput; -import org.apache.tez.runtime.library.output.OnFileSortedOutput; -import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput; -import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput; import com.google.common.base.Function; import com.google.common.collect.Iterators; @@ -161,6 +156,7 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { JobConf conf = new JobConf(baseConf); if (mapWork.getNumMapTasks() != null) { + // Is this required ? conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue()); } @@ -199,6 +195,7 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { inpFormat = CombineHiveInputFormat.class.getName(); } + // Is this required ? conf.set("mapred.mapper.class", ExecMapper.class.getName()); conf.set("mapred.input.format.class", inpFormat); @@ -210,20 +207,19 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { * Edge between them. * * @param group The parent VertexGroup - * @param wConf The job conf of the child vertex + * @param vConf The job conf of one of the parrent (grouped) vertices * @param w The child vertex * @param edgeProp the edge property of connection between the two * endpoints. */ @SuppressWarnings("rawtypes") - public GroupInputEdge createEdge(VertexGroup group, JobConf wConf, + public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w, TezEdgeProperty edgeProp) throws IOException { Class mergeInputClass; LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getName()); - w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf)); EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { @@ -254,40 +250,23 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf wConf, break; } - return new GroupInputEdge(group, w, createEdgeProperty(edgeProp), + return new GroupInputEdge(group, w, createEdgeProperty(edgeProp, vConf), new InputDescriptor(mergeInputClass.getName())); } /** - * Given two vertices a, b update their configurations to be used in an Edge a-b - */ - public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) - throws IOException { - - // Tez needs to setup output subsequent input pairs correctly - MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf); - - // update payloads (configuration for the vertices might have changed) - v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf)); - w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf)); - } - - /** - * Given two vertices and their respective configuration objects createEdge + * Given two vertices and the configuration for the source vertex, createEdge * will create an Edge object that connects the two. * - * @param vConf JobConf of the first vertex + * @param vConf JobConf of the first (source) vertex * @param v The first vertex (source) - * @param wConf JobConf of the second vertex * @param w The second vertex (sink) * @return */ - public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, + public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp) throws IOException { - updateConfigurationForEdge(vConf, v, wConf, w); - switch(edgeProp.getEdgeType()) { case CUSTOM_EDGE: { int numBuckets = edgeProp.getNumBuckets(); @@ -306,31 +285,33 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, // nothing } - return new Edge(v, w, createEdgeProperty(edgeProp)); + return new Edge(v, w, createEdgeProperty(edgeProp, vConf)); } /* * Helper function to create an edge property from an edge type. */ @SuppressWarnings("rawtypes") - private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOException { - DataMovementType dataMovementType; - Class logicalInputClass; - Class logicalOutputClass; + private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf) + throws IOException { + MRHelpers.translateVertexConfToTez(conf); + String keyClass = conf.get(TezJobConfig.TEZ_RUNTIME_KEY_CLASS); + String valClass = conf.get(TezJobConfig.TEZ_RUNTIME_VALUE_CLASS); + String partitionerClassName = conf.get("mapred.partitioner.class"); + Configuration partitionerConf; - EdgeProperty edgeProperty = null; EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { case BROADCAST_EDGE: - dataMovementType = DataMovementType.BROADCAST; - logicalOutputClass = OnFileUnorderedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; - break; - + UnorderedUnpartitionedKVEdgeConfigurer et1Conf = UnorderedUnpartitionedKVEdgeConfigurer + .newBuilder(keyClass, valClass).setFromConfiguration(conf).build(); + return et1Conf.createDefaultBroadcastEdgeProperty(); case CUSTOM_EDGE: - dataMovementType = DataMovementType.CUSTOM; - logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + UnorderedPartitionedKVEdgeConfigurer et2Conf = UnorderedPartitionedKVEdgeConfigurer + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf).build(); EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); CustomEdgeConfiguration edgeConf = @@ -339,38 +320,43 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOExcep edgeConf.write(dob); byte[] userPayload = dob.getData(); edgeDesc.setUserPayload(userPayload); - edgeProperty = - new EdgeProperty(edgeDesc, - DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - new OutputDescriptor(logicalOutputClass.getName()), - new InputDescriptor(logicalInputClass.getName())); - break; - + return et2Conf.createDefaultCustomEdgeProperty(edgeDesc); case CUSTOM_SIMPLE_EDGE: - dataMovementType = DataMovementType.SCATTER_GATHER; - logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; - break; - + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + UnorderedPartitionedKVEdgeConfigurer et3Conf = UnorderedPartitionedKVEdgeConfigurer + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf).build(); + return et3Conf.createDefaultEdgeProperty(); case SIMPLE_EDGE: default: - dataMovementType = DataMovementType.SCATTER_GATHER; - logicalOutputClass = OnFileSortedOutput.class; - logicalInputClass = ShuffledMergedInputLegacy.class; - break; + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + OrderedPartitionedKVEdgeConfigurer et4Conf = OrderedPartitionedKVEdgeConfigurer + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf).build(); + return et4Conf.createDefaultEdgeProperty(); } + } - if (edgeProperty == null) { - edgeProperty = - new EdgeProperty(dataMovementType, - DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - new OutputDescriptor(logicalOutputClass.getName()), - new InputDescriptor(logicalInputClass.getName())); + /** + * Utility method to create a stripped down configuration for the MR partitioner. + * + * @param partitionerClassName + * the real MR partitioner class name + * @param baseConf + * a base configuration to extract relevant properties + * @return + */ + private Configuration createPartitionerConf(String partitionerClassName, + Configuration baseConf) { + Configuration partitionerConf = new Configuration(false); + partitionerConf.set("mapred.partitioner.class", partitionerClassName); + if (baseConf.get("mapreduce.totalorderpartitioner.path") != null) { + partitionerConf.set("mapreduce.totalorderpartitioner.path", + baseConf.get("mapreduce.totalorderpartitioner.path")); } - - return edgeProperty; + return partitionerConf; } /* @@ -422,9 +408,6 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, mapWork); - // Tez ask us to call this even if there's no preceding vertex - MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null); - // finally create the vertex Vertex map = null; @@ -526,6 +509,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, private JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceWork) { JobConf conf = new JobConf(baseConf); + // Is this required ? conf.set("mapred.reducer.class", ExecReducer.class.getName()); boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf, @@ -549,9 +533,6 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork, // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, reduceWork); - // Call once here, will be updated when we find edges - MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null); - // create the vertex Vertex reducer = new Vertex(reduceWork.getName(), new ProcessorDescriptor(ReduceTezProcessor.class.getName()). diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 93793d6..e1f5630 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; @@ -247,16 +246,14 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, } VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray); + // For a vertex group, all Outputs use the same Key-class, Val-class and partitioner. + // Pick any one source vertex to figure out the Edge configuration. + JobConf parentConf = workToConf.get(unionWorkItems.get(0)); + // now hook up the children for (BaseWork v: children) { - // need to pairwise patch up the configuration of the vertices - for (BaseWork part: unionWorkItems) { - utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part), - workToConf.get(v), workToVertex.get(v)); - } - // finally we can create the grouped edge - GroupInputEdge e = utils.createEdge(group, workToConf.get(v), + GroupInputEdge e = utils.createEdge(group, parentConf, workToVertex.get(v), work.getEdgeProperty(w, v)); dag.addEdge(e); @@ -279,7 +276,7 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, TezEdgeProperty edgeProp = work.getEdgeProperty(w, v); - e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeProp); + e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp); dag.addEdge(e); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index f4bd834..c9909a3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -102,7 +102,7 @@ public Vertex answer(InvocationOnMock invocation) throws Throwable { } }); - when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(JobConf.class), + when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(Vertex.class), any(TezEdgeProperty.class))).thenAnswer(new Answer() { @Override