diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java index cf9131d..5f13370 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.lib; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.IdentityHashMap; @@ -37,8 +36,10 @@ public class DefaultGraphWalker implements GraphWalker { protected Stack opStack; - protected final List toWalk = new ArrayList(); + protected final Stack toWalk = new Stack(); protected final IdentityHashMap retMap = new IdentityHashMap(); + protected final HashMap nodeToWalkStatus = new HashMap(); + public enum WalkStatus {DISCOVERED, DISPATCHED}; protected final Dispatcher dispatcher; /** @@ -104,40 +105,61 @@ public void dispatch(Node nd, Stack ndStack) throws SemanticException { */ public void startWalking(Collection startNodes, HashMap nodeOutput) throws SemanticException { - toWalk.addAll(startNodes); - while (toWalk.size() > 0) { - Node nd = toWalk.remove(0); - walk(nd); - if (nodeOutput != null) { - nodeOutput.put(nd, retMap.get(nd)); + nodeToWalkStatus.clear(); + for (Node currStartNode : startNodes) { + // The current node is already done, continue the loop. + if (nodeToWalkStatus.containsKey(currStartNode)) { + continue; + } + // Push the current node to be walked. + toWalk.push(currStartNode); + // Do a postorder DFS with the currStartNode as the origin. + while (!toWalk.empty()) { + Node currWalkNode = toWalk.peek(); + walk(currWalkNode); + // If we have dispatched the current node, add the return value to the output. + if (nodeOutput != null && nodeToWalkStatus.get(currWalkNode) == WalkStatus.DISPATCHED) { + nodeOutput.put(currWalkNode, retMap.get(currWalkNode)); + } } } + nodeToWalkStatus.clear(); } /** - * walk the current operator and its descendants. + * walk the current operator and its descendants in a postorder fashion. * * @param nd * current operator in the graph * @throws SemanticException */ public void walk(Node nd) throws SemanticException { + // The current node is part of the operator stack. if (opStack.empty() || nd != opStack.peek()) { opStack.push(nd); } - - if ((nd.getChildren() == null) - || getDispatchedList().containsAll(nd.getChildren())) { - // all children are done or no need to walk the children - if (!getDispatchedList().contains(nd)) { - dispatch(nd, opStack); - } + // We can dispatch the current node if : + // 1. The current node does not have any children OR + // 2. The node has been previously visited, i.e. children would have been dispatched since + // we traverse in post-order fashion. + if (nd.getChildren() == null || + (nodeToWalkStatus.containsKey(nd) && + nodeToWalkStatus.get(nd) == WalkStatus.DISCOVERED)) { + dispatch(nd, opStack); opStack.pop(); + toWalk.pop(); + // We are done with the node and the subgraph originating from this node. + // Mark the node as dispatched. + nodeToWalkStatus.put(nd, WalkStatus.DISPATCHED); return; } - // add children, self to the front of the queue in that order - getToWalk().add(0, nd); - getToWalk().removeAll(nd.getChildren()); - getToWalk().addAll(0, nd.getChildren()); + // If we have not visited the child, add it to the toWalk stack. + for (Node child : nd.getChildren()) { + if (!nodeToWalkStatus.containsKey(child)) { + toWalk.push(child); + } + } + // We have discovered the node for the first time. Mark the node as discovered. + nodeToWalkStatus.put(nd, WalkStatus.DISCOVERED); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java index a2db3b5..8bc9860 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hive.ql.lib; +import java.util.Collection; +import java.util.HashMap; +import java.util.Set; + import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -43,8 +47,9 @@ protected boolean allParentsDispatched(Node nd) { if (op.getParentOperators() == null) { return true; } + Set dispatchedList = getDispatchedList(); for (Node pNode : op.getParentOperators()) { - if (!getDispatchedList().contains(pNode)) { + if (!dispatchedList.contains(pNode)) { return false; } } @@ -52,14 +57,59 @@ protected boolean allParentsDispatched(Node nd) { } @SuppressWarnings("unchecked") - protected void addAllParents(Node nd) { + protected void addAllUndispatchedParents(Node nd) { Operator op = (Operator) nd; - getToWalk().removeAll(op.getParentOperators()); - getToWalk().addAll(0, op.getParentOperators()); + Set dispatchedList = getDispatchedList(); + for (Node pNode : op.getParentOperators()) { + if (!dispatchedList.contains(pNode)) { + toWalk.push(pNode); + } + } + } + + protected void addAllUndispatchedChildren(Node nd) { + if (nd.getChildren() == null) { + return; + } + Set dispatchedList = getDispatchedList(); + for (Node cNode : nd.getChildren()) { + if (!dispatchedList.contains(cNode)) { + toWalk.push(cNode); + } + } } /** -* walk the current operator and its descendants. + * starting point for walking. + * + * @throws SemanticException + */ + @Override + public void startWalking(Collection startNodes, + HashMap nodeOutput) throws SemanticException { + for (Node currStartNode : startNodes) { + // The current node is already done, continue the loop. + if (getDispatchedList().contains(currStartNode)) { + continue; + } + // Push the current node to be walked. + toWalk.push(currStartNode); + // Do a walk with the currStartNode as the origin. + while (!toWalk.empty()) { + Node currWalkNode = toWalk.peek(); + walk(currWalkNode); + // If we have dispatched the current node, add the return value to the output. + if (nodeOutput != null && getDispatchedList().contains(currWalkNode)) { + nodeOutput.put(currWalkNode, retMap.get(currWalkNode)); + } + } + } + } + +/** +* walk the current operator and its descendants in the following fashion: +* First the undispatched parent nodes, followed by current operator(if undispatched), +* undispatched children. * * @param nd * current operator in the graph @@ -70,17 +120,21 @@ public void walk(Node nd) throws SemanticException { if (opStack.empty() || nd != opStack.peek()) { opStack.push(nd); } + // If the parent nodes have all been dispatched, we can dispatch nd. if (allParentsDispatched(nd)) { - // all children are done or no need to walk the children + // If the current node is not dispatched yet, dispatch the current node and + // add the children to the toWalk stack. if (!getDispatchedList().contains(nd)) { - getToWalk().addAll(nd.getChildren()); dispatch(nd, opStack); + addAllUndispatchedChildren(nd); } + // We are done with the current node. + toWalk.remove(nd); opStack.pop(); return; } - // add children, self to the front of the queue in that order - getToWalk().add(0, nd); - addAllParents(nd); + // All the parents have not been dispatched yet, hence add the undispatched parents + // to the toWalk stack. + addAllUndispatchedParents(nd); } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java index f22694b..0c749f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql.lib; +import java.util.Collection; +import java.util.HashMap; + import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -44,6 +47,27 @@ public PreOrderWalker(Dispatcher disp) { } /** + * starting point for walking. + * + * @throws SemanticException + */ + @Override + public void startWalking(Collection startNodes, + HashMap nodeOutput) throws SemanticException { + for (Node currStartNode : startNodes) { + // The current node is already done, continue the loop. + if (nodeToWalkStatus.containsKey(currStartNode)) { + continue; + } + walk(currStartNode); + // If we have dispatched the current node, add the return value to the output. + if (nodeOutput != null && getDispatchedList().contains(currStartNode)) { + nodeOutput.put(currStartNode, retMap.get(currStartNode)); + } + } + } + + /** * Walk the current operator and its descendants. * * @param nd @@ -54,15 +78,18 @@ public PreOrderWalker(Dispatcher disp) { public void walk(Node nd) throws SemanticException { opStack.push(nd); dispatch(nd, opStack); + nodeToWalkStatus.put(nd, WalkStatus.DISPATCHED); - // move all the children to the front of queue + // move all the children to the front of queue if (nd.getChildren() != null) { for (Node n : nd.getChildren()) { - walk(n); + if (!nodeToWalkStatus.containsKey(n)) { + walk(n); + } } } else if (nd instanceof ConditionalTask) { for (Task n : ((ConditionalTask) nd).getListTasks()) { - if (n.getParentTasks() == null || n.getParentTasks().isEmpty()) { + if ((n.getParentTasks() == null || n.getParentTasks().isEmpty()) && !nodeToWalkStatus.containsKey(n)) { walk(n); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java index b8f5c71..adad51b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java @@ -158,23 +158,27 @@ public void walk(Node nd) throws SemanticException { if ((child instanceof FileSinkOperator) || (child instanceof ScriptOperator)) { walkChildren = false; + break; } } } - if ((nd.getChildren() == null) - || getDispatchedList().containsAll(nd.getChildren()) || !walkChildren) { + if ((nd.getChildren() == null + || (nodeToWalkStatus.containsKey(nd) && nodeToWalkStatus.get(nd) == WalkStatus.DISCOVERED)) || + !walkChildren) { // all children are done or no need to walk the children dispatch(nd, opStack); opStack.pop(); + toWalk.pop(); + nodeToWalkStatus.put(nd, WalkStatus.DISPATCHED); return; } - // move all the children to the front of queue - getToWalk().removeAll(nd.getChildren()); - getToWalk().addAll(0, nd.getChildren()); - // add self to the end of the queue - getToWalk().add(nd); - opStack.pop(); + for (Node child : nd.getChildren()) { + if (!nodeToWalkStatus.containsKey(child)) { + toWalk.push(child); + } + } + nodeToWalkStatus.put(nd, WalkStatus.DISCOVERED); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java index dd53ced..181eb33 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.ForwardWalker; import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -133,37 +134,11 @@ public ParseContext transform(ParseContext pactx) throws SemanticException { /** * Walks the op tree in root first order. */ - public static class ConstantPropagateWalker extends DefaultGraphWalker { + public static class ConstantPropagateWalker extends ForwardWalker { public ConstantPropagateWalker(Dispatcher disp) { super(disp); } - - @Override - public void walk(Node nd) throws SemanticException { - - List parents = ((Operator) nd).getParentOperators(); - if ((parents == null) - || getDispatchedList().containsAll(parents)) { - opStack.push(nd); - - // all children are done or no need to walk the children - dispatch(nd, opStack); - opStack.pop(); - } else { - getToWalk().removeAll(parents); - getToWalk().add(0, nd); - getToWalk().addAll(0, parents); - return; - } - - // move all the children to the front of queue - List children = nd.getChildren(); - if (children != null) { - getToWalk().removeAll(children); - getToWalk().addAll(children); - } - } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenMapRedWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenMapRedWalker.java index 9583a1b..8bcc279 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenMapRedWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenMapRedWalker.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.parse; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; @@ -40,6 +42,23 @@ public GenMapRedWalker(Dispatcher disp) { } /** + * starting point for walking. + * + * @throws SemanticException + */ + @Override + public void startWalking(Collection startNodes, + HashMap nodeOutput) throws SemanticException { + for (Node currStartNode : startNodes) { + walk(currStartNode); + // If we have dispatched the current node, add the return value to the output. + if (nodeOutput != null && getDispatchedList().contains(currStartNode)) { + nodeOutput.put(currStartNode, retMap.get(currStartNode)); + } + } + } + + /** * Walk the given operator. * * @param nd diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java index 2d8c8b2..37943c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java @@ -65,9 +65,7 @@ private void setRoot(Node nd) { @Override public void startWalking(Collection startNodes, HashMap nodeOutput) throws SemanticException { - toWalk.addAll(startNodes); - while (toWalk.size() > 0) { - Node nd = toWalk.remove(0); + for (Node nd : startNodes) { setRoot(nd); walk(nd); if (nodeOutput != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java index 2f63c1a..b08fd43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.parse; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; @@ -40,6 +42,23 @@ public TezWalker(Dispatcher disp) { } /** + * starting point for walking. + * + * @throws SemanticException + */ + @Override + public void startWalking(Collection startNodes, + HashMap nodeOutput) throws SemanticException { + for (Node currStartNode : startNodes) { + walk(currStartNode); + // If we have dispatched the current node, add the return value to the output. + if (nodeOutput != null && getDispatchedList().contains(currStartNode)) { + nodeOutput.put(currStartNode, retMap.get(currStartNode)); + } + } + } + + /** * Walk the given operator. * * @param nd diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java index e31c025..3f90c35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java @@ -65,9 +65,7 @@ private void setRoot(Node nd) { @Override public void startWalking(Collection startNodes, HashMap nodeOutput) throws SemanticException { - toWalk.addAll(startNodes); - while (toWalk.size() > 0) { - Node nd = toWalk.remove(0); + for (Node nd : startNodes) { setRoot(nd); walk(nd); if (nodeOutput != null) {