diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java index 583c113..7d3335a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java @@ -53,16 +53,9 @@ public DefaultGraphWalker(Dispatcher disp) { } /** - * @return the toWalk - */ - public List getToWalk() { - return toWalk; - } - - /** * @return the doneList */ - public Set getDispatchedList() { + protected Set getDispatchedList() { return retMap.keySet(); } @@ -121,7 +114,7 @@ public void startWalking(Collection startNodes, * current operator in the graph * @throws SemanticException */ - public void walk(Node nd) throws SemanticException { + protected void walk(Node nd) throws SemanticException { if (opStack.empty() || nd != opStack.peek()) { opStack.push(nd); } @@ -136,8 +129,8 @@ public void walk(Node nd) throws SemanticException { return; } // add children, self to the front of the queue in that order - getToWalk().add(0, nd); - getToWalk().removeAll(nd.getChildren()); - getToWalk().addAll(0, nd.getChildren()); + toWalk.add(0, nd); + toWalk.removeAll(nd.getChildren()); + toWalk.addAll(0, nd.getChildren()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java index a2db3b5..0c69103 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java @@ -54,8 +54,8 @@ protected boolean allParentsDispatched(Node nd) { @SuppressWarnings("unchecked") protected void addAllParents(Node nd) { Operator op = (Operator) nd; - getToWalk().removeAll(op.getParentOperators()); - getToWalk().addAll(0, op.getParentOperators()); + toWalk.removeAll(op.getParentOperators()); + toWalk.addAll(0, op.getParentOperators()); } /** @@ -66,21 +66,21 @@ protected void addAllParents(Node nd) { * @throws SemanticException */ @Override - public void walk(Node nd) throws SemanticException { + protected void walk(Node nd) throws SemanticException { if (opStack.empty() || nd != opStack.peek()) { opStack.push(nd); } if (allParentsDispatched(nd)) { // all children are done or no need to walk the children if (!getDispatchedList().contains(nd)) { - getToWalk().addAll(nd.getChildren()); + toWalk.addAll(nd.getChildren()); dispatch(nd, opStack); } opStack.pop(); return; } // add children, self to the front of the queue in that order - getToWalk().add(0, nd); + toWalk.add(0, nd); addAllParents(nd); } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lib/LevelOrderWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/lib/LevelOrderWalker.java new file mode 100644 index 0000000..cf05d5f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/lib/LevelOrderWalker.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lib; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Stack; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +/** + * This is a level-wise walker implementation which dispatches the node in the order + * that the node will only get dispatched after all the parents are dispatched. + * + * Each node will be accessed once while it could be dispatched multiple times. + * e.g., for a lineage generator with operator tree, 2 levels of current node's + * ancestors need to keep in the operator stack. + * FIL(2) FIL(4) + * | | + * RS(3) RS(5) + * \ / + * JOIN(7) + * The join lineage needs to be called twice for JOIN(7) node with different operator + * ancestors. + */ +public class LevelOrderWalker extends DefaultGraphWalker { + // Only specified nodes of these types will be walked. + // Empty set means all the nodes will be walked. + private HashSet> nodeTypes = new HashSet>(); + + // How many levels of ancestors to keep in the stack during dispatching + private final int numLevels; + + /** + * Constructor with keeping all the ancestors in the operator stack during dispatching. + * @param disp Dispatcher to call for each op encountered + */ + public LevelOrderWalker(Dispatcher disp) { + super(disp); + this.numLevels = Integer.MAX_VALUE; + } + + /** + * Constructor with specified number of ancestor levels to keep in the operator + * stack during dispatching. + * @param disp Dispatcher to call for each op encountered + * @param numLevels Number of ancestor levels + */ + public LevelOrderWalker(Dispatcher disp, int numLevels) { + super(disp); + this.numLevels = numLevels; + } + + @SuppressWarnings("unchecked") + public void setNodeTypes(Class ...nodeTypes) { + this.nodeTypes.addAll(Arrays.asList(nodeTypes)); + } + + /** + * starting point for walking. + * + * @throws SemanticException + */ + @SuppressWarnings("unchecked") + @Override + public void startWalking(Collection startNodes, + HashMap nodeOutput) throws SemanticException { + toWalk.addAll(startNodes); + + // Starting from the startNodes, add the children whose parents have been + // included in the list. + HashSet addedNodes = new HashSet(); + for (Node node : startNodes) { + addedNodes.add(node); + } + int index = 0; + while(index < toWalk.size()) { + if (toWalk.get(index).getChildren() != null) { + for(Node child : toWalk.get(index).getChildren()) { + Operator childOP = + (Operator) child; + + if (!addedNodes.contains(child) && + (childOP.getParentOperators() == null || + addedNodes.containsAll(childOP.getParentOperators()))) { + toWalk.add(child); + addedNodes.add(child); + } + } + } + ++index; + } + + for(Node nd : toWalk) { + if (!nodeTypes.isEmpty() && !nodeTypes.contains(nd.getClass())) { + continue; + } + + opStack.clear(); + opStack.push(nd); + walk(nd, 0, opStack); + if (nodeOutput != null && getDispatchedList().contains(nd)) { + nodeOutput.put(nd, retMap.get(nd)); + } + } + } + + /** + * Enumerate numLevels of ancestors by putting them in the stack and dispatch + * the current node. + * @param nd current operator in the ancestor tree + * @param level how many level of ancestors included in the stack + * @param stack operator stack + * @throws SemanticException + */ + @SuppressWarnings("unchecked") + private void walk(Node nd, int level, Stack stack) throws SemanticException { + List> parents = + ((Operator)nd).getParentOperators(); + + if (level >= numLevels || parents == null || parents.isEmpty()) { + dispatch(stack.peek(), stack); + return; + } + + for(Node parent : parents) { + stack.add(0, parent); + walk(parent, level+1, stack); + stack.remove(0); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java index f22694b..8d8dab8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java @@ -51,7 +51,7 @@ public PreOrderWalker(Dispatcher disp) { * @throws SemanticException */ @Override - public void walk(Node nd) throws SemanticException { + protected void walk(Node nd) throws SemanticException { opStack.push(nd); dispatch(nd, opStack); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java index 9a45458..561b8fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java @@ -150,7 +150,7 @@ public ColumnPrunerWalker(Dispatcher disp) { * Walk the given operator. */ @Override - public void walk(Node nd) throws SemanticException { + protected void walk(Node nd) throws SemanticException { boolean walkChildren = true; opStack.push(nd); @@ -174,10 +174,10 @@ public void walk(Node nd) throws SemanticException { return; } // move all the children to the front of queue - getToWalk().removeAll(nd.getChildren()); - getToWalk().addAll(0, nd.getChildren()); + toWalk.removeAll(nd.getChildren()); + toWalk.addAll(0, nd.getChildren()); // add self to the end of the queue - getToWalk().add(nd); + toWalk.add(nd); opStack.pop(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java index dd53ced..aacded6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java @@ -140,7 +140,7 @@ public ConstantPropagateWalker(Dispatcher disp) { } @Override - public void walk(Node nd) throws SemanticException { + protected void walk(Node nd) throws SemanticException { List parents = ((Operator) nd).getParentOperators(); if ((parents == null) @@ -151,17 +151,17 @@ public void walk(Node nd) throws SemanticException { dispatch(nd, opStack); opStack.pop(); } else { - getToWalk().removeAll(parents); - getToWalk().add(0, nd); - getToWalk().addAll(0, parents); + toWalk.removeAll(parents); + toWalk.add(0, nd); + toWalk.addAll(0, parents); return; } // move all the children to the front of queue List children = nd.getChildren(); if (children != null) { - getToWalk().removeAll(children); - getToWalk().addAll(children); + toWalk.removeAll(children); + toWalk.addAll(children); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java index 9a5cf55..82e26d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java @@ -37,9 +37,9 @@ import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.LevelOrderWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; -import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.optimizer.Transform; @@ -94,7 +94,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // The dispatcher fires the processor corresponding to the closest matching rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(OpProcFactory.getDefaultProc(), opRules, lCtx); - GraphWalker ogw = new PreOrderWalker(disp); + GraphWalker ogw = new LevelOrderWalker(disp, 2); // Create a list of topop nodes ArrayList topNodes = new ArrayList(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java index c304e97..0398115 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java @@ -35,9 +35,9 @@ import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.LevelOrderWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; -import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.optimizer.Transform; @@ -55,7 +55,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { AnnotateOpTraitsProcCtx annotateCtx = new AnnotateOpTraitsProcCtx(pctx); - // create a walker which walks the tree in a DFS manner while maintaining the + // create a walker which walks the tree in a BFS manner while maintaining the // operator stack. The dispatcher generates the plan from the operator tree Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("TS", TableScanOperator.getOperatorName() + "%"), @@ -83,7 +83,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(OpTraitsRulesProcFactory.getDefaultRule(), opRules, annotateCtx); - GraphWalker ogw = new PreOrderWalker(disp); + GraphWalker ogw = new LevelOrderWalker(disp, 0); // Create a list of topop nodes ArrayList topNodes = new ArrayList(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/AnnotateWithStatistics.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/AnnotateWithStatistics.java index 4aeeff2..c8b3545 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/AnnotateWithStatistics.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/AnnotateWithStatistics.java @@ -33,9 +33,9 @@ import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.LevelOrderWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; -import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.optimizer.Transform; @@ -48,7 +48,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { AnnotateStatsProcCtx aspCtx = new AnnotateStatsProcCtx(pctx); - // create a walker which walks the tree in a DFS manner while maintaining the + // create a walker which walks the tree in a BFS manner while maintaining the // operator stack. The dispatcher generates the plan from the operator tree Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("TS", TableScanOperator.getOperatorName() + "%"), @@ -70,7 +70,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(StatsRulesProcFactory.getDefaultRule(), opRules, aspCtx); - GraphWalker ogw = new PreOrderWalker(disp); + GraphWalker ogw = new LevelOrderWalker(disp, 0); // Create a list of topop nodes ArrayList topNodes = new ArrayList(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcessor.java index 9937343..b1286e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcessor.java @@ -31,9 +31,9 @@ import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.LevelOrderWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; -import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.optimizer.Transform; @@ -66,7 +66,7 @@ public UnionProcessor() { * the current parse context */ public ParseContext transform(ParseContext pCtx) throws SemanticException { - // create a walker which walks the tree in a DFS manner while maintaining + // create a walker which walks the tree in a BFS manner while maintaining // the operator stack. Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("R1", @@ -85,7 +85,8 @@ public ParseContext transform(ParseContext pCtx) throws SemanticException { uCtx.setParseContext(pCtx); Dispatcher disp = new DefaultRuleDispatcher(UnionProcFactory.getNoUnion(), opRules, uCtx); - GraphWalker ogw = new PreOrderWalker(disp); + LevelOrderWalker ogw = new LevelOrderWalker(disp); + ogw.setNodeTypes(UnionOperator.class); // Create a list of topop nodes ArrayList topNodes = new ArrayList(); @@ -109,7 +110,8 @@ public ParseContext transform(ParseContext pCtx) throws SemanticException { UnionProcFactory.getUnionNoProcessFile()); disp = new DefaultRuleDispatcher(UnionProcFactory.getNoUnion(), opRules, uCtx); - ogw = new PreOrderWalker(disp); + ogw = new LevelOrderWalker(disp); + ogw.setNodeTypes(FileSinkOperator.class); // Create a list of topop nodes topNodes.clear(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenMapRedWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenMapRedWalker.java index 9583a1b..c1056ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenMapRedWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenMapRedWalker.java @@ -46,7 +46,7 @@ public GenMapRedWalker(Dispatcher disp) { * operator being walked */ @Override - public void walk(Node nd) throws SemanticException { + protected void walk(Node nd) throws SemanticException { List children = nd.getChildren(); // maintain the stack of operators encountered diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java index 2d8c8b2..8927579 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java @@ -82,7 +82,7 @@ public void startWalking(Collection startNodes, * @param nd operator being walked */ @Override - public void walk(Node nd) throws SemanticException { + protected void walk(Node nd) throws SemanticException { List children = nd.getChildren(); // maintain the stack of operators encountered diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java index 2f63c1a..3187497 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java @@ -46,7 +46,7 @@ public TezWalker(Dispatcher disp) { * operator being walked */ @Override - public void walk(Node nd) throws SemanticException { + protected void walk(Node nd) throws SemanticException { List children = nd.getChildren(); // maintain the stack of operators encountered diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java index e31c025..4450079 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java @@ -82,7 +82,7 @@ public void startWalking(Collection startNodes, HashMap node * @param nd operator being walked */ @Override - public void walk(Node nd) throws SemanticException { + protected void walk(Node nd) throws SemanticException { List children = nd.getChildren(); // maintain the stack of operators encountered diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java b/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java index ea1f713..fb76d5d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java @@ -37,10 +37,10 @@ import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.LevelOrderWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.optimizer.Transform; @@ -74,7 +74,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // rule and passes the context along TransitiveContext context = new TransitiveContext(); Dispatcher disp = new DefaultRuleDispatcher(null, opRules, context); - GraphWalker ogw = new PreOrderWalker(disp); + GraphWalker ogw = new LevelOrderWalker(disp, 2); // Create a list of topop nodes List topNodes = new ArrayList();