diff --git ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java index 69ea4a6..7d5983a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java +++ ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java @@ -36,9 +36,9 @@ public class DefaultGraphWalker implements GraphWalker { protected Stack opStack; - private final List toWalk = new ArrayList(); - private final HashMap retMap = new HashMap(); - private final Dispatcher dispatcher; + protected final List toWalk = new ArrayList(); + protected final HashMap retMap = new HashMap(); + protected final Dispatcher dispatcher; /** * Constructor. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index 3c3dcc0..f4b6016 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -19,18 +19,16 @@ package org.apache.hadoop.hive.ql.parse; import java.io.Serializable; -import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Stack; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.tez.TezTask; @@ -61,10 +59,6 @@ public final Set inputs; public final Set outputs; - // rootOperators are all the table scan operators in sequence - // of traversal - public final Deque> rootOperators; - // holds the root of the operator tree we're currently processing // this could be a table scan, but also a join, ptf, etc (i.e.: // first operator of a reduce task. @@ -98,6 +92,9 @@ // a map that maintains operator (file-sink or reduce-sink) to work mapping public final Map, BaseWork> operatorWorkMap; + // a map to keep track of which root generated which work + public final Map, BaseWork> rootToWorkMap; + // we need to keep the original list of operators in the map join to know // what position in the mapjoin the different parent work items will have. public final Map>> mapJoinParentMap; @@ -108,19 +105,10 @@ // used to group dependent tasks for multi table inserts public final DependencyCollectionTask dependencyTask; - // root of last multi child operator encountered - public Stack> lastRootOfMultiChildOperator; - - // branches of current multi-child operator - public Stack currentBranchCount; - - // work generated for last multi-child operator - public Stack lastWorkForMultiChildOperator; - @SuppressWarnings("unchecked") public GenTezProcContext(HiveConf conf, ParseContext parseContext, List> moveTask, List> rootTasks, - Set inputs, Set outputs, Deque> rootOperators) { + Set inputs, Set outputs) { this.conf = conf; this.parseContext = parseContext; @@ -130,16 +118,15 @@ public GenTezProcContext(HiveConf conf, ParseContext parseContext, this.outputs = outputs; this.currentTask = (TezTask) TaskFactory.get(new TezWork(), conf); this.leafOperatorToFollowingWork = new HashMap, BaseWork>(); - this.rootOperators = rootOperators; this.linkOpWithWorkMap = new HashMap, List>(); this.linkWorkWithReduceSinkMap = new HashMap>(); this.operatorWorkMap = new HashMap, BaseWork>(); + this.rootToWorkMap = new HashMap, BaseWork>(); this.mapJoinParentMap = new HashMap>>(); this.linkChildOpWithDummyOp = new HashMap, List>>(); this.dependencyTask = (DependencyCollectionTask) TaskFactory.get(new DependencyCollectionWork(), conf); - this.lastRootOfMultiChildOperator = new Stack>(); - this.currentBranchCount = new Stack(); - this.lastWorkForMultiChildOperator = new Stack(); + + rootTasks.add(currentTask); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 724ed8f..f4e973a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -48,9 +48,9 @@ static final private Log LOG = LogFactory.getLog(GenTezWork.class.getName()); + // sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...) private int sequenceNumber = 0; - @SuppressWarnings("unchecked") @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, Object... nodeOutputs) @@ -62,130 +62,39 @@ public Object process(Node nd, Stack stack, // a new vertex. Operator operator = (Operator) nd; - TezWork tezWork = context.currentTask.getWork(); - if (!context.rootTasks.contains(context.currentTask)) { - context.rootTasks.add(context.currentTask); - } - // root is the start of the operator pipeline we're currently // packing into a vertex, typically a table scan, union or join Operator root = context.currentRootOperator; - if (root == null) { - // null means that we're starting with a new table scan - // the graph walker walks the rootOperators in the same - // order so we can just take the next - context.preceedingWork = null; - - // if there are branches remaining we can't pop the next - // root operator yet. - if (context.currentBranchCount.isEmpty() - || (!context.lastWorkForMultiChildOperator.isEmpty() - && context.lastWorkForMultiChildOperator.peek() == null)) { - root = context.rootOperators.pop(); - } - } LOG.debug("Root operator: " + root); LOG.debug("Leaf operator: " + operator); + TezWork tezWork = context.currentTask.getWork(); + // Right now the work graph is pretty simple. If there is no // Preceding work we have a root and will generate a map // vertex. If there is a preceding work we will generate // a reduce vertex BaseWork work; - if (context.preceedingWork == null) { - if (root == null) { - // this is the multi-insert case. we need to reuse the last - // table scan work. - root = context.lastRootOfMultiChildOperator.peek(); - work = context.lastWorkForMultiChildOperator.peek(); - LOG.debug("Visiting additional branch in: "+root); - - } else { - assert root.getParentOperators().isEmpty(); - MapWork mapWork = new MapWork("Map "+ (++sequenceNumber)); - LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root); - - // map work starts with table scan operators - assert root instanceof TableScanOperator; - String alias = ((TableScanOperator)root).getConf().getAlias(); - - GenMapRedUtils.setMapWork(mapWork, context.parseContext, - context.inputs, null, root, alias, context.conf, false); - tezWork.add(mapWork); - work = mapWork; - - // remember this table scan and work item. this is needed for multiple - // insert statements where multiple operator pipelines hang of a single - // table scan - if (!context.lastWorkForMultiChildOperator.isEmpty() - && context.lastWorkForMultiChildOperator.peek() == null) { - LOG.debug("Capturing current work for 'multiple branches' case"); - context.lastWorkForMultiChildOperator.pop(); - context.lastWorkForMultiChildOperator.push(work); - } - } - - if (!context.currentBranchCount.isEmpty()) { - // we've handled one branch. Adjust the counts. - int branches = context.currentBranchCount.pop(); - if (--branches != 0) { - LOG.debug("Remaining branches: "+branches); - context.currentBranchCount.push(branches); - } else { - LOG.debug("No more remaining branches."); - context.lastRootOfMultiChildOperator.pop(); - context.lastWorkForMultiChildOperator.pop(); - } - } - + if (context.rootToWorkMap.containsKey(root)) { + // having seen the root operator before means there was a branch in the + // operator graph. There's typically two reasons for that: a) mux/demux + // b) multi insert. Mux/Demux will hit the same leaf again, multi insert + // will result into a vertex with multiple FS or RS operators. + + // At this point we don't have to do anything special in this case. Just + // run through the regular paces w/o creating a new task. + work = context.rootToWorkMap.get(root); } else { - assert !root.getParentOperators().isEmpty(); - ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber)); - LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root); - reduceWork.setReducer(root); - reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork)); - - // All parents should be reduce sinks. We pick the one we just walked - // to choose the number of reducers. In the join/union case they will - // all be -1. In sort/order case where it matters there will be only - // one parent. - assert context.parentOfRoot instanceof ReduceSinkOperator; - ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot; - - LOG.debug("Setting up reduce sink: " + reduceSink); - - reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); - - // need to fill in information about the key and value in the reducer - GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink); - - // remember which parent belongs to which tag - reduceWork.getTagToInput().put(reduceSink.getConf().getTag(), - context.preceedingWork.getName()); - - // remember the output name of the reduce sink - reduceSink.getConf().setOutputName(reduceWork.getName()); - - tezWork.add(reduceWork); - tezWork.connect( - context.preceedingWork, - reduceWork, EdgeType.SIMPLE_EDGE); - - work = reduceWork; - - // remember this work item. this is needed for multiple - // insert statements where multiple operator pipelines hang of a forward - // operator - if (!context.lastWorkForMultiChildOperator.isEmpty() - && context.lastWorkForMultiChildOperator.peek() == null) { - LOG.debug("Capturing current work for 'multiple branches' case"); - context.lastWorkForMultiChildOperator.pop(); - context.lastWorkForMultiChildOperator.push(work); + // create a new vertex + if (context.preceedingWork == null) { + work = createMapWork(context, root, tezWork); + } else { + work = createReduceWork(context, root, tezWork); } + context.rootToWorkMap.put(root, work); } - // We're scanning the operator from table scan to final file sink. // We're scanning a tree from roots to leaf (this is not technically // correct, demux and mux operators might form a diamond shape, but // we will only scan one path and ignore the others, because the @@ -233,20 +142,15 @@ public Object process(Node nd, Stack stack, context.parentOfRoot = operator; context.currentRootOperator = operator.getChildOperators().get(0); context.preceedingWork = work; - } else { - LOG.debug("Leaf operator - resetting context: " + context.currentRootOperator); - context.parentOfRoot = null; - context.currentRootOperator = null; - context.preceedingWork = null; } /* * this happens in case of map join operations. * The tree looks like this: * - * RS <--- we are here perhaps - * | - * MapJoin + * RS <--- we are here perhaps + * | + * MapJoin * / \ * RS TS * / @@ -266,10 +170,10 @@ public Object process(Node nd, Stack stack, } for (BaseWork parentWork : linkWorkList) { tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE); - + // need to set up output name for reduce sink not that we know the name // of the downstream work - for (ReduceSinkOperator r: + for (ReduceSinkOperator r: context.linkWorkWithReduceSinkMap.get(parentWork)) { r.getConf().setOutputName(work.getName()); } @@ -279,4 +183,65 @@ public Object process(Node nd, Stack stack, return null; } + private ReduceWork createReduceWork(GenTezProcContext context, Operator root, + TezWork tezWork) { + assert !root.getParentOperators().isEmpty(); + ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber)); + LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root); + reduceWork.setReducer(root); + reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork)); + + // All parents should be reduce sinks. We pick the one we just walked + // to choose the number of reducers. In the join/union case they will + // all be -1. In sort/order case where it matters there will be only + // one parent. + assert context.parentOfRoot instanceof ReduceSinkOperator; + ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot; + + reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); + + setupReduceSink(context, reduceWork, reduceSink); + + tezWork.add(reduceWork); + tezWork.connect( + context.preceedingWork, + reduceWork, EdgeType.SIMPLE_EDGE); + + return reduceWork; + } + + private void setupReduceSink(GenTezProcContext context, ReduceWork reduceWork, + ReduceSinkOperator reduceSink) { + + LOG.debug("Setting up reduce sink: " + reduceSink + + " with following reduce work: " + reduceWork.getName()); + + // need to fill in information about the key and value in the reducer + GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink); + + // remember which parent belongs to which tag + reduceWork.getTagToInput().put(reduceSink.getConf().getTag(), + context.preceedingWork.getName()); + + // remember the output name of the reduce sink + reduceSink.getConf().setOutputName(reduceWork.getName()); + } + + private MapWork createMapWork(GenTezProcContext context, Operator root, + TezWork tezWork) throws SemanticException { + assert root.getParentOperators().isEmpty(); + MapWork mapWork = new MapWork("Map "+ (++sequenceNumber)); + LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root); + + // map work starts with table scan operators + assert root instanceof TableScanOperator; + String alias = ((TableScanOperator)root).getConf().getAlias(); + + GenMapRedUtils.setMapWork(mapWork, context.parseContext, + context.inputs, null, root, alias, context.conf, false); + tezWork.add(mapWork); + + return mapWork; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java new file mode 100644 index 0000000..08fd61e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +/** + * Walks the operator tree in DFS fashion. + */ +public class GenTezWorkWalker extends DefaultGraphWalker { + + private final GenTezProcContext ctx; + + /** + * constructor of the walker - the dispatcher is passed. + * + * @param disp the dispatcher to be called for each node visited + * @param ctx the context where we'll set the current root operator + * + */ + public GenTezWorkWalker(Dispatcher disp, GenTezProcContext ctx) { + super(disp); + this.ctx = ctx; + } + + private void setRoot(Node nd) { + ctx.currentRootOperator = (Operator) nd; + ctx.preceedingWork = null; + ctx.parentOfRoot = null; + } + + /** + * starting point for walking. + * + * @throws SemanticException + */ + @Override + public void startWalking(Collection startNodes, + HashMap nodeOutput) throws SemanticException { + toWalk.addAll(startNodes); + while (toWalk.size() > 0) { + Node nd = toWalk.remove(0); + setRoot(nd); + walk(nd); + if (nodeOutput != null) { + nodeOutput.put(nd, retMap.get(nd)); + } + } + } + + /** + * Walk the given operator. + * + * @param nd operator being walked + */ + @Override + public void walk(Node nd) throws SemanticException { + List children = nd.getChildren(); + + // maintain the stack of operators encountered + opStack.push(nd); + Boolean skip = dispatchAndReturn(nd, opStack); + + // save some positional state + Operator currentRoot = ctx.currentRootOperator; + Operator parentOfRoot = ctx.parentOfRoot; + BaseWork preceedingWork = ctx.preceedingWork; + + if (skip == null || !skip) { + // move all the children to the front of queue + for (Node ch : children) { + + // and restore the state before walking each child + ctx.currentRootOperator = currentRoot; + ctx.parentOfRoot = parentOfRoot; + ctx.preceedingWork = preceedingWork; + + walk(ch); + } + } + + // done with this operator + opStack.pop(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index bda9e92..08ce7d9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -34,12 +34,10 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.ForwardOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.tez.TezTask; @@ -113,12 +111,8 @@ protected void generateTaskTree(List> rootTasks, Pa ParseContext tempParseContext = getParseContext(pCtx, rootTasks); GenTezWork genTezWork = new GenTezWork(); - // Sequence of TableScan operators to be walked - Deque> deque = new LinkedList>(); - deque.addAll(pCtx.getTopOps().values()); - GenTezProcContext procCtx = new GenTezProcContext( - conf, tempParseContext, mvTask, rootTasks, inputs, outputs, deque); + conf, tempParseContext, mvTask, rootTasks, inputs, outputs); // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. @@ -147,46 +141,12 @@ public Object process(Node n, Stack s, } }); - opRules.put(new RuleRegExp("Setup table scan", - TableScanOperator.getOperatorName() + "%"), new NodeProcessor() - { - @Override - public Object process(Node n, Stack s, - NodeProcessorCtx procCtx, Object... os) throws SemanticException { - GenTezProcContext context = (GenTezProcContext) procCtx; - TableScanOperator tableScan = (TableScanOperator) n; - LOG.debug("TableScan operator ("+tableScan - +"). Number of branches: "+tableScan.getNumChild()); - context.lastRootOfMultiChildOperator.push(tableScan); - context.currentBranchCount.push(tableScan.getNumChild()); - context.lastWorkForMultiChildOperator.push(null); - return null; - } - }); - - opRules.put(new RuleRegExp("Handle Forward opertor", - ForwardOperator.getOperatorName() + "%"), new NodeProcessor() - { - @Override - public Object process(Node n, Stack s, - NodeProcessorCtx procCtx, Object... os) throws SemanticException { - GenTezProcContext context = (GenTezProcContext) procCtx; - ForwardOperator forward = (ForwardOperator) n; - LOG.debug("Forward operator ("+forward+ - "). Number of branches: "+forward.getNumChild()); - context.lastRootOfMultiChildOperator.push(context.currentRootOperator); - context.currentBranchCount.push(forward.getNumChild()); - context.lastWorkForMultiChildOperator.push(null); - return null; - } - }); - // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); List topNodes = new ArrayList(); topNodes.addAll(pCtx.getTopOps().values()); - GraphWalker ogw = new TezWalker(disp); + GraphWalker ogw = new GenTezWorkWalker(disp, procCtx); ogw.startWalking(topNodes, null); }