diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java index cf9131d..3517ecd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.lib; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.IdentityHashMap; @@ -37,7 +36,7 @@ public class DefaultGraphWalker implements GraphWalker { protected Stack opStack; - protected final List toWalk = new ArrayList(); + protected final OptimizedStack toWalk = new OptimizedStack(); protected final IdentityHashMap retMap = new IdentityHashMap(); protected final Dispatcher dispatcher; @@ -104,40 +103,52 @@ public void dispatch(Node nd, Stack ndStack) throws SemanticException { */ public void startWalking(Collection startNodes, HashMap nodeOutput) throws SemanticException { - toWalk.addAll(startNodes); - while (toWalk.size() > 0) { - Node nd = toWalk.remove(0); - walk(nd); - if (nodeOutput != null) { - nodeOutput.put(nd, retMap.get(nd)); + for (Node currStartNode : startNodes) { + // The current node is already done, continue the loop. + if (getDispatchedList().contains(currStartNode)) { + continue; + } + // Push the current node to be walked. + toWalk.push(currStartNode); + // Do a postorder DFS with the currStartNode as the origin. + while (!toWalk.empty()) { + Node currWalkNode = toWalk.peek(); + walk(currWalkNode); + // If we have dispatched the current node, add the return value to the output. + if (nodeOutput != null) { + nodeOutput.put(currWalkNode, retMap.get(currWalkNode)); + } } } } /** - * walk the current operator and its descendants. + * walk the current operator and its descendants in a postorder fashion. * * @param nd * current operator in the graph * @throws SemanticException */ public void walk(Node nd) throws SemanticException { + // The current node is part of the operator stack. if (opStack.empty() || nd != opStack.peek()) { opStack.push(nd); } - - if ((nd.getChildren() == null) - || getDispatchedList().containsAll(nd.getChildren())) { - // all children are done or no need to walk the children + // We can dispatch the current node if : + // 1. The current node does not have any children OR + // 2. Children would have been dispatched since + // we traverse in post-order fashion. + if (nd.getChildren() == null || + getDispatchedList().containsAll(nd.getChildren())) { if (!getDispatchedList().contains(nd)) { dispatch(nd, opStack); } opStack.pop(); + toWalk.remove(nd); return; } - // add children, self to the front of the queue in that order - getToWalk().add(0, nd); - getToWalk().removeAll(nd.getChildren()); - getToWalk().addAll(0, nd.getChildren()); + // Add the children to the beginning of toWalk stack. + toWalk.removeAll(nd.getChildren()); + toWalk.addAllReverse(nd.getChildren()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java index a2db3b5..4c607d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hive.ql.lib; +import java.util.Collection; +import java.util.HashMap; +import java.util.Set; + import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -43,8 +47,9 @@ protected boolean allParentsDispatched(Node nd) { if (op.getParentOperators() == null) { return true; } + Set dispatchedList = getDispatchedList(); for (Node pNode : op.getParentOperators()) { - if (!getDispatchedList().contains(pNode)) { + if (!dispatchedList.contains(pNode)) { return false; } } @@ -52,14 +57,32 @@ protected boolean allParentsDispatched(Node nd) { } @SuppressWarnings("unchecked") - protected void addAllParents(Node nd) { + protected void addAllUndispatchedParents(Node nd) { Operator op = (Operator) nd; - getToWalk().removeAll(op.getParentOperators()); - getToWalk().addAll(0, op.getParentOperators()); + Set dispatchedList = getDispatchedList(); + for (Node pNode : op.getParentOperators()) { + if (!dispatchedList.contains(pNode)) { + toWalk.push(pNode); + } + } } - /** -* walk the current operator and its descendants. + protected void addAllUndispatchedChildren(Node nd) { + if (nd.getChildren() == null) { + return; + } + Set dispatchedList = getDispatchedList(); + for (Node cNode : nd.getChildren()) { + if (!dispatchedList.contains(cNode)) { + toWalk.push(cNode); + } + } + } + +/** +* walk the current operator and its descendants in the following fashion: +* First the undispatched parent nodes, followed by current operator(if undispatched), +* undispatched children. * * @param nd * current operator in the graph @@ -70,17 +93,21 @@ public void walk(Node nd) throws SemanticException { if (opStack.empty() || nd != opStack.peek()) { opStack.push(nd); } + // If the parent nodes have all been dispatched, we can dispatch nd. if (allParentsDispatched(nd)) { - // all children are done or no need to walk the children + // If the current node is not dispatched yet, dispatch the current node and + // add the children to the toWalk stack. if (!getDispatchedList().contains(nd)) { - getToWalk().addAll(nd.getChildren()); dispatch(nd, opStack); + addAllUndispatchedChildren(nd); } + // We are done with the current node. + toWalk.remove(nd); opStack.pop(); return; } - // add children, self to the front of the queue in that order - getToWalk().add(0, nd); - addAllParents(nd); + // All the parents have not been dispatched yet, hence add the undispatched parents + // to the toWalk stack. + addAllUndispatchedParents(nd); } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lib/OptimizedStack.java b/ql/src/java/org/apache/hadoop/hive/ql/lib/OptimizedStack.java new file mode 100644 index 0000000..2bb441b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/lib/OptimizedStack.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lib; +import java.util.*; + +import org.apache.commons.lang.ArrayUtils; + +public class OptimizedStack extends Stack { + private static final long serialVersionUID = 1986881027644228873L; + + @Override + public boolean removeAll(Collection c) { + int oldHi=0, newHi=0, top=0; + for (int i=0; i 0) { + System.arraycopy(elementData, newHi, elementData, top, elementCount - newHi); + elementCount += top - newHi; + return true; + } else { + return false; + } + } + + /** + * This implements the unsynchronized semantics of ensureCapacity. + * Synchronized methods in this class can internally call this + * method for ensuring capacity without incurring the cost of an + * extra synchronization. + * + * @see #ensureCapacity(int) + */ + private void ensureCapacityHelper(int minCapacity) { + // overflow-conscious code + if (minCapacity - elementData.length > 0) + grow(minCapacity); + } + + /** + * The maximum size of array to allocate. + * Some VMs reserve some header words in an array. + * Attempts to allocate larger arrays may result in + * OutOfMemoryError: Requested array size exceeds VM limit + */ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + private void grow(int minCapacity) { + // overflow-conscious code + int oldCapacity = elementData.length; + int newCapacity = oldCapacity + ((capacityIncrement > 0) ? + capacityIncrement : oldCapacity); + if (newCapacity - minCapacity < 0) + newCapacity = minCapacity; + if (newCapacity - MAX_ARRAY_SIZE > 0) + newCapacity = hugeCapacity(minCapacity); + elementData = Arrays.copyOf(elementData, newCapacity); + } + + private static int hugeCapacity(int minCapacity) { + if (minCapacity < 0) // overflow + throw new OutOfMemoryError(); + return (minCapacity > MAX_ARRAY_SIZE) ? + Integer.MAX_VALUE : + MAX_ARRAY_SIZE; + } + + public synchronized boolean addAllReverse(Collection c) { + modCount++; + Object[] a = c.toArray(); + ArrayUtils.reverse(a); + int numNew = a.length; + ensureCapacityHelper(elementCount + numNew); + System.arraycopy(a, 0, elementData, elementCount, numNew); + elementCount += numNew; + return numNew != 0; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java index f22694b..fb62302 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql.lib; +import java.util.Collection; +import java.util.HashMap; + import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -44,6 +47,27 @@ public PreOrderWalker(Dispatcher disp) { } /** + * starting point for walking. + * + * @throws SemanticException + */ + @Override + public void startWalking(Collection startNodes, + HashMap nodeOutput) throws SemanticException { + for (Node currStartNode : startNodes) { + // The current node is already done, continue the loop. + if (getDispatchedList().contains(currStartNode)) { + continue; + } + walk(currStartNode); + // If we have dispatched the current node, add the return value to the output. + if (nodeOutput != null) { + nodeOutput.put(currStartNode, retMap.get(currStartNode)); + } + } + } + + /** * Walk the current operator and its descendants. * * @param nd diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java index 9a45458..413d64a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java @@ -162,22 +162,22 @@ public void walk(Node nd) throws SemanticException { if ((child instanceof FileSinkOperator) || (child instanceof ScriptOperator)) { walkChildren = false; + break; } } } - if ((nd.getChildren() == null) - || getDispatchedList().containsAll(nd.getChildren()) || !walkChildren) { + if ((nd.getChildren() == null) || + getDispatchedList().containsAll(nd.getChildren()) || !walkChildren) { // all children are done or no need to walk the children dispatch(nd, opStack); opStack.pop(); + toWalk.remove(nd); return; } - // move all the children to the front of queue - getToWalk().removeAll(nd.getChildren()); - getToWalk().addAll(0, nd.getChildren()); - // add self to the end of the queue - getToWalk().add(nd); + // If we have not visited the child, add it to the toWalk stack. + toWalk.removeAll(nd.getChildren()); + toWalk.addAllReverse(nd.getChildren()); opStack.pop(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java index dd53ced..181eb33 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.ForwardWalker; import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -133,37 +134,11 @@ public ParseContext transform(ParseContext pactx) throws SemanticException { /** * Walks the op tree in root first order. */ - public static class ConstantPropagateWalker extends DefaultGraphWalker { + public static class ConstantPropagateWalker extends ForwardWalker { public ConstantPropagateWalker(Dispatcher disp) { super(disp); } - - @Override - public void walk(Node nd) throws SemanticException { - - List parents = ((Operator) nd).getParentOperators(); - if ((parents == null) - || getDispatchedList().containsAll(parents)) { - opStack.push(nd); - - // all children are done or no need to walk the children - dispatch(nd, opStack); - opStack.pop(); - } else { - getToWalk().removeAll(parents); - getToWalk().add(0, nd); - getToWalk().addAll(0, parents); - return; - } - - // move all the children to the front of queue - List children = nd.getChildren(); - if (children != null) { - getToWalk().removeAll(children); - getToWalk().addAll(children); - } - } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenMapRedWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenMapRedWalker.java index 9583a1b..58e9c15 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenMapRedWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenMapRedWalker.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.parse; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; @@ -40,6 +42,23 @@ public GenMapRedWalker(Dispatcher disp) { } /** + * starting point for walking. + * + * @throws SemanticException + */ + @Override + public void startWalking(Collection startNodes, + HashMap nodeOutput) throws SemanticException { + for (Node currStartNode : startNodes) { + walk(currStartNode); + // If we have dispatched the current node, add the return value to the output. + if (nodeOutput != null) { + nodeOutput.put(currStartNode, retMap.get(currStartNode)); + } + } + } + + /** * Walk the given operator. * * @param nd diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java index 2d8c8b2..37943c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java @@ -65,9 +65,7 @@ private void setRoot(Node nd) { @Override public void startWalking(Collection startNodes, HashMap nodeOutput) throws SemanticException { - toWalk.addAll(startNodes); - while (toWalk.size() > 0) { - Node nd = toWalk.remove(0); + for (Node nd : startNodes) { setRoot(nd); walk(nd); if (nodeOutput != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java index 2f63c1a..96ef037 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezWalker.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.parse; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; @@ -40,6 +42,23 @@ public TezWalker(Dispatcher disp) { } /** + * starting point for walking. + * + * @throws SemanticException + */ + @Override + public void startWalking(Collection startNodes, + HashMap nodeOutput) throws SemanticException { + for (Node currStartNode : startNodes) { + walk(currStartNode); + // If we have dispatched the current node, add the return value to the output. + if (nodeOutput != null) { + nodeOutput.put(currStartNode, retMap.get(currStartNode)); + } + } + } + + /** * Walk the given operator. * * @param nd diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java index e31c025..3f90c35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java @@ -65,9 +65,7 @@ private void setRoot(Node nd) { @Override public void startWalking(Collection startNodes, HashMap nodeOutput) throws SemanticException { - toWalk.addAll(startNodes); - while (toWalk.size() > 0) { - Node nd = toWalk.remove(0); + for (Node nd : startNodes) { setRoot(nd); walk(nd); if (nodeOutput != null) {