diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 489bd25..6e9c4cd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -70,11 +70,12 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy; import org.apache.tez.runtime.library.output.OnFileSortedOutput; +import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; -import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.processor.map.MapProcessor; import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; @@ -160,17 +161,21 @@ public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf)); DataMovementType dataMovementType; + Class logicalInputClass; + Class logicalOutputClass; + switch (edgeType) { case BROADCAST_EDGE: dataMovementType = DataMovementType.BROADCAST; + logicalOutputClass = OnFileUnorderedKVOutput.class; + logicalInputClass = MRInput.class; break; case SIMPLE_EDGE: - dataMovementType = DataMovementType.SCATTER_GATHER; - break; - default: dataMovementType = DataMovementType.SCATTER_GATHER; + logicalOutputClass = OnFileSortedOutput.class; + logicalInputClass = ShuffledMergedInputLegacy.class; break; } @@ -178,15 +183,15 @@ public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, new EdgeProperty(dataMovementType, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, - new OutputDescriptor(OnFileSortedOutput.class.getName()), - new InputDescriptor(ShuffledMergedInputLegacy.class.getName())); + new OutputDescriptor(logicalOutputClass.getName()), + new InputDescriptor(logicalInputClass.getName())); return new Edge(v, w, edgeProperty); } /* * Helper function to create Vertex from MapWork. */ - private static Vertex createVertex(JobConf conf, MapWork mapWork, int seqNo, + private static Vertex createVertex(JobConf conf, MapWork mapWork, LocalResource appJarLr, List additionalLr, FileSystem fs, Path mrScratchDir, Context ctx) throws Exception { @@ -214,7 +219,7 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, int seqNo, Utilities.setInputPaths(conf, inputPaths); InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, - new Path(tezDir, ""+seqNo)); + new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_"))); // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, mapWork); @@ -226,7 +231,7 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, int seqNo, Vertex map = null; byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf); if (inputSplitInfo.getNumTasks() != 0) { - map = new Vertex("Map "+seqNo, + map = new Vertex(mapWork.getName(), new ProcessorDescriptor(TezProcessor.class.getName()). setUserPayload(serializedConf), inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(conf)); @@ -234,8 +239,12 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, int seqNo, MRHelpers.updateEnvironmentForMRTasks(conf, environment, true); map.setTaskEnvironment(environment); map.setJavaOpts(MRHelpers.getMapJavaOpts(conf)); - map.addInput("in_"+seqNo, - new InputDescriptor(MRInputLegacy.class.getName()). + + assert mapWork.getAliasToWork().keySet().size() == 1; + + String alias = mapWork.getAliasToWork().keySet().iterator().next(); + map.addInput(alias, + new InputDescriptor(MRInput.class.getName()). setUserPayload(serializedConf)); map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints()); @@ -285,7 +294,7 @@ private static JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceW /* * Helper function to create Vertex for given ReduceWork. */ - private static Vertex createVertex(JobConf conf, ReduceWork reduceWork, int seqNo, + private static Vertex createVertex(JobConf conf, ReduceWork reduceWork, LocalResource appJarLr, List additionalLr, FileSystem fs, Path mrScratchDir, Context ctx) throws Exception { @@ -302,7 +311,7 @@ private static Vertex createVertex(JobConf conf, ReduceWork reduceWork, int seqN MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null); // create the vertex - Vertex reducer = new Vertex("Reducer "+seqNo, + Vertex reducer = new Vertex(reduceWork.getName(), new ProcessorDescriptor(ReduceProcessor.class.getName()). setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), reduceWork.getNumReduceTasks(), MRHelpers.getReduceResource(conf)); @@ -576,7 +585,6 @@ public static JobConf initializeVertexConf(JobConf conf, BaseWork work) { * @param work The instance of BaseWork representing the actual work to be performed * by this vertex. * @param scratchDir HDFS scratch dir for this execution unit. - * @param seqNo Unique number for this DAG. Used to name the vertex. * @param appJarLr Local resource for hive-exec. * @param additionalLr * @param fileSystem FS corresponding to scratchDir and LocalResources @@ -584,17 +592,17 @@ public static JobConf initializeVertexConf(JobConf conf, BaseWork work) { * @return Vertex */ public static Vertex createVertex(JobConf conf, BaseWork work, - Path scratchDir, int seqNo, LocalResource appJarLr, List additionalLr, + Path scratchDir, LocalResource appJarLr, List additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren) throws Exception { Vertex v = null; // simply dispatch the call to the right method for the actual (sub-) type of // BaseWork. if (work instanceof MapWork) { - v = createVertex(conf, (MapWork) work, seqNo, appJarLr, + v = createVertex(conf, (MapWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx); } else if (work instanceof ReduceWork) { - v = createVertex(conf, (ReduceWork) work, seqNo, appJarLr, + v = createVertex(conf, (ReduceWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx); } else { assert false; @@ -603,7 +611,7 @@ public static Vertex createVertex(JobConf conf, BaseWork work, // final vertices need to have at least one output if (!hasChildren) { - v.addOutput("out_"+seqNo, + v.addOutput("out_"+work.getName(), new OutputDescriptor(MROutput.class.getName()) .setUserPayload(MRHelpers.createUserPayloadFromConf(conf))); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 798f4a3..e8a35ed 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -157,7 +157,6 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, Utilities.abbreviate(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYSTRING), HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVEJOBNAMELENGTH))); - int i = ws.size(); for (BaseWork w: ws) { boolean isFinal = work.getLeaves().contains(w); @@ -165,7 +164,7 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, // translate work to vertex JobConf wxConf = DagUtils.initializeVertexConf(conf, w); Vertex wx = DagUtils.createVertex(wxConf, w, tezDir, - i--, appJarLr, additionalLr, fs, ctx, !isFinal); + appJarLr, additionalLr, fs, ctx, !isFinal); dag.addVertex(wx); workToVertex.put(w, wx); workToConf.put(w, wxConf); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 59ae774..9233a9f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -47,6 +47,8 @@ static final private Log LOG = LogFactory.getLog(GenTezWork.class.getName()); + private int sequenceNumber = 0; + @SuppressWarnings("unchecked") @Override public Object process(Node nd, Stack stack, @@ -85,8 +87,8 @@ public Object process(Node nd, Stack stack, BaseWork work; if (context.preceedingWork == null) { assert root.getParentOperators().isEmpty(); - LOG.debug("Adding map work for " + root); - MapWork mapWork = new MapWork(); + MapWork mapWork = new MapWork("Map "+ (++sequenceNumber)); + LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root); // map work starts with table scan operators assert root instanceof TableScanOperator; @@ -98,8 +100,8 @@ public Object process(Node nd, Stack stack, work = mapWork; } else { assert !root.getParentOperators().isEmpty(); - LOG.debug("Adding reduce work for " + root); - ReduceWork reduceWork = new ReduceWork(); + ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber)); + LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root); reduceWork.setReducer(root); reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork)); @@ -115,10 +117,9 @@ public Object process(Node nd, Stack stack, // need to fill in information about the key and value in the reducer GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink); - // needs to be fixed in HIVE-5052. This should be driven off of stats - if (reduceWork.getNumReduceTasks() <= 0) { - reduceWork.setNumReduceTasks(1); - } + // remember which parent belongs to which tag + reduceWork.getTagToInput().put(reduceSink.getConf().getTag(), + context.preceedingWork.getName()); tezWork.add(reduceWork); tezWork.connect( @@ -150,6 +151,9 @@ public Object process(Node nd, Stack stack, ReduceSinkOperator rs = (ReduceSinkOperator) operator; ReduceWork rWork = (ReduceWork) followingWork; GenMapRedUtils.setKeyAndValueDesc(rWork, rs); + + // remember which parent belongs to which tag + rWork.getTagToInput().put(rs.getConf().getTag(), work.getName()); // add dependency between the two work items tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator), diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 1632988..fdc072d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -113,6 +113,7 @@ protected void generateTaskTree(List> rootTasks, Pa throws SemanticException { ParseContext tempParseContext = getParseContext(pCtx, rootTasks); + GenTezWork genTezWork = new GenTezWork(); // Sequence of TableScan operators to be walked Deque> deque = new LinkedList>(); @@ -127,13 +128,13 @@ protected void generateTaskTree(List> rootTasks, Pa Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp(new String("Split Work - ReduceSink"), ReduceSinkOperator.getOperatorName() + "%"), - new GenTezWork()); + genTezWork); opRules.put(new RuleRegExp(new String("No more walking on ReduceSink-MapJoin"), ReduceSinkOperator.getOperatorName() + "%" + MapJoinOperator.getOperatorName() + "%"), new ReduceSinkMapJoinProc()); opRules.put(new RuleRegExp(new String("Split Work - FileSink"), FileSinkOperator.getOperatorName() + "%"), - new GenTezWork()); + genTezWork); // The dispatcher fires the processor corresponding to the closest matching diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 2e09de9..8654da6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -32,8 +32,16 @@ @SuppressWarnings({"serial", "deprecation"}) public abstract class BaseWork extends AbstractOperatorDesc { + public BaseWork() {} + + public BaseWork(String name) { + setName(name); + } + private boolean gatheringStats; + private String name; + public void setGatheringStats(boolean gatherStats) { this.gatheringStats = gatherStats; } @@ -42,6 +50,14 @@ public boolean isGatheringStats() { return this.gatheringStats; } + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + protected abstract List> getAllRootOperators(); public List> getAllOperators() { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index a72ec8b..957d534 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -115,7 +115,10 @@ private Map> scratchColumnVectorTypes = null; private boolean vectorMode = false; - public MapWork() { + public MapWork() {} + + public MapWork(String name) { + super(name); } @Explain(displayName = "Path -> Alias", normalExplain = false) diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index a6d9c55..03edcc1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hive.ql.plan; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +46,12 @@ @SuppressWarnings({"serial", "deprecation"}) public class ReduceWork extends BaseWork { + public ReduceWork() {} + + public ReduceWork(String name) { + super(name); + } + private static transient final Log LOG = LogFactory.getLog(ReduceWork.class); // schema of the map-reduce 'key' object - this is homogeneous @@ -63,6 +71,8 @@ // not (e.g.: group by) private boolean needsTagging; + private Map tagToInput = new HashMap(); + /** * If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing * to keySerializeInfo of the ReduceSink @@ -103,6 +113,14 @@ public void setNeedsTagging(boolean needsTagging) { this.needsTagging = needsTagging; } + public void setTagToInput(final Map tagToInput) { + this.tagToInput = tagToInput; + } + + public Map getTagToInput() { + return tagToInput; + } + @Override protected List> getAllRootOperators() { ArrayList> opList = new ArrayList>();